[重写]北京12号线计轴通信交互逻辑(未完,主要是安全校验域计算逻辑)

This commit is contained in:
thesai 2024-07-04 13:59:39 +08:00
parent 1166d7ffe5
commit 731aea8c8d
18 changed files with 824 additions and 753 deletions

View File

@ -190,7 +190,7 @@ func deleteProjectRunConfig(c *gin.Context) {
// @Failure 500 {object} dto.ErrorDto
// @Router /api/v1/runconfig/description [get]
func getRunCofigDescription(c *gin.Context) {
c.JSON(http.StatusOK, parseRunCofigStruct(&config.ThridPartyConfig{}))
c.JSON(http.StatusOK, parseRunCofigStruct(&config.ThirdPartyConfig{}))
}
// 解析环境配置结构

View File

@ -59,12 +59,12 @@ type mqtt struct {
// }
// 第三方配置结构
type ThridPartyConfig struct {
Id int32 `json:"id"`
Dynamics DynamicsConfig `json:"dynamics" description:"动力学配置"`
Vobc VobcConfig `json:"vobc" description:"半实物配置"`
Interlocks []InterlockConfig `json:"interlock" description:"联锁配置"`
RsspAxleCfgs []RsspAxleConfig `json:"rsspAxleCfgs" description:"所有联锁集中站计轴RSSP-I配置"`
type ThirdPartyConfig struct {
Id int32 `json:"id"`
Dynamics DynamicsConfig `json:"dynamics" description:"动力学配置"`
Vobc VobcConfig `json:"vobc" description:"半实物配置"`
Interlocks []InterlockConfig `json:"interlock" description:"联锁配置"`
RsspAxleConfig RsspAxleConfig `json:"rsspAxleCfgs" description:"计轴通信配置"`
//ElectricMachinery ElectricMachineryConfig `json:"electricMachinery" description:"电机配置"`
ElectricMachinerys []ElectricMachineryConfig `json:"electricMachinerys" description:"电机配置"`
BtmCanet BtmCanetConfig `json:"btmCanet" description:"BTM关联的网关设备CANET配置"`
@ -151,36 +151,6 @@ type BtmCanetConfig struct {
Open bool `json:"open" description:"是否开启"`
}
// RsspAxleConfig 计轴区段与联锁安全通信配置
type RsspAxleConfig struct {
Open bool `json:"open" description:"是否开启"`
City string `json:"city" description:"所属城市"`
LineId string `json:"lineId" description:"所属线路"`
CentralizedStation string `json:"centralizedStation" description:"所属集中站"`
RsspCfg RsspConfig `json:"rsspCfg" description:"安全通道配置"`
}
// RsspConfig CI系统与计轴设备的安全通信协议配置参数
// 计轴设备(管理一个集中站的所有计轴器)配置
type RsspConfig struct {
SrcAddr uint16 `json:"srcAddr" description:"16位源地址,本地地址"` //16位源地址,本地地址
DstAddr uint16 `json:"dstAddr" description:"16位目的地址,远程地址"` //16位目的地址,远程地址
DataVer1 uint32 `json:"dataVer1" description:"通道1数据版本"` //通道1数据版本
DataVer2 uint32 `json:"dataVer2" description:"通道2数据版本"` //通道2数据版本
SID1 uint32 `json:"sID1" description:"通道1源标识"` //通道1源标识
SID2 uint32 `json:"sID2" description:"通道2源标识"` //通道2源标识
SINIT1 uint32 `json:"sINIT1" description:"通道1序列初始"` //通道1序列初始
SINIT2 uint32 `json:"sINIT2" description:"通道2序列初始"` //通道2序列初始
SendingPeriod uint32 `json:"sendingPeriod" description:"发送周期值"` //接收方每个安全通信会话对应的发送周期值,单位ms
SsrRsspTimeout uint32 `json:"ssrRsspTimeout" description:"等待SSR回应的定时器超时值"` //等待SSR回应的定时器超时值,为RsspTimer时间,1=SendingPeriod
Mtv uint32 `json:"mtv" description:"最大时序偏差"` //每个安全通信会话可容忍的最大时序偏差,即当前接收的RSD的序列号与上一次RSD的序列号最大允许差值
Udl uint32 `json:"udl" description:"RSD应用数据长度配置值"` //每个安全通信会话RSD应用数据长度发送和接收的配置值支持固定长度和可变长度;0-可变长度大于0即固定长度
DeviceA bool `json:"deviceA" description:"true-A机false-B机"` //true-A机false-B机
PicType byte `json:"picType" description:"协议交互类别"` //协议交互类别message.PicType
RemoteIp string `json:"remoteIp" description:"远程服务器ip"` //远程服务器ip
RemoteUdpPort int `json:"remoteUdpPort" description:"远程服务器端口"` //远程服务器端口
LocalUdpPort int `json:"localUdpPort" description:"本地服务器端口"` //本地服务器端口
}
type VehiclePCSimConfig2 struct {
TrainEnds bool `json:"trainEnds" description:"列车端点A"`
Open bool `json:"open" description:"是否开启"`
@ -197,9 +167,29 @@ type VehiclePCSimConfig struct {
//LocalTestingPort uint32 `json:"localTestingPort" description:"本地测试端口"`
}
// CheckAddress 检测目标源地址目的地址是否在配置中
func (c *RsspConfig) CheckAddress(srcAddr uint16, dstAddr uint16) bool {
return true
// RsspAxleConfig Rssp计轴通信配置
type RsspAxleConfig struct {
Open bool `json:"open" description:"开启"`
StationCode string `json:"stationCode" description:"集中站编号"`
NetAConfig RsspNetConfig `json:"netAConfig" description:"A网配置"`
}
// RsspNetConfig 计轴通信配置
type RsspNetConfig struct {
RemoteIp string `json:"remoteIp" description:"远端IP"`
RemotePort int `json:"remotePort" description:"远端端口"`
LocalPort int `json:"localPort" description:"本地端口"`
SourceAddr string `json:"sourceAddr" description:"源地址16进制2字节"`
TargetAddr string `json:"targetAddr" description:"目的地址16进制2字节"`
Sid1 string `json:"sid1" description:"SID_116进制4字节"`
Sid2 string `json:"sid2" description:"SID_216进制4字节"`
Sinit1 string `json:"sinit1" description:"SINIT_116进制4字节"`
Sinit2 string `json:"sinit2" description:"SINIT_216进制4字节"`
DataVer1 string `json:"dataVer1" description:"DATAVER_116进制4字节"`
DataVer2 string `json:"dataVer2" description:"DATAVER_216进制4字节"`
MaxDeviation uint32 `json:"maxDeviation" description:"可容忍的最大时序偏差"`
WaitSSRTimeout int `json:"waitSSRTimeout" description:"等待SSR回应的定时器超时值ms"`
Period int `json:"period" description:"RSD发送周期ms"`
}
///////////////////////////////////////////////////////////////////////////////////////

@ -1 +1 @@
Subproject commit 181dd9951b16a0c1665779567d5d040568382615
Subproject commit ef7e469175d27182823461972998049692fb425f

@ -1 +1 @@
Subproject commit 547b0b1baf218f46e667e98852147a078be884e3
Subproject commit c0aa3ab5b7e5e4819d4ce0147ef3ae6947b72daf

View File

@ -0,0 +1,61 @@
package msg
import (
"bytes"
"encoding/binary"
)
const (
ProtocolType_Sync = 0x01
ProtocolType_NoSync = 0x02
)
const (
MessageType_A = 0x80
MessageType_B = 0x81
MessageType_SSE = 0x90
MessageType_SSR = 0x91
)
const (
CRC_POLY_1 = 0x100D4E63 //通道1的CRC多项式
CRC_POLY_2 = 0x8CE56011 //通道2的CRC多项式
)
const (
SCW_1 = 0xAE390B5A //通道1的SCW
SCW_2 = 0xC103589C //通道2的SCW
)
const (
T_POLY_1 = 0x0FC22F87 //通道1的时间戳生成多项式
T_POLY_2 = 0xC3E887E1 //通道2的时间戳生成多项式
)
const Twait_sse = 3 //默认sse等待回应的周期数
func GetMessageType(data []byte) byte {
return data[1]
}
type MsgHeader struct {
ProtocolType byte //协议交互类别
MessageType byte //报文类型
SourceAddr uint16 //源地址
TargetAddr uint16 //目的地址
}
func (m *MsgHeader) encode() []byte {
var data []byte
data = append(data, m.ProtocolType)
data = append(data, m.MessageType)
data = binary.LittleEndian.AppendUint16(data, m.SourceAddr)
data = binary.LittleEndian.AppendUint16(data, m.TargetAddr)
return data
}
func (m *MsgHeader) decode(data []byte) error {
buf := bytes.NewBuffer(data)
err := binary.Read(buf, binary.LittleEndian, m)
return err
}

View File

@ -0,0 +1,142 @@
package msg
import (
"bytes"
"encoding/binary"
"joylink.club/bj-rtsts-server/third_party/message"
)
// 实时安全数据消息
type RsdMsg struct {
MsgHeader
SeqNum uint32 //序列号
UserDataLen uint16 //用户数据包字节总数+8
Svc1 uint32 //CRC1^SID1^T1(N)^SCW1
Svc2 uint32 //CRC2^SID2^T2(N)^SCW2
UserData []byte //用户数据包
Tail uint16 //报文尾 CRC16
}
func (r *RsdMsg) Encode() []byte {
data := r.MsgHeader.encode()
data = binary.BigEndian.AppendUint32(data, r.SeqNum)
data = binary.BigEndian.AppendUint16(data, r.UserDataLen)
data = binary.BigEndian.AppendUint32(data, r.Svc1)
data = binary.BigEndian.AppendUint32(data, r.Svc2)
data = append(data, r.UserData...)
r.Tail = message.Rssp_I_Crc16(r.UserData)
data = binary.BigEndian.AppendUint16(data, r.Tail)
return data
}
func (r *RsdMsg) Decode(data []byte) error {
err := r.MsgHeader.decode(data)
if err != nil {
return err
}
buf := bytes.NewBuffer(data[6:]) //去掉报文头的6个字节
fields := []any{&r.SeqNum, &r.UserDataLen, &r.Svc1, &r.Svc2}
for _, field := range fields {
err := binary.Read(buf, binary.LittleEndian, field)
if err != nil {
return err
}
}
r.UserData = data[len(data)-buf.Len() : len(data)-2]
r.Tail = binary.LittleEndian.Uint16(data[len(data)-2:])
return nil
}
// RsdMsgBuilder 用来构建RSD将无需用户赋值的字段去掉了
type RsdMsgBuilder struct {
MsgHeader
SeqNum uint32
Svc1 uint32
Svc2 uint32
UserData []byte
}
func (b *RsdMsgBuilder) Build() *RsdMsg {
return &RsdMsg{
MsgHeader: b.MsgHeader,
SeqNum: b.SeqNum,
UserDataLen: uint16(len(b.UserData) + 8),
Svc1: b.Svc1,
Svc2: b.Svc2,
UserData: b.UserData,
Tail: message.Rssp_I_Crc16(b.UserData),
}
}
func (b *RsdMsgBuilder) Encode() []byte {
return b.Build().Encode()
}
// CmdInfos 来自联锁的数据帧
type CmdInfos []*cmdInfo
func (f *CmdInfos) Decode(data []byte) error {
for _, b := range data {
cmdInfo := &cmdInfo{}
cmdInfo.decode(b)
*f = append(*f, cmdInfo)
}
return nil
}
// StateInfos 发给联锁的数据帧
type StateInfos []*StateInfo
func (t StateInfos) Encode() []byte {
var data []byte
for _, info := range t {
data = append(data, info.encode()...)
}
return data
}
type cmdInfo struct {
PRST bool //不使用
RST bool //不使用
DRST bool //直接复位 位索引5
RRST bool //不使用
RSTR bool //不使用
PDRST bool //预复位 位索引2
PRRST bool //不使用
}
func (c *cmdInfo) decode(data byte) {
c.DRST = (data>>5)&1 == 1
c.PDRST = (data>>2)&1 == 1
}
type StateInfo struct {
CLR bool //计轴出清 位索引0-70字节的7位
OCC bool //计轴占用 0-6
RAC bool //计轴复位反馈 1-6
RJO bool //运营原因拒绝计轴复位 1-5
RJT bool //技术原因拒绝计轴复位 1-4
}
func (s *StateInfo) encode() []byte {
var b0 byte
var b1 byte
if s.CLR {
b0 = b0 | 1<<7
}
if s.OCC {
b0 = b0 | 1<<6
}
if s.RAC {
b1 = b1 | 1<<6
}
if s.RJO {
b1 = b1 | 1<<5
}
if s.RJT {
b1 = b1 | 1<<4
}
return []byte{b0, b1}
}

View File

@ -0,0 +1,30 @@
package msg
import (
"bytes"
"encoding/binary"
"joylink.club/bj-rtsts-server/third_party/message"
)
type SseMsg struct {
MsgHeader
SeqNum uint32 // 序列号
SeqEnq1 uint32 //时序校正请求通道1 SID_1^T_1(NE)
SeqEnq2 uint32 // 时序校正请求通道2 SID_2^T_2(NE)
Tail uint16 //报文位 CRC16
}
func (s *SseMsg) Encode() []byte {
data := s.MsgHeader.encode()
data = binary.LittleEndian.AppendUint32(data, s.SeqNum)
data = binary.LittleEndian.AppendUint32(data, s.SeqEnq1)
data = binary.LittleEndian.AppendUint32(data, s.SeqEnq2)
s.Tail = message.Rssp_I_Crc16(data)
data = binary.LittleEndian.AppendUint16(data, s.Tail)
return data
}
func (s *SseMsg) Decode(data []byte) error {
buf := bytes.NewBuffer(data)
return binary.Read(buf, binary.LittleEndian, s)
}

View File

@ -0,0 +1,34 @@
package msg
import (
"bytes"
"encoding/binary"
"joylink.club/bj-rtsts-server/third_party/message"
)
type SsrMsg struct {
MsgHeader
SeqNumSsr uint32 // 应答方的序列号
SeqNumSse uint32 // 请求方的序列号
SeqInit1 uint32 // 时序初始化通道1 SEQENQ_1^SID_1^T_1(NR)^DATAVER_1
SeqInit2 uint32 // 时序初始化通道2 SEQENQ_2^SID_2^T_2(NR)^DATAVER_2
DataVer byte // 数据版本号 预留固定值0x01
Tail uint16 // 报文位 CRC16
}
func (s *SsrMsg) Encode() []byte {
data := s.MsgHeader.encode()
data = binary.LittleEndian.AppendUint32(data, s.SeqNumSsr)
data = binary.LittleEndian.AppendUint32(data, s.SeqNumSse)
data = binary.LittleEndian.AppendUint32(data, s.SeqInit1)
data = binary.LittleEndian.AppendUint32(data, s.SeqInit2)
data = append(data, s.DataVer)
s.Tail = message.Rssp_I_Crc16(data)
data = binary.LittleEndian.AppendUint16(data, s.Tail)
return data
}
func (s *SsrMsg) Decode(data []byte) error {
buf := bytes.NewBuffer(data)
return binary.Read(buf, binary.LittleEndian, s)
}

View File

@ -0,0 +1,416 @@
package beijing12
import (
"context"
"fmt"
"joylink.club/bj-rtsts-server/config"
"joylink.club/bj-rtsts-server/sys_error"
"joylink.club/bj-rtsts-server/third_party/axle_device/beijing12/msg"
"joylink.club/bj-rtsts-server/third_party/message"
"joylink.club/bj-rtsts-server/third_party/udp"
"joylink.club/bj-rtsts-server/ts/simulation/wayside/memory"
"joylink.club/rtsssimulation/component"
"joylink.club/rtsssimulation/entity"
"joylink.club/rtsssimulation/fi"
"joylink.club/rtsssimulation/repository/model/proto"
"log/slog"
"runtime/debug"
"strconv"
"sync"
"time"
)
var ( //日志
logTag = "[北京12号线计轴通信]"
privateLogger *slog.Logger
loggerInit sync.Once
)
// key-集中站编号
var contextMap = make(map[string]*serviceContext)
var mu = sync.Mutex{}
type serviceContext struct {
sim *memory.VerifySimulation
config config.RsspAxleConfig
server udp.UdpServer
client udp.UdpClient
cancelFunc context.CancelFunc
ciSectionIndexConfigs []*proto.CiSectionCodePoint
sourceAddr uint16 //源地址 从配置中的16进制字符串转来的
targetAddr uint16 //目的地址 从配置中的16进制字符串转来的
sid1 uint32 //SID1 从配置中的16进制字符串转来的
sid2 uint32 //SID2 从配置中的16进制字符串转来的
msgChan <-chan []byte //消息队列
seqNum uint32 //当前的序列号
lastSeqNum uint32 //最近一次收到的序列号
lastTimeSeqParam uint32 //最近一次的有效时序参数
sseMsg *msg.SseMsg //发送出去的时序校验请求
sseWaitTimer <-chan time.Time //sse超时定时器
}
func Start(simulation *memory.VerifySimulation) {
mu.Lock()
defer mu.Unlock()
//检查服务启动条件
rsspConfig := simulation.GetRunConfig().RsspAxleConfig
if !rsspConfig.Open {
return
}
if contextMap[rsspConfig.StationCode] != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]服务已启动", logTag, rsspConfig.StationCode)))
}
station := simulation.Repo.FindStationByStationName(rsspConfig.StationCode)
if station == nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]不存在", logTag, rsspConfig.StationCode)))
}
ref := simulation.Repo.GetCentralizedStationRef(station.Id())
if ref == nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]关联数据不存在", logTag, rsspConfig.StationCode)))
}
if len(ref.SectionCodePoints) == 0 {
logger().Warn(fmt.Sprintf("集中站[%s]无区段编码数据,服务不启动", rsspConfig.StationCode))
return
}
sourceAddr, err := strconv.ParseUint(rsspConfig.NetAConfig.SourceAddr, 16, 16)
if err != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析源地址[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.SourceAddr)))
}
targetAddr, err := strconv.ParseUint(rsspConfig.NetAConfig.TargetAddr, 16, 16)
if err != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析目的地址[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.TargetAddr)))
}
sid1, err := strconv.ParseUint(rsspConfig.NetAConfig.Sid1, 16, 32)
if err != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析SID1[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.Sid1)))
}
sid2, err := strconv.ParseUint(rsspConfig.NetAConfig.Sid2, 16, 32)
if err != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析SID2[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.Sid2)))
}
//服务初始化及启动
msgChan := make(chan []byte, 100)
serviceCtx := &serviceContext{
sim: simulation,
config: rsspConfig,
ciSectionIndexConfigs: ref.SectionCodePoints,
sourceAddr: uint16(sourceAddr),
targetAddr: uint16(targetAddr),
sid1: uint32(sid1),
sid2: uint32(sid2),
msgChan: msgChan,
}
netAConfig := rsspConfig.NetAConfig
server := udp.NewServer(fmt.Sprintf(":%d", netAConfig.LocalPort), func(b []byte) {
msgChan <- b
})
client := udp.NewClient(fmt.Sprintf("%s:%d", netAConfig.RemoteIp, netAConfig.RemotePort))
err = server.Listen()
if err != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]服务启动失败", logTag, rsspConfig.StationCode)))
} else {
logger().Info(fmt.Sprintf("监听[:%d]", netAConfig.LocalPort))
}
serviceCtx.server = server
serviceCtx.client = client
cancelCtx, cancelFunc := context.WithCancel(context.Background())
serviceCtx.cancelFunc = cancelFunc
serviceCtx.runCollectTask(cancelCtx)
serviceCtx.runHandleMsgTask(cancelCtx)
contextMap[rsspConfig.StationCode] = serviceCtx
}
func Stop(simulation *memory.VerifySimulation) {
mu.Lock()
defer mu.Unlock()
rsspConfig := simulation.GetRunConfig().RsspAxleConfig
serviceCtx := contextMap[rsspConfig.StationCode]
if serviceCtx == nil {
return
}
serviceCtx.stop()
delete(contextMap, rsspConfig.StationCode)
}
func (s *serviceContext) stop() {
if s.server != nil {
s.server.Close()
}
if s.client != nil {
s.client.Close()
}
s.cancelFunc()
}
func (s *serviceContext) runCollectTask(ctx context.Context) {
go func() {
defer func() {
if err := recover(); err != nil {
logger().Error("状态收集任务出错,记录后重启", "error", err, "stack", string(debug.Stack()))
s.runCollectTask(ctx)
}
}()
for range time.Tick(time.Millisecond * time.Duration(s.config.NetAConfig.Period)) {
select {
case <-ctx.Done():
return
default:
frame := s.collect()
err := s.client.Send(frame.Encode())
if err != nil {
logger().Error("发送状态数据失败", "error", err)
}
}
}
}()
}
func (s *serviceContext) collect() *msg.RsdMsgBuilder {
worldData := entity.GetWorldData(s.sim.World)
amdEntry := entity.FindAxleManageDevice(worldData, s.config.StationCode)
amd := component.AxleManageDeviceType.Get(amdEntry)
stateInfos := msg.StateInfos{}
for _, cfg := range s.ciSectionIndexConfigs {
sectionRuntime := amd.Adrs[cfg.SectionId]
sectionEntry, ok := entity.GetEntityByUid(s.sim.World, cfg.SectionId)
sectionState := component.PhysicalSectionStateType.Get(sectionEntry)
if !ok {
continue
}
stateInfos = append(stateInfos, &msg.StateInfo{
CLR: !sectionState.Occ,
OCC: sectionState.Occ,
RAC: sectionRuntime.Rac,
RJO: sectionRuntime.Rjo,
RJT: sectionRuntime.Rjt,
})
}
userData := stateInfos.Encode()
builder := &msg.RsdMsgBuilder{
MsgHeader: msg.MsgHeader{
ProtocolType: msg.ProtocolType_Sync,
MessageType: msg.MessageType_A,
SourceAddr: s.sourceAddr,
TargetAddr: s.targetAddr,
},
SeqNum: s.seqNum,
Svc1: s.calculateSvc1(userData, s.seqNum),
Svc2: s.calculateSvc2(userData, s.seqNum),
UserData: userData,
}
s.seqNum++
return builder
}
func (s *serviceContext) calculateSvc1(userData []byte, seqNum uint32) uint32 {
return 0
}
func (s *serviceContext) calculateSvc2(userData []byte, seqNum uint32) uint32 {
return 0
}
func (s *serviceContext) runHandleMsgTask(ctx context.Context) {
go func() {
defer func() {
if err := recover(); err != nil {
logger().Error("消息处理任务出错,记录后重启", "error", err, "stack", string(debug.Stack()))
s.runHandleMsgTask(ctx)
}
}()
for {
select {
case <-ctx.Done():
return
case <-s.sseWaitTimer: //SSE消息等待超时
s.sseMsg = nil
case data := <-s.msgChan:
messageType := msg.GetMessageType(data)
switch messageType {
case msg.MessageType_A, msg.MessageType_B:
s.handleRsdMsg(data)
case msg.MessageType_SSE:
s.handleSseMsg(data)
case msg.MessageType_SSR:
s.handleSsrMsg(data)
default:
logger().Warn(fmt.Sprintf("未知的消息类型[%x]", messageType))
}
}
}
}()
}
func (s *serviceContext) handleRsdMsg(data []byte) {
if s.sseMsg == nil { //正在时序校正过程中
return
}
rsdMsg := &msg.RsdMsg{}
err := rsdMsg.Decode(data)
if err != nil {
logger().Error("解析RSD数据出错", "error", err)
return
}
//校验
if !s.validateRsdMsg(rsdMsg) {
return
}
//流程处理
seqDeviation := rsdMsg.SeqNum - s.lastSeqNum
if s.lastSeqNum == 0 {
seqDeviation = 0
} else if seqDeviation < 0 || seqDeviation > s.config.NetAConfig.MaxDeviation { //序列号减小或时序差超出容忍限度
s.startSeeProgress()
return
}
s.lastSeqNum = rsdMsg.SeqNum
cmdInfos := msg.CmdInfos{}
err = cmdInfos.Decode(rsdMsg.UserData)
if err != nil {
logger().Error("解析命令信息出错", "error", err)
return
}
//驱动
for i, cmdInfo := range cmdInfos {
sectionIndexConfig := s.ciSectionIndexConfigs[i]
err := fi.AxleSectionDrstDrive(s.sim.World, sectionIndexConfig.SectionId, cmdInfo.DRST)
if err != nil {
logger().Error("驱动计轴直接复位出错", "error", err)
}
err = fi.AxleSectionPdrstDrive(s.sim.World, sectionIndexConfig.SectionId, cmdInfo.PDRST)
if err != nil {
logger().Error("驱动计轴预复位出错", "error", err)
}
}
}
func (s *serviceContext) handleSseMsg(data []byte) {
sseMsg := &msg.SseMsg{}
err := sseMsg.Decode(data)
if err != nil {
logger().Error("解析SSE数据出错", "error", err)
return
}
//校验
if !s.validateSseMsg(sseMsg) {
return
}
//回复
s.lastSeqNum = sseMsg.SeqNum
ssrMsg := msg.SsrMsg{
MsgHeader: msg.MsgHeader{
ProtocolType: msg.ProtocolType_Sync,
MessageType: msg.MessageType_SSR,
SourceAddr: s.sourceAddr,
TargetAddr: s.targetAddr,
},
SeqNumSsr: s.seqNum,
SeqNumSse: sseMsg.SeqNum,
SeqInit1: s.calculateSeqInit1(sseMsg.SeqEnq1),
SeqInit2: s.calculateSeqInit1(sseMsg.SeqEnq1),
DataVer: 0x01,
}
err = s.client.Send(ssrMsg.Encode())
if err != nil {
logger().Error("发送SSR数据失败", "error", err)
}
}
func (s *serviceContext) handleSsrMsg(data []byte) {
if s.sseMsg == nil { //不在时序校正过程中
return
}
ssrMsg := &msg.SsrMsg{}
err := ssrMsg.Decode(data)
if err != nil {
logger().Error("解析SSR数据出错", "error", err)
return
}
//校验
if !s.validateSsrMsg(ssrMsg) {
return
}
//完成校正时序
s.sseMsg = nil
s.lastSeqNum = ssrMsg.SeqNumSsr
}
// 启动SSE流程
func (s *serviceContext) startSeeProgress() {
sseMsg := &msg.SseMsg{
MsgHeader: msg.MsgHeader{
ProtocolType: msg.ProtocolType_Sync,
MessageType: msg.MessageType_SSE,
SourceAddr: s.sourceAddr,
TargetAddr: s.targetAddr,
},
SeqNum: s.seqNum,
SeqEnq1: s.calculateSeqEnq1(),
SeqEnq2: s.calculateSeqEnq2(),
}
err := s.client.Send(sseMsg.Encode())
if err != nil {
logger().Error("发送SSE数据失败", "error", err)
} else {
s.sseMsg = sseMsg
s.sseWaitTimer = time.After(time.Duration(s.config.NetAConfig.Period*msg.Twait_sse) * time.Millisecond)
}
}
func (s *serviceContext) validateRsdMsg(rsdMsg *msg.RsdMsg) bool {
sourceAddr, _ := strconv.ParseUint(s.config.NetAConfig.SourceAddr, 16, 16)
if rsdMsg.SourceAddr != uint16(sourceAddr) {
logger().Error(fmt.Sprintf("源地址[%x]不正确[%s]", rsdMsg.SourceAddr, s.config.NetAConfig.SourceAddr))
return false
}
targetAddr, _ := strconv.ParseUint(s.config.NetAConfig.TargetAddr, 16, 16)
if rsdMsg.TargetAddr != uint16(targetAddr) {
logger().Error(fmt.Sprintf("目的地址[%x]不正确[%s]", rsdMsg.TargetAddr, s.config.NetAConfig.TargetAddr))
return false
}
if len(rsdMsg.UserData) != len(s.ciSectionIndexConfigs) {
logger().Error(fmt.Sprintf("用户数据长度[%d]与配置长度[%d]不符", len(rsdMsg.UserData), len(s.ciSectionIndexConfigs)))
return false
}
if message.Rssp_I_Crc16(rsdMsg.UserData) != rsdMsg.Tail {
logger().Error(fmt.Sprintf("报文位验证失败"))
return false
}
return true
}
func (s *serviceContext) validateSseMsg(sseMsg *msg.SseMsg) bool {
return true
}
func (s *serviceContext) validateSsrMsg(ssrMsg *msg.SsrMsg) bool {
if s.sseMsg.SeqNum != ssrMsg.SeqNumSse {
logger().Error(fmt.Sprintf("SSR的Ne[%d]与请求方不符[%d]", ssrMsg.SeqNumSse, s.sseMsg.SeqNum))
return false
}
return true
}
func (s *serviceContext) calculateSeqEnq1() uint32 {
return 0
}
func (s *serviceContext) calculateSeqEnq2() uint32 {
return 0
}
func (s *serviceContext) calculateSeqInit1(seqEnq1 uint32) uint32 {
return 0
}
func (s *serviceContext) calculateSeqInit2(seqEnq2 uint32) uint32 {
return 0
}
func logger() *slog.Logger {
loggerInit.Do(func() {
privateLogger = slog.Default().With("tag", logTag)
})
return privateLogger
}

View File

@ -1,129 +0,0 @@
package axle_device
import (
"context"
"fmt"
"joylink.club/bj-rtsts-server/config"
"joylink.club/bj-rtsts-server/third_party/message"
"log/slog"
"runtime/debug"
"time"
)
//计轴设备与联锁系统安全通信应用层实现
type RsspAxle interface {
//Start 启动计轴设备与联锁系统安全通信服务
Start(amm AxleMessageManager) error
//Stop 停止计轴设备与联锁系统安全通信服务
Stop()
}
type rsspAxle struct {
//所属城市
city string
//所属线路
lineId string
//所属集中站
centralizedStation string
//接收方每个安全通信会话对应的发送周期值,单位ms
sendingPeriod uint32
//主安全通道
rsspChannel *RsspChannel
//收到应用层消息回调
messageManager AxleMessageManager
//发送区段状态任务
cancelSendStatus context.CancelFunc
}
func InitRsspAxle(cfg *config.RsspAxleConfig) RsspAxle {
ra := &rsspAxle{}
//
ra.city = cfg.City
ra.lineId = cfg.LineId
ra.centralizedStation = cfg.CentralizedStation
ra.sendingPeriod = cfg.RsspCfg.SendingPeriod
//
mrc := &RsspChannel{}
ra.rsspChannel = mrc.Init(&cfg.RsspCfg)
//
return ra
}
// rssp 安全层执行
func (s *rsspAxle) rcvCmdMsg(data []byte) {
msg := &message.SectionCmdMsgPack{}
msg.Decode(data)
s.messageManager.HandleSectionCmdMsg(s.city, s.lineId, s.centralizedStation, msg)
}
func (s *rsspAxle) Start(amm AxleMessageManager) error {
s.messageManager = amm
//设置安全通道层
if s.rsspChannel != nil {
s.rsspChannel.handleUserData = s.rcvCmdMsg
s.rsspChannel.Start()
}
//
sendContext, sendCancel := context.WithCancel(context.Background())
go s.periodRun(sendContext)
s.cancelSendStatus = sendCancel
//
return nil
}
func (s *rsspAxle) Stop() {
if s.rsspChannel != nil {
s.rsspChannel.Stop()
}
//
if s.cancelSendStatus != nil {
s.cancelSendStatus()
}
s.messageManager = nil
}
func (s *rsspAxle) periodRun(runContext context.Context) {
time.Sleep(2 * time.Second)
defer func() {
if e := recover(); e != nil {
slog.Error(fmt.Sprintf("[%s-%s-%s]定时发送计轴区段状态任务异常", s.city, s.lineId, s.centralizedStation), "error", e, "stack", string(debug.Stack()))
debug.PrintStack()
}
}()
for {
select {
case <-runContext.Done():
return
default:
}
//slog.Debug("计轴设备periodRun")
if s.messageManager == nil {
slog.Warn(fmt.Sprintf("[%s-%s-%s]定时发送计轴区段状态任务因messageManager不存在退出", s.city, s.lineId, s.centralizedStation))
return
}
//收集区段状态
sectionStatusMsg, e := s.messageManager.CollectSectionStatus(s.city, s.lineId, s.centralizedStation)
if e == nil {
if sectionStatusMsg != nil {
msgPack := &message.SectionStatusMsgPack{}
msgPack.Ck = 0 //暂时无用
msgPack.Sms = sectionStatusMsg
s.sendStatusMsg(msgPack)
}
} else {
slog.Warn(e.Error())
}
//
time.Sleep(time.Duration(s.sendingPeriod) * time.Millisecond)
//更新周期性参数
s.rsspChannel.NextPeriod()
}
}
// 发送计轴区段状态给联锁
func (s *rsspAxle) sendStatusMsg(msg *message.SectionStatusMsgPack) {
data := msg.Encode()
//向主通道发送
if s.rsspChannel != nil {
//slog.Debug("计轴设备发送SectionStatusMsgPack", "区段状态个数", len(msg.Sms), "packLen", len(data))
s.rsspChannel.SendUserData(data)
}
}

View File

@ -1,65 +0,0 @@
package axle_device
import (
"joylink.club/bj-rtsts-server/config"
"joylink.club/bj-rtsts-server/third_party/message"
"log/slog"
)
//联锁集中站计轴与联锁通信管理
type AxleMessageManager interface {
GetLineAllRsspAxleCfgs() []config.RsspAxleConfig
//HandleSectionCmdMsg 计轴设备接收到联锁发送来的控制命令
HandleSectionCmdMsg(city string, lineId string, centralizedStation string, msg *message.SectionCmdMsgPack)
//CollectSectionStatus 收集仿真中计轴区段状态
CollectSectionStatus(city string, lineId string, centralizedStation string) ([]*message.SectionStatusMsg, error)
}
var allRsspAxleServices []RsspAxle
func StartLineAllRsspAxleServices(ram AxleMessageManager) {
allRsspAxleServices = nil
cfgs := ram.GetLineAllRsspAxleCfgs()
//cfgs = getTestConfig() //测试用
for _, cfg := range cfgs {
if cfg.Open {
as := InitRsspAxle(&cfg)
allRsspAxleServices = append(allRsspAxleServices, as)
as.Start(ram)
slog.Debug("启动计轴设备", "city", cfg.City, "lineId", cfg.LineId, "CentralizedStation", cfg.CentralizedStation)
}
}
}
func StopLineAllRsspAxleServices() {
for _, as := range allRsspAxleServices {
as.Stop()
}
}
// 测试用
func getTestConfig() []config.RsspAxleConfig {
cfg := &config.RsspAxleConfig{}
cfg.Open = true
cfg.City = "北京"
cfg.LineId = "12"
cfg.CentralizedStation = "酒仙桥"
cfg.RsspCfg.SrcAddr = 0x02
cfg.RsspCfg.DstAddr = 0x01
cfg.RsspCfg.DataVer1 = 0x0011
cfg.RsspCfg.DataVer2 = 0x0012
cfg.RsspCfg.SID1 = 0x10000000
cfg.RsspCfg.SID2 = 0x00000001
cfg.RsspCfg.SINIT1 = 0x01
cfg.RsspCfg.SINIT2 = 0x02
cfg.RsspCfg.SendingPeriod = 500
cfg.RsspCfg.SsrRsspTimeout = 3
cfg.RsspCfg.Mtv = 3
cfg.RsspCfg.DeviceA = false
cfg.RsspCfg.PicType = message.PIC_MASTER
cfg.RsspCfg.RemoteIp = "192.168.3.5"
cfg.RsspCfg.RemoteUdpPort = 7777
cfg.RsspCfg.LocalUdpPort = 6666
//
return []config.RsspAxleConfig{*cfg}
}

View File

@ -1,315 +0,0 @@
package axle_device
import (
"fmt"
"log/slog"
"joylink.club/bj-rtsts-server/config"
"joylink.club/bj-rtsts-server/third_party/message"
"joylink.club/bj-rtsts-server/third_party/udp"
)
// 铁路信号安全通信协议实现
// HandleUserData 回调应用层
type HandleUserData func([]byte)
// RsspChannel 实现rssp通信
type RsspChannel struct {
//udp
udpServer udp.UdpServer
//udp
udpClient udp.UdpClient
//回调应用层
handleUserData HandleUserData
//rssp安全通信配置
config *config.RsspConfig
//rssp时钟
rsspTimer *RsspTimer
//批次编号,发送序列号
sn *message.RsspSn
//安全通道1时间戳
ch1Ts *message.RsspLFSR
//安全通道2时间戳
ch2Ts *message.RsspLFSR
//最近一次接收到的报文的安全通道1时间戳
rcvCh1Ts uint32
//最近一次接收到的报文的安全通道2时间戳
rcvCh2Ts uint32
//最近一次接收到的报文的序列号
rcvSn uint32
//时序校验请求发送记录
sendSseRecord *SseFireRecord
}
func (s *RsspChannel) SetRcvUserDataCallback(handleUserData HandleUserData) {
s.handleUserData = handleUserData
}
func (s *RsspChannel) Init(config *config.RsspConfig) *RsspChannel {
s.config = config
s.rsspTimer = &RsspTimer{t: 0}
s.sn = message.NewRsspSn(1)
s.ch1Ts = message.NewRsspLFSR(message.RSSP_I_C1_TS, 32, s.config.SID1, false)
s.ch2Ts = message.NewRsspLFSR(message.RSSP_I_C2_TS, 32, s.config.SID2, false)
return s
}
// Start 启动安全通道
func (s *RsspChannel) Start() {
//
s.udpServer = udp.NewServer(fmt.Sprintf(":%d", s.config.LocalUdpPort), s.handleRsspMsg)
s.udpServer.Listen()
//
s.udpClient = udp.NewClient(fmt.Sprintf("%s:%d", s.config.RemoteIp, s.config.RemoteUdpPort))
}
// Stop 关闭安全通道
func (s *RsspChannel) Stop() {
if s.udpServer != nil {
s.udpServer.Close()
}
if s.udpClient != nil {
s.udpClient.Close()
}
}
// RsspTimer rssp时钟每一个tick周期为config.SendingPeriod
type RsspTimer struct {
t uint64
}
func (s *RsspTimer) tick() {
s.t++
}
func (s *RsspTimer) now() uint64 {
return s.t
}
// SseFireRecord 发送时序校验请求的记录
type SseFireRecord struct {
send *message.RsspSse //已经发送的时序校验请求
rsspTime uint64 //发送时序校验请求时的rssp时间
}
func (s *SseFireRecord) record(send *message.RsspSse, rsspTime uint64) {
s.send = send
s.rsspTime = rsspTime
}
func (s *SseFireRecord) clear() {
s.send = nil
s.rsspTime = 0
}
func (s *SseFireRecord) hasRecord() bool {
return s.send != nil
}
// 处理接收到的rssp报文
// 注意本函数由udp socket 协程执行
func (s *RsspChannel) handleRsspMsg(pack []byte) {
slog.Debug("接收到RSSP报文", "len", len(pack))
//报文头校验
head := &message.RsspHead{}
if !head.Parse(pack) { //解析报文头失败
slog.Debug("丢弃接收的RSSP报文解析报文头失败")
return
}
if !message.RsspHeadMcCheck(head) { //报文类别检测未通过
slog.Debug("丢弃接收的RSSP报文报文类别检测未通过")
return
}
if !message.RsspHeadPicCheck(head) { //协议交互类别检测未通过
slog.Debug("丢弃接收的RSSP报文协议交互类别检测未通过")
return
}
if !s.config.CheckAddress(head.Sa, head.Da) { //校验报文头中源地址和目的地址是否包含在已配置列表中
slog.Debug("丢弃接收的RSSP报文报文头中源地址或目的地址不在在已配置列表中")
return
}
//报文尾校验
if !message.RsspPackCrc16Check(pack) { //整个报文crc16校验未通过
slog.Debug("丢弃接收的RSSP报文报文尾CRC16校验未通过")
return
}
//解析得到RSD、SSE或SRE
rssp := message.ParseRsspPack(head, pack)
if rssp == nil { //解析具体rssp包失败
slog.Debug("丢弃接收的RSSP报文解析具体类别包失败")
return
}
//处理接收到的具体类别RSSP包
switch rssp.Type() {
case message.RSD_A:
fallthrough
case message.RSD_B:
s.handleRsspRsd(rssp.(*message.RsspRsd))
case message.SSE:
s.handleRsspSse(rssp.(*message.RsspSse))
case message.SSR:
s.handleRsspSsr(rssp.(*message.RsspSsr))
}
}
// 处理接收到的实时安全数据 RSD
func (s *RsspChannel) handleRsspRsd(rsd *message.RsspRsd) {
//slog.Debug("接收到的实时安全数据 RSD")
//如果为备机发送来的安全数据
if s.config.PicType == message.PIC_SLAVE { //备安全通道
slog.Debug("丢弃接收的RSSP-RSD报文舍弃在备安全通道中接收到的安全数据")
//备安全通道收到安全数据,表示该物理通道连接正常
return
}
//如果为主机发送来的安全数据
if s.config.PicType == message.PIC_MASTER { //主安全通道
if !rsd.IsMaster() {
slog.Debug("丢弃接收的RSSP-RSD报文舍弃在主安全通道中收到的非主机发送来的安全数据")
return
}
}
//序列号校验
//接收的序列号小于最近一次有效序列号则触发SSE时序校验
if rsd.Sn < s.rcvSn {
slog.Debug("丢弃接收的RSSP-RSD报文当前接收RSD的序列号小于最近一次接收的RSD的序列号触发SSE")
s.fireSse(rsd)
return
}
dSn := rsd.Sn - s.rcvSn
if dSn > s.config.Mtv {
slog.Debug("丢弃接收的RSSP-RSD报文当前接收RSD的序列号与最近一次接收的RSD的序列号差值过大触发SSE")
s.fireSse(rsd)
return
}
//SVC校验
c1Crc32 := message.Rssp_I_Crc32C1(rsd.Sad)
c1SidTs := c1Crc32 ^ rsd.Svc1 ^ message.RSSP_I_C1_SCW //T(n)
//
c2Crc32 := message.Rssp_I_Crc32C2(rsd.Sad)
c2SidTs := c2Crc32 ^ rsd.Svc2 ^ message.RSSP_I_C2_SCW //T(n)
//todo ... SVC校验待完善
_ = c1SidTs
_ = c2SidTs
//校验通过
//记录本次接收RSD的序列号和安全校验通道时间戳
s.rcvSn = rsd.Sn
s.rcvCh1Ts = c1SidTs ^ s.config.SID1
s.rcvCh2Ts = c2SidTs ^ s.config.SID2
//通知应用层接收应用数据
s.handleUserData(rsd.Sad)
}
// 触发时序校正请求
func (s *RsspChannel) fireSse(rsd *message.RsspRsd) {
s.sendSseRecord = &SseFireRecord{send: s.sendSse(), rsspTime: s.rsspTimer.now()}
}
// 接收到时序校正请求
func (s *RsspChannel) handleRsspSse(sse *message.RsspSse) {
if s.config.PicType != message.PIC_MASTER {
slog.Debug("丢弃接收的RSSP-SSE报文在非主安全通道中收到时序校正请求SSE")
return
}
//发送时序校正响应
s.sendSsr(sse)
}
// 接收到时序校正应答
func (s *RsspChannel) handleRsspSsr(ssr *message.RsspSsr) {
//SSR校验
if !s.sendSseRecord.hasRecord() {
slog.Debug("丢弃接收的RSSP-SSR报文未发起过SSE时序校正请求")
return
}
if s.rsspTimer.t-s.sendSseRecord.rsspTime > uint64(s.config.SsrRsspTimeout) {
slog.Debug("丢弃接收的RSSP-SSR报文等待SSE响应超时")
return
}
if ssr.SeSn != s.sendSseRecord.send.Sn {
slog.Debug("丢弃接收的RSSP-SSR报文SSR与SSE不对应")
return
}
//恢复时序?
s.rcvSn = ssr.SrSn
s.rcvCh1Ts = ssr.Tic1 ^ s.sendSseRecord.send.SeqEnq1 ^ s.config.SID1 ^ s.config.DataVer1
s.rcvCh2Ts = ssr.Tic2 ^ s.sendSseRecord.send.SeqEnq2 ^ s.config.SID2 ^ s.config.DataVer2
}
// NextPeriod 刷新与周期有关的:将序列号和时间戳更新到下一个值
func (s *RsspChannel) NextPeriod() {
s.sn.GetAndAdd()
s.ch1Ts.GetAndMove()
s.ch2Ts.GetAndMove()
s.rsspTimer.tick()
}
// 发送时序校正应答
func (s *RsspChannel) sendSsr(sse *message.RsspSse) {
ssr := &message.RsspSsr{}
//
ssr.Pic = message.PIC_MASTER
ssr.Mc = message.SSR
ssr.Sa = s.config.SrcAddr
ssr.Da = s.config.DstAddr
ssr.SrSn = s.sn.Get() //当前序列号
ssr.SeSn = sse.Sn
ssr.Tic1 = sse.SeqEnq1 ^ s.config.SID1 ^ s.ch1Ts.Get() ^ s.config.DataVer1
ssr.Tic2 = sse.SeqEnq2 ^ s.config.SID2 ^ s.ch2Ts.Get() ^ s.config.DataVer2
ssr.Dvn = 0x01 //预留固定值
//
rsspPack := ssr.Encode()
s.sendPack(rsspPack)
}
// 发送时序校正请求
func (s *RsspChannel) sendSse() *message.RsspSse {
sse := &message.RsspSse{}
//
sse.Pic = message.PIC_MASTER
sse.Mc = message.SSE
sse.Sa = s.config.SrcAddr
sse.Da = s.config.DstAddr
//时序校正请求,把最近一次接收到的报文中的序列号时间戳发送给发送方
sse.Sn = s.rcvSn
sse.SeqEnq1 = s.createSeqNeq(s.config.SID1, s.rcvCh1Ts)
sse.SeqEnq2 = s.createSeqNeq(s.config.SID2, s.rcvCh2Ts)
//
rsspPack := sse.Encode()
s.sendPack(rsspPack)
//
return sse
}
// SendUserData 发送用户数据即通过rssp安全通道发送应用层数据通过rssp的RSD报文发送
func (s *RsspChannel) SendUserData(userData []byte) {
rsd := &message.RsspRsd{}
rsd.Pic = s.config.PicType
if s.config.DeviceA {
rsd.Mc = message.RSD_A
} else {
rsd.Mc = message.RSD_B
}
rsd.Sa = s.config.SrcAddr
rsd.Da = s.config.DstAddr
//
rsd.Sn = s.sn.Get()
rsd.Sdl = uint16(len(userData) + 8)
//安全校验通道SVC_1
crc_c1 := message.Rssp_I_Crc32C1(userData)
rsd.Svc1 = s.createSvcCode(crc_c1, s.config.SID1, s.ch1Ts.Get(), message.RSSP_I_C1_SCW)
//安全校验通道SVC_2
crc_c2 := message.Rssp_I_Crc32C2(userData)
rsd.Svc1 = s.createSvcCode(crc_c2, s.config.SID2, s.ch2Ts.Get(), message.RSSP_I_C2_SCW)
rsd.Sad = userData
//
rsspPack := rsd.Encode()
s.sendPack(rsspPack)
}
func (s *RsspChannel) createSvcCode(crc32 uint32, sid uint32, ts uint32, scw uint32) uint32 {
return crc32 ^ sid ^ ts ^ scw
}
func (s *RsspChannel) createSeqNeq(sid uint32, ts uint32) uint32 {
return sid ^ ts
}
// 通过网络发送数据
func (s *RsspChannel) sendPack(rsspPack []byte) {
s.udpClient.Send(rsspPack)
}

View File

@ -1,155 +0,0 @@
package ci
import (
"context"
"fmt"
"joylink.club/bj-rtsts-server/config"
"joylink.club/bj-rtsts-server/third_party/axle_device"
"joylink.club/bj-rtsts-server/third_party/message"
"sort"
"sync"
"time"
)
type CiServer interface {
Start()
Stop()
//SendSectionReset 向计轴设备发送计轴命令
SendSectionReset(sectionId string, drst bool, pdrst bool)
//HandleSectionStatus 收到来自计轴设备的物理区段状态
HandleSectionStatus(status []*message.SectionStatusMsg)
}
// 北京12号线酒仙桥集中站物理区段码表
var codePointMap = map[int]string{0: "北京_12_酒仙桥_9G", 1: "北京_12_酒仙桥_1DG", 2: "北京_12_酒仙桥_11G", 3: "北京_12_酒仙桥_13G", 4: "北京_12_酒仙桥_15G", 5: "北京_12_酒仙桥_6G", 6: "北京_12_酒仙桥_8G", 7: "北京_12_酒仙桥_2DG", 8: "北京_12_酒仙桥_10G", 9: "北京_12_酒仙桥_12G", 10: "北京_12_酒仙桥_14G"}
var idRowMap = make(map[string]int)
var sectionCmds []*message.SectionCmdMsg
var cmdsLock = sync.Mutex{}
// rssp 测试
type ciServer struct {
//所属城市
city string
//所属线路
lineId string
//所属集中站
centralizedStation string
//接收方每个安全通信会话对应的发送周期值,单位ms
sendingPeriod uint32
//主安全通道
rsspChannel *axle_device.RsspChannel
//
cancel context.CancelFunc
}
func (s *ciServer) HandleSectionStatus(status []*message.SectionStatusMsg) {
if len(codePointMap) != len(status) {
fmt.Println("==>>接收到的区段状态与码表不对应:", "codePointsLen=", len(codePointMap), ",statusLen=", len(status))
return
}
fmt.Println("==>>接收到的区段状态----------------------------------------------------------------------------i")
for row, state := range status {
sectionId := codePointMap[row]
fmt.Printf("==>>[%d]区段[%s]状态: Clr=%t Occ=%t Rac=%t Rjo=%t Rjt=%t\n", row, sectionId, state.Clr, state.Occ, state.Rac, state.Rjo, state.Rjt)
}
fmt.Println("==>>接收到的区段状态----------------------------------------------------------------------------o")
}
func (s *ciServer) periodRun(runContext context.Context) {
defer func() {
fmt.Println("==>>CI Server 周期运行退出 ...")
}()
for {
select {
case <-runContext.Done():
return
default:
}
s.doSendSectionCmdMsg()
time.Sleep(time.Duration(s.sendingPeriod) * time.Millisecond)
//更新周期性参数
s.rsspChannel.NextPeriod()
}
}
func (s *ciServer) doSendSectionCmdMsg() {
if s.rsspChannel != nil {
data := s.collectSectionCmdsData()
fmt.Println("==>>CI发送区段指令数据", ",len=", len(data))
s.rsspChannel.SendUserData(data)
}
}
func (s *ciServer) collectSectionCmdsData() []byte {
defer func() {
cmdsLock.Unlock()
}()
cmdsLock.Lock()
//
data := (&message.SectionCmdMsgPack{Ck: 0x80, Scs: sectionCmds}).Encode()
return data
}
// SendSectionReset 发送区段复位指令
func (s *ciServer) SendSectionReset(sectionId string, drst bool, pdrst bool) {
defer func() {
cmdsLock.Unlock()
}()
cmdsLock.Lock()
//
sec := sectionCmds[idRowMap[sectionId]]
sec.Drst = drst
sec.Pdrst = pdrst
}
type rowId struct {
row int
id string
}
func (s *ciServer) Start() {
s.rsspChannel.SetRcvUserDataCallback(s.rcvUserData)
s.rsspChannel.Start()
ctx, cancel := context.WithCancel(context.Background())
go s.periodRun(ctx)
s.cancel = cancel
}
func (s *ciServer) Stop() {
if s.rsspChannel != nil {
s.rsspChannel.Stop()
s.rsspChannel = nil
}
if s.cancel != nil {
s.cancel()
}
}
func (s *ciServer) rcvUserData(data []byte) {
fmt.Println("==>>CI接收到区段状态数据", ",len=", len(data))
msg := &message.SectionStatusMsgPack{}
msg.Decode(data)
s.HandleSectionStatus(msg.Sms)
}
func NewRsspCiServer(cfg *config.RsspAxleConfig) CiServer {
ra := &ciServer{}
//
ra.city = cfg.City
ra.lineId = cfg.LineId
ra.centralizedStation = cfg.CentralizedStation
ra.sendingPeriod = cfg.RsspCfg.SendingPeriod
//
mrc := &axle_device.RsspChannel{}
ra.rsspChannel = mrc.Init(&cfg.RsspCfg)
//
return ra
}
func InitCiServerCodePoints() {
var ris []*rowId
for row, id := range codePointMap {
idRowMap[id] = row
ris = append(ris, &rowId{row: row, id: id})
}
sort.SliceStable(ris, func(i, j int) bool {
return ris[i].row < ris[j].row
})
for range ris {
sectionCmds = append(sectionCmds, &message.SectionCmdMsg{Drst: false, Pdrst: false})
}
}

View File

@ -2,45 +2,81 @@ package main
import (
"fmt"
"time"
"joylink.club/bj-rtsts-server/config"
"joylink.club/bj-rtsts-server/third_party/example/rssp/ci"
"joylink.club/bj-rtsts-server/third_party/message"
)
//func main() {
// var scw1 uint32 = 0xAE390B5A
// var t_p uint32 = 0x0FC22F87
// var sid1 uint32 = 0xa2bcfc8c
// var sinit uint32 = 0xb763ec88
//
// var svcn_1 uint32 = 0x08b12b3b
// //var svcn_1 uint32 = 0x3b2bb108
// var svcn uint32 = 0x547fd6ca
// //var svcn uint32 = 0xcad67f54
// var userData []byte = []byte{0x80, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00}
// crc1 := crc32Encode(userData)
//
// m := func(svc1 uint32) uint32 {
// return (svc1 ^ crc1) ^ scw1
// }
//
// rn_1 := m(svcn_1)
// rn := m(svcn)
// l1 := &Lfsr{
// value: sinit,
// p: t_p,
// }
// //l1.add(sid1)
// l1.add(rn_1)
// l1.add(rn)
// fmt.Printf("%x\n", l1.value)
//
// //sinit~<sid1~<sid1
// l2 := &Lfsr{
// value: sinit,
// p: t_p,
// }
// //l2.add(sid1)
// l2.add(sid1)
// l2.add(sid1)
// fmt.Printf("%x", l2.value)
//}
func main() {
// message.InitRsspCrcTable()
ci.InitCiServerCodePoints()
//
cfg := &config.RsspAxleConfig{}
cfg.City = "北京"
cfg.LineId = "12"
cfg.CentralizedStation = "酒仙桥"
cfg.RsspCfg.SrcAddr = 0x01
cfg.RsspCfg.DstAddr = 0x02
cfg.RsspCfg.DataVer1 = 0x0011
cfg.RsspCfg.DataVer2 = 0x0012
cfg.RsspCfg.SID1 = 0x10000000
cfg.RsspCfg.SID2 = 0x00000001
cfg.RsspCfg.SINIT1 = 0x01
cfg.RsspCfg.SINIT2 = 0x02
cfg.RsspCfg.SendingPeriod = 500
cfg.RsspCfg.SsrRsspTimeout = 3
cfg.RsspCfg.Mtv = 3
cfg.RsspCfg.DeviceA = true
cfg.RsspCfg.PicType = message.PIC_MASTER
cfg.RsspCfg.RemoteIp = "192.168.3.5"
cfg.RsspCfg.RemoteUdpPort = 6666
cfg.RsspCfg.LocalUdpPort = 7777
//
ciServer := ci.NewRsspCiServer(cfg)
ciServer.Start()
fmt.Println("==>>ci server ...")
for {
time.Sleep(2 * time.Second)
ciServer.SendSectionReset("北京_12_酒仙桥_9G", true, false)
var scw uint32 = 0xAE390B5A
var t_p uint32 = 0x0FC22F87
var sid uint32 = 0xa2bcfc8c
//var sinit uint32 = 0xb763ec88
lfsr := Lfsr{
value: sid,
p: t_p,
}
//
time.Sleep(3600 * time.Second)
lfsr.add(1)
var userData []byte = []byte{0x80, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00}
crc1 := crc32Encode(userData)
svc := crc1 ^ sid ^ lfsr.value ^ scw
fmt.Printf("%x\n", svc)
}
func crc32Encode(data []byte) uint32 {
return message.Rssp_I_Crc32C1(data)
}
type Lfsr struct {
value uint32
p uint32
}
func (l *Lfsr) add(x uint32) uint32 {
l.value = l.value ^ x
for i := 0; i < 1; i++ {
l.value = l.value << 1
if l.value>>31 == 1 {
l.value ^= l.p
}
}
return l.value
}

View File

@ -20,11 +20,11 @@ const (
var (
// crc16多项式为G(x)=X16+X11+X4+1
RSSP_I_CRC16 = &crc.Parameters{Width: 16, Polynomial: 0x0811, Init: 0x0, ReflectIn: false, ReflectOut: false, FinalXor: 0x0}
RSSP_I_CRC16 = &crc.Parameters{Width: 16, Polynomial: 0x0811, Init: 0x0, ReflectIn: true, ReflectOut: true, FinalXor: 0x0}
// 通道1 crc32多项式为0x100d4e63
RSSP_I_C1_CRC32 = &crc.Parameters{Width: 32, Polynomial: 0x100d4e63, Init: 0x0, ReflectIn: false, ReflectOut: false, FinalXor: 0x0}
RSSP_I_C1_CRC32 = &crc.Parameters{Width: 32, Polynomial: 0x100d4e63, Init: 0x0, ReflectIn: true, ReflectOut: true, FinalXor: 0x0}
// 通道2 crc32多项式为0x8ce56011
RSSP_I_C2_CRC32 = &crc.Parameters{Width: 32, Polynomial: 0x8ce56011, Init: 0x0, ReflectIn: false, ReflectOut: false, FinalXor: 0x0}
RSSP_I_C2_CRC32 = &crc.Parameters{Width: 32, Polynomial: 0x8ce56011, Init: 0x0, ReflectIn: true, ReflectOut: true, FinalXor: 0x0}
)
// Rssp_I_Crc16计算

20
third_party/message/rssp_code_test.go vendored Normal file
View File

@ -0,0 +1,20 @@
package message
import (
"fmt"
"testing"
)
func TestNewRsspLFSR(t *testing.T) {
lfsr := NewRsspLFSR(0x0FC22F87, 32, 0x7665986c, false)
for i := 0; i < 341; i++ {
lfsr.GetAndMove()
}
fmt.Printf("%x", lfsr.Get())
}
func TestRssp_I_Crc16(t *testing.T) {
bytes := []byte{0x01, 0x80, 0x3a, 0x30, 0x9e, 0x30, 0x24, 0x85, 00, 00, 0x23, 00, 0x3b, 0x2b, 0xb1, 0x08, 0xf8, 0xc0, 0x6c, 0x16, 0x80, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00}
crc := Rssp_I_Crc16(bytes)
fmt.Printf("crc16: %x\n", crc)
}

View File

@ -45,7 +45,7 @@ type VerifySimulation struct {
//设备UID映射 key-uid
UidMap map[string]*elementIdStructure
// 运行环境配置
runConfig *config.ThridPartyConfig
runConfig *config.ThirdPartyConfig
}
// 轨旁仿真内存模型
@ -146,6 +146,13 @@ func (s *VerifySimulation) GetComIdByUid(uid string) uint32 {
return es[uid].CommonId
}
func (s *VerifySimulation) GetRunConfig() config.ThirdPartyConfig {
if s.runConfig == nil {
return config.ThirdPartyConfig{}
}
return *s.runConfig
}
// GetBtmCanetConfig 获取CANET配置信息
func (s *VerifySimulation) GetBtmCanetConfig() config.BtmCanetConfig {
return s.runConfig.BtmCanet
@ -175,9 +182,6 @@ func (s *VerifySimulation) GetConnVobcTrain() *state_proto.TrainState {
func (s *VerifySimulation) GetBtmVobcConfig() config.BtmVobcConfig {
return s.runConfig.BtmVobc
}
func (s *VerifySimulation) GetLineAllRsspAxleCfgs() []config.RsspAxleConfig {
return s.runConfig.RsspAxleCfgs
}
// GetSectionCodePoints 获取集中站的区段码表
func (s *VerifySimulation) GetSectionCodePoints(city string, lineId string, centralizedStation string) []*proto.CiSectionCodePoint {
@ -555,7 +559,7 @@ func (s *VerifySimulation) initRunConfig(runConfig *dto.ProjectRunConfigDto) err
if runConfig == nil || runConfig.ConfigContent == "" {
return nil
}
var configMap config.ThridPartyConfig
var configMap config.ThirdPartyConfig
err := json.Unmarshal([]byte(runConfig.ConfigContent), &configMap)
if err != nil {
return sys_error.New("配置信息格式错误", err)

View File

@ -3,6 +3,7 @@ package ts
import (
"fmt"
"joylink.club/bj-rtsts-server/third_party/acc"
axleBeijing12 "joylink.club/bj-rtsts-server/third_party/axle_device/beijing12"
"joylink.club/bj-rtsts-server/third_party/btm_vobc"
"joylink.club/bj-rtsts-server/third_party/interlock/beijing11"
"joylink.club/bj-rtsts-server/third_party/interlock/beijing12"
@ -16,7 +17,6 @@ import (
"joylink.club/bj-rtsts-server/third_party/can_btm"
cidcmodbus "joylink.club/bj-rtsts-server/third_party/cidc_modbus"
"joylink.club/bj-rtsts-server/third_party/axle_device"
"joylink.club/bj-rtsts-server/third_party/electrical_machinery"
"joylink.club/bj-rtsts-server/message_server"
@ -130,7 +130,8 @@ func runThirdParty(s *memory.VerifySimulation) error {
}
}
// 计轴RSSP启动
axle_device.StartLineAllRsspAxleServices(s)
axleBeijing12.Start(s)
//obsolete.StartLineAllRsspAxleServices(s)
// 电机UDP启动
electrical_machinery.Default().Start(s)
// 车载BTM启动
@ -166,7 +167,8 @@ func stopThirdParty(s *memory.VerifySimulation) {
}
}
//计轴RSSP启动销毁
axle_device.StopLineAllRsspAxleServices()
axleBeijing12.Stop(s)
//obsolete.StopLineAllRsspAxleServices()
// 电机UDP停止
electrical_machinery.Default().Stop()
// 车载BTM停止