diff --git a/config/config.go b/config/config.go index 5e47af0..df89e83 100644 --- a/config/config.go +++ b/config/config.go @@ -12,6 +12,7 @@ type AppConfig struct { Server server Datasource datasource Logging log + Messaging messaging } type server struct { Port int @@ -30,6 +31,15 @@ type log struct { Compress bool // 是否压缩日志 Stdout bool // 是否输出到控制台 } +type messaging struct { + Centrifugo centrifugo +} +type centrifugo struct { + TokenSecret string + ApiKey string + ApiEndpoint string + Address string +} var Config AppConfig diff --git a/grpcproto/client.go b/grpcproto/client.go index 57f3d30..f4d24ab 100644 --- a/grpcproto/client.go +++ b/grpcproto/client.go @@ -1,17 +1,44 @@ package apiproto -import grpc "google.golang.org/grpc" +import ( + context "context" + "log" + grpc "google.golang.org/grpc" + "joylink.club/bj-rtsts-server/config" +) + +// Centrifugo 实时消息传递客户端 var client CentrifugoApiClient +// Centrifugo 客户端初始化 func InitClient() { - conn, err := grpc.Dial("192.168.3.158:10000", grpc.WithInsecure()) + conn, err := grpc.Dial(config.Config.Messaging.Centrifugo.Address, grpc.WithInsecure()) if err != nil { panic(err) } client = NewCentrifugoApiClient(conn) } +// 返回 Centrifugo 客户端 func Cli() CentrifugoApiClient { return client } + +// 发布消息 +func PublishMsg(channalName string, data []byte) { + resp, err := client.Publish(context.Background(), &PublishRequest{ + Channel: channalName, + Data: data, + }) + if err != nil { + log.Fatalf("Transport level error: %v \n", err) + } else { + if resp.GetError() != nil { + respError := resp.GetError() + log.Fatalf("Publish msg[%s] error %d(%s)\n", channalName, respError.Code, respError.Message) + } else { + log.Println("Successfully published") + } + } +} diff --git a/grpcproto/message.go b/grpcproto/message.go new file mode 100644 index 0000000..21f2a21 --- /dev/null +++ b/grpcproto/message.go @@ -0,0 +1,90 @@ +package apiproto + +import ( + "log" + "time" +) + +// 消息服务 +type IMsgServer interface { + // 获取通道名 + getChannelName() string + + // 发送消息间隔时间,单位ms + getInterval() time.Duration + + // 全量信息 + allMsgData() []byte + + // 定时发送的消息 + onTick() []TopicMsg +} + +// 消息实体 +type TopicMsg struct { + // 通道名称 + channalName string + + // 消息信息 + data []byte +} + +// 消息类型服务集合 +var serverMap = make(map[string]*IMsgServer) + +// 消息服务退出通道 +var serverExitChannelMap = make(map[string]chan bool) + +// 注册服务 +func RegisterMsgServer(server IMsgServer) { + serverMap[server.getChannelName()] = &server + if server.getInterval() > 0 { + exitChannel := make(chan bool) + ticker := time.NewTicker(server.getInterval()) + serverExitChannelMap[server.getChannelName()] = exitChannel + go func() { + defer func() { + if r := recover(); r != nil { + log.Fatalf("定时器发生错误,%v\n", r) + } + ticker.Stop() // 意外退出时关闭定时器 + }() + // 循环推送信息 + for _ = range ticker.C { + select { + case <-ticker.C: + topicMsgs := server.onTick() + if topicMsgs != nil && len(topicMsgs) != 0 { + for _, msg := range topicMsgs { + PublishMsg(msg.channalName, msg.data) + } + } + case <-exitChannel: + return + } + } + }() + } +} + +// 用户初次进入系统后执行消息发送(这里可以使用对个人消息发送,但目前命名空间没有调试通) +func Subscription() { + for key, server := range serverMap { + data := (*server).allMsgData() + if data != nil && len(data) > 0 { + PublishMsg(key, data) + } + } +} + +// 注销消息服务 +func UnRegisterMsgServer(key string) { + channel := serverExitChannelMap[key] + if channel != nil { + // 定时任务取消 + channel <- false + delete(serverExitChannelMap, key) + // 删除集合信息 + delete(serverMap, key) + } +} diff --git a/grpcproto/test_server.go b/grpcproto/test_server.go new file mode 100644 index 0000000..49fe4f2 --- /dev/null +++ b/grpcproto/test_server.go @@ -0,0 +1,24 @@ +package apiproto + +import "time" + +type TestServer struct { + data map[string]string +} + +func (t *TestServer) getChannelName() string { + return "test" +} + +func (t *TestServer) getInterval() time.Duration { + return time.Second +} + +func (t *TestServer) allMsgData() []byte { + return []byte(`{"input": "hello world"}`) +} + +func (t *TestServer) onTick() []TopicMsg { + msg := TopicMsg{t.getChannelName(), []byte(`{"input": "hello from GRPC"}`)} + return []TopicMsg{msg} +} diff --git a/xiannccda.setting.yml b/xiannccda.setting.yml index 9b1f2ae..7b2ba9e 100644 --- a/xiannccda.setting.yml +++ b/xiannccda.setting.yml @@ -32,3 +32,4 @@ messaging: tokenSecret: aaa apiKey: bbb apiEndpoint: /api + address: 192.168.3.233:10000