rts-sim-testing-service/vobc/udp.go
2023-09-05 15:12:11 +08:00

168 lines
4.2 KiB
Go

package vobc
import (
"container/list"
"encoding/binary"
"fmt"
"math"
"net"
"sync"
"github.com/panjf2000/gnet/v2"
"go.uber.org/zap"
"joylink.club/bj-rtsts-server/config"
)
var (
running bool
mutex sync.Mutex
receiveTrainLifeSignal uint16 //接收列车消息生命信号
sendTrainLifeSignal uint16 //发送的列车消息生命信号
trainLifeSignalInit bool //列车生命信号是否初始化
limit uint16 = 10000 //用于处理生命信号循环使用产生的各种特殊处理
vobcMsgInfoChan chan []byte = make(chan []byte) //列车消息队列
//处理方法列表
handlerList list.List
)
func init() {
go func() {
for {
info := <-vobcMsgInfoChan
for e := handlerList.Front(); e != nil; e = e.Next() {
func() {
defer func() {
r := recover()
if r != nil {
zap.S().Errorf("列车信息处理函数报错")
}
}()
handler := e.Value.(VobcDataHandler)
handler(info)
}()
}
}
}()
}
type VobcDataHandler func([]byte)
type udpServer struct {
gnet.BuiltinEventEngine
eng gnet.Engine
addr string
multicore bool
}
// udp 启动时运行
func (server *udpServer) OnBoot(eng gnet.Engine) gnet.Action {
server.eng = eng
zap.S().Infof("vobc udp server with multi-core=%t is listening on %s", server.multicore, server.addr)
return gnet.None
}
// OnTraffic 接收到数据后的解析
func (server *udpServer) OnTraffic(c gnet.Conn) gnet.Action {
defer func() {
if r := recover(); r != nil {
zap.L().Error("vobc udp服务数据解析异常", zap.Any("panic", r))
}
}()
buf, _ := c.Next(-1)
lifeSignal := binary.BigEndian.Uint16(buf[0:2])
if !trainLifeSignalInit {
trainLifeSignalInit = true
} else if receiveTrainLifeSignal < limit {
if lifeSignal < receiveTrainLifeSignal || lifeSignal > receiveTrainLifeSignal-limit {
zap.S().Debugf("丢弃列车信息[%d-%d]", lifeSignal, receiveTrainLifeSignal)
return gnet.None
}
} else if receiveTrainLifeSignal < math.MaxUint16-10000 {
if lifeSignal < receiveTrainLifeSignal {
zap.S().Debugf("丢弃列车信息[%d-%d]", lifeSignal, receiveTrainLifeSignal)
return gnet.None
}
} else {
if lifeSignal < receiveTrainLifeSignal && lifeSignal > receiveTrainLifeSignal+10000 {
zap.S().Debugf("丢弃列车信息[%d-%d]", lifeSignal, receiveTrainLifeSignal)
return gnet.None
}
}
receiveTrainLifeSignal = lifeSignal
// trainInfo := decoderVobcTrainInfo(buf)
//消息队列
vobcMsgInfoChan <- buf
return gnet.None
}
// 注册处理vobc处理方法
func RegisterTrainInfoHandler(handler VobcDataHandler) {
handlerList.PushBack(handler)
}
// 创建UDP服务
func RunUdpServer() {
server := &udpServer{addr: fmt.Sprintf("udp://:%d", config.Config.Vobc.LocalPort), multicore: false}
err := gnet.Run(server, server.addr, gnet.WithMulticore(server.multicore))
zap.L().Fatal("vobc udp服务启动失败", zap.Error(err))
}
// 发送列车速度到VOBC
func SendTrainSpeedTask(speed float64) error {
if running {
return nil
}
mutex.Lock()
defer mutex.Unlock()
trainInfo := &SendTrainInfo{
LifeSignal: sendTrainLifeSignal,
Speed: uint16(speed),
}
err := sendVobcMsg(encoderVobcTrainInfo(trainInfo))
if err != nil {
zap.S().Error(err)
}
sendTrainLifeSignal++
return err
}
// UDP停止
func Stop() {
mutex.Lock()
defer mutex.Unlock()
running = false
}
// Vobc 消息发送消息
func sendVobcMsg(buf []byte) error {
defer func() {
if r := recover(); r != nil {
zap.S().Error("发送列车速度信息失败", r)
}
}()
addr := fmt.Sprintf("%v:%v", config.Config.Vobc.Ip, config.Config.Vobc.RemotePort)
remoteAddr, _ := net.ResolveUDPAddr("udp", addr)
conn, err := net.DialUDP("udp", nil, remoteAddr)
if err != nil {
zap.S().Error("UDP通信失败", err)
return err
}
defer func(conn *net.UDPConn) {
err := conn.Close()
if err != nil {
zap.S().Error(err)
}
}(conn)
_, err = conn.Write(buf)
return err
}
// 将道岔转为动力学的消息
func encoderVobcTrainInfo(info *SendTrainInfo) []byte {
var data []byte
data = binary.BigEndian.AppendUint16(data, info.LifeSignal)
data = binary.BigEndian.AppendUint16(data, info.Speed)
return data
}