rts-sim-testing-service/third_party/tpapi/service.go

147 lines
3.0 KiB
Go

package tpapi
import (
"context"
"fmt"
"sync"
"time"
)
type ThirdPartyApiServiceState uint32
const (
// 服务未启动
ThirdPartyState_Closed ThirdPartyApiServiceState = iota
// 服务正常
ThirdPartyState_Normal
// 服务异常
ThirdPartyState_Broken
)
// 第三方对接接口服务
type ThirdPartyApiService interface {
Name() string
// 服务状态
State() ThirdPartyApiServiceState
}
// func NewThirdPartyApiService() ThirdPartyApiService {
// return &thirdPartyApiService{
// state: ThirdPartyState_Closed,
// }
// }
// type thirdPartyApiService struct {
// state ThirdPartyApiServiceState
// }
// func (s *thirdPartyApiService) UpdateState(state ThirdPartyApiServiceState) {
// s.state = state
// }
// func (s *thirdPartyApiService) State() ThirdPartyApiServiceState {
// return s.state
// }
// UDP通信延时记录
type UdpDelayRecorder struct {
mu sync.Mutex
// 接收消息间隔,单位毫秒
interval int
// 上一次收到消息时刻
lastReceiedTime int64
// 近期收到消息时间间隔
intervals []int64
// 是否发生异常
err error
// 长时间未收到消息或恢复处理回调
handler func(err error)
cancelFunc context.CancelFunc
done chan struct{} // 服务协程退出信号
}
func NewUdpDelayRecorder(interval int, handler func(err error)) *UdpDelayRecorder {
return &UdpDelayRecorder{
interval: interval,
lastReceiedTime: 0,
handler: handler,
}
}
func (r *UdpDelayRecorder) Start() {
r.mu.Lock()
defer r.mu.Unlock()
r.lastReceiedTime = 0
r.err = nil
r.done = make(chan struct{})
ctx, cf := context.WithCancel(context.Background())
r.cancelFunc = cf
go r.checkTimeout(ctx)
}
func (r *UdpDelayRecorder) checkTimeout(ctx context.Context) {
defer close(r.done)
for {
select {
case <-ctx.Done():
return
default:
}
time.Sleep(time.Millisecond * time.Duration(r.interval*2))
lastReceiedTime := r.lastReceiedTime
interval := int(time.Now().UnixMilli() - lastReceiedTime)
if r.err == nil && interval > calUdpTimeout(r.interval) { // 超时故障回调
r.err = fmt.Errorf("UDP通信超时未收到消息,可能网络异常")
if r.handler != nil {
r.handler(r.err)
}
} else if r.err != nil && interval <= r.interval*3 { // 恢复正常回调
r.err = nil
if r.handler != nil {
r.handler(r.err)
}
}
}
}
func (r *UdpDelayRecorder) Stop() {
r.mu.Lock()
defer r.mu.Unlock()
if r.cancelFunc == nil {
return
}
r.cancelFunc()
<-r.done
r.lastReceiedTime = 0
r.err = nil
}
// 记录时间间隔
func (r *UdpDelayRecorder) RecordInterval() {
r.mu.Lock()
defer r.mu.Unlock()
// 记录时间及间隔
ts := time.Now().UnixMilli()
if r.lastReceiedTime > 0 {
r.intervals = append(r.intervals, ts-r.lastReceiedTime)
if len(r.intervals) > 10 {
r.intervals = r.intervals[1:]
}
}
r.lastReceiedTime = ts
}
func (r *UdpDelayRecorder) GetIntervals() []int64 {
return r.intervals
}
func calUdpTimeout(interval int) int {
if interval <= 50 {
return interval * 10
} else {
return interval * 5
}
}