From 8bc2bfda910178db512fa9e22b77a521244dd53b Mon Sep 17 00:00:00 2001 From: weizhihong Date: Wed, 26 Jul 2023 17:51:32 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E3=80=90websocket=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E3=80=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/config.go | 10 +++++ grpcproto/client.go | 31 +++++++++++++- grpcproto/message.go | 90 ++++++++++++++++++++++++++++++++++++++++ grpcproto/test_server.go | 24 +++++++++++ xiannccda.setting.yml | 1 + 5 files changed, 154 insertions(+), 2 deletions(-) create mode 100644 grpcproto/message.go create mode 100644 grpcproto/test_server.go diff --git a/config/config.go b/config/config.go index 5e47af0..df89e83 100644 --- a/config/config.go +++ b/config/config.go @@ -12,6 +12,7 @@ type AppConfig struct { Server server Datasource datasource Logging log + Messaging messaging } type server struct { Port int @@ -30,6 +31,15 @@ type log struct { Compress bool // 是否压缩日志 Stdout bool // 是否输出到控制台 } +type messaging struct { + Centrifugo centrifugo +} +type centrifugo struct { + TokenSecret string + ApiKey string + ApiEndpoint string + Address string +} var Config AppConfig diff --git a/grpcproto/client.go b/grpcproto/client.go index 57f3d30..f4d24ab 100644 --- a/grpcproto/client.go +++ b/grpcproto/client.go @@ -1,17 +1,44 @@ package apiproto -import grpc "google.golang.org/grpc" +import ( + context "context" + "log" + grpc "google.golang.org/grpc" + "joylink.club/bj-rtsts-server/config" +) + +// Centrifugo 实时消息传递客户端 var client CentrifugoApiClient +// Centrifugo 客户端初始化 func InitClient() { - conn, err := grpc.Dial("192.168.3.158:10000", grpc.WithInsecure()) + conn, err := grpc.Dial(config.Config.Messaging.Centrifugo.Address, grpc.WithInsecure()) if err != nil { panic(err) } client = NewCentrifugoApiClient(conn) } +// 返回 Centrifugo 客户端 func Cli() CentrifugoApiClient { return client } + +// 发布消息 +func PublishMsg(channalName string, data []byte) { + resp, err := client.Publish(context.Background(), &PublishRequest{ + Channel: channalName, + Data: data, + }) + if err != nil { + log.Fatalf("Transport level error: %v \n", err) + } else { + if resp.GetError() != nil { + respError := resp.GetError() + log.Fatalf("Publish msg[%s] error %d(%s)\n", channalName, respError.Code, respError.Message) + } else { + log.Println("Successfully published") + } + } +} diff --git a/grpcproto/message.go b/grpcproto/message.go new file mode 100644 index 0000000..21f2a21 --- /dev/null +++ b/grpcproto/message.go @@ -0,0 +1,90 @@ +package apiproto + +import ( + "log" + "time" +) + +// 消息服务 +type IMsgServer interface { + // 获取通道名 + getChannelName() string + + // 发送消息间隔时间,单位ms + getInterval() time.Duration + + // 全量信息 + allMsgData() []byte + + // 定时发送的消息 + onTick() []TopicMsg +} + +// 消息实体 +type TopicMsg struct { + // 通道名称 + channalName string + + // 消息信息 + data []byte +} + +// 消息类型服务集合 +var serverMap = make(map[string]*IMsgServer) + +// 消息服务退出通道 +var serverExitChannelMap = make(map[string]chan bool) + +// 注册服务 +func RegisterMsgServer(server IMsgServer) { + serverMap[server.getChannelName()] = &server + if server.getInterval() > 0 { + exitChannel := make(chan bool) + ticker := time.NewTicker(server.getInterval()) + serverExitChannelMap[server.getChannelName()] = exitChannel + go func() { + defer func() { + if r := recover(); r != nil { + log.Fatalf("定时器发生错误,%v\n", r) + } + ticker.Stop() // 意外退出时关闭定时器 + }() + // 循环推送信息 + for _ = range ticker.C { + select { + case <-ticker.C: + topicMsgs := server.onTick() + if topicMsgs != nil && len(topicMsgs) != 0 { + for _, msg := range topicMsgs { + PublishMsg(msg.channalName, msg.data) + } + } + case <-exitChannel: + return + } + } + }() + } +} + +// 用户初次进入系统后执行消息发送(这里可以使用对个人消息发送,但目前命名空间没有调试通) +func Subscription() { + for key, server := range serverMap { + data := (*server).allMsgData() + if data != nil && len(data) > 0 { + PublishMsg(key, data) + } + } +} + +// 注销消息服务 +func UnRegisterMsgServer(key string) { + channel := serverExitChannelMap[key] + if channel != nil { + // 定时任务取消 + channel <- false + delete(serverExitChannelMap, key) + // 删除集合信息 + delete(serverMap, key) + } +} diff --git a/grpcproto/test_server.go b/grpcproto/test_server.go new file mode 100644 index 0000000..49fe4f2 --- /dev/null +++ b/grpcproto/test_server.go @@ -0,0 +1,24 @@ +package apiproto + +import "time" + +type TestServer struct { + data map[string]string +} + +func (t *TestServer) getChannelName() string { + return "test" +} + +func (t *TestServer) getInterval() time.Duration { + return time.Second +} + +func (t *TestServer) allMsgData() []byte { + return []byte(`{"input": "hello world"}`) +} + +func (t *TestServer) onTick() []TopicMsg { + msg := TopicMsg{t.getChannelName(), []byte(`{"input": "hello from GRPC"}`)} + return []TopicMsg{msg} +} diff --git a/xiannccda.setting.yml b/xiannccda.setting.yml index 9b1f2ae..7b2ba9e 100644 --- a/xiannccda.setting.yml +++ b/xiannccda.setting.yml @@ -32,3 +32,4 @@ messaging: tokenSecret: aaa apiKey: bbb apiEndpoint: /api + address: 192.168.3.233:10000 From d4f5915a7a4478679987bdc52514bbbefe4d8c2a Mon Sep 17 00:00:00 2001 From: weizhihong Date: Wed, 26 Jul 2023 18:00:00 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E3=80=90=E8=A7=A3=E5=86=B3=E5=86=B2?= =?UTF-8?q?=E7=AA=81=E3=80=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/config.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/config/config.go b/config/config.go index bdef266..1c86472 100644 --- a/config/config.go +++ b/config/config.go @@ -32,7 +32,7 @@ type log struct { Compress bool // 是否压缩日志 Stdout bool // 是否输出到控制台 } -<<<<<<< HEAD + type messaging struct { Centrifugo centrifugo } @@ -41,11 +41,10 @@ type centrifugo struct { ApiKey string ApiEndpoint string Address string -======= +} type dynamics struct { UdpLocalPort int UdpRemoteAddr string ->>>>>>> a435ec7520b6fab654c7e0b9ece87920ee534ba9 } var Config AppConfig From 93c41175ee9be9f4e25dfb9d7068374b92140976 Mon Sep 17 00:00:00 2001 From: weizhihong Date: Thu, 27 Jul 2023 16:59:55 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E3=80=90=E4=BF=AE=E6=94=B9=E8=8D=89?= =?UTF-8?q?=E7=A8=BF=E8=8E=B7=E5=8F=96=E7=94=A8=E6=88=B7ID=E6=96=B9?= =?UTF-8?q?=E5=BC=8F=E3=80=91=20=E3=80=90=E5=A2=9E=E5=8A=A0=E7=94=9F?= =?UTF-8?q?=E6=88=90=E6=B6=88=E6=81=AF=E4=B8=AD=E5=BF=83token=E5=85=B3?= =?UTF-8?q?=E9=94=AE=E5=AD=97=E3=80=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/drafting.go | 9 ++++++--- middleware/jwt.go | 10 ++++------ 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/api/drafting.go b/api/drafting.go index 1f21f08..1d313bc 100644 --- a/api/drafting.go +++ b/api/drafting.go @@ -7,6 +7,7 @@ import ( jwt "github.com/appleboy/gin-jwt/v2" "github.com/gin-gonic/gin" "go.uber.org/zap" + "joylink.club/bj-rtsts-server/db/model" "joylink.club/bj-rtsts-server/dto" "joylink.club/bj-rtsts-server/middleware" "joylink.club/bj-rtsts-server/service" @@ -68,7 +69,8 @@ func pageQueryDrafting(c *gin.Context) { // @Failure 500 {object} dto.ErrorDto // @Router /api/v1/drafting [post] func createDrafting(c *gin.Context) { - createId := middleware.ParseContextUserId(c) + user, _ := c.Get(middleware.IdentityKey) + createId := user.(model.User).ID req := dto.DraftingDto{} if err := c.ShouldBind(&req); err != nil { panic("传入参数为空") @@ -101,13 +103,14 @@ func saveAsDrafting(c *gin.Context) { panic("必要参数id不存在") } zap.S().Debug("传入参数id为" + id) - createId := middleware.ParseContextUserId(c) + user, _ := c.Get(middleware.IdentityKey) + createrId := user.(model.User).ID req := dto.DraftingDto{} if err := c.ShouldBind(&req); err != nil { zap.S().Warn("保存数据出错", err) } int64Id, _ := strconv.ParseInt(id, 10, 64) - service.SaveAsDrafting(createId, int32(int64Id), &req) + service.SaveAsDrafting(createrId, int32(int64Id), &req) c.Status(http.StatusOK) } diff --git a/middleware/jwt.go b/middleware/jwt.go index 4157f7f..d607080 100644 --- a/middleware/jwt.go +++ b/middleware/jwt.go @@ -2,6 +2,7 @@ package middleware import ( "log" + "strconv" "time" jwt "github.com/appleboy/gin-jwt/v2" @@ -12,6 +13,7 @@ import ( ) const IdentityKey = "id" +const CentrifugoKey = "sub" // centrifugo 消息传递服务器token验证需要的主键 func InitGinJwtMiddleware() (authMiddleware *jwt.GinJWTMiddleware) { // the jwt middleware @@ -24,7 +26,8 @@ func InitGinJwtMiddleware() (authMiddleware *jwt.GinJWTMiddleware) { PayloadFunc: func(data interface{}) jwt.MapClaims { if v, ok := data.(*model.User); ok { return jwt.MapClaims{ - IdentityKey: v.ID, + IdentityKey: v.ID, + CentrifugoKey: strconv.Itoa(int(v.ID)), } } return jwt.MapClaims{} @@ -79,8 +82,3 @@ func InitGinJwtMiddleware() (authMiddleware *jwt.GinJWTMiddleware) { return } - -func ParseContextUserId(c *gin.Context) int32 { - claims := jwt.ExtractClaims(c) - return int32(claims[IdentityKey].(float64)) -} From 6e8c17412aa732d8b35fb47f4690642266449031 Mon Sep 17 00:00:00 2001 From: weizhihong Date: Thu, 27 Jul 2023 17:03:41 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E3=80=90range=E8=A1=A8=E8=BE=BE=E5=BC=8F?= =?UTF-8?q?=E7=AE=80=E5=8C=96=E3=80=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- grpcproto/message.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/grpcproto/message.go b/grpcproto/message.go index 21f2a21..c788932 100644 --- a/grpcproto/message.go +++ b/grpcproto/message.go @@ -50,7 +50,7 @@ func RegisterMsgServer(server IMsgServer) { ticker.Stop() // 意外退出时关闭定时器 }() // 循环推送信息 - for _ = range ticker.C { + for range ticker.C { select { case <-ticker.C: topicMsgs := server.onTick()