udp服务停止问题调整

This commit is contained in:
walker 2024-01-24 13:25:00 +08:00
parent 6b16afc7b3
commit 9abb18e3b6
8 changed files with 48 additions and 29 deletions

@ -1 +1 @@
Subproject commit bc70ae80a0ed73ec417884710e02996103841bf9 Subproject commit 7b637f0f519e0c6f5fa6817546f70000ac22fd10

@ -1 +1 @@
Subproject commit fa7211d7bdf68be1df571f222a047ddd6cd764b0 Subproject commit 247003d00b1d1c37e467242ac9b3c35e012b0eb8

View File

@ -97,6 +97,7 @@ func (d *dynamics) Name() string {
// 解码列车信息并处理 // 解码列车信息并处理
func (d *dynamics) handleDynamicsTrainInfo(b []byte) { func (d *dynamics) handleDynamicsTrainInfo(b []byte) {
d.udpDelayRecorder.RecordInterval() d.udpDelayRecorder.RecordInterval()
// slog.Debug("动力学列车信息近期消息间隔", "intervals", d.udpDelayRecorder.GetIntervals())
trainInfo := &message.DynamicsTrainInfo{} trainInfo := &message.DynamicsTrainInfo{}
err := trainInfo.Decode(b) err := trainInfo.Decode(b)
if err != nil { if err != nil {
@ -230,12 +231,16 @@ func (d *dynamics) Start(manager DynamicsMessageManager) error {
} }
d.manager = manager d.manager = manager
// 初始化客户端信息 // 初始化客户端信息
d.initDynamics() err := d.initDynamics()
if err != nil {
d.Stop()
return err
}
// 初始化运行资源 // 初始化运行资源
err := d.initDynamicsRunRepository() err = d.initDynamicsRunRepository()
if err != nil { if err != nil {
d.Stop() // 发送错误后将信息销毁 d.Stop() // 发送错误后将信息销毁
panic(err) return err
} }
ctx, cancle := context.WithCancel(context.Background()) ctx, cancle := context.WithCancel(context.Background())
go d.sendTurnoutStateTask(ctx) go d.sendTurnoutStateTask(ctx)
@ -246,13 +251,13 @@ func (d *dynamics) Start(manager DynamicsMessageManager) error {
} }
// 初始化客户端、服务等信息 // 初始化客户端、服务等信息
func (d *dynamics) initDynamics() { func (d *dynamics) initDynamics() error {
d.turnoutStateUdpClient = udp.NewClient(fmt.Sprintf("%v:%v", d.runConfig.Ip, d.runConfig.UdpRemotePort)) d.turnoutStateUdpClient = udp.NewClient(fmt.Sprintf("%v:%v", d.runConfig.Ip, d.runConfig.UdpRemotePort))
d.trainControlUdpClient = udp.NewClient(fmt.Sprintf("%v:%v", d.runConfig.Ip, d.runConfig.UdpRemoteTrainPort)) d.trainControlUdpClient = udp.NewClient(fmt.Sprintf("%v:%v", d.runConfig.Ip, d.runConfig.UdpRemoteTrainPort))
d.baseUrl = getUrlBase(d.runConfig) d.baseUrl = getUrlBase(d.runConfig)
d.httpClient = &http.Client{Timeout: time.Second * 5} d.httpClient = &http.Client{Timeout: time.Second * 5}
d.trainInfoUdpServer = udp.NewServer(fmt.Sprintf(":%d", d.runConfig.UdpLocalPort), d.handleDynamicsTrainInfo) d.trainInfoUdpServer = udp.NewServer(fmt.Sprintf(":%d", d.runConfig.UdpLocalPort), d.handleDynamicsTrainInfo)
d.trainInfoUdpServer.Listen() return d.trainInfoUdpServer.Listen()
} }
// 动力学运行所需数据 // 动力学运行所需数据
@ -268,23 +273,27 @@ func (d *dynamics) initDynamicsRunRepository() error {
func (d *dynamics) Stop() { func (d *dynamics) Stop() {
initMutex.Lock() initMutex.Lock()
defer initMutex.Unlock() defer initMutex.Unlock()
slog.Debug("动力学服务停止")
// 停止网络监听 // 停止网络监听
d.udpDelayRecorder.Stop() d.udpDelayRecorder.Stop()
if d.turnoutTaskCancel != nil {
d.turnoutTaskCancel()
}
if d.httpClient != nil { if d.httpClient != nil {
d.requestStopSimulation() d.requestStopSimulation()
d.httpClient = nil d.httpClient = nil
} }
if d.turnoutStateUdpClient != nil { if d.turnoutStateUdpClient != nil {
d.turnoutStateUdpClient.Close() d.turnoutStateUdpClient.Close()
d.turnoutStateUdpClient = nil
} }
if d.trainControlUdpClient != nil { if d.trainControlUdpClient != nil {
d.trainControlUdpClient.Close() d.trainControlUdpClient.Close()
d.trainControlUdpClient = nil
} }
if d.trainInfoUdpServer != nil { if d.trainInfoUdpServer != nil {
d.trainInfoUdpServer.Close() d.trainInfoUdpServer.Close()
} d.trainInfoUdpServer = nil
if d.turnoutTaskCancel != nil {
d.turnoutTaskCancel()
} }
d.manager = nil d.manager = nil
d.updateState(tpapi.ThirdPartyState_Closed) d.updateState(tpapi.ThirdPartyState_Closed)

View File

@ -2,7 +2,6 @@ package semi_physical_train
import ( import (
"fmt" "fmt"
"log/slog"
"sync" "sync"
"joylink.club/bj-rtsts-server/config" "joylink.club/bj-rtsts-server/config"
@ -55,7 +54,7 @@ func (s *semiPhysicalTrainImpl) Name() string {
func (s *semiPhysicalTrainImpl) handleTrainControlMsg(b []byte) { func (s *semiPhysicalTrainImpl) handleTrainControlMsg(b []byte) {
s.udpDelayRecorder.RecordInterval() s.udpDelayRecorder.RecordInterval()
slog.Debug(fmt.Sprintf("半实物列车控制消息近期消息间隔: %v", s.udpDelayRecorder.GetIntervals())) // slog.Debug(fmt.Sprintf("半实物列车控制消息近期消息间隔: %v", s.udpDelayRecorder.GetIntervals()))
handler := s.manager handler := s.manager
if handler != nil { if handler != nil {
handler.HandleSemiPhysicalTrainControlMsg(b) handler.HandleSemiPhysicalTrainControlMsg(b)

View File

@ -28,8 +28,9 @@ func convertServiceName(name string) state_proto.SimulationThirdPartyApiService_
return state_proto.SimulationThirdPartyApiService_Dynamics return state_proto.SimulationThirdPartyApiService_Dynamics
case semi_physical_train.Name: case semi_physical_train.Name:
return state_proto.SimulationThirdPartyApiService_SemiPhysicalTrain return state_proto.SimulationThirdPartyApiService_SemiPhysicalTrain
default:
return state_proto.SimulationThirdPartyApiService_Undefined
} }
return state_proto.SimulationThirdPartyApiService_Undefined
} }
func GetRunningServiceStates() *state_proto.SimulationThirdPartyApiService { func GetRunningServiceStates() *state_proto.SimulationThirdPartyApiService {
@ -38,6 +39,7 @@ func GetRunningServiceStates() *state_proto.SimulationThirdPartyApiService {
t := convertServiceName(tpas.Name()) t := convertServiceName(tpas.Name())
if t == state_proto.SimulationThirdPartyApiService_Undefined { if t == state_proto.SimulationThirdPartyApiService_Undefined {
slog.Error("未知的第三方接口服务类型", "name", tpas.Name()) slog.Error("未知的第三方接口服务类型", "name", tpas.Name())
continue
} }
switch tpas.State() { switch tpas.State() {
case tpapi.ThirdPartyState_Normal: case tpapi.ThirdPartyState_Normal:

View File

@ -6,8 +6,8 @@ import (
) )
type UdpClient interface { type UdpClient interface {
SendMsg(msg UdpMessageEncoder) SendMsg(msg UdpMessageEncoder) error
Send(b []byte) Send(b []byte) error
Close() Close()
} }
@ -62,20 +62,24 @@ func NewClientWithLocalAddr(remoteAddr string, localAddr string) UdpClient {
return c return c
} }
func (c *client) SendMsg(msg UdpMessageEncoder) { func (c *client) SendMsg(msg UdpMessageEncoder) error {
b := msg.Encode() b := msg.Encode()
_, err := c.conn.Write(b) _, err := c.conn.Write(b)
if err != nil { if err != nil {
slog.Error("udp client send error", "error", err) slog.Error("udp client send error", "error", err)
return err
} }
return nil
// slog.Debug("udp client send", "size", n) // slog.Debug("udp client send", "size", n)
} }
func (c *client) Send(b []byte) { func (c *client) Send(b []byte) error {
_, err := c.conn.Write(b) _, err := c.conn.Write(b)
if err != nil { if err != nil {
slog.Error("udp client send error", "error", err) slog.Error("udp client send error", "error", err)
return err
} }
return nil
// slog.Debug("udp client send", "size", n) // slog.Debug("udp client send", "size", n)
} }

View File

@ -8,7 +8,7 @@ import (
) )
type UdpServer interface { type UdpServer interface {
Listen() Listen() error
Close() Close()
} }
@ -19,44 +19,49 @@ type server struct {
conn *net.UDPConn conn *net.UDPConn
handler UdpMsgHandler handler UdpMsgHandler
cancelFn context.CancelFunc cancelFn context.CancelFunc
done chan struct{} // 服务协程退出信号
} }
// NewServer creates a new instance of UdpServer. // NewServer creates a new instance of UdpServer.
func NewServer(addr string, handler UdpMsgHandler) UdpServer { func NewServer(addr string, handler UdpMsgHandler) UdpServer {
return &server{addr: addr, handler: handler} return &server{addr: addr, handler: handler, done: make(chan struct{})}
} }
func (s *server) Listen() { func (s *server) Listen() error {
udpAddr, err := net.ResolveUDPAddr("udp", s.addr) udpAddr, err := net.ResolveUDPAddr("udp", s.addr)
if err != nil { if err != nil {
panic(err) return err
} }
conn, err := net.ListenUDP("udp", udpAddr) conn, err := net.ListenUDP("udp", udpAddr)
if err != nil { if err != nil {
panic(err) return err
} }
s.conn = conn s.conn = conn
ctx, cfn := context.WithCancel(context.Background()) ctx, cfn := context.WithCancel(context.Background())
// 启动监听处理 // 启动监听处理
go s.listenAndHandle(ctx) go s.listenAndHandle(ctx)
s.cancelFn = cfn s.cancelFn = cfn
return nil
} }
func (s *server) Close() { func (s *server) Close() {
err := s.conn.Close()
if err != nil {
slog.Error("udp server close error", "error", err)
}
s.cancelFn() s.cancelFn()
// err := s.conn.Close() <-s.done
// if err != nil {
// slog.Error("udp server close error", "error", err)
// }
} }
func (s *server) listenAndHandle(ctx context.Context) { func (s *server) listenAndHandle(ctx context.Context) {
defer close(s.done)
defer s.conn.Close() defer s.conn.Close()
mainLoop:
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
slog.Info("udp server listen 关闭", "addr", s.addr) // slog.Info("udp server listen 关闭", "addr", s.addr)
return break mainLoop
default: default:
} }
buf := make([]byte, 1024) buf := make([]byte, 1024)

View File

@ -84,7 +84,7 @@ func DestroySimulation(simulationId string) {
// 停止ecs world // 停止ecs world
simulationInfo.World.Close() simulationInfo.World.Close()
message_server.Close(simulationInfo) message_server.Close(simulationInfo)
// 发布销毁消息 // 确保发布销毁消息
message_server.PubSimulationDestroyMsg(simulationId) message_server.PubSimulationDestroyMsg(simulationId)
// 停止第三方 // 停止第三方
stopThirdParty(simulationInfo) stopThirdParty(simulationInfo)