【项目运行时增加第三方运行环境逻辑】

This commit is contained in:
weizhihong 2023-10-26 15:06:26 +08:00
parent 9357c17ce4
commit 472bbb2fd1
11 changed files with 416 additions and 255 deletions

View File

@ -82,6 +82,7 @@ func createByProjectId(c *gin.Context) {
if err := c.ShouldBind(&req); nil != err { if err := c.ShouldBind(&req); nil != err {
panic(sys_error.New("测试启动失败,请求参数异常", err)) panic(sys_error.New("测试启动失败,请求参数异常", err))
} }
// 地图信息
mapInfos := service.QueryProjectPublishedGi(req.ProjectId) mapInfos := service.QueryProjectPublishedGi(req.ProjectId)
if len(mapInfos) == 0 { if len(mapInfos) == 0 {
panic(sys_error.New("测试启动失败,项目未关联发布图")) panic(sys_error.New("测试启动失败,项目未关联发布图"))
@ -90,11 +91,17 @@ func createByProjectId(c *gin.Context) {
for i, mapInfo := range mapInfos { for i, mapInfo := range mapInfos {
mapIds[i] = mapInfo.ID mapIds[i] = mapInfo.ID
} }
rsp := dto.SimulationCreateRspDto{ProjectId: req.ProjectId, MapId: mapIds[0], MapIds: mapIds} // 运行环境配置
simulationId, err := simulation.CreateSimulation(req.ProjectId, mapIds) var runConfigStr string
runConfig := service.QueryRunConfig(req.ProjectRunConfigId)
if runConfig != nil {
runConfigStr = runConfig.ConfigContent
}
simulationId, err := simulation.CreateSimulation(req.ProjectId, mapIds, runConfigStr)
if err != nil { if err != nil {
panic(sys_error.New("测试启动失败", err)) panic(sys_error.New("测试启动失败", err))
} }
rsp := dto.SimulationCreateRspDto{ProjectId: req.ProjectId, MapId: mapIds[0], MapIds: mapIds}
rsp.SimulationId = simulationId rsp.SimulationId = simulationId
c.JSON(http.StatusOK, &rsp) c.JSON(http.StatusOK, &rsp)
} }

View File

@ -7,8 +7,6 @@ import (
"joylink.club/bj-rtsts-server/ats/verify/simulation/wayside/memory" "joylink.club/bj-rtsts-server/ats/verify/simulation/wayside/memory"
"joylink.club/bj-rtsts-server/config" "joylink.club/bj-rtsts-server/config"
"joylink.club/bj-rtsts-server/sys_error" "joylink.club/bj-rtsts-server/sys_error"
"joylink.club/bj-rtsts-server/third_party/dynamics"
"joylink.club/bj-rtsts-server/third_party/semi_physical_train"
"joylink.club/bj-rtsts-server/dto" "joylink.club/bj-rtsts-server/dto"
) )
@ -27,31 +25,18 @@ func IsExistSimulation() bool {
} }
// 创建仿真对象 // 创建仿真对象
func CreateSimulation(projectId int32, mapIds []int32) (string, error) { func CreateSimulation(projectId int32, mapIds []int32, runConfig string) (string, error) {
simulationId := createSimulationId(projectId) simulationId := createSimulationId(projectId)
_, e := simulationMap.Load(simulationId) _, e := simulationMap.Load(simulationId)
if !e && IsExistSimulation() { if !e && IsExistSimulation() {
return "", sys_error.New("一套环境同时只能运行一个仿真") return "", sys_error.New("一套环境同时只能运行一个仿真")
} }
if !e { if !e {
verifySimulation, err := memory.CreateSimulation(projectId, mapIds) verifySimulation, err := memory.CreateSimulation(projectId, mapIds, runConfig)
if err != nil { if err != nil {
return "", err return "", err
} }
verifySimulation.SimulationId = simulationId verifySimulation.SimulationId = simulationId
if config.Config.Dynamics.Open {
// 动力学接口调用
lineBaseInfo := verifySimulation.BuildLineBaseInfo()
err := dynamics.Default().RequestStartSimulation(lineBaseInfo)
if err != nil {
return "", err
}
dynamics.Default().Start(verifySimulation)
}
if config.Config.Vobc.Open {
// 半实物系统接口功能启动
semi_physical_train.Default().Start(verifySimulation)
}
simulationMap.Store(simulationId, verifySimulation) simulationMap.Store(simulationId, verifySimulation)
// 全部成功,启动仿真 // 全部成功,启动仿真
verifySimulation.World.StartUp() verifySimulation.World.StartUp()
@ -65,18 +50,9 @@ func DestroySimulation(simulationId string) {
if !e { if !e {
return return
} }
simulationInfo := s.(*memory.VerifySimulation)
simulationMap.Delete(simulationId) simulationMap.Delete(simulationId)
// 停止ecs world simulationInfo := s.(*memory.VerifySimulation)
simulationInfo.World.Close() simulationInfo.StopSimulation()
if config.Config.Dynamics.Open {
// 停止动力学接口功能
dynamics.Default().Stop()
dynamics.Default().RequestStopSimulation()
}
if config.Config.Vobc.Open {
semi_physical_train.Default().Stop()
}
} }
func createSimulationId(projectId int32) string { func createSimulationId(projectId int32) string {

View File

@ -1,6 +1,7 @@
package memory package memory
import ( import (
"encoding/json"
"fmt" "fmt"
"log/slog" "log/slog"
"math" "math"
@ -15,9 +16,11 @@ import (
"joylink.club/bj-rtsts-server/ats/verify/protos/graphicData" "joylink.club/bj-rtsts-server/ats/verify/protos/graphicData"
"joylink.club/bj-rtsts-server/ats/verify/protos/state" "joylink.club/bj-rtsts-server/ats/verify/protos/state"
"joylink.club/bj-rtsts-server/config"
"joylink.club/bj-rtsts-server/dto" "joylink.club/bj-rtsts-server/dto"
"joylink.club/bj-rtsts-server/sys_error" "joylink.club/bj-rtsts-server/sys_error"
"joylink.club/bj-rtsts-server/third_party/dynamics" "joylink.club/bj-rtsts-server/third_party/dynamics"
"joylink.club/bj-rtsts-server/third_party/interlock"
"joylink.club/bj-rtsts-server/third_party/message" "joylink.club/bj-rtsts-server/third_party/message"
"joylink.club/bj-rtsts-server/third_party/semi_physical_train" "joylink.club/bj-rtsts-server/third_party/semi_physical_train"
"joylink.club/ecs" "joylink.club/ecs"
@ -38,11 +41,12 @@ type VerifySimulation struct {
Memory *WaysideMemory Memory *WaysideMemory
//模型仓库 //模型仓库
Repo *repository.Repository Repo *repository.Repository
//Rtss仿真世界的id //Rtss仿真世界的
WorldId ecs.WorldId World ecs.World
World ecs.World
//设备UID映射 //设备UID映射
uidMap map[string]*elementIdStructure uidMap map[string]*elementIdStructure
// 运行环境配置
runConfig *config.ThridPartyConfig
} }
// 轨旁仿真内存模型 // 轨旁仿真内存模型
@ -89,51 +93,32 @@ func NewWaysideMemory() *WaysideMemory {
} }
// 创建仿真对象 // 创建仿真对象
func CreateSimulation(projectId int32, mapIds []int32) (*VerifySimulation, error) { func CreateSimulation(projectId int32, mapIds []int32, runConfig string) (*VerifySimulation, error) {
//构建Repository // 地图信息
sort.Slice(mapIds, func(i, j int) bool { sort.Slice(mapIds, func(i, j int) bool {
return mapIds[i] < mapIds[j] return mapIds[i] < mapIds[j]
}) })
var mapIdStrSlice []string verifySimulation := &VerifySimulation{ProjectId: projectId, MapIds: mapIds}
for _, id := range mapIds { // 设置运行环境
mapIdStrSlice = append(mapIdStrSlice, strconv.Itoa(int(id))) err := verifySimulation.initRunConfig(runConfig)
}
repoId := strings.Join(mapIdStrSlice, "|")
repoVersion := "0.1"
repo := repository.FindRepository(repoId, repoVersion)
if repo == nil {
protoRepo, err := buildProtoRepository(mapIds)
if err != nil {
return nil, sys_error.New("数据错误", err)
}
protoRepo.Id, protoRepo.Version = repoId, repoVersion
newRepo, err := repository.BuildRepository(protoRepo)
if err != nil {
return nil, sys_error.New("数据错误", err)
}
repo = newRepo
}
// 构建所有UID映射关系
allUidMap := buildRepositoryAllUidsMap(mapIds, repo)
//创建仿真
w, err := rtss_simulation.NewSimulation(repo)
if err != nil { if err != nil {
return nil, sys_error.New("仿真创建失败", err) return nil, err
} }
verifySimulation := &VerifySimulation{ // 构建Repository
MapIds: mapIds, err = verifySimulation.initRepository()
ProjectId: projectId, if err != nil {
Memory: NewWaysideMemory(), return nil, err
Repo: repo, }
World: w, // 创建world
WorldId: w.Id(), err = verifySimulation.initWorld()
uidMap: allUidMap, if err != nil {
return nil, err
}
// 运行第三方服务
err = verifySimulation.runThirdParty()
if err != nil {
return nil, err
} }
// 保证World关闭
runtime.SetFinalizer(verifySimulation, func(verifySimulation *VerifySimulation) {
slog.Info("---关闭仿真World---")
verifySimulation.World.Close()
})
return verifySimulation, nil return verifySimulation, nil
} }
@ -221,71 +206,6 @@ func (s *VerifySimulation) GetComIdByUid(uid string) string {
return es[uid].CommonId return es[uid].CommonId
} }
func (sim *VerifySimulation) BuildLineBaseInfo() *message.LineBaseInfo {
info := &message.LineBaseInfo{}
for _, model := range sim.Repo.LinkList() {
id, _ := strconv.Atoi(model.Id())
link := &message.Link{
ID: int32(id),
Len: int32(model.Length()),
}
info.LinkList = append(info.LinkList, link)
if model.ARelation() != nil {
turnoutId, _ := strconv.Atoi(sim.GetComIdByUid(model.ARelation().Device().Id()))
link.ARelTurnoutId = int32(turnoutId)
switch model.ARelation().Port() {
case proto.Port_A:
link.ARelTurnoutPoint = "A"
case proto.Port_B:
link.ARelTurnoutPoint = "B"
case proto.Port_C:
link.ARelTurnoutPoint = "C"
}
}
if model.BRelation() != nil {
turnoutId, _ := strconv.Atoi(sim.GetComIdByUid(model.BRelation().Device().Id()))
link.BRelTurnoutId = int32(turnoutId)
switch model.BRelation().Port() {
case proto.Port_A:
link.BRelTurnoutPoint = "A"
case proto.Port_B:
link.BRelTurnoutPoint = "B"
case proto.Port_C:
link.BRelTurnoutPoint = "C"
}
}
}
for _, model := range sim.Repo.SlopeList() {
id, _ := strconv.Atoi(sim.GetComIdByUid(model.Id()))
slope := &message.Slope{
ID: int32(id),
StartLinkOffset: int32(model.StartLinkPosition().Offset()),
EndLinkOffset: int32(model.EndLinkPosition().Offset()),
DegreeTrig: model.Degree(),
}
info.SlopeList = append(info.SlopeList, slope)
startLinkId, _ := strconv.Atoi(model.StartLinkPosition().Link().Id())
slope.StartLinkId = int32(startLinkId)
endLinkId, _ := strconv.Atoi(model.EndLinkPosition().Link().Id())
slope.EndLinkId = int32(endLinkId)
}
for _, model := range sim.Repo.SectionalCurvatureList() {
id, _ := strconv.Atoi(sim.GetComIdByUid(model.Id()))
curve := &message.Curve{
ID: int32(id),
StartLinkOffset: int32(model.StartLinkPosition().Offset()),
EndLinkOffset: int32(model.EndLinkPosition().Offset()),
Curvature: model.Radius(),
}
info.CurveList = append(info.CurveList, curve)
startLinkId, _ := strconv.Atoi(model.StartLinkPosition().Link().Id())
curve.StartLinkId = int32(startLinkId)
endLinkId, _ := strconv.Atoi(model.EndLinkPosition().Link().Id())
curve.EndLinkId = int32(endLinkId)
}
return info
}
// 采集动力学道岔状态 // 采集动力学道岔状态
func (s *VerifySimulation) CollectDynamicsTurnoutInfo() []*message.DynamicsTurnoutInfo { func (s *VerifySimulation) CollectDynamicsTurnoutInfo() []*message.DynamicsTurnoutInfo {
stateSlice := GetAllTurnoutState(s) stateSlice := GetAllTurnoutState(s)
@ -363,6 +283,81 @@ func convert(info *message.DynamicsTrainInfo, sta *state.TrainState, simulation
return sta return sta
} }
// 获取动力学配置信息
func (s *VerifySimulation) GetDynamicsRunConfig() *config.DynamicsConfig {
// TODO:目前为了兼容当前配置方式做成查询方式后删除IP判断
if config.Config.Dynamics.Ip != "" {
return &config.Config.Dynamics
}
return &s.runConfig.Dynamics
}
// 获取动力学运行资源
func (s *VerifySimulation) GetDynamicsRunRepository() *message.LineBaseInfo {
info := &message.LineBaseInfo{}
for _, model := range s.Repo.LinkList() {
id, _ := strconv.Atoi(model.Id())
link := &message.Link{
ID: int32(id),
Len: int32(model.Length()),
}
info.LinkList = append(info.LinkList, link)
if model.ARelation() != nil {
turnoutId, _ := strconv.Atoi(s.GetComIdByUid(model.ARelation().Device().Id()))
link.ARelTurnoutId = int32(turnoutId)
switch model.ARelation().Port() {
case proto.Port_A:
link.ARelTurnoutPoint = "A"
case proto.Port_B:
link.ARelTurnoutPoint = "B"
case proto.Port_C:
link.ARelTurnoutPoint = "C"
}
}
if model.BRelation() != nil {
turnoutId, _ := strconv.Atoi(s.GetComIdByUid(model.BRelation().Device().Id()))
link.BRelTurnoutId = int32(turnoutId)
switch model.BRelation().Port() {
case proto.Port_A:
link.BRelTurnoutPoint = "A"
case proto.Port_B:
link.BRelTurnoutPoint = "B"
case proto.Port_C:
link.BRelTurnoutPoint = "C"
}
}
}
for _, model := range s.Repo.SlopeList() {
id, _ := strconv.Atoi(s.GetComIdByUid(model.Id()))
slope := &message.Slope{
ID: int32(id),
StartLinkOffset: int32(model.StartLinkPosition().Offset()),
EndLinkOffset: int32(model.EndLinkPosition().Offset()),
DegreeTrig: model.Degree(),
}
info.SlopeList = append(info.SlopeList, slope)
startLinkId, _ := strconv.Atoi(model.StartLinkPosition().Link().Id())
slope.StartLinkId = int32(startLinkId)
endLinkId, _ := strconv.Atoi(model.EndLinkPosition().Link().Id())
slope.EndLinkId = int32(endLinkId)
}
for _, model := range s.Repo.SectionalCurvatureList() {
id, _ := strconv.Atoi(s.GetComIdByUid(model.Id()))
curve := &message.Curve{
ID: int32(id),
StartLinkOffset: int32(model.StartLinkPosition().Offset()),
EndLinkOffset: int32(model.EndLinkPosition().Offset()),
Curvature: model.Radius(),
}
info.CurveList = append(info.CurveList, curve)
startLinkId, _ := strconv.Atoi(model.StartLinkPosition().Link().Id())
curve.StartLinkId = int32(startLinkId)
endLinkId, _ := strconv.Atoi(model.EndLinkPosition().Link().Id())
curve.EndLinkId = int32(endLinkId)
}
return info
}
// 发送给前端的速度格式化 // 发送给前端的速度格式化
func speedParse(speed float32) int32 { func speedParse(speed float32) int32 {
return int32(math.Abs(float64(speed * 3.6 * 100))) return int32(math.Abs(float64(speed * 3.6 * 100)))
@ -390,6 +385,15 @@ func (s *VerifySimulation) HandleSemiPhysicalTrainControlMsg(b []byte) {
}) })
} }
// 获取半实物运行配置信息
func (s *VerifySimulation) GetSemiPhysicalRunConfig() *config.VobcConfig {
// TODO:目前为了兼容当前配置方式做成查询方式后删除IP判断
if config.Config.Vobc.Ip != "" {
return &config.Config.Vobc
}
return &s.runConfig.Vobc
}
// 处理接到的联锁消息 // 处理接到的联锁消息
func (s *VerifySimulation) HandleDriverInfo(b []byte) { func (s *VerifySimulation) HandleDriverInfo(b []byte) {
driverMsg := message.NewInterlockReceiveMsgPkg(0, 128, 8*131) driverMsg := message.NewInterlockReceiveMsgPkg(0, 128, 8*131)
@ -402,6 +406,15 @@ func (s *VerifySimulation) HandleDriverInfo(b []byte) {
} }
} }
// 获取联锁配置
func (s *VerifySimulation) GetInterlockRunConfig() *config.InterlockConfig {
// TODO:目前为了兼容当前配置方式做成查询方式后删除IP判断
if config.Config.Interlock.Ip != "" {
return &config.Config.Interlock
}
return &s.runConfig.Interlock
}
// 采集联锁中的继电器消息 // 采集联锁中的继电器消息
func (s *VerifySimulation) CollectRelayInfo() *message.InterlockSendMsgPkg { func (s *VerifySimulation) CollectRelayInfo() *message.InterlockSendMsgPkg {
msg := &message.InterlockSendMsgPkg{} msg := &message.InterlockSendMsgPkg{}
@ -417,6 +430,91 @@ func (s *VerifySimulation) CollectRelayInfo() *message.InterlockSendMsgPkg {
return msg return msg
} }
// 初始化仿真运行配置
func (s *VerifySimulation) initRunConfig(configStr string) error {
if configStr == "" {
return nil
}
var configMap config.ThridPartyConfig
err := json.Unmarshal([]byte(configStr), &configMap)
if err != nil {
return sys_error.New("配置信息格式错误", err)
}
s.runConfig = &configMap
return nil
}
// 初始化运行资源
func (s *VerifySimulation) initRepository() error {
// 构建Repository
var mapIdStrSlice []string
for _, id := range s.MapIds {
mapIdStrSlice = append(mapIdStrSlice, strconv.Itoa(int(id)))
}
repoId := strings.Join(mapIdStrSlice, "|")
repoVersion := "0.1"
repo := repository.FindRepository(repoId, repoVersion)
if repo == nil {
protoRepo, err := buildProtoRepository(s.MapIds)
if err != nil {
return sys_error.New("数据错误", err)
}
protoRepo.Id, protoRepo.Version = repoId, repoVersion
newRepo, err := repository.BuildRepository(protoRepo)
if err != nil {
return sys_error.New("数据错误", err)
}
repo = newRepo
}
s.Repo = repo
s.Memory = NewWaysideMemory()
// 构建所有UID映射关系
s.uidMap = buildRepositoryAllUidsMap(s.MapIds, s.Repo)
return nil
}
// 创建world
func (s *VerifySimulation) initWorld() error {
//创建仿真
w, err := rtss_simulation.NewSimulation(s.Repo)
if err != nil {
return sys_error.New("仿真创建失败", err)
}
s.World = w
// 保证World关闭
runtime.SetFinalizer(s, func(verifySimulation *VerifySimulation) {
slog.Info("---关闭仿真World---")
verifySimulation.World.Close()
})
return nil
}
// 运行仿真第三方模块
func (s *VerifySimulation) runThirdParty() error {
// 动力学启动
err := dynamics.Default().Start(s)
if err != nil {
return err
}
// 半实物启动
semi_physical_train.Default().Start(s)
// 联锁启动
interlock.Default().Start(s)
return nil
}
// 停止仿真
func (s *VerifySimulation) StopSimulation() {
// 停止ecs world
s.World.Close()
// 停止动力学接口功能
dynamics.Default().Stop()
// 停止半实物
semi_physical_train.Default().Stop()
// 联锁启动
interlock.Default().Stop()
}
func buildProtoRepository(mapIds []int32) (*proto.Repository, error) { func buildProtoRepository(mapIds []int32) (*proto.Repository, error) {
repo := &proto.Repository{} repo := &proto.Repository{}
var exceptStationGiMapIds []int32 var exceptStationGiMapIds []int32

View File

@ -18,9 +18,9 @@ type AppConfig struct {
Datasource datasource Datasource datasource
Logging log Logging log
Messaging messaging Messaging messaging
Dynamics dynamics Dynamics DynamicsConfig
Vobc vobc Vobc VobcConfig
Interlock interlock Interlock InterlockConfig
} }
type server struct { type server struct {
Port int Port int
@ -49,26 +49,32 @@ type centrifugo struct {
ApiEndpoint string ApiEndpoint string
Address string Address string
} }
type dynamics struct {
Ip string
UdpLocalPort int
UdpRemotePort int
UdpRemoteTrainPort int
HttpPort int
Open bool
}
type vobc struct {
Ip string
LocalPort int
RemotePort int
Open bool
}
type interlock struct { // 第三方配置结构
Ip string type ThridPartyConfig struct {
LocalPort int Dynamics DynamicsConfig `json:"dynamics"`
RemotePort int Vobc VobcConfig `json:"vobc"`
Open bool Interlock InterlockConfig `json:"interlock"`
}
type DynamicsConfig struct {
Ip string `json:"ip"`
UdpLocalPort int `json:"udpLocalPort"`
UdpRemotePort int `json:"udpRemotePort"`
UdpRemoteTrainPort int `json:"udpRemoteTrainPort"`
HttpPort int `json:"httpPort"`
Open bool `json:"open"`
}
type VobcConfig struct {
Ip string `json:"ip"`
LocalPort int `json:"localPort"`
RemotePort int `json:"remotePort"`
Open bool `json:"open"`
}
type InterlockConfig struct {
Ip string `json:"ip"`
LocalPort int `json:"localPort"`
RemotePort int `json:"remotePort"`
Open bool `json:"open"`
} }
var Config AppConfig var Config AppConfig

View File

@ -58,6 +58,21 @@ func QueryProjectRunConfig(id int32) *dto.ProjectRunConfigDto {
return dto.ConvertToRunConfigDto(query) return dto.ConvertToRunConfigDto(query)
} }
// 查询项目运行环境
func QueryRunConfig(id int32) *dto.ProjectRunConfigDto {
if id == 0 {
return nil
}
query, err := dbquery.ProjectRunConfig.Where(dbquery.ProjectRunConfig.ID.Eq(id)).Find()
if err != nil {
panic(sys_error.New("查询失败", err))
}
if len(query) == 0 {
return nil
}
return dto.ConvertToRunConfigDto(query[0])
}
// 更新项目运行环境 // 更新项目运行环境
func UpdateProjectRunConfig(id int32, dd *dto.ProjectRunConfigReqDto) bool { func UpdateProjectRunConfig(id int32, dd *dto.ProjectRunConfigReqDto) bool {
findOldQuery := dbquery.ProjectRunConfig findOldQuery := dbquery.ProjectRunConfig

View File

@ -8,6 +8,7 @@ import (
"log/slog" "log/slog"
"net/http" "net/http"
"runtime/debug" "runtime/debug"
"sync"
"time" "time"
"joylink.club/bj-rtsts-server/config" "joylink.club/bj-rtsts-server/config"
@ -19,21 +20,19 @@ import (
type DynamicsMessageManager interface { type DynamicsMessageManager interface {
CollectDynamicsTurnoutInfo() []*message.DynamicsTurnoutInfo CollectDynamicsTurnoutInfo() []*message.DynamicsTurnoutInfo
HandleDynamicsTrainInfo(info *message.DynamicsTrainInfo) HandleDynamicsTrainInfo(info *message.DynamicsTrainInfo)
GetDynamicsRunConfig() *config.DynamicsConfig
GetDynamicsRunRepository() *message.LineBaseInfo
} }
// 动力学接口 // 动力学接口
type Dynamics interface { type Dynamics interface {
// 请求启动仿真
RequestStartSimulation(base *message.LineBaseInfo) error
// 请求停止仿真
RequestStopSimulation() error
// 请求添加列车 // 请求添加列车
RequestAddTrain(info *message.InitTrainInfo) error RequestAddTrain(info *message.InitTrainInfo) error
// 请求移除列车 // 请求移除列车
RequestRemoveTrain(req *message.RemoveTrainReq) error RequestRemoveTrain(req *message.RemoveTrainReq) error
// 启动动力学消息功能 // 启动动力学消息功能
Start(manager DynamicsMessageManager) Start(manager DynamicsMessageManager) error
// 停止动力学消息功能 // 停止动力学消息功能
Stop() Stop()
@ -42,22 +41,17 @@ type Dynamics interface {
} }
var _default Dynamics var _default Dynamics
var initMutex sync.Mutex
func Default() Dynamics { func Default() Dynamics {
if !config.Config.Dynamics.Open { initMutex.Lock()
panic("动力学接口模块未开启") defer initMutex.Unlock()
if _default == nil {
_default = &dynamics{}
} }
return _default return _default
} }
func Init() {
if !config.Config.Dynamics.Open {
return
}
slog.Info("初始化动力学接口模块")
_default = newDynamics()
}
type dynamics struct { type dynamics struct {
trainInfoUdpServer udp.UdpServer trainInfoUdpServer udp.UdpServer
turnoutStateUdpClient udp.UdpClient turnoutStateUdpClient udp.UdpClient
@ -67,20 +61,7 @@ type dynamics struct {
httpClient *http.Client httpClient *http.Client
manager DynamicsMessageManager manager DynamicsMessageManager
turnoutTaskCancel context.CancelFunc turnoutTaskCancel context.CancelFunc
} runConfig *config.DynamicsConfig
func newDynamics() Dynamics {
d := &dynamics{
turnoutStateUdpClient: udp.NewClient(fmt.Sprintf("%v:%v", config.Config.Dynamics.Ip, config.Config.Dynamics.UdpRemotePort)),
trainControlUdpClient: udp.NewClient(fmt.Sprintf("%v:%v", config.Config.Dynamics.Ip, config.Config.Dynamics.UdpRemoteTrainPort)),
baseUrl: getUrlBase(),
httpClient: &http.Client{
Timeout: time.Second * 5,
},
}
d.trainInfoUdpServer = udp.NewServer(fmt.Sprintf(":%d", config.Config.Dynamics.UdpLocalPort), d.handleDynamicsTrainInfo)
d.trainInfoUdpServer.Listen()
return d
} }
// 解码列车信息并处理 // 解码列车信息并处理
@ -96,11 +77,11 @@ func (d *dynamics) handleDynamicsTrainInfo(b []byte) {
} }
} }
func getUrlBase() string { func getUrlBase(c *config.DynamicsConfig) string {
ip := config.Config.Dynamics.Ip ip := c.Ip
var port string var port string
if config.Config.Dynamics.HttpPort != 0 { if c.HttpPort != 0 {
port = fmt.Sprintf(":%d", config.Config.Dynamics.HttpPort) port = fmt.Sprintf(":%d", c.HttpPort)
} }
urlBase := "http://" + ip + port urlBase := "http://" + ip + port
return urlBase return urlBase
@ -110,8 +91,8 @@ func (d *dynamics) buildUrl(uri string) string {
return d.baseUrl + uri return d.baseUrl + uri
} }
func (d *dynamics) RequestStartSimulation(base *message.LineBaseInfo) error { func (d *dynamics) requestStartSimulation(base *message.LineBaseInfo) error {
if !config.Config.Dynamics.Open { if !d.runConfig.Open {
return nil return nil
} }
url := d.buildUrl("/api/start/") url := d.buildUrl("/api/start/")
@ -130,8 +111,8 @@ func (d *dynamics) RequestStartSimulation(base *message.LineBaseInfo) error {
return nil return nil
} }
func (d *dynamics) RequestStopSimulation() error { func (d *dynamics) requestStopSimulation() error {
if !config.Config.Dynamics.Open { if !d.runConfig.Open {
return nil return nil
} }
url := d.buildUrl("/api/end/") url := d.buildUrl("/api/end/")
@ -149,7 +130,7 @@ func (d *dynamics) RequestStopSimulation() error {
} }
func (d *dynamics) RequestAddTrain(info *message.InitTrainInfo) error { func (d *dynamics) RequestAddTrain(info *message.InitTrainInfo) error {
if !config.Config.Dynamics.Open { if !d.runConfig.Open {
return nil return nil
} }
url := d.buildUrl("/api/aerodynamics/init/train/") url := d.buildUrl("/api/aerodynamics/init/train/")
@ -168,7 +149,7 @@ func (d *dynamics) RequestAddTrain(info *message.InitTrainInfo) error {
} }
func (d *dynamics) RequestRemoveTrain(req *message.RemoveTrainReq) error { func (d *dynamics) RequestRemoveTrain(req *message.RemoveTrainReq) error {
if !config.Config.Dynamics.Open { if !d.runConfig.Open {
return nil return nil
} }
url := d.buildUrl("/api/aerodynamics/remove/train/") url := d.buildUrl("/api/aerodynamics/remove/train/")
@ -186,20 +167,66 @@ func (d *dynamics) RequestRemoveTrain(req *message.RemoveTrainReq) error {
return nil return nil
} }
func (d *dynamics) Start(manager DynamicsMessageManager) { func (d *dynamics) Start(manager DynamicsMessageManager) error {
if manager == nil { if manager == nil {
panic("启动动力学消息服务错误: DynamicsMessageManager不能为nil") panic("启动动力学消息服务错误: DynamicsMessageManager不能为nil")
} }
if d.manager != nil { if d.manager != nil {
panic("启动动力学消息服务错误: 存在正在运行的任务") panic("启动动力学消息服务错误: 存在正在运行的任务")
} }
d.runConfig = manager.GetDynamicsRunConfig()
if d.runConfig == nil || d.runConfig.Ip == "" || !d.runConfig.Open {
return nil
}
d.manager = manager d.manager = manager
// 初始化客户端信息
d.initDynamics()
// 初始化运行资源
err := d.initDynamicsRunRepository()
if err != nil {
panic("启动动力学消息服务错误: 存在正在运行的任务")
}
ctx, cancle := context.WithCancel(context.Background()) ctx, cancle := context.WithCancel(context.Background())
go d.sendTurnoutStateTask(ctx) go d.sendTurnoutStateTask(ctx)
d.turnoutTaskCancel = cancle d.turnoutTaskCancel = cancle
return nil
}
// 初始化客户端、服务等信息
func (d *dynamics) initDynamics() {
d.turnoutStateUdpClient = udp.NewClient(fmt.Sprintf("%v:%v", d.runConfig.Ip, d.runConfig.UdpRemotePort))
d.trainControlUdpClient = udp.NewClient(fmt.Sprintf("%v:%v", d.runConfig.Ip, d.runConfig.UdpRemoteTrainPort))
d.baseUrl = getUrlBase(d.runConfig)
d.httpClient = &http.Client{Timeout: time.Second * 5}
d.trainInfoUdpServer = udp.NewServer(fmt.Sprintf(":%d", d.runConfig.UdpLocalPort), d.handleDynamicsTrainInfo)
d.trainInfoUdpServer.Listen()
}
// 动力学运行所需数据
func (d *dynamics) initDynamicsRunRepository() error {
// 动力学接口调用
lineBaseInfo := d.manager.GetDynamicsRunRepository()
err := d.requestStartSimulation(lineBaseInfo)
if err != nil {
return err
}
return nil
} }
func (d *dynamics) Stop() { func (d *dynamics) Stop() {
if d.httpClient != nil {
d.requestStopSimulation()
d.httpClient = nil
}
if d.turnoutStateUdpClient != nil {
d.turnoutStateUdpClient.Close()
}
if d.trainControlUdpClient != nil {
d.trainControlUdpClient.Close()
}
if d.trainInfoUdpServer != nil {
d.trainInfoUdpServer.Close()
}
if d.turnoutTaskCancel != nil { if d.turnoutTaskCancel != nil {
d.turnoutTaskCancel() d.turnoutTaskCancel()
d.manager = nil d.manager = nil

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"log/slog" "log/slog"
"runtime/debug" "runtime/debug"
"sync"
"time" "time"
"joylink.club/bj-rtsts-server/config" "joylink.club/bj-rtsts-server/config"
@ -16,6 +17,7 @@ import (
type InterlockMessageManager interface { type InterlockMessageManager interface {
CollectRelayInfo() *message.InterlockSendMsgPkg CollectRelayInfo() *message.InterlockSendMsgPkg
HandleDriverInfo(b []byte) HandleDriverInfo(b []byte)
GetInterlockRunConfig() *config.InterlockConfig
} }
// 联锁接口 // 联锁接口
@ -29,10 +31,13 @@ type InterlockProxy interface {
} }
var _default InterlockProxy var _default InterlockProxy
var initMutex sync.Mutex
func Default() InterlockProxy { func Default() InterlockProxy {
if !config.Config.Interlock.Open { // TODO initMutex.Lock()
panic("联锁接口模块未开启") defer initMutex.Unlock()
if _default == nil { // TODO
_default = &interlockProxy{}
} }
return _default return _default
} }
@ -43,27 +48,34 @@ type interlockProxy struct {
manager InterlockMessageManager manager InterlockMessageManager
collectInfoTaskCancel context.CancelFunc collectInfoTaskCancel context.CancelFunc
runConfig *config.InterlockConfig
} }
// 驱动信息进行转发 // 驱动信息进行转发
func (d *interlockProxy) handleDriverInfo(b []byte) { func (i *interlockProxy) handleDriverInfo(b []byte) {
handler := d.manager handler := i.manager
if handler != nil { if handler != nil {
handler.HandleDriverInfo(b) handler.HandleDriverInfo(b)
} }
} }
func (d *interlockProxy) Start(manager InterlockMessageManager) { func (i *interlockProxy) Start(manager InterlockMessageManager) {
if manager == nil { if manager == nil {
panic("启动联锁消息服务错误: InterlockMessageManager不能为nil") panic("启动联锁消息服务错误: InterlockMessageManager不能为nil")
} }
if d.manager != nil { if i.manager != nil {
panic("启动联锁消息服务错误: 存在正在运行的任务") panic("启动联锁消息服务错误: 存在正在运行的任务")
} }
d.manager = manager i.runConfig = manager.GetInterlockRunConfig()
if i.runConfig == nil || i.runConfig.Ip == "" || !i.runConfig.Open {
return
}
i.manager = manager
// 初始化客户端、服务端
i.initInterlockProxy()
ctx, cancle := context.WithCancel(context.Background()) ctx, cancle := context.WithCancel(context.Background())
go d.collectInfoStateTask(ctx) go i.collectInfoStateTask(ctx)
d.collectInfoTaskCancel = cancle i.collectInfoTaskCancel = cancle
} }
const ( const (
@ -72,7 +84,7 @@ const (
) )
// 定时发送采集电路状态任务 // 定时发送采集电路状态任务
func (d *interlockProxy) collectInfoStateTask(ctx context.Context) { func (i *interlockProxy) collectInfoStateTask(ctx context.Context) {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
slog.Error("定时发送道岔状态任务异常", "error", err, "stack", string(debug.Stack())) slog.Error("定时发送道岔状态任务异常", "error", err, "stack", string(debug.Stack()))
@ -85,36 +97,31 @@ func (d *interlockProxy) collectInfoStateTask(ctx context.Context) {
return return
default: default:
} }
collectInfoStates := d.manager.CollectRelayInfo() collectInfoStates := i.manager.CollectRelayInfo()
d.sendCollectUdpClient.SendMsg(collectInfoStates) i.sendCollectUdpClient.SendMsg(collectInfoStates)
time.Sleep(time.Millisecond * InterlockMessageSendInterval) time.Sleep(time.Millisecond * InterlockMessageSendInterval)
} }
} }
func (d *interlockProxy) Stop() { func (i *interlockProxy) Stop() {
if d.collectInfoTaskCancel != nil { if i.sendCollectUdpClient != nil {
d.collectInfoTaskCancel() i.sendCollectUdpClient.Close()
d.manager = nil }
if i.driveInfoUdpServer != nil {
i.driveInfoUdpServer.Close()
}
if i.collectInfoTaskCancel != nil {
i.collectInfoTaskCancel()
i.manager = nil
} }
} }
func (d *interlockProxy) SendCollectMessage(b []byte) { func (i *interlockProxy) SendCollectMessage(b []byte) {
d.sendCollectUdpClient.Send(b) i.sendCollectUdpClient.Send(b)
} }
func newInterlockProxy() *interlockProxy { func (i *interlockProxy) initInterlockProxy() {
d := &interlockProxy{ i.sendCollectUdpClient = udp.NewClient(fmt.Sprintf("%v:%v", i.runConfig.Ip, i.runConfig.RemotePort))
sendCollectUdpClient: udp.NewClient(fmt.Sprintf("%v:%v", config.Config.Interlock.Ip, config.Config.Interlock.RemotePort)), i.driveInfoUdpServer = udp.NewServer(fmt.Sprintf(":%d", i.runConfig.LocalPort), i.handleDriverInfo)
} i.driveInfoUdpServer.Listen()
d.driveInfoUdpServer = udp.NewServer(fmt.Sprintf(":%d", config.Config.Interlock.LocalPort), d.handleDriverInfo)
d.driveInfoUdpServer.Listen()
return d
}
func Init() {
if !config.Config.Interlock.Open { // TODO
return
}
slog.Info("初始化联锁接口模块")
_default = newInterlockProxy()
} }

View File

@ -2,6 +2,7 @@ package semi_physical_train
import ( import (
"fmt" "fmt"
"sync"
"joylink.club/bj-rtsts-server/config" "joylink.club/bj-rtsts-server/config"
"joylink.club/bj-rtsts-server/third_party/message" "joylink.club/bj-rtsts-server/third_party/message"
@ -21,13 +22,16 @@ type SemiPhysicalTrain interface {
type SemiPhysicalMessageManager interface { type SemiPhysicalMessageManager interface {
// 处理半实物仿真列车控制消息 // 处理半实物仿真列车控制消息
HandleSemiPhysicalTrainControlMsg(b []byte) HandleSemiPhysicalTrainControlMsg(b []byte)
// 获取半实物启动参数
GetSemiPhysicalRunConfig() *config.VobcConfig
} }
type semiPhysicalTrainImpl struct { type semiPhysicalTrainImpl struct {
trainControlUdpServer udp.UdpServer trainControlUdpServer udp.UdpServer
trainSpeedInfoUdpClient udp.UdpClient trainSpeedInfoUdpClient udp.UdpClient
manager SemiPhysicalMessageManager manager SemiPhysicalMessageManager
runConfig *config.VobcConfig
} }
func (s *semiPhysicalTrainImpl) handleTrainControlMsg(b []byte) { func (s *semiPhysicalTrainImpl) handleTrainControlMsg(b []byte) {
@ -38,10 +42,28 @@ func (s *semiPhysicalTrainImpl) handleTrainControlMsg(b []byte) {
} }
func (s *semiPhysicalTrainImpl) Start(manager SemiPhysicalMessageManager) { func (s *semiPhysicalTrainImpl) Start(manager SemiPhysicalMessageManager) {
if manager == nil {
panic("启动半实物消息服务错误: SemiPhysicalMessageManager不能为nil")
}
if s.manager != nil {
panic("启动半实物消息服务错误: 存在正在运行的任务")
}
s.runConfig = manager.GetSemiPhysicalRunConfig()
if s.runConfig == nil || s.runConfig.Ip == "" || !s.runConfig.Open {
return
}
// 初始化客户端、服务端
s.initSemiPhysical()
s.manager = manager s.manager = manager
} }
func (s *semiPhysicalTrainImpl) Stop() { func (s *semiPhysicalTrainImpl) Stop() {
if s.trainControlUdpServer != nil {
s.trainControlUdpServer.Close()
}
if s.trainSpeedInfoUdpClient != nil {
s.trainSpeedInfoUdpClient.Close()
}
s.manager = nil s.manager = nil
} }
@ -51,27 +73,20 @@ func (s *semiPhysicalTrainImpl) SendTrainControlMessage(info *message.DynamicsTr
s.trainSpeedInfoUdpClient.Send(sendMsg.Encode()) s.trainSpeedInfoUdpClient.Send(sendMsg.Encode())
} }
func newSemiPhysicalTrain() SemiPhysicalTrain { func (s *semiPhysicalTrainImpl) initSemiPhysical() {
s := &semiPhysicalTrainImpl{ s.trainSpeedInfoUdpClient = udp.NewClient(fmt.Sprintf("%v:%v", s.runConfig.Ip, s.runConfig.RemotePort))
trainSpeedInfoUdpClient: udp.NewClient(fmt.Sprintf("%v:%v", config.Config.Vobc.Ip, config.Config.Vobc.RemotePort)), s.trainControlUdpServer = udp.NewServer(fmt.Sprintf(":%d", s.runConfig.LocalPort), s.handleTrainControlMsg)
}
s.trainControlUdpServer = udp.NewServer(fmt.Sprintf(":%d", config.Config.Vobc.LocalPort), s.handleTrainControlMsg)
s.trainControlUdpServer.Listen() s.trainControlUdpServer.Listen()
return s
} }
var _default SemiPhysicalTrain var _default SemiPhysicalTrain
var initMutex sync.Mutex
func Default() SemiPhysicalTrain { func Default() SemiPhysicalTrain {
if !config.Config.Vobc.Open { initMutex.Lock()
panic("半实物仿真接口模块未开启") defer initMutex.Unlock()
if _default == nil {
_default = &semiPhysicalTrainImpl{}
} }
return _default return _default
} }
func Init() {
if !config.Config.Vobc.Open {
return
}
_default = newSemiPhysicalTrain()
}

View File

@ -1,11 +1,5 @@
package third_party package third_party
import (
"joylink.club/bj-rtsts-server/third_party/dynamics"
"joylink.club/bj-rtsts-server/third_party/semi_physical_train"
)
func Init() { func Init() {
dynamics.Init()
semi_physical_train.Init()
} }

View File

@ -8,6 +8,7 @@ import (
type UdpClient interface { type UdpClient interface {
SendMsg(msg UdpMessageEncoder) SendMsg(msg UdpMessageEncoder)
Send(b []byte) Send(b []byte)
Close()
} }
type client struct { type client struct {
@ -77,3 +78,10 @@ func (c *client) Send(b []byte) {
} }
// slog.Debug("udp client send", "size", n) // slog.Debug("udp client send", "size", n)
} }
func (c *client) Close() {
err := c.conn.Close()
if err != nil {
slog.Error("udp client close error", "error", err)
}
}

View File

@ -8,6 +8,7 @@ import (
type UdpServer interface { type UdpServer interface {
Listen() Listen()
Close()
} }
type UdpMsgHandler func(b []byte) type UdpMsgHandler func(b []byte)
@ -37,6 +38,13 @@ func (s *server) Listen() {
go s.listenAndHandle() go s.listenAndHandle()
} }
func (s *server) Close() {
err := s.conn.Close()
if err != nil {
slog.Error("udp server close error", "error", err)
}
}
func (s *server) listenAndHandle() { func (s *server) listenAndHandle() {
defer s.conn.Close() defer s.conn.Close()
for { for {