diff --git a/message_server/ibp_ms.go b/message_server/ibp_ms.go index 22f36ed..b0f45b5 100644 --- a/message_server/ibp_ms.go +++ b/message_server/ibp_ms.go @@ -1,6 +1,7 @@ package message_server import ( + "fmt" "time" "joylink.club/bj-rtsts-server/message_server/ms_api" @@ -16,7 +17,7 @@ import ( // 综合后备盘IBP消息服务 func NewIBPMs(vs *memory.VerifySimulation, mapId int32) ms_api.MsgTask { mapData := memory.QueryGiData[*graphicData.RtssGraphicStorage](mapId) - return ms_api.NewScheduleTask("综合后备盘IBP", func() error { + return ms_api.NewScheduleTask(fmt.Sprintf("地图[%d]综合后备盘IBP", mapId), func() error { for _, station := range mapData.Stations { sid := memory.GetMapElementId(station.Common) stationIbpState, err := collectStationIbpState(mapId, vs.World, station) diff --git a/message_server/psl_ms.go b/message_server/psl_ms.go index 116cc2c..0bd7069 100644 --- a/message_server/psl_ms.go +++ b/message_server/psl_ms.go @@ -16,7 +16,7 @@ import ( func NewPSLMs(vs *memory.VerifySimulation, mapId int32) ms_api.MsgTask { mapData := memory.QueryGiData[*graphicData.RtssGraphicStorage](mapId) - return ms_api.NewScheduleTask("综合后备盘IBP", func() error { + return ms_api.NewScheduleTask(fmt.Sprintf("地图[%d]综合门控箱按钮状态", mapId), func() error { for _, box := range mapData.GateBoxs { did := memory.GetMapElementId(box.Common) state, err := collectGateBoxPSLState(vs.World, mapId, box) diff --git a/message_server/rcc_ms.go b/message_server/rcc_ms.go index 95494d3..d82dfbe 100644 --- a/message_server/rcc_ms.go +++ b/message_server/rcc_ms.go @@ -1,6 +1,7 @@ package message_server import ( + "fmt" "time" "joylink.club/bj-rtsts-server/message_server/ms_api" @@ -14,7 +15,7 @@ import ( ) func NewRccMs(vs *memory.VerifySimulation, mapId int32) ms_api.MsgTask { - return ms_api.NewScheduleTask("继电器状态", func() error { + return ms_api.NewScheduleTask(fmt.Sprintf("地图[%d]继电器状态", mapId), func() error { relayStates, err := collectRelayState(vs.World, mapId) if err != nil { return err diff --git a/message_server/sfp_ms.go b/message_server/sfp_ms.go index 64d9482..95c3a55 100644 --- a/message_server/sfp_ms.go +++ b/message_server/sfp_ms.go @@ -18,7 +18,7 @@ import ( // 信号平面布置图消息服务 func NewSfpMs(vs *memory.VerifySimulation, mapId int32) ms_api.MsgTask { - return ms_api.NewScheduleTask("信号平面布置图", func() error { + return ms_api.NewScheduleTask(fmt.Sprintf("信号平面布置图[%d]状态", mapId), func() error { turnoutStates, err := collectTurnoutStates(vs, mapId) if err != nil { return err diff --git a/message_server/simulation.go b/message_server/simulation.go index f258ef4..690cc0c 100644 --- a/message_server/simulation.go +++ b/message_server/simulation.go @@ -13,14 +13,14 @@ func Start(vs *memory.VerifySimulation) { switch t { case graphicData.PictureType_StationLayout: // 平面布置图 // 添加车站关联的平面布置图、IBP、PSL信息 - mqtt.GetMsgClient().PublishTask(NewSfpMs(vs, mapId), NewIBPMs(vs, mapId), NewPSLMs(vs, mapId)) + mqtt.GetMsgClient().PublishTask(vs.SimulationId, NewSfpMs(vs, mapId), NewIBPMs(vs, mapId), NewPSLMs(vs, mapId)) case graphicData.PictureType_RelayCabinetLayout: // 继电器柜 - mqtt.GetMsgClient().PublishTask(NewRccMs(vs, mapId)) + mqtt.GetMsgClient().PublishTask(vs.SimulationId, NewRccMs(vs, mapId)) } } } // 关闭仿真消息服务 func Close(vs *memory.VerifySimulation) { - mqtt.GetMsgClient().CloseTask() + mqtt.GetMsgClient().CloseTask(vs.SimulationId) } diff --git a/message_server/simulation_state_ms.go b/message_server/simulation_state_ms.go index d897576..ecd855d 100644 --- a/message_server/simulation_state_ms.go +++ b/message_server/simulation_state_ms.go @@ -1,6 +1,8 @@ package message_server import ( + "fmt" + "joylink.club/bj-rtsts-server/message_server/ms_api" "joylink.club/bj-rtsts-server/mqtt" "joylink.club/bj-rtsts-server/ts/protos/state" @@ -9,7 +11,7 @@ import ( ) func NewStateMs(vs *memory.VerifySimulation) ms_api.MsgTask { - return ms_api.NewMonitorTask("仿真状态", func() { + return ms_api.NewMonitorTask(fmt.Sprintf("仿真[%s]状态", vs.SimulationId), func() { ecs.WorldStateChangeEvent.Subscribe(vs.World, func(_ ecs.World, e ecs.WorldStateChange) { s := &state.SimulationStatus{SimulationId: vs.SimulationId} switch e.NewState { diff --git a/mqtt/client.go b/mqtt/client.go index 9137443..59bc704 100644 --- a/mqtt/client.go +++ b/mqtt/client.go @@ -19,7 +19,7 @@ var mqttClient *MqttClient type MqttClient struct { cc *autopaho.ClientConfig cm *autopaho.ConnectionManager - tasks []ms_api.MsgTask + tasks map[string][]ms_api.MsgTask } // 初始化并启动MQTT客户端服务 @@ -35,7 +35,7 @@ func Startup(cmc *MqttOptions) error { if err != nil { return err } - mqttClient = &MqttClient{cc: cc, cm: cm} + mqttClient = &MqttClient{cc: cc, cm: cm, tasks: make(map[string][]ms_api.MsgTask)} return nil } @@ -90,16 +90,20 @@ func (client *MqttClient) pub(topic string, data protoreflect.ProtoMessage) erro } // 发布任务 -func (client *MqttClient) PublishTask(tasks ...ms_api.MsgTask) { - client.tasks = append(client.tasks, tasks...) +func (client *MqttClient) PublishTask(simulationId string, tasks ...ms_api.MsgTask) { + client.tasks[simulationId] = append(client.tasks[simulationId], tasks...) } // 停止任务 -func (client *MqttClient) CloseTask() { - for _, task := range client.tasks { +func (client *MqttClient) CloseTask(simulationId string) { + tasks, ok := client.tasks[simulationId] + if !ok { + return + } + for _, task := range tasks { task.Stop() } - client.tasks = nil + client.tasks[simulationId] = nil } // 发送仿真状态数据 diff --git a/mqtt/topic.go b/mqtt/topic.go index 7b1eef9..b684d78 100644 --- a/mqtt/topic.go +++ b/mqtt/topic.go @@ -8,12 +8,12 @@ import ( ) const ( - topicPrefix = "/" + config.SystemName + "/simulation/%s/" - stateTopic = topicPrefix + "state" - sfpTopic = topicPrefix + "sfp/%d" - rccTopic = topicPrefix + "rcc/%d" - pslTopic = topicPrefix + "psl/%d/%d" - ibpTopic = topicPrefix + "ibp/%d/%d" + topicPrefix = "/" + config.SystemName + "/simulation/%s/" // 公共部分 仿真ID + stateTopic = topicPrefix + "state" // 仿真状态topic + sfpTopic = topicPrefix + "sfp/%d" // 平面布置图设备状态topic 地图ID + rccTopic = topicPrefix + "rcc/%d" // 继电器柜继电器状态topic 地图ID + pslTopic = topicPrefix + "psl/%d/%d" // psl状态topic 地图ID/门控箱ID + ibpTopic = topicPrefix + "ibp/%d/%d" // ibp盘状态topic 地图ID/车站ID ) var topicMap = map[string]string{