From 1ff7a8f28ce10d47c2c2144544f284397a33c779 Mon Sep 17 00:00:00 2001 From: weizhihong Date: Wed, 20 Dec 2023 10:37:54 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E4=BF=AE=E6=94=B9MQTT=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E5=8F=91=E9=80=81=E9=80=BB=E8=BE=91=E3=80=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- message_server/ibp_ms.go | 86 ++++--------- message_server/ms_api/api.go | 100 +++++++++++---- message_server/ms_manage/manage.go | 74 ----------- message_server/psl_ms.go | 68 +++------- message_server/rcc_ms.go | 71 +++-------- message_server/sfp_ms.go | 171 +++++++++++--------------- message_server/simulation.go | 68 ++-------- message_server/simulation_state_ms.go | 65 ++++------ mqtt/client.go | 154 ++++++++++++++--------- mqtt/config.go | 72 +++++++++++ mqtt/topic.go | 75 +++++++++++ tmp/proto_test.go | 86 +++++++++++++ 12 files changed, 574 insertions(+), 516 deletions(-) delete mode 100644 message_server/ms_manage/manage.go create mode 100644 mqtt/config.go create mode 100644 mqtt/topic.go create mode 100644 tmp/proto_test.go diff --git a/message_server/ibp_ms.go b/message_server/ibp_ms.go index b6632a3..d36f1c5 100644 --- a/message_server/ibp_ms.go +++ b/message_server/ibp_ms.go @@ -1,11 +1,10 @@ package message_server import ( - "fmt" "time" - "google.golang.org/protobuf/proto" "joylink.club/bj-rtsts-server/message_server/ms_api" + "joylink.club/bj-rtsts-server/mqtt" "joylink.club/bj-rtsts-server/ts/protos/graphicData" "joylink.club/bj-rtsts-server/ts/protos/state" "joylink.club/bj-rtsts-server/ts/simulation/wayside/memory" @@ -15,68 +14,42 @@ import ( ) // 综合后备盘IBP消息服务 -type IbpMs struct { - vs *memory.VerifySimulation - mapId int32 -} - -func NewIBPMs(vs *memory.VerifySimulation, mapId int32) *IbpMs { - return &IbpMs{vs: vs, mapId: mapId} -} - -func (ms *IbpMs) GetChannel() string { - return SimulationIbpTopic - // return "simulation-ibp-%s_%d_%s-status" -} - -func (ms *IbpMs) GetInterval() time.Duration { - return 200 * time.Millisecond -} - -func (ms *IbpMs) OnTick() ([]*ms_api.TopicMsg, error) { - var msgArr []*ms_api.TopicMsg - mapData := memory.QueryGiData[*graphicData.RtssGraphicStorage](ms.mapId) - for _, station := range mapData.Stations { - sid := memory.GetMapElementId(station.Common) - channel := ms.handlerIBPChannelName(sid) - stationIbpState, err := ms.collectStationIbpState(station) - if err != nil { - return nil, err +func NewIBPMs(vs *memory.VerifySimulation, mapId int32) ms_api.MsgTask { + mapData := memory.QueryGiData[*graphicData.RtssGraphicStorage](mapId) + return ms_api.NewScheduleTask("综合后备盘IBP", func() error { + for _, station := range mapData.Stations { + sid := memory.GetMapElementId(station.Common) + stationIbpState, err := collectStationIbpState(mapId, vs.World, station) + if err != nil { + return err + } + mqtt.PubIBPState(vs.SimulationId, mapId, sid, stationIbpState) } - b, err := proto.Marshal(stationIbpState) - if err != nil { - return nil, fmt.Errorf("IBP设备状态消息服务数据序列化失败: %s", err) - } - msgArr = append(msgArr, ms_api.NewTopicMsg(channel, b)) - } - return msgArr, nil + return nil + }, 200*time.Millisecond) } -// 当发生错误时执行的逻辑 -func (ms *IbpMs) OnError(err error) { -} - -func (ms *IbpMs) collectStationIbpState(station *graphicData.Station) (*state.PushedDevicesStatus, error) { +func collectStationIbpState(mapId int32, world ecs.World, station *graphicData.Station) (*state.PushedDevicesStatus, error) { if station.RefIbpMapCode == "" { return nil, nil } sid := memory.GetMapElementId(station.Common) - stationUid := memory.QueryUidByMidAndComId(ms.mapId, sid, &graphicData.Station{}) + stationUid := memory.QueryUidByMidAndComId(mapId, sid, &graphicData.Station{}) ibpMapId, ibpStorage := memory.QueryGiDataByName[*graphicData.IBPGraphicStorage](station.RefIbpMapCode) ibpUidsMap := memory.QueryUidStructure[*memory.IBPUidStructure](ibpMapId) - buttonStates, err := ms.collectIBPButtonState(stationUid, ibpUidsMap, ibpStorage.IbpButtons) + buttonStates, err := collectIBPButtonState(world, stationUid, ibpUidsMap, ibpStorage.IbpButtons) if err != nil { return nil, err } - alarmStates, err := ms.collectIBPAlarmState(stationUid, ibpUidsMap, ibpStorage.IbpAlarms) + alarmStates, err := collectIBPAlarmState(world, stationUid, ibpUidsMap, ibpStorage.IbpAlarms) if err != nil { return nil, err } - lightStates, err := ms.collectIBPLightState(stationUid, ibpUidsMap, ibpStorage.IbpLights) + lightStates, err := collectIBPLightState(world, stationUid, ibpUidsMap, ibpStorage.IbpLights) if err != nil { return nil, err } - keyStates, err := ms.collectIBPKeyState(stationUid, ibpUidsMap, ibpStorage.IbpKeys) + keyStates, err := collectIBPKeyState(world, stationUid, ibpUidsMap, ibpStorage.IbpKeys) if err != nil { return nil, err } @@ -92,12 +65,12 @@ func (ms *IbpMs) collectStationIbpState(station *graphicData.Station) (*state.Pu } // 收集IBP按钮状态 -func (ms *IbpMs) collectIBPButtonState(stationUid string, uidsMap *memory.IBPUidStructure, ibpButtons []*graphicData.IBPButton) ([]*state.ButtonState, error) { +func collectIBPButtonState(world ecs.World, stationUid string, uidsMap *memory.IBPUidStructure, ibpButtons []*graphicData.IBPButton) ([]*state.ButtonState, error) { var states []*state.ButtonState for _, data := range ibpButtons { // 按钮 did := memory.GetMapElementId(data.Common) uid := stationUid + "_" + uidsMap.IbpButtonIds[did].Uid - entry, ok := entity.GetEntityByUid(ms.vs.World, uid) + entry, ok := entity.GetEntityByUid(world, uid) if !ok { continue } @@ -122,12 +95,12 @@ func getIBPButtonState(id uint32, entry *ecs.Entry) *state.ButtonState { } // 收集报警器状态 -func (ms *IbpMs) collectIBPAlarmState(stationUid string, uidsMap *memory.IBPUidStructure, ibpAlarms []*graphicData.IbpAlarm) ([]*state.AlarmState, error) { +func collectIBPAlarmState(world ecs.World, stationUid string, uidsMap *memory.IBPUidStructure, ibpAlarms []*graphicData.IbpAlarm) ([]*state.AlarmState, error) { var states []*state.AlarmState for _, data := range ibpAlarms { // 报警器 did := memory.GetMapElementId(data.Common) uid := stationUid + "_" + uidsMap.IbpAlarmIds[did].Uid - entry, ok := entity.GetEntityByUid(ms.vs.World, uid) + entry, ok := entity.GetEntityByUid(world, uid) if !ok { continue } @@ -139,12 +112,12 @@ func (ms *IbpMs) collectIBPAlarmState(stationUid string, uidsMap *memory.IBPUidS } // 收集灯状态信息 -func (ms *IbpMs) collectIBPLightState(stationUid string, uidsMap *memory.IBPUidStructure, ibpLights []*graphicData.IbpLight) ([]*state.LightState, error) { +func collectIBPLightState(world ecs.World, stationUid string, uidsMap *memory.IBPUidStructure, ibpLights []*graphicData.IbpLight) ([]*state.LightState, error) { var states []*state.LightState for _, data := range ibpLights { // 指示灯 did := memory.GetMapElementId(data.Common) uid := stationUid + "_" + uidsMap.IbpLightIds[did].Uid - entry, ok := entity.GetEntityByUid(ms.vs.World, uid) + entry, ok := entity.GetEntityByUid(world, uid) if !ok { continue } @@ -156,12 +129,12 @@ func (ms *IbpMs) collectIBPLightState(stationUid string, uidsMap *memory.IBPUidS } // 收集钥匙状态 -func (ms *IbpMs) collectIBPKeyState(stationUid string, uidsMap *memory.IBPUidStructure, ibpKeys []*graphicData.IbpKey) ([]*state.KeyState, error) { +func collectIBPKeyState(world ecs.World, stationUid string, uidsMap *memory.IBPUidStructure, ibpKeys []*graphicData.IbpKey) ([]*state.KeyState, error) { var states []*state.KeyState for _, data := range ibpKeys { // 钥匙 did := memory.GetMapElementId(data.Common) uid := stationUid + "_" + uidsMap.IbpKeyIds[did].Uid - entry, ok := entity.GetEntityByUid(ms.vs.World, uid) + entry, ok := entity.GetEntityByUid(world, uid) if !ok { continue } @@ -171,8 +144,3 @@ func (ms *IbpMs) collectIBPKeyState(stationUid string, uidsMap *memory.IBPUidStr } return states, nil } - -// 处理订阅通道名称 -func (ms *IbpMs) handlerIBPChannelName(stationId uint32) string { - return fmt.Sprintf(SimulationIbpTopic, ms.vs.SimulationId, ms.mapId, stationId) -} diff --git a/message_server/ms_api/api.go b/message_server/ms_api/api.go index 88f3e4e..d274ba7 100644 --- a/message_server/ms_api/api.go +++ b/message_server/ms_api/api.go @@ -1,34 +1,86 @@ package ms_api -import "time" +import ( + "context" + "fmt" + "time" -// 消息服务 -type IMsgServer interface { - // 获取消息服务名 - GetChannel() string + "joylink.club/bj-rtsts-server/sys_error" +) - // 发送消息间隔时间,单位ms - GetInterval() time.Duration - - // 构造定时发送的消息 - OnTick() ([]*TopicMsg, error) - - // 当发生错误时执行的逻辑 - OnError(err error) +type MsgTask interface { + // 停止 + Stop() } -// 消息实体 -type TopicMsg struct { - // 通道 - Channel string - - // 消息内容 - Data []byte +// 监控型任务 +type msgMonitorTask struct { + name string + fn func() } -func NewTopicMsg(channel string, data []byte) *TopicMsg { - return &TopicMsg{ - Channel: channel, - Data: data, +// 监控型任务停止 +func (t *msgMonitorTask) Stop() { + fmt.Printf("【%s】处理任务线程退出", t.name) +} + +// 定时型任务 +type msgScheduleTask struct { + name string + fn func() error + interval time.Duration + cancel context.CancelFunc + done chan struct{} // 服务协程退出信号 +} + +// Stop +func (t *msgScheduleTask) Stop() { + t.cancel() + <-t.done + fmt.Printf("【%s】处理任务线程退出", t.name) +} + +// 定时任务运行 +func (t *msgScheduleTask) run(ctx context.Context) { + defer close(t.done) +mainLoop: + for { + select { + case <-ctx.Done(): + break mainLoop + default: + } + err := t.fn() + if err != nil { + panic(sys_error.New(fmt.Sprintf("仿真任务【%s】状态消息收集异常", t.name), err)) + } + time.Sleep(t.interval) } } + +// 创建定时任务 +func NewScheduleTask(name string, run func() error, interval time.Duration) MsgTask { + if interval <= 0 { + interval = time.Second + } + task := &msgScheduleTask{ + name: name, + fn: run, + interval: interval, + done: make(chan struct{}), + } + ctx, cancel := context.WithCancel(context.Background()) + go task.run(ctx) + task.cancel = cancel + return task +} + +// 创建监控任务 +func NewMonitorTask(name string, run func()) MsgTask { + task := &msgMonitorTask{ + name: name, + fn: run, + } + go task.fn() + return task +} diff --git a/message_server/ms_manage/manage.go b/message_server/ms_manage/manage.go deleted file mode 100644 index 4b8dd25..0000000 --- a/message_server/ms_manage/manage.go +++ /dev/null @@ -1,74 +0,0 @@ -package ms_manage - -import ( - "context" - "log/slog" - "runtime/debug" - "time" - - "joylink.club/bj-rtsts-server/message_server/ms_api" - "joylink.club/bj-rtsts-server/mqtt" -) - -type MsgServer struct { - ms_api.IMsgServer - ctx context.Context - cancelFn context.CancelFunc -} - -// 消息服务管理map -var servers map[string]*MsgServer = make(map[string]*MsgServer) - -// 注册消息服务 -func Register(server ms_api.IMsgServer) *MsgServer { - ms := &MsgServer{ - IMsgServer: server, - } - ctx, cancelFn := context.WithCancel(context.Background()) - ms.ctx = ctx - ms.cancelFn = cancelFn - go run(ms) - servers[server.GetChannel()] = ms - return ms -} - -// 注销消息服务 -func Unregister(server ms_api.IMsgServer) { - if server == nil { - return - } - s := servers[server.GetChannel()] - s.cancelFn() - delete(servers, server.GetChannel()) -} - -// 消息服务运行 -func run(server *MsgServer) { - defer func() { - if err := recover(); err != nil { - slog.Error("消息服务运行异常", "channel", server.GetChannel(), "error", err, "stack", string(debug.Stack())) - debug.PrintStack() - } - }() - for { - select { - case <-server.ctx.Done(): - slog.Info("消息服务退出", "channel", server.GetChannel()) - return - default: - } - topicMsgs, err := server.OnTick() - if err != nil { - server.OnError(err) - slog.Error("消息服务构建定时发送消息错误,服务退出", "channel", server.GetChannel(), "error", err) - break - } - if len(topicMsgs) > 0 { - for _, msg := range topicMsgs { - // apiproto.PublishMsg(msg.Channel, msg.Data) - mqtt.Publish(msg.Channel, msg.Data) - } - } - time.Sleep(server.GetInterval()) - } -} diff --git a/message_server/psl_ms.go b/message_server/psl_ms.go index 0562af1..bcd60d9 100644 --- a/message_server/psl_ms.go +++ b/message_server/psl_ms.go @@ -4,66 +4,34 @@ import ( "fmt" "time" - "joylink.club/ecs" - "joylink.club/rtsssimulation/component" - "joylink.club/rtsssimulation/entity" - - "google.golang.org/protobuf/proto" "joylink.club/bj-rtsts-server/message_server/ms_api" + "joylink.club/bj-rtsts-server/mqtt" "joylink.club/bj-rtsts-server/ts/protos/graphicData" "joylink.club/bj-rtsts-server/ts/protos/state" "joylink.club/bj-rtsts-server/ts/simulation/wayside/memory" + "joylink.club/ecs" + "joylink.club/rtsssimulation/component" + "joylink.club/rtsssimulation/entity" ) -// PSL或门控箱消息服务 -type PslMs struct { - vs *memory.VerifySimulation - mapId int32 -} - -func NewPSLMs(vs *memory.VerifySimulation, mapId int32) *PslMs { - return &PslMs{vs: vs, mapId: mapId} -} - -func (p *PslMs) GetChannel() string { - return SimulationPslTopic - // return "simulation-psl-%s_%d_%s-status" -} - -func (p *PslMs) GetInterval() time.Duration { - return 200 * time.Millisecond -} - -func (p *PslMs) OnTick() ([]*ms_api.TopicMsg, error) { - var msgArr []*ms_api.TopicMsg - mapData := memory.QueryGiData[*graphicData.RtssGraphicStorage](p.mapId) - for _, box := range mapData.GateBoxs { - did := memory.GetMapElementId(box.Common) - channel := p.handlerPSLChannelName(did) - state, err := p.collectGateBoxPSLState(box) - if err != nil { - return nil, err +func NewPSLMs(vs *memory.VerifySimulation, mapId int32) ms_api.MsgTask { + mapData := memory.QueryGiData[*graphicData.RtssGraphicStorage](mapId) + return ms_api.NewScheduleTask("综合后备盘IBP", func() error { + for _, box := range mapData.GateBoxs { + did := memory.GetMapElementId(box.Common) + state, err := collectGateBoxPSLState(vs.World, mapId, box) + if err != nil { + return err + } + mqtt.PubPSLState(vs.SimulationId, mapId, did, state) } - b, err := proto.Marshal(state) - if err != nil { - return nil, fmt.Errorf("PSL设备状态消息服务数据序列化失败: %s", err) - } - msgArr = append(msgArr, ms_api.NewTopicMsg(channel, b)) - } - return msgArr, nil + return nil + }, 200*time.Millisecond) } -func (p *PslMs) OnError(err error) {} - -// 处理订阅通道名称 -func (p *PslMs) handlerPSLChannelName(gateBoxId uint32) string { - return fmt.Sprintf(SimulationPslTopic, p.vs.SimulationId, p.mapId, gateBoxId) -} - -func (p *PslMs) collectGateBoxPSLState(box *graphicData.GatedBox) (*state.PushedDevicesStatus, error) { - world := p.vs.World +func collectGateBoxPSLState(world ecs.World, mapId int32, box *graphicData.GatedBox) (*state.PushedDevicesStatus, error) { did := memory.GetMapElementId(box.Common) - uidStructure := memory.QueryUidStructure[*memory.StationUidStructure](p.mapId) + uidStructure := memory.QueryUidStructure[*memory.StationUidStructure](mapId) boxUid := uidStructure.GateBoxIds[did].Uid mkxEntry, ok := entity.GetEntityByUid(world, boxUid) if !ok { diff --git a/message_server/rcc_ms.go b/message_server/rcc_ms.go index bb64012..42a70ad 100644 --- a/message_server/rcc_ms.go +++ b/message_server/rcc_ms.go @@ -1,73 +1,42 @@ package message_server import ( - "fmt" "time" - "google.golang.org/protobuf/proto" "joylink.club/bj-rtsts-server/message_server/ms_api" + "joylink.club/bj-rtsts-server/mqtt" "joylink.club/bj-rtsts-server/ts/protos/graphicData" "joylink.club/bj-rtsts-server/ts/protos/state" "joylink.club/bj-rtsts-server/ts/simulation/wayside/memory" + "joylink.club/ecs" "joylink.club/rtsssimulation/component" "joylink.club/rtsssimulation/entity" ) -// 继电器组合柜布置图消息服务 -type RccMs struct { - vs *memory.VerifySimulation - mapId int32 - channel string -} - -func NewRccMs(vs *memory.VerifySimulation, mapId int32) *RccMs { - return &RccMs{ - vs: vs, - mapId: mapId, - channel: fmt.Sprintf(SimulationRccTopic, vs.SimulationId, mapId), - } -} - -// 获取消息服务名 -func (r *RccMs) GetChannel() string { - return r.channel -} - -// 发送消息间隔时间,单位ms -func (r *RccMs) GetInterval() time.Duration { - return 200 * time.Millisecond -} - -// 构造定时发送的消息 -func (r *RccMs) OnTick() ([]*ms_api.TopicMsg, error) { - relayStates, err := r.collectRelayState() - if err != nil { - return nil, err - } - ststes := &state.PushedDevicesStatus{ - All: true, - AllStatus: &state.AllDevicesStatus{ - ReplyState: relayStates, - }, - } - b, err := proto.Marshal(ststes) - if err != nil { - return nil, fmt.Errorf("信号布置图设备状态消息服务数据序列化失败, %s", err) - } - return []*ms_api.TopicMsg{ms_api.NewTopicMsg(r.channel, b)}, nil -} - -// 当发生错误时执行的逻辑 -func (r *RccMs) OnError(err error) { +func NewRccMs(vs *memory.VerifySimulation, mapId int32) ms_api.MsgTask { + return ms_api.NewScheduleTask("继电器状态", func() error { + relayStates, err := collectRelayState(vs.World, mapId) + if err != nil { + return err + } + ststes := &state.PushedDevicesStatus{ + All: true, + AllStatus: &state.AllDevicesStatus{ + ReplyState: relayStates, + }, + } + mqtt.PubRCCState(vs.SimulationId, mapId, ststes) + return nil + }, 200*time.Millisecond) } // 获取仿真地图的继电器状态,前端推送 -func (r *RccMs) collectRelayState() ([]*state.ReplyState, error) { +func collectRelayState(world ecs.World, mapId int32) ([]*state.ReplyState, error) { // 获取本地图下的继电器信息 - uidMap := memory.QueryMapUidMapByType(r.mapId, &graphicData.Relay{}) + uidMap := memory.QueryMapUidMapByType(mapId, &graphicData.Relay{}) var replyStateArr []*state.ReplyState for _, u := range uidMap { - entry, ok := entity.GetEntityByUid(r.vs.World, u.Uid) + entry, ok := entity.GetEntityByUid(world, u.Uid) if !ok { // 暂时注释,很多继电器都没初始化 //return nil, fmt.Errorf("继电器实体不存在: World id=%d, uid=%s", r.vs.World.Id(), u.Uid) diff --git a/message_server/sfp_ms.go b/message_server/sfp_ms.go index ec26cc9..17086de 100644 --- a/message_server/sfp_ms.go +++ b/message_server/sfp_ms.go @@ -5,8 +5,8 @@ import ( "strings" "time" - "google.golang.org/protobuf/proto" "joylink.club/bj-rtsts-server/message_server/ms_api" + "joylink.club/bj-rtsts-server/mqtt" "joylink.club/bj-rtsts-server/ts/protos/graphicData" "joylink.club/bj-rtsts-server/ts/protos/state" "joylink.club/bj-rtsts-server/ts/simulation/wayside/memory" @@ -16,88 +16,58 @@ import ( ) // 信号平面布置图消息服务 -type SfpMs struct { - vs *memory.VerifySimulation - mapId int32 - channel string -} -func NewSfpMs(vs *memory.VerifySimulation, mapId int32) *SfpMs { - return &SfpMs{ - vs: vs, - mapId: mapId, - channel: fmt.Sprintf(SimulationSfpTopic, vs.SimulationId, mapId), - } -} - -// 获取通道名 -func (ms *SfpMs) GetChannel() string { - return ms.channel -} - -// 发送消息间隔时间,单位ms -func (ms *SfpMs) GetInterval() time.Duration { - return 200 * time.Millisecond -} - -// 定时发送的消息 -func (ms *SfpMs) OnTick() ([]*ms_api.TopicMsg, error) { - turnoutStates, err := ms.collectTurnoutStates() - if err != nil { - return nil, err - } - signalStates, err := ms.collectSignalStates() - if err != nil { - return nil, err - } - buttonStates, err := ms.collectStationButtonStates() - if err != nil { - return nil, err - } - psdStates, err := ms.collectPsdStates() - if err != nil { - return nil, err - } - sectionStates, err := ms.collectSectionStates() - if err != nil { - return nil, err - } - platformStates, err := ms.collectPlatformStates() - if err != nil { - return nil, err - } - trainState, err := ms.collectTrainStates() - if err != nil { - return nil, err - } - ststes := &state.PushedDevicesStatus{ - All: true, - AllStatus: &state.AllDevicesStatus{ - TrainState: trainState, - SwitchState: turnoutStates, - SignalState: signalStates, - ButtonState: buttonStates, - PsdState: psdStates, - SectionState: sectionStates, - PlatformState: platformStates, - }, - } - b, err := proto.Marshal(ststes) - if err != nil { - return nil, fmt.Errorf("信号布置图设备状态消息服务数据序列化失败, %s", err) - } - return []*ms_api.TopicMsg{ms_api.NewTopicMsg(ms.channel, b)}, nil -} - -func (ms *SfpMs) OnError(err error) { - // TODO: 错误处理 +func NewSfpMs(vs *memory.VerifySimulation, mapId int32) ms_api.MsgTask { + return ms_api.NewScheduleTask("信号平面布置图", func() error { + turnoutStates, err := collectTurnoutStates(vs, mapId) + if err != nil { + return err + } + signalStates, err := collectSignalStates(vs.World, mapId) + if err != nil { + return err + } + buttonStates, err := collectStationButtonStates(vs.World, mapId) + if err != nil { + return err + } + psdStates, err := collectPsdStates(vs.World, mapId) + if err != nil { + return err + } + sectionStates, err := collectSectionStates(vs.World, mapId) + if err != nil { + return err + } + platformStates, err := collectPlatformStates(vs.World, mapId) + if err != nil { + return err + } + trainState, err := collectTrainStates(vs) + if err != nil { + return err + } + ststes := &state.PushedDevicesStatus{ + All: true, + AllStatus: &state.AllDevicesStatus{ + TrainState: trainState, + SwitchState: turnoutStates, + SignalState: signalStates, + ButtonState: buttonStates, + PsdState: psdStates, + SectionState: sectionStates, + PlatformState: platformStates, + }, + } + mqtt.PubSfpState(vs.SimulationId, mapId, ststes) + return nil + }, 200*time.Millisecond) } // 收集屏蔽门状态 -func (ms *SfpMs) collectPsdStates() ([]*state.PsdState, error) { - world := ms.vs.World - uidStructure := memory.QueryUidStructure[*memory.StationUidStructure](ms.mapId) - data := memory.QueryGiData[*graphicData.RtssGraphicStorage](ms.mapId) +func collectPsdStates(world ecs.World, mapId int32) ([]*state.PsdState, error) { + uidStructure := memory.QueryUidStructure[*memory.StationUidStructure](mapId) + data := memory.QueryGiData[*graphicData.RtssGraphicStorage](mapId) var psdStateArr []*state.PsdState for _, door := range data.ScreenDoors { did := memory.GetMapElementId(door.Common) @@ -126,11 +96,11 @@ func (ms *SfpMs) collectPsdStates() ([]*state.PsdState, error) { } // 收集区段状态 -func (ms *SfpMs) collectSectionStates() ([]*state.SectionState, error) { - uidMap := memory.QueryMapUidMapByType(ms.mapId, &graphicData.Section{}) +func collectSectionStates(world ecs.World, mapId int32) ([]*state.SectionState, error) { + uidMap := memory.QueryMapUidMapByType(mapId, &graphicData.Section{}) var sectionArr []*state.SectionState for _, u := range uidMap { - s := handlerSectionState(ms.vs.World, u.Uid) + s := handlerSectionState(world, u.Uid) if s == nil { continue } @@ -157,14 +127,14 @@ func handlerSectionState(w ecs.World, uid string) *state.SectionState { } // 收集车站按钮状态 -func (ms *SfpMs) collectStationButtonStates() ([]*state.ButtonState, error) { +func collectStationButtonStates(world ecs.World, mapId int32) ([]*state.ButtonState, error) { // 获取地图上的按钮状态 - uidMap := memory.QueryMapUidMapByType(ms.mapId, &graphicData.EsbButton{}) + uidMap := memory.QueryMapUidMapByType(mapId, &graphicData.EsbButton{}) var btnStateArr []*state.ButtonState for _, u := range uidMap { - entry, ok := entity.GetEntityByUid(ms.vs.World, u.Uid) + entry, ok := entity.GetEntityByUid(world, u.Uid) if !ok { - return nil, fmt.Errorf("ESB按钮实体不存在: World id=%d, uid=%s", ms.vs.World.Id(), u.Uid) + return nil, fmt.Errorf("ESB按钮实体不存在: World id=%d, uid=%s", world.Id(), u.Uid) } if entry.HasComponent(component.ButtonTag) { // 按钮 bit := component.BitStateType.Get(entry) @@ -175,11 +145,11 @@ func (ms *SfpMs) collectStationButtonStates() ([]*state.ButtonState, error) { } // 收集信号机状态 -func (ms *SfpMs) collectSignalStates() ([]*state.SignalState, error) { - uidMap := memory.QueryMapUidMapByType(ms.mapId, &graphicData.Signal{}) +func collectSignalStates(world ecs.World, mapId int32) ([]*state.SignalState, error) { + uidMap := memory.QueryMapUidMapByType(mapId, &graphicData.Signal{}) var signalArr []*state.SignalState for _, u := range uidMap { - s, err := handlerSignalState(ms.vs.World, u.Uid) + s, err := handlerSignalState(world, u.Uid) if err != nil { return nil, err } @@ -238,8 +208,8 @@ func handlerSignalState(w ecs.World, uid string) (*state.SignalState, error) { } // 收集列车状态 -func (ms *SfpMs) collectTrainStates() ([]*state.TrainMapState, error) { - allTrainMap := &ms.vs.Memory.Status.TrainStateMap +func collectTrainStates(vs *memory.VerifySimulation) ([]*state.TrainMapState, error) { + allTrainMap := &vs.Memory.Status.TrainStateMap var trainArr []*state.TrainMapState allTrainMap.Range(func(_, v any) bool { trainArr = append(trainArr, convertTrainState(v.(*state.TrainState))) @@ -318,9 +288,8 @@ func convertTrainState(v *state.TrainState) *state.TrainMapState { } // 收集道岔状态 -func (ms *SfpMs) collectTurnoutStates() ([]*state.SwitchState, error) { - sim := ms.vs - uidMap := memory.QueryMapUidMapByType(ms.mapId, &graphicData.Turnout{}) +func collectTurnoutStates(sim *memory.VerifySimulation, mapId int32) ([]*state.SwitchState, error) { + uidMap := memory.QueryMapUidMapByType(mapId, &graphicData.Turnout{}) wd := entity.GetWorldData(sim.World) var switchArr []*state.SwitchState for _, u := range uidMap { @@ -394,10 +363,10 @@ func (ms *SfpMs) collectTurnoutStates() ([]*state.SwitchState, error) { } // 收集站台状态 -func (ms *SfpMs) collectPlatformStates() ([]*state.PlatformState, error) { +func collectPlatformStates(world ecs.World, mapId int32) ([]*state.PlatformState, error) { var states []*state.PlatformState - mapData := memory.QueryGiData[*graphicData.RtssGraphicStorage](ms.mapId) - uidsMap := memory.QueryUidStructure[*memory.StationUidStructure](ms.mapId) + mapData := memory.QueryGiData[*graphicData.RtssGraphicStorage](mapId) + uidsMap := memory.QueryUidStructure[*memory.StationUidStructure](mapId) platformScreenDoorMap := wrapScreenDoorToPlatform(mapData) for _, platform := range mapData.Platforms { pid := memory.GetMapElementId(platform.Common) @@ -409,9 +378,9 @@ func (ms *SfpMs) collectPlatformStates() ([]*state.PlatformState, error) { if uidInfo == nil { return nil, fmt.Errorf("车站实体不存在uid映射:id=%v", stationCommonId) } - entry, ok := entity.GetEntityByUid(ms.vs.World, uidInfo.Uid) + entry, ok := entity.GetEntityByUid(world, uidInfo.Uid) if !ok { - return nil, fmt.Errorf("车站实体不存在: World id=%d, uid=%s", ms.vs.World.Id(), uidInfo.Uid) + return nil, fmt.Errorf("车站实体不存在: World id=%d, uid=%s", world.Id(), uidInfo.Uid) } sta := &state.PlatformState{Id: pid} isX := strings.Contains(platform.Code, "下行站台") //下行站台 @@ -440,9 +409,9 @@ func (ms *SfpMs) collectPlatformStates() ([]*state.PlatformState, error) { if !ok { continue } - psdEntry, ok := entity.GetEntityByUid(ms.vs.World, psdUid.Uid) + psdEntry, ok := entity.GetEntityByUid(world, psdUid.Uid) if !ok { - return nil, fmt.Errorf("屏蔽门实体不存在: World id=%d, uid=%s", ms.vs.World.Id(), psdUid.Uid) + return nil, fmt.Errorf("屏蔽门实体不存在: World id=%d, uid=%s", world.Id(), psdUid.Uid) } if psdEntry.HasComponent(component.PlatformMkxCircuitType) { mkxCircuit := component.PlatformMkxCircuitType.Get(psdEntry) diff --git a/message_server/simulation.go b/message_server/simulation.go index 5f0c06e..326d828 100644 --- a/message_server/simulation.go +++ b/message_server/simulation.go @@ -2,36 +2,19 @@ package message_server import ( "sync" - "time" - "joylink.club/bj-rtsts-server/config" "joylink.club/bj-rtsts-server/message_server/ms_api" - "joylink.club/bj-rtsts-server/message_server/ms_manage" "joylink.club/bj-rtsts-server/ts/protos/graphicData" "joylink.club/bj-rtsts-server/ts/simulation/wayside/memory" ) -const ( - SimulationTopicPrefix = "/" + config.SystemName + "/simulation/%s/" - // 仿真状态消息topic - SimulationStateTopic = SimulationTopicPrefix + "state" - // 信号布置图设备状态消息topic - SimulationSfpTopic = SimulationTopicPrefix + "sfp/%d" - // 继电器组合柜布置图设备状态消息topic - SimulationRccTopic = SimulationTopicPrefix + "rcc/%d" - // PSL设备状态消息topic - SimulationPslTopic = SimulationTopicPrefix + "psl/%d/%d" - // IBP设备状态消息topic - SimulationIbpTopic = SimulationTopicPrefix + "ibp/%d/%d" -) - var smsMap sync.Map // 仿真消息服务 // 管理仿真消息服务,整体可以作为一个消息服务,也可以每个消息子服务各自作为一个消息服务,暂时先按整体作为一个消息服务的方式使用 type SimulationMs struct { - vs *memory.VerifySimulation - mss []ms_api.IMsgServer + vs *memory.VerifySimulation + tasks []ms_api.MsgTask } // 启动仿真所需的消息服务 @@ -39,22 +22,22 @@ func Start(vs *memory.VerifySimulation) { _, ok := smsMap.Load(vs.SimulationId) if !ok { sms := &SimulationMs{ - vs: vs, - mss: []ms_api.IMsgServer{}, + vs: vs, + tasks: []ms_api.MsgTask{}, + //mss: []ms_api.IMsgServer{}, } for _, mapId := range vs.MapIds { t := memory.QueryGiType(mapId) switch t { case graphicData.PictureType_StationLayout: // 平面布置图 // 添加车站关联的平面布置图、IBP、PSL信息 - sms.mss = append(sms.mss, NewSfpMs(vs, mapId), NewIBPMs(vs, mapId), NewPSLMs(vs, mapId)) + sms.tasks = append(sms.tasks, NewSfpMs(vs, mapId), NewIBPMs(vs, mapId), NewPSLMs(vs, mapId)) case graphicData.PictureType_RelayCabinetLayout: // 继电器柜 - sms.mss = append(sms.mss, NewRccMs(vs, mapId)) + sms.tasks = append(sms.tasks, NewRccMs(vs, mapId)) } } // 启动仿真状态服务 - NewSimulationStateMs(vs) - ms_manage.Register(sms) + sms.tasks = append(sms.tasks, NewStateMs(vs)) smsMap.Store(vs.SimulationId, sms) } } @@ -63,36 +46,11 @@ func Start(vs *memory.VerifySimulation) { func Close(vs *memory.VerifySimulation) { sms, ok := smsMap.Load(vs.SimulationId) if ok { - ms_manage.Unregister(sms.(*SimulationMs)) + ms := sms.(*SimulationMs) + for _, task := range ms.tasks { + task.Stop() + } + ms.tasks = nil smsMap.Delete(vs.SimulationId) } } - -// 获取通道 -func (sms *SimulationMs) GetChannel() string { - return sms.vs.SimulationId -} - -// 发送消息间隔时间,单位ms -func (sms *SimulationMs) GetInterval() time.Duration { - return 200 * time.Millisecond -} - -// 构造定时发送的消息 -func (sms *SimulationMs) OnTick() ([]*ms_api.TopicMsg, error) { - var tmList []*ms_api.TopicMsg - for _, ms := range sms.mss { - tm, err := ms.OnTick() - if err != nil { - return nil, err - } - if len(tm) > 0 { - tmList = append(tmList, tm...) - } - } - return tmList, nil -} - -func (sms *SimulationMs) OnError(err error) { - // TODO: 仿真消息错误处理 -} diff --git a/message_server/simulation_state_ms.go b/message_server/simulation_state_ms.go index 78f888d..54f416e 100644 --- a/message_server/simulation_state_ms.go +++ b/message_server/simulation_state_ms.go @@ -1,54 +1,33 @@ package message_server import ( - "fmt" - - "google.golang.org/protobuf/proto" + "joylink.club/bj-rtsts-server/message_server/ms_api" "joylink.club/bj-rtsts-server/mqtt" "joylink.club/bj-rtsts-server/ts/protos/state" "joylink.club/bj-rtsts-server/ts/simulation/wayside/memory" "joylink.club/ecs" ) -// 世界状态变化消息服务 -type SimulationStateMs struct { - vs *memory.VerifySimulation - channel string -} - -func NewSimulationStateMs(vs *memory.VerifySimulation) *SimulationStateMs { - ms := &SimulationStateMs{ - vs: vs, - channel: fmt.Sprintf(SimulationStateTopic, vs.SimulationId), - } - ecs.WorldStateChangeEvent.Subscribe(ms.vs.World, func(_ ecs.World, e ecs.WorldStateChange) { - switch e.NewState { - case ecs.WorldClose: - doSendState(ms.channel, &state.SimulationStatus{ - SimulationId: vs.SimulationId, - State: state.SimulationStatus_DESTROY, - }) - case ecs.WorldError: - doSendState(ms.channel, &state.SimulationStatus{ - SimulationId: vs.SimulationId, - State: state.SimulationStatus_ERROR, - }) - case ecs.WorldPause: - doSendState(ms.channel, &state.SimulationStatus{ - SimulationId: vs.SimulationId, - State: state.SimulationStatus_PAUSE, - }) - } +func NewStateMs(vs *memory.VerifySimulation) ms_api.MsgTask { + return ms_api.NewMonitorTask("仿真状态", func() { + ecs.WorldStateChangeEvent.Subscribe(vs.World, func(_ ecs.World, e ecs.WorldStateChange) { + switch e.NewState { + case ecs.WorldClose: + mqtt.PubSimulationState(vs.SimulationId, &state.SimulationStatus{ + SimulationId: vs.SimulationId, + State: state.SimulationStatus_DESTROY, + }) + case ecs.WorldError: + mqtt.PubSimulationState(vs.SimulationId, &state.SimulationStatus{ + SimulationId: vs.SimulationId, + State: state.SimulationStatus_ERROR, + }) + case ecs.WorldPause: + mqtt.PubSimulationState(vs.SimulationId, &state.SimulationStatus{ + SimulationId: vs.SimulationId, + State: state.SimulationStatus_PAUSE, + }) + } + }) }) - return ms -} - -func doSendState(channel string, status *state.SimulationStatus) error { - b, err := proto.Marshal(status) - if err != nil { - return fmt.Errorf("仿真状态消息服务数据序列化失败, %s", err) - } - mqtt.Publish(channel, b) - // apiproto.PublishMsg(channel, b) - return nil } diff --git a/mqtt/client.go b/mqtt/client.go index aab695a..21282c6 100644 --- a/mqtt/client.go +++ b/mqtt/client.go @@ -1,52 +1,69 @@ package mqtt import ( + "context" "fmt" - "log/slog" "math/rand" "time" - mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/eclipse/paho.golang/autopaho" + "github.com/eclipse/paho.golang/paho" "github.com/google/uuid" + "github.com/sagikazarmark/slog-shim" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protoreflect" + "joylink.club/bj-rtsts-server/ts/protos/state" ) -// MQTT客户端连接配置 -type MqttOptions struct { - // MQTT Broker 代理 - Broker string - // 认证用户名 - Username string - // 认证密码 - Password string -} - -func NewMqttOptions(address, username, password string) MqttOptions { - return MqttOptions{ - Broker: address, - Username: username, - Password: password, - } -} - -var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { - slog.Debug("MQTT收到消息", "topic", msg.Topic(), "msg", string(msg.Payload())) -} - -var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) { - or := client.OptionsReader() - slog.Info("MQTT连接成功", "ClientID", or.ClientID()) - // subs := make(map[string]byte) - // subs["$SYS/brokers/+/clients/+/+"] = 0 - // client.SubscribeMultiple(subs, messagePubHandler) -} - -var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) { - slog.Error("MQTT连接断开", "error", err) -} - -var mqttClient mqtt.Client +var mqttClient *MqttClient var clientId string +// 客户端 +type MqttClient struct { + cmc *MqttOptions + cc *autopaho.ClientConfig + cm *autopaho.ConnectionManager +} + +// 初始化并启动MQTT客户端服务 +func Startup(cmc *MqttOptions) error { + initClientId() + cmc.ClientId = clientId + if err := checkConfig(cmc); err != nil { + return err + } + cc, err := cmc.tryInto() + if err != nil { + return err + } + cm, err := autopaho.NewConnection(context.Background(), *cc) + if err != nil { + return err + } + mqttClient = &MqttClient{cmc: cmc, cc: cc, cm: cm} + return nil +} + +// 检查配置信息 +func checkConfig(cmc *MqttOptions) error { + if cmc.AppId == "" { + return fmt.Errorf("应用编号不能为空") + } + if cmc.ClientId == "" { + return fmt.Errorf("客户端编号不能为空") + } + if cmc.BrokerUrl == "" { + return fmt.Errorf("MQTT代理服务地址不能为空") + } + if cmc.Username == "" { + return fmt.Errorf("MQTT用户名不能为空") + } + if cmc.Password == "" { + return fmt.Errorf("MQTT密码不能为空") + } + return nil +} + // 初始化MQTT客户端id func initClientId() { if clientId == "" { @@ -62,29 +79,48 @@ func GetClientId() string { return clientId } -// 启动MQTT -func Startup(options MqttOptions) { - initClientId() - opts := mqtt.NewClientOptions() - opts.AddBroker(options.Broker) - opts.SetClientID(clientId) - opts.SetUsername(options.Username) - opts.SetPassword(options.Password) - opts.SetDefaultPublishHandler(messagePubHandler) - opts.OnConnect = connectHandler - opts.OnConnectionLost = connectLostHandler - client := mqtt.NewClient(opts) - if token := client.Connect(); token.Wait() && token.Error() != nil { - panic(token.Error()) +// 发布数据 +func pub(topic string, data protoreflect.ProtoMessage) error { + if data == nil { + return fmt.Errorf("发布数据引用为nil") } - mqttClient = client + b, err := proto.Marshal(data) + if err != nil { + return err + } + if !MatchTopic(topic) { + slog.Error("未知发布主题", "topic", topic, "data", data) + return fmt.Errorf("未知发布主题: topic=%s", topic) + } + _, err = mqttClient.cm.Publish(context.Background(), &paho.Publish{ + Topic: topic, + QoS: 0, + Payload: b, + }) + return err } -// 发布消息 -func Publish(dest string, data any) error { - token := mqttClient.Publish(dest, 0, false, data) - if token.Wait() && token.Error() != nil { - return token.Error() - } - return nil +// 发送仿真状态数据 +func PubSimulationState(simulationId string, msg *state.SimulationStatus) error { + return pub(GetStateTopic(simulationId), msg) +} + +// 发送IBP状态数据 +func PubIBPState(simulationId string, mapId int32, stationId uint32, msg *state.PushedDevicesStatus) error { + return pub(GetIbpTopic(simulationId, mapId, stationId), msg) +} + +// 发送PSL状态数据 +func PubPSLState(simulationId string, mapId int32, boxId uint32, msg *state.PushedDevicesStatus) error { + return pub(GetPslTopic(simulationId, mapId, boxId), msg) +} + +// 发送继电器状态数据 +func PubRCCState(simulationId string, mapId int32, msg *state.PushedDevicesStatus) error { + return pub(GetRccTopic(simulationId, mapId), msg) +} + +// 发送站场图状态数据 +func PubSfpState(simulationId string, mapId int32, msg *state.PushedDevicesStatus) error { + return pub(GetSfpTopic(simulationId, mapId), msg) } diff --git a/mqtt/config.go b/mqtt/config.go new file mode 100644 index 0000000..4d98f41 --- /dev/null +++ b/mqtt/config.go @@ -0,0 +1,72 @@ +package mqtt + +import ( + "fmt" + "log/slog" + "net/url" + "time" + + "github.com/eclipse/paho.golang/autopaho" + "github.com/eclipse/paho.golang/paho" + "joylink.club/bj-rtsts-server/config" +) + +type MqttOptions struct { + AppId string // 所属应用编号 + BrokerUrl string // Broker地址 + ClientId string // 客户端ID + Username string // 用户名 + Password string // 密码 + KeepAlive uint16 // 保活时间间隔,单位s,默认为60 + ConnectRetryDelay uint16 // 连接重试延时,单位s,默认为3 + ConnectTimeout uint16 // 连接操作超时,单位s,默认为3 +} + +func NewMqttOptions(address, username, password string) *MqttOptions { + return &MqttOptions{ + AppId: config.SystemName, + BrokerUrl: address, + Username: username, + Password: password, + } +} + +func (c *MqttOptions) tryInto() (*autopaho.ClientConfig, error) { + addr, err := url.Parse(c.BrokerUrl) + if err != nil { + return nil, fmt.Errorf("Mqtt.Address格式错误, %s: %w", c.BrokerUrl, err) + } + if c.KeepAlive == 0 { + c.KeepAlive = 60 + } + if c.ConnectRetryDelay == 0 { + c.ConnectRetryDelay = 3 + } + if c.ConnectTimeout == 0 { + c.ConnectTimeout = 3 + } + cc := &autopaho.ClientConfig{ + BrokerUrls: []*url.URL{ + addr, + }, + KeepAlive: c.KeepAlive, + ConnectRetryDelay: time.Duration(c.ConnectRetryDelay) * time.Second, + ConnectTimeout: time.Duration(c.ConnectTimeout) * time.Second, + OnConnectionUp: func(*autopaho.ConnectionManager, *paho.Connack) { + slog.Info("MQTT连接成功") + }, + OnConnectError: func(err error) { + slog.Error("MQTT连接失败", "error", err) + }, + ClientConfig: paho.ClientConfig{ + ClientID: c.ClientId, + Router: paho.NewStandardRouter(), + OnClientError: func(err error) { fmt.Printf("%s Mqtt客户端发生错误: %s\n", c.ClientId, err) }, + OnServerDisconnect: func(d *paho.Disconnect) { + fmt.Printf("%s 连接断开; reason code: %d,properties: %v\n", c.ClientId, d.ReasonCode, d.Properties) + }, + }, + } + cc.SetUsernamePassword(c.Username, []byte(c.Password)) + return cc, nil +} diff --git a/mqtt/topic.go b/mqtt/topic.go new file mode 100644 index 0000000..7b1eef9 --- /dev/null +++ b/mqtt/topic.go @@ -0,0 +1,75 @@ +package mqtt + +import ( + "fmt" + "strings" + + "joylink.club/bj-rtsts-server/config" +) + +const ( + topicPrefix = "/" + config.SystemName + "/simulation/%s/" + stateTopic = topicPrefix + "state" + sfpTopic = topicPrefix + "sfp/%d" + rccTopic = topicPrefix + "rcc/%d" + pslTopic = topicPrefix + "psl/%d/%d" + ibpTopic = topicPrefix + "ibp/%d/%d" +) + +var topicMap = map[string]string{ + "state": stateTopic, + "sfp": sfpTopic, + "rcc": rccTopic, + "psl": pslTopic, + "ibp": ibpTopic, +} + +// 检测topic是否合法 +func MatchTopic(topic string) bool { + topicArr := strings.Split(topic, "/") + for k, v := range topicMap { + result := strings.Contains(topic, "/"+k) + if result { + fmtArr := strings.Split(v, "/") + for i, f := range fmtArr { + if f == "%s" || f == "%d" { + continue + } else { + result = topicArr[i] == f + } + if !result { + break + } + } + } + if result { + return true + } + } + return false +} + +// 仿真状态消息topic +func GetStateTopic(simulationId string) string { + return fmt.Sprintf(stateTopic, simulationId) +} + +// 信号布置图设备状态消息topic +func GetSfpTopic(simulationId string, mapId int32) string { + return fmt.Sprintf(sfpTopic, simulationId, mapId) +} + +// 继电器组合柜布置图设备状态消息topic +func GetRccTopic(simulationId string, mapId int32) string { + return fmt.Sprintf(rccTopic, simulationId, mapId) +} + +// PSL设备状态消息topic +func GetPslTopic(simulationId string, mapId int32, boxId uint32) string { + return fmt.Sprintf(pslTopic, simulationId, mapId, boxId) +} + +// IBP设备状态消息topic +func GetIbpTopic(simulationId string, mapId int32, stationId uint32) string { + return fmt.Sprintf(ibpTopic, simulationId, mapId, stationId) +} diff --git a/tmp/proto_test.go b/tmp/proto_test.go new file mode 100644 index 0000000..1eadcd9 --- /dev/null +++ b/tmp/proto_test.go @@ -0,0 +1,86 @@ +package memory_test + +import ( + "fmt" + "log/slog" + "testing" + "time" + + "google.golang.org/protobuf/proto" + "joylink.club/bj-rtsts-server/ts/protos/state" +) + +func TestTrainProto(t *testing.T) { + st := &state.TrainState{ + Id: "1", + TrainLength: 96000, + Show: true, + HeadDeviceId: 682, + HeadOffset: 1017073, + PointTo: true, + RunDirection: true, + HeadDirection: true, + DynamicState: &state.TrainDynamicState{ + Heartbeat: 24512, + HeadLinkId: "9", + HeadLinkOffset: 1095653, + TailLinkId: "9", + TailLinkOffset: 999653, + Slope: 5290, + RunningUp: true, + RunningResistanceSum: 1.787, + AirResistance: 9.136, + RampResistance: -8.295, + CurveResistance: 0.947, + Speed: 8011, + HeadSensorSpeed1: 8011, + HeadSensorSpeed2: 8011, + TailSensorSpeed1: 8011, + TailSensorSpeed2: 8011, + Acceleration: -0.011171325, + }, + VobcState: &state.TrainVobcState{ + LifeSignal: 23457, + Tc2Active: true, + TractionForce: 9040, + BrakeForce: 9040, + TrainLoad: 16000, + UpdateTime: 1702534082337, + }, + TrainKilometer: 126984703, + ControlDelayTime: 22, + WheelDiameter: 800, + } + stopTime := time.Now().Add(5 * time.Second) + for { + if stopTime.Before(time.Now()) { + break + } + st.VobcState.UpdateTime = time.Now().Unix() + d, err := proto.Marshal(st) + if err != nil { + slog.Error("转换出错", err) + } + fmt.Println(d) + + time.Sleep(20 * time.Millisecond) + } + + // st2 := &state.TrainState{} + // dd := []byte{ + // 10, 1, 49, 32, 128, 238, 5, 40, 1, 48, 170, 5, 56, 241, 137, + // 62, 72, 1, 80, 1, 88, 1, 98, 67, 8, 192, 191, 1, 18, 1, 57, 24, + // 229, 239, 66, 34, 1, 57, 40, 229, 129, 61, 56, 170, 41, 72, 1, + // 85, 106, 188, 228, 63, 93, 14, 45, 18, 65, 101, 82, 184, 4, 193, + // 109, 152, 110, 114, 63, 112, 203, 62, 120, 203, 62, 128, 1, 203, + // 62, 136, 1, 203, 62, 144, 1, 203, 62, 181, 1, 239, 7, 55, 188, 106, + // 35, 8, 161, 183, 1, 24, 1, 32, 1, 72, 1, 112, 1, 128, 1, 1, 136, 1, 208, + // 70, 144, 1, 208, 70, 152, 1, 128, 125, 208, 1, 247, 223, 193, 183, + // 198, 49, 112, 255, 195, 198, 60, 120, 22, 128, 1, 160, 6, + // } + // err2 := proto.Unmarshal(dd, st2) + // if err2 != nil { + // slog.Error("转换出错", err2) + // } + // fmt.Println(st2) +}