diff --git a/api/simulation.go b/api/simulation.go index eaf04ea..8dac600 100644 --- a/api/simulation.go +++ b/api/simulation.go @@ -82,6 +82,7 @@ func createByProjectId(c *gin.Context) { if err := c.ShouldBind(&req); nil != err { panic(sys_error.New("测试启动失败,请求参数异常", err)) } + // 地图信息 mapInfos := service.QueryProjectPublishedGi(req.ProjectId) if len(mapInfos) == 0 { panic(sys_error.New("测试启动失败,项目未关联发布图")) @@ -90,11 +91,17 @@ func createByProjectId(c *gin.Context) { for i, mapInfo := range mapInfos { mapIds[i] = mapInfo.ID } - rsp := dto.SimulationCreateRspDto{ProjectId: req.ProjectId, MapId: mapIds[0], MapIds: mapIds} - simulationId, err := simulation.CreateSimulation(req.ProjectId, mapIds) + // 运行环境配置 + var runConfigStr string + runConfig := service.QueryRunConfig(req.ProjectRunConfigId) + if runConfig != nil { + runConfigStr = runConfig.ConfigContent + } + simulationId, err := simulation.CreateSimulation(req.ProjectId, mapIds, runConfigStr) if err != nil { panic(sys_error.New("测试启动失败", err)) } + rsp := dto.SimulationCreateRspDto{ProjectId: req.ProjectId, MapId: mapIds[0], MapIds: mapIds} rsp.SimulationId = simulationId c.JSON(http.StatusOK, &rsp) } diff --git a/ats/verify/simulation/simulation_manage.go b/ats/verify/simulation/simulation_manage.go index b19cb08..f502b41 100644 --- a/ats/verify/simulation/simulation_manage.go +++ b/ats/verify/simulation/simulation_manage.go @@ -7,8 +7,6 @@ import ( "joylink.club/bj-rtsts-server/ats/verify/simulation/wayside/memory" "joylink.club/bj-rtsts-server/config" "joylink.club/bj-rtsts-server/sys_error" - "joylink.club/bj-rtsts-server/third_party/dynamics" - "joylink.club/bj-rtsts-server/third_party/semi_physical_train" "joylink.club/bj-rtsts-server/dto" ) @@ -27,31 +25,18 @@ func IsExistSimulation() bool { } // 创建仿真对象 -func CreateSimulation(projectId int32, mapIds []int32) (string, error) { +func CreateSimulation(projectId int32, mapIds []int32, runConfig string) (string, error) { simulationId := createSimulationId(projectId) _, e := simulationMap.Load(simulationId) if !e && IsExistSimulation() { return "", sys_error.New("一套环境同时只能运行一个仿真") } if !e { - verifySimulation, err := memory.CreateSimulation(projectId, mapIds) + verifySimulation, err := memory.CreateSimulation(projectId, mapIds, runConfig) if err != nil { return "", err } verifySimulation.SimulationId = simulationId - if config.Config.Dynamics.Open { - // 动力学接口调用 - lineBaseInfo := verifySimulation.BuildLineBaseInfo() - err := dynamics.Default().RequestStartSimulation(lineBaseInfo) - if err != nil { - return "", err - } - dynamics.Default().Start(verifySimulation) - } - if config.Config.Vobc.Open { - // 半实物系统接口功能启动 - semi_physical_train.Default().Start(verifySimulation) - } simulationMap.Store(simulationId, verifySimulation) // 全部成功,启动仿真 verifySimulation.World.StartUp() @@ -65,18 +50,9 @@ func DestroySimulation(simulationId string) { if !e { return } - simulationInfo := s.(*memory.VerifySimulation) simulationMap.Delete(simulationId) - // 停止ecs world - simulationInfo.World.Close() - if config.Config.Dynamics.Open { - // 停止动力学接口功能 - dynamics.Default().Stop() - dynamics.Default().RequestStopSimulation() - } - if config.Config.Vobc.Open { - semi_physical_train.Default().Stop() - } + simulationInfo := s.(*memory.VerifySimulation) + simulationInfo.StopSimulation() } func createSimulationId(projectId int32) string { diff --git a/ats/verify/simulation/wayside/memory/wayside_simulation.go b/ats/verify/simulation/wayside/memory/wayside_simulation.go index bca18b9..76fecbf 100644 --- a/ats/verify/simulation/wayside/memory/wayside_simulation.go +++ b/ats/verify/simulation/wayside/memory/wayside_simulation.go @@ -1,6 +1,7 @@ package memory import ( + "encoding/json" "fmt" "log/slog" "math" @@ -15,9 +16,11 @@ import ( "joylink.club/bj-rtsts-server/ats/verify/protos/graphicData" "joylink.club/bj-rtsts-server/ats/verify/protos/state" + "joylink.club/bj-rtsts-server/config" "joylink.club/bj-rtsts-server/dto" "joylink.club/bj-rtsts-server/sys_error" "joylink.club/bj-rtsts-server/third_party/dynamics" + "joylink.club/bj-rtsts-server/third_party/interlock" "joylink.club/bj-rtsts-server/third_party/message" "joylink.club/bj-rtsts-server/third_party/semi_physical_train" "joylink.club/ecs" @@ -38,11 +41,12 @@ type VerifySimulation struct { Memory *WaysideMemory //模型仓库 Repo *repository.Repository - //Rtss仿真世界的id - WorldId ecs.WorldId - World ecs.World + //Rtss仿真世界的 + World ecs.World //设备UID映射 uidMap map[string]*elementIdStructure + // 运行环境配置 + runConfig *config.ThridPartyConfig } // 轨旁仿真内存模型 @@ -89,51 +93,32 @@ func NewWaysideMemory() *WaysideMemory { } // 创建仿真对象 -func CreateSimulation(projectId int32, mapIds []int32) (*VerifySimulation, error) { - //构建Repository +func CreateSimulation(projectId int32, mapIds []int32, runConfig string) (*VerifySimulation, error) { + // 地图信息 sort.Slice(mapIds, func(i, j int) bool { return mapIds[i] < mapIds[j] }) - var mapIdStrSlice []string - for _, id := range mapIds { - mapIdStrSlice = append(mapIdStrSlice, strconv.Itoa(int(id))) - } - repoId := strings.Join(mapIdStrSlice, "|") - repoVersion := "0.1" - repo := repository.FindRepository(repoId, repoVersion) - if repo == nil { - protoRepo, err := buildProtoRepository(mapIds) - if err != nil { - return nil, sys_error.New("数据错误", err) - } - protoRepo.Id, protoRepo.Version = repoId, repoVersion - newRepo, err := repository.BuildRepository(protoRepo) - if err != nil { - return nil, sys_error.New("数据错误", err) - } - repo = newRepo - } - // 构建所有UID映射关系, - allUidMap := buildRepositoryAllUidsMap(mapIds, repo) - //创建仿真 - w, err := rtss_simulation.NewSimulation(repo) + verifySimulation := &VerifySimulation{ProjectId: projectId, MapIds: mapIds} + // 设置运行环境 + err := verifySimulation.initRunConfig(runConfig) if err != nil { - return nil, sys_error.New("仿真创建失败", err) + return nil, err } - verifySimulation := &VerifySimulation{ - MapIds: mapIds, - ProjectId: projectId, - Memory: NewWaysideMemory(), - Repo: repo, - World: w, - WorldId: w.Id(), - uidMap: allUidMap, + // 构建Repository + err = verifySimulation.initRepository() + if err != nil { + return nil, err + } + // 创建world + err = verifySimulation.initWorld() + if err != nil { + return nil, err + } + // 运行第三方服务 + err = verifySimulation.runThirdParty() + if err != nil { + return nil, err } - // 保证World关闭 - runtime.SetFinalizer(verifySimulation, func(verifySimulation *VerifySimulation) { - slog.Info("---关闭仿真World---") - verifySimulation.World.Close() - }) return verifySimulation, nil } @@ -221,71 +206,6 @@ func (s *VerifySimulation) GetComIdByUid(uid string) string { return es[uid].CommonId } -func (sim *VerifySimulation) BuildLineBaseInfo() *message.LineBaseInfo { - info := &message.LineBaseInfo{} - for _, model := range sim.Repo.LinkList() { - id, _ := strconv.Atoi(model.Id()) - link := &message.Link{ - ID: int32(id), - Len: int32(model.Length()), - } - info.LinkList = append(info.LinkList, link) - if model.ARelation() != nil { - turnoutId, _ := strconv.Atoi(sim.GetComIdByUid(model.ARelation().Device().Id())) - link.ARelTurnoutId = int32(turnoutId) - switch model.ARelation().Port() { - case proto.Port_A: - link.ARelTurnoutPoint = "A" - case proto.Port_B: - link.ARelTurnoutPoint = "B" - case proto.Port_C: - link.ARelTurnoutPoint = "C" - } - } - if model.BRelation() != nil { - turnoutId, _ := strconv.Atoi(sim.GetComIdByUid(model.BRelation().Device().Id())) - link.BRelTurnoutId = int32(turnoutId) - switch model.BRelation().Port() { - case proto.Port_A: - link.BRelTurnoutPoint = "A" - case proto.Port_B: - link.BRelTurnoutPoint = "B" - case proto.Port_C: - link.BRelTurnoutPoint = "C" - } - } - } - for _, model := range sim.Repo.SlopeList() { - id, _ := strconv.Atoi(sim.GetComIdByUid(model.Id())) - slope := &message.Slope{ - ID: int32(id), - StartLinkOffset: int32(model.StartLinkPosition().Offset()), - EndLinkOffset: int32(model.EndLinkPosition().Offset()), - DegreeTrig: model.Degree(), - } - info.SlopeList = append(info.SlopeList, slope) - startLinkId, _ := strconv.Atoi(model.StartLinkPosition().Link().Id()) - slope.StartLinkId = int32(startLinkId) - endLinkId, _ := strconv.Atoi(model.EndLinkPosition().Link().Id()) - slope.EndLinkId = int32(endLinkId) - } - for _, model := range sim.Repo.SectionalCurvatureList() { - id, _ := strconv.Atoi(sim.GetComIdByUid(model.Id())) - curve := &message.Curve{ - ID: int32(id), - StartLinkOffset: int32(model.StartLinkPosition().Offset()), - EndLinkOffset: int32(model.EndLinkPosition().Offset()), - Curvature: model.Radius(), - } - info.CurveList = append(info.CurveList, curve) - startLinkId, _ := strconv.Atoi(model.StartLinkPosition().Link().Id()) - curve.StartLinkId = int32(startLinkId) - endLinkId, _ := strconv.Atoi(model.EndLinkPosition().Link().Id()) - curve.EndLinkId = int32(endLinkId) - } - return info -} - // 采集动力学道岔状态 func (s *VerifySimulation) CollectDynamicsTurnoutInfo() []*message.DynamicsTurnoutInfo { stateSlice := GetAllTurnoutState(s) @@ -363,6 +283,81 @@ func convert(info *message.DynamicsTrainInfo, sta *state.TrainState, simulation return sta } +// 获取动力学配置信息 +func (s *VerifySimulation) GetDynamicsRunConfig() *config.DynamicsConfig { + // TODO:目前为了兼容当前配置方式,做成查询方式后删除IP判断 + if config.Config.Dynamics.Ip != "" { + return &config.Config.Dynamics + } + return &s.runConfig.Dynamics +} + +// 获取动力学运行资源 +func (s *VerifySimulation) GetDynamicsRunRepository() *message.LineBaseInfo { + info := &message.LineBaseInfo{} + for _, model := range s.Repo.LinkList() { + id, _ := strconv.Atoi(model.Id()) + link := &message.Link{ + ID: int32(id), + Len: int32(model.Length()), + } + info.LinkList = append(info.LinkList, link) + if model.ARelation() != nil { + turnoutId, _ := strconv.Atoi(s.GetComIdByUid(model.ARelation().Device().Id())) + link.ARelTurnoutId = int32(turnoutId) + switch model.ARelation().Port() { + case proto.Port_A: + link.ARelTurnoutPoint = "A" + case proto.Port_B: + link.ARelTurnoutPoint = "B" + case proto.Port_C: + link.ARelTurnoutPoint = "C" + } + } + if model.BRelation() != nil { + turnoutId, _ := strconv.Atoi(s.GetComIdByUid(model.BRelation().Device().Id())) + link.BRelTurnoutId = int32(turnoutId) + switch model.BRelation().Port() { + case proto.Port_A: + link.BRelTurnoutPoint = "A" + case proto.Port_B: + link.BRelTurnoutPoint = "B" + case proto.Port_C: + link.BRelTurnoutPoint = "C" + } + } + } + for _, model := range s.Repo.SlopeList() { + id, _ := strconv.Atoi(s.GetComIdByUid(model.Id())) + slope := &message.Slope{ + ID: int32(id), + StartLinkOffset: int32(model.StartLinkPosition().Offset()), + EndLinkOffset: int32(model.EndLinkPosition().Offset()), + DegreeTrig: model.Degree(), + } + info.SlopeList = append(info.SlopeList, slope) + startLinkId, _ := strconv.Atoi(model.StartLinkPosition().Link().Id()) + slope.StartLinkId = int32(startLinkId) + endLinkId, _ := strconv.Atoi(model.EndLinkPosition().Link().Id()) + slope.EndLinkId = int32(endLinkId) + } + for _, model := range s.Repo.SectionalCurvatureList() { + id, _ := strconv.Atoi(s.GetComIdByUid(model.Id())) + curve := &message.Curve{ + ID: int32(id), + StartLinkOffset: int32(model.StartLinkPosition().Offset()), + EndLinkOffset: int32(model.EndLinkPosition().Offset()), + Curvature: model.Radius(), + } + info.CurveList = append(info.CurveList, curve) + startLinkId, _ := strconv.Atoi(model.StartLinkPosition().Link().Id()) + curve.StartLinkId = int32(startLinkId) + endLinkId, _ := strconv.Atoi(model.EndLinkPosition().Link().Id()) + curve.EndLinkId = int32(endLinkId) + } + return info +} + // 发送给前端的速度格式化 func speedParse(speed float32) int32 { return int32(math.Abs(float64(speed * 3.6 * 100))) @@ -390,6 +385,15 @@ func (s *VerifySimulation) HandleSemiPhysicalTrainControlMsg(b []byte) { }) } +// 获取半实物运行配置信息 +func (s *VerifySimulation) GetSemiPhysicalRunConfig() *config.VobcConfig { + // TODO:目前为了兼容当前配置方式,做成查询方式后删除IP判断 + if config.Config.Vobc.Ip != "" { + return &config.Config.Vobc + } + return &s.runConfig.Vobc +} + // 处理接到的联锁消息 func (s *VerifySimulation) HandleDriverInfo(b []byte) { driverMsg := message.NewInterlockReceiveMsgPkg(0, 128, 8*131) @@ -402,6 +406,15 @@ func (s *VerifySimulation) HandleDriverInfo(b []byte) { } } +// 获取联锁配置 +func (s *VerifySimulation) GetInterlockRunConfig() *config.InterlockConfig { + // TODO:目前为了兼容当前配置方式,做成查询方式后删除IP判断 + if config.Config.Interlock.Ip != "" { + return &config.Config.Interlock + } + return &s.runConfig.Interlock +} + // 采集联锁中的继电器消息 func (s *VerifySimulation) CollectRelayInfo() *message.InterlockSendMsgPkg { msg := &message.InterlockSendMsgPkg{} @@ -417,6 +430,91 @@ func (s *VerifySimulation) CollectRelayInfo() *message.InterlockSendMsgPkg { return msg } +// 初始化仿真运行配置 +func (s *VerifySimulation) initRunConfig(configStr string) error { + if configStr == "" { + return nil + } + var configMap config.ThridPartyConfig + err := json.Unmarshal([]byte(configStr), &configMap) + if err != nil { + return sys_error.New("配置信息格式错误", err) + } + s.runConfig = &configMap + return nil +} + +// 初始化运行资源 +func (s *VerifySimulation) initRepository() error { + // 构建Repository + var mapIdStrSlice []string + for _, id := range s.MapIds { + mapIdStrSlice = append(mapIdStrSlice, strconv.Itoa(int(id))) + } + repoId := strings.Join(mapIdStrSlice, "|") + repoVersion := "0.1" + repo := repository.FindRepository(repoId, repoVersion) + if repo == nil { + protoRepo, err := buildProtoRepository(s.MapIds) + if err != nil { + return sys_error.New("数据错误", err) + } + protoRepo.Id, protoRepo.Version = repoId, repoVersion + newRepo, err := repository.BuildRepository(protoRepo) + if err != nil { + return sys_error.New("数据错误", err) + } + repo = newRepo + } + s.Repo = repo + s.Memory = NewWaysideMemory() + // 构建所有UID映射关系, + s.uidMap = buildRepositoryAllUidsMap(s.MapIds, s.Repo) + return nil +} + +// 创建world +func (s *VerifySimulation) initWorld() error { + //创建仿真 + w, err := rtss_simulation.NewSimulation(s.Repo) + if err != nil { + return sys_error.New("仿真创建失败", err) + } + s.World = w + // 保证World关闭 + runtime.SetFinalizer(s, func(verifySimulation *VerifySimulation) { + slog.Info("---关闭仿真World---") + verifySimulation.World.Close() + }) + return nil +} + +// 运行仿真第三方模块 +func (s *VerifySimulation) runThirdParty() error { + // 动力学启动 + err := dynamics.Default().Start(s) + if err != nil { + return err + } + // 半实物启动 + semi_physical_train.Default().Start(s) + // 联锁启动 + interlock.Default().Start(s) + return nil +} + +// 停止仿真 +func (s *VerifySimulation) StopSimulation() { + // 停止ecs world + s.World.Close() + // 停止动力学接口功能 + dynamics.Default().Stop() + // 停止半实物 + semi_physical_train.Default().Stop() + // 联锁启动 + interlock.Default().Stop() +} + func buildProtoRepository(mapIds []int32) (*proto.Repository, error) { repo := &proto.Repository{} var exceptStationGiMapIds []int32 diff --git a/config/config.go b/config/config.go index 4767b1a..00b9a53 100644 --- a/config/config.go +++ b/config/config.go @@ -18,9 +18,9 @@ type AppConfig struct { Datasource datasource Logging log Messaging messaging - Dynamics dynamics - Vobc vobc - Interlock interlock + Dynamics DynamicsConfig + Vobc VobcConfig + Interlock InterlockConfig } type server struct { Port int @@ -49,26 +49,32 @@ type centrifugo struct { ApiEndpoint string Address string } -type dynamics struct { - Ip string - UdpLocalPort int - UdpRemotePort int - UdpRemoteTrainPort int - HttpPort int - Open bool -} -type vobc struct { - Ip string - LocalPort int - RemotePort int - Open bool -} -type interlock struct { - Ip string - LocalPort int - RemotePort int - Open bool +// 第三方配置结构 +type ThridPartyConfig struct { + Dynamics DynamicsConfig `json:"dynamics"` + Vobc VobcConfig `json:"vobc"` + Interlock InterlockConfig `json:"interlock"` +} +type DynamicsConfig struct { + Ip string `json:"ip"` + UdpLocalPort int `json:"udpLocalPort"` + UdpRemotePort int `json:"udpRemotePort"` + UdpRemoteTrainPort int `json:"udpRemoteTrainPort"` + HttpPort int `json:"httpPort"` + Open bool `json:"open"` +} +type VobcConfig struct { + Ip string `json:"ip"` + LocalPort int `json:"localPort"` + RemotePort int `json:"remotePort"` + Open bool `json:"open"` +} +type InterlockConfig struct { + Ip string `json:"ip"` + LocalPort int `json:"localPort"` + RemotePort int `json:"remotePort"` + Open bool `json:"open"` } var Config AppConfig diff --git a/service/projectRunConfig.go b/service/projectRunConfig.go index 0b4a490..0f10e6f 100644 --- a/service/projectRunConfig.go +++ b/service/projectRunConfig.go @@ -58,6 +58,21 @@ func QueryProjectRunConfig(id int32) *dto.ProjectRunConfigDto { return dto.ConvertToRunConfigDto(query) } +// 查询项目运行环境 +func QueryRunConfig(id int32) *dto.ProjectRunConfigDto { + if id == 0 { + return nil + } + query, err := dbquery.ProjectRunConfig.Where(dbquery.ProjectRunConfig.ID.Eq(id)).Find() + if err != nil { + panic(sys_error.New("查询失败", err)) + } + if len(query) == 0 { + return nil + } + return dto.ConvertToRunConfigDto(query[0]) +} + // 更新项目运行环境 func UpdateProjectRunConfig(id int32, dd *dto.ProjectRunConfigReqDto) bool { findOldQuery := dbquery.ProjectRunConfig diff --git a/third_party/dynamics/dynamics.go b/third_party/dynamics/dynamics.go index 552c210..f951394 100644 --- a/third_party/dynamics/dynamics.go +++ b/third_party/dynamics/dynamics.go @@ -8,6 +8,7 @@ import ( "log/slog" "net/http" "runtime/debug" + "sync" "time" "joylink.club/bj-rtsts-server/config" @@ -19,21 +20,19 @@ import ( type DynamicsMessageManager interface { CollectDynamicsTurnoutInfo() []*message.DynamicsTurnoutInfo HandleDynamicsTrainInfo(info *message.DynamicsTrainInfo) + GetDynamicsRunConfig() *config.DynamicsConfig + GetDynamicsRunRepository() *message.LineBaseInfo } // 动力学接口 type Dynamics interface { - // 请求启动仿真 - RequestStartSimulation(base *message.LineBaseInfo) error - // 请求停止仿真 - RequestStopSimulation() error // 请求添加列车 RequestAddTrain(info *message.InitTrainInfo) error // 请求移除列车 RequestRemoveTrain(req *message.RemoveTrainReq) error // 启动动力学消息功能 - Start(manager DynamicsMessageManager) + Start(manager DynamicsMessageManager) error // 停止动力学消息功能 Stop() @@ -42,22 +41,17 @@ type Dynamics interface { } var _default Dynamics +var initMutex sync.Mutex func Default() Dynamics { - if !config.Config.Dynamics.Open { - panic("动力学接口模块未开启") + initMutex.Lock() + defer initMutex.Unlock() + if _default == nil { + _default = &dynamics{} } return _default } -func Init() { - if !config.Config.Dynamics.Open { - return - } - slog.Info("初始化动力学接口模块") - _default = newDynamics() -} - type dynamics struct { trainInfoUdpServer udp.UdpServer turnoutStateUdpClient udp.UdpClient @@ -67,20 +61,7 @@ type dynamics struct { httpClient *http.Client manager DynamicsMessageManager turnoutTaskCancel context.CancelFunc -} - -func newDynamics() Dynamics { - d := &dynamics{ - turnoutStateUdpClient: udp.NewClient(fmt.Sprintf("%v:%v", config.Config.Dynamics.Ip, config.Config.Dynamics.UdpRemotePort)), - trainControlUdpClient: udp.NewClient(fmt.Sprintf("%v:%v", config.Config.Dynamics.Ip, config.Config.Dynamics.UdpRemoteTrainPort)), - baseUrl: getUrlBase(), - httpClient: &http.Client{ - Timeout: time.Second * 5, - }, - } - d.trainInfoUdpServer = udp.NewServer(fmt.Sprintf(":%d", config.Config.Dynamics.UdpLocalPort), d.handleDynamicsTrainInfo) - d.trainInfoUdpServer.Listen() - return d + runConfig *config.DynamicsConfig } // 解码列车信息并处理 @@ -96,11 +77,11 @@ func (d *dynamics) handleDynamicsTrainInfo(b []byte) { } } -func getUrlBase() string { - ip := config.Config.Dynamics.Ip +func getUrlBase(c *config.DynamicsConfig) string { + ip := c.Ip var port string - if config.Config.Dynamics.HttpPort != 0 { - port = fmt.Sprintf(":%d", config.Config.Dynamics.HttpPort) + if c.HttpPort != 0 { + port = fmt.Sprintf(":%d", c.HttpPort) } urlBase := "http://" + ip + port return urlBase @@ -110,8 +91,8 @@ func (d *dynamics) buildUrl(uri string) string { return d.baseUrl + uri } -func (d *dynamics) RequestStartSimulation(base *message.LineBaseInfo) error { - if !config.Config.Dynamics.Open { +func (d *dynamics) requestStartSimulation(base *message.LineBaseInfo) error { + if !d.runConfig.Open { return nil } url := d.buildUrl("/api/start/") @@ -130,8 +111,8 @@ func (d *dynamics) RequestStartSimulation(base *message.LineBaseInfo) error { return nil } -func (d *dynamics) RequestStopSimulation() error { - if !config.Config.Dynamics.Open { +func (d *dynamics) requestStopSimulation() error { + if !d.runConfig.Open { return nil } url := d.buildUrl("/api/end/") @@ -149,7 +130,7 @@ func (d *dynamics) RequestStopSimulation() error { } func (d *dynamics) RequestAddTrain(info *message.InitTrainInfo) error { - if !config.Config.Dynamics.Open { + if !d.runConfig.Open { return nil } url := d.buildUrl("/api/aerodynamics/init/train/") @@ -168,7 +149,7 @@ func (d *dynamics) RequestAddTrain(info *message.InitTrainInfo) error { } func (d *dynamics) RequestRemoveTrain(req *message.RemoveTrainReq) error { - if !config.Config.Dynamics.Open { + if !d.runConfig.Open { return nil } url := d.buildUrl("/api/aerodynamics/remove/train/") @@ -186,20 +167,66 @@ func (d *dynamics) RequestRemoveTrain(req *message.RemoveTrainReq) error { return nil } -func (d *dynamics) Start(manager DynamicsMessageManager) { +func (d *dynamics) Start(manager DynamicsMessageManager) error { if manager == nil { panic("启动动力学消息服务错误: DynamicsMessageManager不能为nil") } if d.manager != nil { panic("启动动力学消息服务错误: 存在正在运行的任务") } + d.runConfig = manager.GetDynamicsRunConfig() + if d.runConfig == nil || d.runConfig.Ip == "" || !d.runConfig.Open { + return nil + } d.manager = manager + // 初始化客户端信息 + d.initDynamics() + // 初始化运行资源 + err := d.initDynamicsRunRepository() + if err != nil { + panic("启动动力学消息服务错误: 存在正在运行的任务") + } ctx, cancle := context.WithCancel(context.Background()) go d.sendTurnoutStateTask(ctx) d.turnoutTaskCancel = cancle + return nil +} + +// 初始化客户端、服务等信息 +func (d *dynamics) initDynamics() { + d.turnoutStateUdpClient = udp.NewClient(fmt.Sprintf("%v:%v", d.runConfig.Ip, d.runConfig.UdpRemotePort)) + d.trainControlUdpClient = udp.NewClient(fmt.Sprintf("%v:%v", d.runConfig.Ip, d.runConfig.UdpRemoteTrainPort)) + d.baseUrl = getUrlBase(d.runConfig) + d.httpClient = &http.Client{Timeout: time.Second * 5} + d.trainInfoUdpServer = udp.NewServer(fmt.Sprintf(":%d", d.runConfig.UdpLocalPort), d.handleDynamicsTrainInfo) + d.trainInfoUdpServer.Listen() +} + +// 动力学运行所需数据 +func (d *dynamics) initDynamicsRunRepository() error { + // 动力学接口调用 + lineBaseInfo := d.manager.GetDynamicsRunRepository() + err := d.requestStartSimulation(lineBaseInfo) + if err != nil { + return err + } + return nil } func (d *dynamics) Stop() { + if d.httpClient != nil { + d.requestStopSimulation() + d.httpClient = nil + } + if d.turnoutStateUdpClient != nil { + d.turnoutStateUdpClient.Close() + } + if d.trainControlUdpClient != nil { + d.trainControlUdpClient.Close() + } + if d.trainInfoUdpServer != nil { + d.trainInfoUdpServer.Close() + } if d.turnoutTaskCancel != nil { d.turnoutTaskCancel() d.manager = nil diff --git a/third_party/interlock/interlock.go b/third_party/interlock/interlock.go index 234a001..8487bc1 100644 --- a/third_party/interlock/interlock.go +++ b/third_party/interlock/interlock.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" "runtime/debug" + "sync" "time" "joylink.club/bj-rtsts-server/config" @@ -16,6 +17,7 @@ import ( type InterlockMessageManager interface { CollectRelayInfo() *message.InterlockSendMsgPkg HandleDriverInfo(b []byte) + GetInterlockRunConfig() *config.InterlockConfig } // 联锁接口 @@ -29,10 +31,13 @@ type InterlockProxy interface { } var _default InterlockProxy +var initMutex sync.Mutex func Default() InterlockProxy { - if !config.Config.Interlock.Open { // TODO - panic("联锁接口模块未开启") + initMutex.Lock() + defer initMutex.Unlock() + if _default == nil { // TODO + _default = &interlockProxy{} } return _default } @@ -43,27 +48,34 @@ type interlockProxy struct { manager InterlockMessageManager collectInfoTaskCancel context.CancelFunc + runConfig *config.InterlockConfig } // 驱动信息进行转发 -func (d *interlockProxy) handleDriverInfo(b []byte) { - handler := d.manager +func (i *interlockProxy) handleDriverInfo(b []byte) { + handler := i.manager if handler != nil { handler.HandleDriverInfo(b) } } -func (d *interlockProxy) Start(manager InterlockMessageManager) { +func (i *interlockProxy) Start(manager InterlockMessageManager) { if manager == nil { panic("启动联锁消息服务错误: InterlockMessageManager不能为nil") } - if d.manager != nil { + if i.manager != nil { panic("启动联锁消息服务错误: 存在正在运行的任务") } - d.manager = manager + i.runConfig = manager.GetInterlockRunConfig() + if i.runConfig == nil || i.runConfig.Ip == "" || !i.runConfig.Open { + return + } + i.manager = manager + // 初始化客户端、服务端 + i.initInterlockProxy() ctx, cancle := context.WithCancel(context.Background()) - go d.collectInfoStateTask(ctx) - d.collectInfoTaskCancel = cancle + go i.collectInfoStateTask(ctx) + i.collectInfoTaskCancel = cancle } const ( @@ -72,7 +84,7 @@ const ( ) // 定时发送采集电路状态任务 -func (d *interlockProxy) collectInfoStateTask(ctx context.Context) { +func (i *interlockProxy) collectInfoStateTask(ctx context.Context) { defer func() { if err := recover(); err != nil { slog.Error("定时发送道岔状态任务异常", "error", err, "stack", string(debug.Stack())) @@ -85,36 +97,31 @@ func (d *interlockProxy) collectInfoStateTask(ctx context.Context) { return default: } - collectInfoStates := d.manager.CollectRelayInfo() - d.sendCollectUdpClient.SendMsg(collectInfoStates) + collectInfoStates := i.manager.CollectRelayInfo() + i.sendCollectUdpClient.SendMsg(collectInfoStates) time.Sleep(time.Millisecond * InterlockMessageSendInterval) } } -func (d *interlockProxy) Stop() { - if d.collectInfoTaskCancel != nil { - d.collectInfoTaskCancel() - d.manager = nil +func (i *interlockProxy) Stop() { + if i.sendCollectUdpClient != nil { + i.sendCollectUdpClient.Close() + } + if i.driveInfoUdpServer != nil { + i.driveInfoUdpServer.Close() + } + if i.collectInfoTaskCancel != nil { + i.collectInfoTaskCancel() + i.manager = nil } } -func (d *interlockProxy) SendCollectMessage(b []byte) { - d.sendCollectUdpClient.Send(b) +func (i *interlockProxy) SendCollectMessage(b []byte) { + i.sendCollectUdpClient.Send(b) } -func newInterlockProxy() *interlockProxy { - d := &interlockProxy{ - sendCollectUdpClient: udp.NewClient(fmt.Sprintf("%v:%v", config.Config.Interlock.Ip, config.Config.Interlock.RemotePort)), - } - d.driveInfoUdpServer = udp.NewServer(fmt.Sprintf(":%d", config.Config.Interlock.LocalPort), d.handleDriverInfo) - d.driveInfoUdpServer.Listen() - return d -} - -func Init() { - if !config.Config.Interlock.Open { // TODO - return - } - slog.Info("初始化联锁接口模块") - _default = newInterlockProxy() +func (i *interlockProxy) initInterlockProxy() { + i.sendCollectUdpClient = udp.NewClient(fmt.Sprintf("%v:%v", i.runConfig.Ip, i.runConfig.RemotePort)) + i.driveInfoUdpServer = udp.NewServer(fmt.Sprintf(":%d", i.runConfig.LocalPort), i.handleDriverInfo) + i.driveInfoUdpServer.Listen() } diff --git a/third_party/semi_physical_train/semi_physical_train.go b/third_party/semi_physical_train/semi_physical_train.go index 480efc7..909e3cd 100644 --- a/third_party/semi_physical_train/semi_physical_train.go +++ b/third_party/semi_physical_train/semi_physical_train.go @@ -2,6 +2,7 @@ package semi_physical_train import ( "fmt" + "sync" "joylink.club/bj-rtsts-server/config" "joylink.club/bj-rtsts-server/third_party/message" @@ -21,13 +22,16 @@ type SemiPhysicalTrain interface { type SemiPhysicalMessageManager interface { // 处理半实物仿真列车控制消息 HandleSemiPhysicalTrainControlMsg(b []byte) + // 获取半实物启动参数 + GetSemiPhysicalRunConfig() *config.VobcConfig } type semiPhysicalTrainImpl struct { trainControlUdpServer udp.UdpServer trainSpeedInfoUdpClient udp.UdpClient - manager SemiPhysicalMessageManager + manager SemiPhysicalMessageManager + runConfig *config.VobcConfig } func (s *semiPhysicalTrainImpl) handleTrainControlMsg(b []byte) { @@ -38,10 +42,28 @@ func (s *semiPhysicalTrainImpl) handleTrainControlMsg(b []byte) { } func (s *semiPhysicalTrainImpl) Start(manager SemiPhysicalMessageManager) { + if manager == nil { + panic("启动半实物消息服务错误: SemiPhysicalMessageManager不能为nil") + } + if s.manager != nil { + panic("启动半实物消息服务错误: 存在正在运行的任务") + } + s.runConfig = manager.GetSemiPhysicalRunConfig() + if s.runConfig == nil || s.runConfig.Ip == "" || !s.runConfig.Open { + return + } + // 初始化客户端、服务端 + s.initSemiPhysical() s.manager = manager } func (s *semiPhysicalTrainImpl) Stop() { + if s.trainControlUdpServer != nil { + s.trainControlUdpServer.Close() + } + if s.trainSpeedInfoUdpClient != nil { + s.trainSpeedInfoUdpClient.Close() + } s.manager = nil } @@ -51,27 +73,20 @@ func (s *semiPhysicalTrainImpl) SendTrainControlMessage(info *message.DynamicsTr s.trainSpeedInfoUdpClient.Send(sendMsg.Encode()) } -func newSemiPhysicalTrain() SemiPhysicalTrain { - s := &semiPhysicalTrainImpl{ - trainSpeedInfoUdpClient: udp.NewClient(fmt.Sprintf("%v:%v", config.Config.Vobc.Ip, config.Config.Vobc.RemotePort)), - } - s.trainControlUdpServer = udp.NewServer(fmt.Sprintf(":%d", config.Config.Vobc.LocalPort), s.handleTrainControlMsg) +func (s *semiPhysicalTrainImpl) initSemiPhysical() { + s.trainSpeedInfoUdpClient = udp.NewClient(fmt.Sprintf("%v:%v", s.runConfig.Ip, s.runConfig.RemotePort)) + s.trainControlUdpServer = udp.NewServer(fmt.Sprintf(":%d", s.runConfig.LocalPort), s.handleTrainControlMsg) s.trainControlUdpServer.Listen() - return s } var _default SemiPhysicalTrain +var initMutex sync.Mutex func Default() SemiPhysicalTrain { - if !config.Config.Vobc.Open { - panic("半实物仿真接口模块未开启") + initMutex.Lock() + defer initMutex.Unlock() + if _default == nil { + _default = &semiPhysicalTrainImpl{} } return _default } - -func Init() { - if !config.Config.Vobc.Open { - return - } - _default = newSemiPhysicalTrain() -} diff --git a/third_party/third_party.go b/third_party/third_party.go index d491530..52df27f 100644 --- a/third_party/third_party.go +++ b/third_party/third_party.go @@ -1,11 +1,5 @@ package third_party -import ( - "joylink.club/bj-rtsts-server/third_party/dynamics" - "joylink.club/bj-rtsts-server/third_party/semi_physical_train" -) - func Init() { - dynamics.Init() - semi_physical_train.Init() + } diff --git a/third_party/udp/udp_client.go b/third_party/udp/udp_client.go index ddb3f94..a7e5d56 100644 --- a/third_party/udp/udp_client.go +++ b/third_party/udp/udp_client.go @@ -8,6 +8,7 @@ import ( type UdpClient interface { SendMsg(msg UdpMessageEncoder) Send(b []byte) + Close() } type client struct { @@ -77,3 +78,10 @@ func (c *client) Send(b []byte) { } // slog.Debug("udp client send", "size", n) } + +func (c *client) Close() { + err := c.conn.Close() + if err != nil { + slog.Error("udp client close error", "error", err) + } +} diff --git a/third_party/udp/udp_server.go b/third_party/udp/udp_server.go index 9566798..aea18ba 100644 --- a/third_party/udp/udp_server.go +++ b/third_party/udp/udp_server.go @@ -8,6 +8,7 @@ import ( type UdpServer interface { Listen() + Close() } type UdpMsgHandler func(b []byte) @@ -37,6 +38,13 @@ func (s *server) Listen() { go s.listenAndHandle() } +func (s *server) Close() { + err := s.conn.Close() + if err != nil { + slog.Error("udp server close error", "error", err) + } +} + func (s *server) listenAndHandle() { defer s.conn.Close() for {