From 7e9c9c969d96e460e2c08a10c230104ea7e8dc7f Mon Sep 17 00:00:00 2001 From: weizhihong Date: Mon, 25 Dec 2023 14:15:22 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E4=BB=BF=E7=9C=9F=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E4=BF=AE=E6=94=B9=E3=80=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- message_server/ibp_ms.go | 2 +- message_server/psl_ms.go | 2 +- message_server/rcc_ms.go | 2 +- message_server/sfp_ms.go | 2 +- message_server/simulation.go | 49 +++++--------------- message_server/simulation_state_ms.go | 19 +++----- mqtt/client.go | 64 ++++++++++++++------------- mqtt/config.go | 8 ++++ 8 files changed, 62 insertions(+), 86 deletions(-) diff --git a/message_server/ibp_ms.go b/message_server/ibp_ms.go index d36f1c5..22f36ed 100644 --- a/message_server/ibp_ms.go +++ b/message_server/ibp_ms.go @@ -23,7 +23,7 @@ func NewIBPMs(vs *memory.VerifySimulation, mapId int32) ms_api.MsgTask { if err != nil { return err } - mqtt.PubIBPState(vs.SimulationId, mapId, sid, stationIbpState) + mqtt.GetMsgClient().PubIBPState(vs.SimulationId, mapId, sid, stationIbpState) } return nil }, 200*time.Millisecond) diff --git a/message_server/psl_ms.go b/message_server/psl_ms.go index bcd60d9..116cc2c 100644 --- a/message_server/psl_ms.go +++ b/message_server/psl_ms.go @@ -23,7 +23,7 @@ func NewPSLMs(vs *memory.VerifySimulation, mapId int32) ms_api.MsgTask { if err != nil { return err } - mqtt.PubPSLState(vs.SimulationId, mapId, did, state) + mqtt.GetMsgClient().PubPSLState(vs.SimulationId, mapId, did, state) } return nil }, 200*time.Millisecond) diff --git a/message_server/rcc_ms.go b/message_server/rcc_ms.go index 42a70ad..95494d3 100644 --- a/message_server/rcc_ms.go +++ b/message_server/rcc_ms.go @@ -25,7 +25,7 @@ func NewRccMs(vs *memory.VerifySimulation, mapId int32) ms_api.MsgTask { ReplyState: relayStates, }, } - mqtt.PubRCCState(vs.SimulationId, mapId, ststes) + mqtt.GetMsgClient().PubRCCState(vs.SimulationId, mapId, ststes) return nil }, 200*time.Millisecond) } diff --git a/message_server/sfp_ms.go b/message_server/sfp_ms.go index 17086de..64d9482 100644 --- a/message_server/sfp_ms.go +++ b/message_server/sfp_ms.go @@ -59,7 +59,7 @@ func NewSfpMs(vs *memory.VerifySimulation, mapId int32) ms_api.MsgTask { PlatformState: platformStates, }, } - mqtt.PubSfpState(vs.SimulationId, mapId, ststes) + mqtt.GetMsgClient().PubSfpState(vs.SimulationId, mapId, ststes) return nil }, 200*time.Millisecond) } diff --git a/message_server/simulation.go b/message_server/simulation.go index d069459..f258ef4 100644 --- a/message_server/simulation.go +++ b/message_server/simulation.go @@ -1,55 +1,26 @@ package message_server import ( - "sync" - - "joylink.club/bj-rtsts-server/message_server/ms_api" + "joylink.club/bj-rtsts-server/mqtt" "joylink.club/bj-rtsts-server/ts/protos/graphicData" "joylink.club/bj-rtsts-server/ts/simulation/wayside/memory" ) -var smsMap sync.Map - -// 仿真消息服务 -// 管理仿真消息服务,整体可以作为一个消息服务,也可以每个消息子服务各自作为一个消息服务,暂时先按整体作为一个消息服务的方式使用 -type SimulationMs struct { - vs *memory.VerifySimulation - tasks []ms_api.MsgTask -} - // 启动仿真所需的消息服务 func Start(vs *memory.VerifySimulation) { - _, ok := smsMap.Load(vs.SimulationId) - if !ok { - sms := &SimulationMs{ - vs: vs, - tasks: []ms_api.MsgTask{}, + for _, mapId := range vs.MapIds { + t := memory.QueryGiType(mapId) + switch t { + case graphicData.PictureType_StationLayout: // 平面布置图 + // 添加车站关联的平面布置图、IBP、PSL信息 + mqtt.GetMsgClient().PublishTask(NewSfpMs(vs, mapId), NewIBPMs(vs, mapId), NewPSLMs(vs, mapId)) + case graphicData.PictureType_RelayCabinetLayout: // 继电器柜 + mqtt.GetMsgClient().PublishTask(NewRccMs(vs, mapId)) } - for _, mapId := range vs.MapIds { - t := memory.QueryGiType(mapId) - switch t { - case graphicData.PictureType_StationLayout: // 平面布置图 - // 添加车站关联的平面布置图、IBP、PSL信息 - sms.tasks = append(sms.tasks, NewSfpMs(vs, mapId), NewIBPMs(vs, mapId), NewPSLMs(vs, mapId)) - case graphicData.PictureType_RelayCabinetLayout: // 继电器柜 - sms.tasks = append(sms.tasks, NewRccMs(vs, mapId)) - } - } - // 启动仿真状态服务 - sms.tasks = append(sms.tasks, NewStateMs(vs)) - smsMap.Store(vs.SimulationId, sms) } } // 关闭仿真消息服务 func Close(vs *memory.VerifySimulation) { - sms, ok := smsMap.Load(vs.SimulationId) - if ok { - ms := sms.(*SimulationMs) - for _, task := range ms.tasks { - task.Stop() - } - ms.tasks = nil - smsMap.Delete(vs.SimulationId) - } + mqtt.GetMsgClient().CloseTask() } diff --git a/message_server/simulation_state_ms.go b/message_server/simulation_state_ms.go index 54f416e..18fb726 100644 --- a/message_server/simulation_state_ms.go +++ b/message_server/simulation_state_ms.go @@ -11,23 +11,18 @@ import ( func NewStateMs(vs *memory.VerifySimulation) ms_api.MsgTask { return ms_api.NewMonitorTask("仿真状态", func() { ecs.WorldStateChangeEvent.Subscribe(vs.World, func(_ ecs.World, e ecs.WorldStateChange) { + s := &state.SimulationStatus{SimulationId: vs.SimulationId} switch e.NewState { case ecs.WorldClose: - mqtt.PubSimulationState(vs.SimulationId, &state.SimulationStatus{ - SimulationId: vs.SimulationId, - State: state.SimulationStatus_DESTROY, - }) + s.State = state.SimulationStatus_DESTROY case ecs.WorldError: - mqtt.PubSimulationState(vs.SimulationId, &state.SimulationStatus{ - SimulationId: vs.SimulationId, - State: state.SimulationStatus_ERROR, - }) + s.State = state.SimulationStatus_ERROR case ecs.WorldPause: - mqtt.PubSimulationState(vs.SimulationId, &state.SimulationStatus{ - SimulationId: vs.SimulationId, - State: state.SimulationStatus_PAUSE, - }) + s.State = state.SimulationStatus_PAUSE + default: + return } + mqtt.GetMsgClient().PubSimulationState(vs.SimulationId, s) }) }) } diff --git a/mqtt/client.go b/mqtt/client.go index 21282c6..9137443 100644 --- a/mqtt/client.go +++ b/mqtt/client.go @@ -3,32 +3,27 @@ package mqtt import ( "context" "fmt" - "math/rand" - "time" "github.com/eclipse/paho.golang/autopaho" "github.com/eclipse/paho.golang/paho" - "github.com/google/uuid" "github.com/sagikazarmark/slog-shim" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/reflect/protoreflect" + "joylink.club/bj-rtsts-server/message_server/ms_api" "joylink.club/bj-rtsts-server/ts/protos/state" ) var mqttClient *MqttClient -var clientId string // 客户端 type MqttClient struct { - cmc *MqttOptions - cc *autopaho.ClientConfig - cm *autopaho.ConnectionManager + cc *autopaho.ClientConfig + cm *autopaho.ConnectionManager + tasks []ms_api.MsgTask } // 初始化并启动MQTT客户端服务 func Startup(cmc *MqttOptions) error { - initClientId() - cmc.ClientId = clientId if err := checkConfig(cmc); err != nil { return err } @@ -40,7 +35,7 @@ func Startup(cmc *MqttOptions) error { if err != nil { return err } - mqttClient = &MqttClient{cmc: cmc, cc: cc, cm: cm} + mqttClient = &MqttClient{cc: cc, cm: cm} return nil } @@ -64,23 +59,17 @@ func checkConfig(cmc *MqttOptions) error { return nil } -// 初始化MQTT客户端id -func initClientId() { - if clientId == "" { - us := uuid.New().String() - usl := len(us) - sufix5 := us[usl-5 : usl] - clientId = fmt.Sprintf("%s%d", sufix5, rand.New(rand.NewSource(time.Now().UnixNano())).Int()%1000) - } +func GetMsgClient() *MqttClient { + return mqttClient } // 获取MQTT客户端id func GetClientId() string { - return clientId + return mqttClient.cc.ClientConfig.ClientID } // 发布数据 -func pub(topic string, data protoreflect.ProtoMessage) error { +func (client *MqttClient) pub(topic string, data protoreflect.ProtoMessage) error { if data == nil { return fmt.Errorf("发布数据引用为nil") } @@ -92,7 +81,7 @@ func pub(topic string, data protoreflect.ProtoMessage) error { slog.Error("未知发布主题", "topic", topic, "data", data) return fmt.Errorf("未知发布主题: topic=%s", topic) } - _, err = mqttClient.cm.Publish(context.Background(), &paho.Publish{ + _, err = client.cm.Publish(context.Background(), &paho.Publish{ Topic: topic, QoS: 0, Payload: b, @@ -100,27 +89,40 @@ func pub(topic string, data protoreflect.ProtoMessage) error { return err } +// 发布任务 +func (client *MqttClient) PublishTask(tasks ...ms_api.MsgTask) { + client.tasks = append(client.tasks, tasks...) +} + +// 停止任务 +func (client *MqttClient) CloseTask() { + for _, task := range client.tasks { + task.Stop() + } + client.tasks = nil +} + // 发送仿真状态数据 -func PubSimulationState(simulationId string, msg *state.SimulationStatus) error { - return pub(GetStateTopic(simulationId), msg) +func (client *MqttClient) PubSimulationState(simulationId string, msg *state.SimulationStatus) error { + return client.pub(GetStateTopic(simulationId), msg) } // 发送IBP状态数据 -func PubIBPState(simulationId string, mapId int32, stationId uint32, msg *state.PushedDevicesStatus) error { - return pub(GetIbpTopic(simulationId, mapId, stationId), msg) +func (client *MqttClient) PubIBPState(simulationId string, mapId int32, stationId uint32, msg *state.PushedDevicesStatus) error { + return client.pub(GetIbpTopic(simulationId, mapId, stationId), msg) } // 发送PSL状态数据 -func PubPSLState(simulationId string, mapId int32, boxId uint32, msg *state.PushedDevicesStatus) error { - return pub(GetPslTopic(simulationId, mapId, boxId), msg) +func (client *MqttClient) PubPSLState(simulationId string, mapId int32, boxId uint32, msg *state.PushedDevicesStatus) error { + return client.pub(GetPslTopic(simulationId, mapId, boxId), msg) } // 发送继电器状态数据 -func PubRCCState(simulationId string, mapId int32, msg *state.PushedDevicesStatus) error { - return pub(GetRccTopic(simulationId, mapId), msg) +func (client *MqttClient) PubRCCState(simulationId string, mapId int32, msg *state.PushedDevicesStatus) error { + return client.pub(GetRccTopic(simulationId, mapId), msg) } // 发送站场图状态数据 -func PubSfpState(simulationId string, mapId int32, msg *state.PushedDevicesStatus) error { - return pub(GetSfpTopic(simulationId, mapId), msg) +func (client *MqttClient) PubSfpState(simulationId string, mapId int32, msg *state.PushedDevicesStatus) error { + return client.pub(GetSfpTopic(simulationId, mapId), msg) } diff --git a/mqtt/config.go b/mqtt/config.go index 4d98f41..fe795e1 100644 --- a/mqtt/config.go +++ b/mqtt/config.go @@ -3,11 +3,13 @@ package mqtt import ( "fmt" "log/slog" + "math/rand" "net/url" "time" "github.com/eclipse/paho.golang/autopaho" "github.com/eclipse/paho.golang/paho" + "github.com/google/uuid" "joylink.club/bj-rtsts-server/config" ) @@ -28,6 +30,12 @@ func NewMqttOptions(address, username, password string) *MqttOptions { BrokerUrl: address, Username: username, Password: password, + ClientId: (func() string { // 初始化MQTT客户端id + us := uuid.New().String() + usl := len(us) + sufix5 := us[usl-5 : usl] + return fmt.Sprintf("%s%d", sufix5, rand.New(rand.NewSource(time.Now().UnixNano())).Int()%1000) + })(), } }