重构消息服务代码结构

实现部分信号布置图的状态消息采集(其他都待重构)
This commit is contained in:
walker 2023-10-26 16:41:18 +08:00
parent cdb9654799
commit eb86063724
17 changed files with 479 additions and 11 deletions

View File

@ -14,10 +14,8 @@ import (
"joylink.club/bj-rtsts-server/ats/verify/protos/state"
"joylink.club/bj-rtsts-server/ats/verify/simulation"
"joylink.club/bj-rtsts-server/ats/verify/simulation/wayside/memory"
"joylink.club/bj-rtsts-server/config"
"joylink.club/bj-rtsts-server/dto"
"joylink.club/bj-rtsts-server/dto/request_proto"
apiproto "joylink.club/bj-rtsts-server/grpcproto"
"joylink.club/bj-rtsts-server/middleware"
"joylink.club/bj-rtsts-server/service"
"joylink.club/bj-rtsts-server/sys_error"
@ -44,10 +42,10 @@ func InitSimulationRouter(api *gin.RouterGroup, authMiddleware *jwt.GinJWTMiddle
// 初始化地图信息
initPublishMapInfo()
apiproto.Register(&apiproto.SimulationServer{})
apiproto.Register(&apiproto.SimulationIBPServer{})
apiproto.Register(&apiproto.SimulationPSLServer{})
apiproto.Register(&apiproto.MemoryChangeServer{SimulationMap: make(map[string]*state.SimulationStatus)})
// apiproto.Register(&apiproto.SimulationServer{})
// apiproto.Register(&apiproto.SimulationIBPServer{})
// apiproto.Register(&apiproto.SimulationPSLServer{})
// apiproto.Register(&apiproto.MemoryChangeServer{SimulationMap: make(map[string]*state.SimulationStatus)})
}
func initPublishMapInfo() {
@ -427,6 +425,9 @@ func pslBtnOperation(c *gin.Context) {
// @Security JwtAuth
//
// @Description 获取仿真信息更新通道名称
//
// Deprecated: 此接口废弃,使用轮询方式更新启动的测试列表
//
// @Tags ATS测试仿真Api
// @Accept json
// @Produce json
@ -436,7 +437,8 @@ func pslBtnOperation(c *gin.Context) {
// @Failure 500 {object} dto.ErrorDto
// @Router /api/v1/simulation/getDataChannelName [get]
func getDataChannelName(c *gin.Context) {
c.JSON(http.StatusOK, config.SimulationId_prefix+apiproto.MemoryChangeServerSuffix)
// c.JSON(http.StatusOK, config.SimulationId_prefix+apiproto.MemoryChangeServerSuffix)
panic(sys_error.New("接口已废弃", fmt.Errorf("此接口废弃,使用轮询方式更新启动的测试列表")))
}
// 获取仿真地图的公里标范围

View File

@ -6,6 +6,7 @@ import (
"joylink.club/bj-rtsts-server/ats/verify/simulation/wayside/memory"
"joylink.club/bj-rtsts-server/config"
"joylink.club/bj-rtsts-server/message_server"
"joylink.club/bj-rtsts-server/sys_error"
"joylink.club/bj-rtsts-server/third_party/dynamics"
"joylink.club/bj-rtsts-server/third_party/semi_physical_train"
@ -55,6 +56,8 @@ func CreateSimulation(projectId int32, mapIds []int32) (string, error) {
simulationMap.Store(simulationId, verifySimulation)
// 全部成功,启动仿真
verifySimulation.World.StartUp()
// 启动仿真消息服务
message_server.Start(verifySimulation)
}
return simulationId, nil
}
@ -69,6 +72,7 @@ func DestroySimulation(simulationId string) {
simulationMap.Delete(simulationId)
// 停止ecs world
simulationInfo.World.Close()
message_server.Close(simulationInfo)
if config.Config.Dynamics.Open {
// 停止动力学接口功能
dynamics.Default().Stop()

View File

@ -297,7 +297,7 @@ func buildRepositoryAllUidsMap(mapIds []int32, repo *repository.Repository) map[
func queryUidStructure[T *stationUidStructure | *relayUidStructure](mapId int32) T {
uidData, ok := giUidMap.Load(mapId)
if !ok {
panic(&dto.ErrorDto{Code: dto.DataNotExist, Message: fmt.Sprintf("地图【id:%d】不存在uid缓存", mapId)})
panic(fmt.Errorf("id=%d的发布图没有uid缓存数据", mapId))
}
return uidData.(T)
}

View File

@ -77,9 +77,9 @@ func run(server *MsgServer) {
}
topicMsgs := server.onTick()
if len(topicMsgs) > 0 {
for _, msg := range topicMsgs {
PublishMsg(msg.channalName, msg.data)
}
// for _, msg := range topicMsgs {
// PublishMsg(msg.channalName, msg.data)
// }
}
time.Sleep(server.getInterval())
}

61
message_server/ibp_ms.go Normal file
View File

@ -0,0 +1,61 @@
package message_server
import (
"fmt"
"strconv"
"strings"
"time"
"google.golang.org/protobuf/proto"
"joylink.club/bj-rtsts-server/ats/verify/protos/graphicData"
"joylink.club/bj-rtsts-server/ats/verify/protos/state"
"joylink.club/bj-rtsts-server/ats/verify/simulation/wayside/memory"
"joylink.club/bj-rtsts-server/message_server/ms_api"
)
// 综合后备盘IBP消息服务
type IbpMs struct {
vs *memory.VerifySimulation
mapId int32
}
func (ms *IbpMs) GetChannel() string {
return "simulation-ibp-{sid}_{mid}_{station}-status"
}
func (ms *IbpMs) GetInterval() time.Duration {
return 200 * time.Millisecond
}
func (ms *IbpMs) OnTick() ([]*ms_api.TopicMsg, error) {
var msgArr []*ms_api.TopicMsg
mapData := memory.QueryGiData[*graphicData.RtssGraphicStorage](ms.mapId)
idStr := strconv.Itoa(int(ms.mapId))
for _, station := range mapData.Stations {
channel := handlerIBPChannelName(ms.vs.SimulationId, idStr, station.Common.Id, ms.GetChannel())
stationIbpState, err := ms.collectStationIbpState(station)
if err != nil {
return nil, err
}
b, err := proto.Marshal(stationIbpState)
if err != nil {
return nil, fmt.Errorf("IBP设备状态消息服务数据序列化失败: %s", err)
}
msgArr = append(msgArr, ms_api.NewTopicMsg(channel, b))
}
return msgArr, nil
}
func (ms *IbpMs) collectStationIbpState(station *graphicData.Station) (*state.PushedDevicesStatus, error) {
// TODO: 重构之前的消息服务
return nil, nil
}
// 处理订阅通道名称
func handlerIBPChannelName(sid, mapId, station string, format string) string {
var channelName string
channelName = strings.Replace(format, "{sid}", sid, 1)
channelName = strings.Replace(channelName, "{mid}", mapId, 1)
channelName = strings.Replace(channelName, "{station}", station, 1)
return channelName
}

View File

@ -0,0 +1,31 @@
package ms_api
import "time"
// 消息服务
type IMsgServer interface {
// 获取消息服务名
GetChannel() string
// 发送消息间隔时间,单位ms
GetInterval() time.Duration
// 构造定时发送的消息
OnTick() ([]*TopicMsg, error)
}
// 消息实体
type TopicMsg struct {
// 通道
Channel string
// 消息内容
Data []byte
}
func NewTopicMsg(channel string, data []byte) *TopicMsg {
return &TopicMsg{
Channel: channel,
Data: data,
}
}

View File

@ -0,0 +1,72 @@
package ms_manage
import (
"context"
"log/slog"
"runtime/debug"
"time"
apiproto "joylink.club/bj-rtsts-server/grpcproto"
"joylink.club/bj-rtsts-server/message_server/ms_api"
)
type MsgServer struct {
ms_api.IMsgServer
ctx context.Context
cancelFn context.CancelFunc
}
// 消息服务管理map
var servers map[string]*MsgServer = make(map[string]*MsgServer)
// 注册消息服务
func Register(server ms_api.IMsgServer) *MsgServer {
ms := &MsgServer{
IMsgServer: server,
}
ctx, cancelFn := context.WithCancel(context.Background())
ms.ctx = ctx
ms.cancelFn = cancelFn
go run(ms)
servers[server.GetChannel()] = ms
return ms
}
// 注销消息服务
func Unregister(server ms_api.IMsgServer) {
if server == nil {
return
}
s := servers[server.GetChannel()]
s.cancelFn()
delete(servers, server.GetChannel())
}
// 消息服务运行
func run(server *MsgServer) {
defer func() {
if err := recover(); err != nil {
slog.Error("消息服务运行异常", "channel", server.GetChannel(), "error", err, "stack", string(debug.Stack()))
debug.PrintStack()
}
}()
for {
select {
case <-server.ctx.Done():
slog.Info("消息服务退出", "channel", server.GetChannel())
return
default:
}
topicMsgs, err := server.OnTick()
if err != nil {
slog.Error("消息服务构建定时发送消息错误", "channel", server.GetChannel(), "error", err)
continue
}
if len(topicMsgs) > 0 {
for _, msg := range topicMsgs {
apiproto.PublishMsg(msg.Channel, msg.Data)
}
}
time.Sleep(server.GetInterval())
}
}

5
message_server/psl_ms.go Normal file
View File

@ -0,0 +1,5 @@
package message_server
// PSL或门控箱消息服务
type PslMs struct {
}

5
message_server/rcc_ms.go Normal file
View File

@ -0,0 +1,5 @@
package message_server
// 继电器组合柜布置图消息服务
type RccMs struct {
}

215
message_server/sfp_ms.go Normal file
View File

@ -0,0 +1,215 @@
package message_server
import (
"fmt"
"time"
"google.golang.org/protobuf/proto"
"joylink.club/bj-rtsts-server/ats/verify/protos/graphicData"
"joylink.club/bj-rtsts-server/ats/verify/protos/state"
"joylink.club/bj-rtsts-server/ats/verify/simulation/wayside/memory"
"joylink.club/bj-rtsts-server/message_server/ms_api"
"joylink.club/ecs"
"joylink.club/rtsssimulation/component"
"joylink.club/rtsssimulation/entity"
)
// 信号平面布置图消息服务
type SfpMs struct {
vs *memory.VerifySimulation
mapId int32
channel string
}
func NewSfpMs(vs *memory.VerifySimulation) *SfpMs {
mapId := int32(0)
return &SfpMs{
vs: vs,
mapId: mapId,
channel: fmt.Sprintf("simulation-%s_%d-devices-status", vs.SimulationId, mapId),
}
}
// 获取通道名
func (ms *SfpMs) GetChannel() string {
return ms.channel
}
// 发送消息间隔时间,单位ms
func (ms *SfpMs) GetInterval() time.Duration {
return 200 * time.Millisecond
}
// 定时发送的消息
func (ms *SfpMs) OnTick() ([]*ms_api.TopicMsg, error) {
turnoutStates, err := ms.collectTurnoutStates()
if err != nil {
return nil, err
}
trainStates, err := ms.collectTrainStates()
if err != nil {
return nil, err
}
signalStates, err := ms.collectSignalStates()
if err != nil {
return nil, err
}
buttonStates, err := ms.collectStationButtonStates()
if err != nil {
return nil, err
}
psdStates, err := ms.collectPsdStates()
if err != nil {
return nil, err
}
sectionStates, err := ms.collectSectionStates()
if err != nil {
return nil, err
}
ststes := &state.PushedDevicesStatus{
All: true,
AllStatus: &state.AllDevicesStatus{
SwitchState: turnoutStates,
TrainState: trainStates,
SignalState: signalStates,
ButtonState: buttonStates,
PsdState: psdStates,
SectionState: sectionStates,
},
}
b, err := proto.Marshal(ststes)
if err != nil {
return nil, fmt.Errorf("信号布置图设备状态消息服务数据序列化失败, %s", err)
}
return []*ms_api.TopicMsg{ms_api.NewTopicMsg(ms.channel, b)}, nil
}
// 收集屏蔽门状态
func (ms *SfpMs) collectPsdStates() ([]*state.PsdState, error) {
// TODO: 重构之前的消息服务
return nil, nil
}
// 收集区段状态
func (ms *SfpMs) collectSectionStates() ([]*state.SectionState, error) {
// TODO: 重构之前的消息服务
return nil, nil
}
// 收集车站按钮状态
func (ms *SfpMs) collectStationButtonStates() ([]*state.ButtonState, error) {
// 获取地图上的按钮状态
uidMap := memory.QueryMapUidMapByType(ms.mapId, &graphicData.EsbButton{})
var btnStateArr []*state.ButtonState
for _, u := range uidMap {
entry, ok := entity.GetEntityByUid(ms.vs.World, u.Uid)
if !ok {
return nil, fmt.Errorf("ESB按钮实体不存在: World id=%d, uid=%s", ms.vs.World.Id(), u.Uid)
}
if entry.HasComponent(component.ButtonTag) { // 按钮
bit := component.BitStateType.Get(entry)
btnStateArr = append(btnStateArr, &state.ButtonState{Id: u.CommonId, Down: bit.Val})
}
}
return btnStateArr, nil
}
// 收集信号机状态
func (ms *SfpMs) collectSignalStates() ([]*state.SignalState, error) {
uidMap := memory.QueryMapUidMapByType(ms.mapId, &graphicData.Signal{})
var signalArr []*state.SignalState
for _, u := range uidMap {
s, err := handlerSignalState(ms.vs.World, u.Uid)
if err != nil {
return nil, err
}
s.Id = u.CommonId
signalArr = append(signalArr, s)
}
return signalArr, nil
}
func handlerSignalState(w ecs.World, uid string) (*state.SignalState, error) {
entry, ok := entity.GetEntityByUid(w, uid)
if !ok {
return nil, fmt.Errorf("信号机不存在: World id=%d, 信号机id=%s", w.Id(), uid)
}
if !entry.HasComponent(component.SignalLightsType) { //信号机灯列表
return nil, fmt.Errorf("信号机没有SignalLights组件: World id=%d, 信号机id=%s", w.Id(), uid)
}
signalState := &state.SignalState{}
lights := component.SignalLightsType.Get(entry)
isL := false
isH := false
isU := false
isA := false
isB := false
for _, light := range lights.Lights {
switch {
case light.HasComponent(component.LdTag):
isL = component.BitStateType.Get(light).Val
case light.HasComponent(component.HdTag):
isH = component.BitStateType.Get(light).Val
case light.HasComponent(component.UdTag):
isU = component.BitStateType.Get(light).Val
case light.HasComponent(component.BdTag):
isB = component.BitStateType.Get(light).Val
case light.HasComponent(component.AdTag):
isA = component.BitStateType.Get(light).Val
}
}
if isH && isU {
signalState.Aspect = state.Signal_HU
} else {
switch {
case isL:
signalState.Aspect = state.Signal_L
case isH:
signalState.Aspect = state.Signal_H
case isU:
signalState.Aspect = state.Signal_U
case isB:
signalState.Aspect = state.Signal_B
case isA:
signalState.Aspect = state.Signal_A
}
}
return signalState, nil
}
// 收集列车状态
func (ms *SfpMs) collectTrainStates() ([]*state.TrainState, error) {
allTrainMap := &ms.vs.Memory.Status.TrainStateMap
var trainArr []*state.TrainState
allTrainMap.Range(func(_, v any) bool {
trainArr = append(trainArr, v.(*state.TrainState))
return true
})
return trainArr, nil
}
// 收集道岔状态
func (ms *SfpMs) collectTurnoutStates() ([]*state.SwitchState, error) {
s := ms.vs
uidMap := memory.QueryMapUidMapByType(ms.mapId, &graphicData.Turnout{})
var switchArr []*state.SwitchState
for _, u := range uidMap {
entry, ok := entity.GetEntityByUid(s.World, u.Uid)
if !ok {
return nil, fmt.Errorf("道岔不存在: World id=%d,道岔id=%s", s.World.Id(), u.Uid)
}
if !entry.HasComponent(component.TurnoutPositionType) {
return nil, fmt.Errorf("道岔没有TurnoutPosition组件: World id=%d,道岔id=%s", s.World.Id(), u.Uid)
}
pos := component.TurnoutPositionType.Get(entry)
s := &state.SwitchState{
Id: u.CommonId,
Normal: pos.Db,
Reverse: pos.Fb,
Dw: pos.Dw,
Fw: pos.Fw,
}
switchArr = append(switchArr, s)
}
return switchArr, nil
}

View File

@ -0,0 +1,68 @@
package message_server
import (
"sync"
"time"
"joylink.club/bj-rtsts-server/ats/verify/simulation/wayside/memory"
"joylink.club/bj-rtsts-server/message_server/ms_api"
"joylink.club/bj-rtsts-server/message_server/ms_manage"
)
var smsMap sync.Map
// 仿真消息服务
// 管理仿真消息服务,整体可以作为一个消息服务,也可以每个消息子服务各自作为一个消息服务,暂时先按整体作为一个消息服务的方式使用
type SimulationMs struct {
vs *memory.VerifySimulation
mss []ms_api.IMsgServer
}
// 启动仿真所需的消息服务
func Start(vs *memory.VerifySimulation) {
_, ok := smsMap.Load(vs.SimulationId)
if !ok {
sms := &SimulationMs{
vs: vs,
mss: []ms_api.IMsgServer{
NewSfpMs(vs),
},
}
ms_manage.Register(sms)
smsMap.Store(vs.SimulationId, sms)
}
}
// 关闭仿真消息服务
func Close(vs *memory.VerifySimulation) {
sms, ok := smsMap.Load(vs.SimulationId)
if ok {
ms_manage.Unregister(sms.(*SimulationMs))
smsMap.Delete(vs.SimulationId)
}
}
// 获取通道
func (sms *SimulationMs) GetChannel() string {
return sms.vs.SimulationId
}
// 发送消息间隔时间,单位ms
func (sms *SimulationMs) GetInterval() time.Duration {
return 200 * time.Millisecond
}
// 构造定时发送的消息
func (sms *SimulationMs) OnTick() ([]*ms_api.TopicMsg, error) {
var tmList []*ms_api.TopicMsg
for _, ms := range sms.mss {
tm, err := ms.OnTick()
if err != nil {
return nil, err
}
if len(tm) > 0 {
tmList = append(tmList, tm...)
}
}
return tmList, nil
}

View File

@ -0,0 +1,5 @@
package message_server
// 世界状态变化消息服务
type SimulationStateMs struct {
}