【增加消息topic注释】

This commit is contained in:
weizhihong 2023-12-26 13:27:09 +08:00
parent 96cf5e844b
commit bafff0315d
8 changed files with 29 additions and 21 deletions

View File

@ -1,6 +1,7 @@
package message_server
import (
"fmt"
"time"
"joylink.club/bj-rtsts-server/message_server/ms_api"
@ -16,7 +17,7 @@ import (
// 综合后备盘IBP消息服务
func NewIBPMs(vs *memory.VerifySimulation, mapId int32) ms_api.MsgTask {
mapData := memory.QueryGiData[*graphicData.RtssGraphicStorage](mapId)
return ms_api.NewScheduleTask("综合后备盘IBP", func() error {
return ms_api.NewScheduleTask(fmt.Sprintf("地图[%d]综合后备盘IBP", mapId), func() error {
for _, station := range mapData.Stations {
sid := memory.GetMapElementId(station.Common)
stationIbpState, err := collectStationIbpState(mapId, vs.World, station)

View File

@ -16,7 +16,7 @@ import (
func NewPSLMs(vs *memory.VerifySimulation, mapId int32) ms_api.MsgTask {
mapData := memory.QueryGiData[*graphicData.RtssGraphicStorage](mapId)
return ms_api.NewScheduleTask("综合后备盘IBP", func() error {
return ms_api.NewScheduleTask(fmt.Sprintf("地图[%d]综合门控箱按钮状态", mapId), func() error {
for _, box := range mapData.GateBoxs {
did := memory.GetMapElementId(box.Common)
state, err := collectGateBoxPSLState(vs.World, mapId, box)

View File

@ -1,6 +1,7 @@
package message_server
import (
"fmt"
"time"
"joylink.club/bj-rtsts-server/message_server/ms_api"
@ -14,7 +15,7 @@ import (
)
func NewRccMs(vs *memory.VerifySimulation, mapId int32) ms_api.MsgTask {
return ms_api.NewScheduleTask("继电器状态", func() error {
return ms_api.NewScheduleTask(fmt.Sprintf("地图[%d]继电器状态", mapId), func() error {
relayStates, err := collectRelayState(vs.World, mapId)
if err != nil {
return err

View File

@ -18,7 +18,7 @@ import (
// 信号平面布置图消息服务
func NewSfpMs(vs *memory.VerifySimulation, mapId int32) ms_api.MsgTask {
return ms_api.NewScheduleTask("信号平面布置图", func() error {
return ms_api.NewScheduleTask(fmt.Sprintf("信号平面布置图[%d]状态", mapId), func() error {
turnoutStates, err := collectTurnoutStates(vs, mapId)
if err != nil {
return err

View File

@ -13,14 +13,14 @@ func Start(vs *memory.VerifySimulation) {
switch t {
case graphicData.PictureType_StationLayout: // 平面布置图
// 添加车站关联的平面布置图、IBP、PSL信息
mqtt.GetMsgClient().PublishTask(NewSfpMs(vs, mapId), NewIBPMs(vs, mapId), NewPSLMs(vs, mapId))
mqtt.GetMsgClient().PublishTask(vs.SimulationId, NewSfpMs(vs, mapId), NewIBPMs(vs, mapId), NewPSLMs(vs, mapId))
case graphicData.PictureType_RelayCabinetLayout: // 继电器柜
mqtt.GetMsgClient().PublishTask(NewRccMs(vs, mapId))
mqtt.GetMsgClient().PublishTask(vs.SimulationId, NewRccMs(vs, mapId))
}
}
}
// 关闭仿真消息服务
func Close(vs *memory.VerifySimulation) {
mqtt.GetMsgClient().CloseTask()
mqtt.GetMsgClient().CloseTask(vs.SimulationId)
}

View File

@ -1,6 +1,8 @@
package message_server
import (
"fmt"
"joylink.club/bj-rtsts-server/message_server/ms_api"
"joylink.club/bj-rtsts-server/mqtt"
"joylink.club/bj-rtsts-server/ts/protos/state"
@ -9,7 +11,7 @@ import (
)
func NewStateMs(vs *memory.VerifySimulation) ms_api.MsgTask {
return ms_api.NewMonitorTask("仿真状态", func() {
return ms_api.NewMonitorTask(fmt.Sprintf("仿真[%s]状态", vs.SimulationId), func() {
ecs.WorldStateChangeEvent.Subscribe(vs.World, func(_ ecs.World, e ecs.WorldStateChange) {
s := &state.SimulationStatus{SimulationId: vs.SimulationId}
switch e.NewState {

View File

@ -19,7 +19,7 @@ var mqttClient *MqttClient
type MqttClient struct {
cc *autopaho.ClientConfig
cm *autopaho.ConnectionManager
tasks []ms_api.MsgTask
tasks map[string][]ms_api.MsgTask
}
// 初始化并启动MQTT客户端服务
@ -35,7 +35,7 @@ func Startup(cmc *MqttOptions) error {
if err != nil {
return err
}
mqttClient = &MqttClient{cc: cc, cm: cm}
mqttClient = &MqttClient{cc: cc, cm: cm, tasks: make(map[string][]ms_api.MsgTask)}
return nil
}
@ -90,16 +90,20 @@ func (client *MqttClient) pub(topic string, data protoreflect.ProtoMessage) erro
}
// 发布任务
func (client *MqttClient) PublishTask(tasks ...ms_api.MsgTask) {
client.tasks = append(client.tasks, tasks...)
func (client *MqttClient) PublishTask(simulationId string, tasks ...ms_api.MsgTask) {
client.tasks[simulationId] = append(client.tasks[simulationId], tasks...)
}
// 停止任务
func (client *MqttClient) CloseTask() {
for _, task := range client.tasks {
func (client *MqttClient) CloseTask(simulationId string) {
tasks, ok := client.tasks[simulationId]
if !ok {
return
}
for _, task := range tasks {
task.Stop()
}
client.tasks = nil
client.tasks[simulationId] = nil
}
// 发送仿真状态数据

View File

@ -8,12 +8,12 @@ import (
)
const (
topicPrefix = "/" + config.SystemName + "/simulation/%s/"
stateTopic = topicPrefix + "state"
sfpTopic = topicPrefix + "sfp/%d"
rccTopic = topicPrefix + "rcc/%d"
pslTopic = topicPrefix + "psl/%d/%d"
ibpTopic = topicPrefix + "ibp/%d/%d"
topicPrefix = "/" + config.SystemName + "/simulation/%s/" // 公共部分 仿真ID
stateTopic = topicPrefix + "state" // 仿真状态topic
sfpTopic = topicPrefix + "sfp/%d" // 平面布置图设备状态topic 地图ID
rccTopic = topicPrefix + "rcc/%d" // 继电器柜继电器状态topic 地图ID
pslTopic = topicPrefix + "psl/%d/%d" // psl状态topic 地图ID/门控箱ID
ibpTopic = topicPrefix + "ibp/%d/%d" // ibp盘状态topic 地图ID/车站ID
)
var topicMap = map[string]string{