【websocket消息实现】

This commit is contained in:
weizhihong 2023-07-26 17:51:32 +08:00
parent f58e2589d6
commit 8bc2bfda91
5 changed files with 154 additions and 2 deletions

View File

@ -12,6 +12,7 @@ type AppConfig struct {
Server server
Datasource datasource
Logging log
Messaging messaging
}
type server struct {
Port int
@ -30,6 +31,15 @@ type log struct {
Compress bool // 是否压缩日志
Stdout bool // 是否输出到控制台
}
type messaging struct {
Centrifugo centrifugo
}
type centrifugo struct {
TokenSecret string
ApiKey string
ApiEndpoint string
Address string
}
var Config AppConfig

View File

@ -1,17 +1,44 @@
package apiproto
import grpc "google.golang.org/grpc"
import (
context "context"
"log"
grpc "google.golang.org/grpc"
"joylink.club/bj-rtsts-server/config"
)
// Centrifugo 实时消息传递客户端
var client CentrifugoApiClient
// Centrifugo 客户端初始化
func InitClient() {
conn, err := grpc.Dial("192.168.3.158:10000", grpc.WithInsecure())
conn, err := grpc.Dial(config.Config.Messaging.Centrifugo.Address, grpc.WithInsecure())
if err != nil {
panic(err)
}
client = NewCentrifugoApiClient(conn)
}
// 返回 Centrifugo 客户端
func Cli() CentrifugoApiClient {
return client
}
// 发布消息
func PublishMsg(channalName string, data []byte) {
resp, err := client.Publish(context.Background(), &PublishRequest{
Channel: channalName,
Data: data,
})
if err != nil {
log.Fatalf("Transport level error: %v \n", err)
} else {
if resp.GetError() != nil {
respError := resp.GetError()
log.Fatalf("Publish msg[%s] error %d(%s)\n", channalName, respError.Code, respError.Message)
} else {
log.Println("Successfully published")
}
}
}

90
grpcproto/message.go Normal file
View File

@ -0,0 +1,90 @@
package apiproto
import (
"log"
"time"
)
// 消息服务
type IMsgServer interface {
// 获取通道名
getChannelName() string
// 发送消息间隔时间,单位ms
getInterval() time.Duration
// 全量信息
allMsgData() []byte
// 定时发送的消息
onTick() []TopicMsg
}
// 消息实体
type TopicMsg struct {
// 通道名称
channalName string
// 消息信息
data []byte
}
// 消息类型服务集合
var serverMap = make(map[string]*IMsgServer)
// 消息服务退出通道
var serverExitChannelMap = make(map[string]chan bool)
// 注册服务
func RegisterMsgServer(server IMsgServer) {
serverMap[server.getChannelName()] = &server
if server.getInterval() > 0 {
exitChannel := make(chan bool)
ticker := time.NewTicker(server.getInterval())
serverExitChannelMap[server.getChannelName()] = exitChannel
go func() {
defer func() {
if r := recover(); r != nil {
log.Fatalf("定时器发生错误,%v\n", r)
}
ticker.Stop() // 意外退出时关闭定时器
}()
// 循环推送信息
for _ = range ticker.C {
select {
case <-ticker.C:
topicMsgs := server.onTick()
if topicMsgs != nil && len(topicMsgs) != 0 {
for _, msg := range topicMsgs {
PublishMsg(msg.channalName, msg.data)
}
}
case <-exitChannel:
return
}
}
}()
}
}
// 用户初次进入系统后执行消息发送(这里可以使用对个人消息发送,但目前命名空间没有调试通)
func Subscription() {
for key, server := range serverMap {
data := (*server).allMsgData()
if data != nil && len(data) > 0 {
PublishMsg(key, data)
}
}
}
// 注销消息服务
func UnRegisterMsgServer(key string) {
channel := serverExitChannelMap[key]
if channel != nil {
// 定时任务取消
channel <- false
delete(serverExitChannelMap, key)
// 删除集合信息
delete(serverMap, key)
}
}

24
grpcproto/test_server.go Normal file
View File

@ -0,0 +1,24 @@
package apiproto
import "time"
type TestServer struct {
data map[string]string
}
func (t *TestServer) getChannelName() string {
return "test"
}
func (t *TestServer) getInterval() time.Duration {
return time.Second
}
func (t *TestServer) allMsgData() []byte {
return []byte(`{"input": "hello world"}`)
}
func (t *TestServer) onTick() []TopicMsg {
msg := TopicMsg{t.getChannelName(), []byte(`{"input": "hello from GRPC"}`)}
return []TopicMsg{msg}
}

View File

@ -32,3 +32,4 @@ messaging:
tokenSecret: aaa
apiKey: bbb
apiEndpoint: /api
address: 192.168.3.233:10000