Merge remote-tracking branch 'origin/master'

This commit is contained in:
joylink_zhangsai 2023-07-27 17:13:05 +08:00
commit 055a891ac2
7 changed files with 165 additions and 11 deletions

View File

@ -7,6 +7,7 @@ import (
jwt "github.com/appleboy/gin-jwt/v2"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
"joylink.club/bj-rtsts-server/db/model"
"joylink.club/bj-rtsts-server/dto"
"joylink.club/bj-rtsts-server/middleware"
"joylink.club/bj-rtsts-server/service"
@ -68,7 +69,8 @@ func pageQueryDrafting(c *gin.Context) {
// @Failure 500 {object} dto.ErrorDto
// @Router /api/v1/drafting [post]
func createDrafting(c *gin.Context) {
createId := middleware.ParseContextUserId(c)
user, _ := c.Get(middleware.IdentityKey)
createId := user.(model.User).ID
req := dto.DraftingDto{}
if err := c.ShouldBind(&req); err != nil {
panic("传入参数为空")
@ -101,13 +103,14 @@ func saveAsDrafting(c *gin.Context) {
panic("必要参数id不存在")
}
zap.S().Debug("传入参数id为" + id)
createId := middleware.ParseContextUserId(c)
user, _ := c.Get(middleware.IdentityKey)
createrId := user.(model.User).ID
req := dto.DraftingDto{}
if err := c.ShouldBind(&req); err != nil {
zap.S().Warn("保存数据出错", err)
}
int64Id, _ := strconv.ParseInt(id, 10, 64)
service.SaveAsDrafting(createId, int32(int64Id), &req)
service.SaveAsDrafting(createrId, int32(int64Id), &req)
c.Status(http.StatusOK)
}

View File

@ -12,6 +12,7 @@ type AppConfig struct {
Server server
Datasource datasource
Logging log
Messaging messaging
Dynamics dynamics
}
type server struct {
@ -31,6 +32,16 @@ type log struct {
Compress bool // 是否压缩日志
Stdout bool // 是否输出到控制台
}
type messaging struct {
Centrifugo centrifugo
}
type centrifugo struct {
TokenSecret string
ApiKey string
ApiEndpoint string
Address string
}
type dynamics struct {
Ip string
UdpLocalPort int

View File

@ -1,17 +1,44 @@
package apiproto
import grpc "google.golang.org/grpc"
import (
context "context"
"log"
grpc "google.golang.org/grpc"
"joylink.club/bj-rtsts-server/config"
)
// Centrifugo 实时消息传递客户端
var client CentrifugoApiClient
// Centrifugo 客户端初始化
func InitClient() {
conn, err := grpc.Dial("192.168.3.158:10000", grpc.WithInsecure())
conn, err := grpc.Dial(config.Config.Messaging.Centrifugo.Address, grpc.WithInsecure())
if err != nil {
panic(err)
}
client = NewCentrifugoApiClient(conn)
}
// 返回 Centrifugo 客户端
func Cli() CentrifugoApiClient {
return client
}
// 发布消息
func PublishMsg(channalName string, data []byte) {
resp, err := client.Publish(context.Background(), &PublishRequest{
Channel: channalName,
Data: data,
})
if err != nil {
log.Fatalf("Transport level error: %v \n", err)
} else {
if resp.GetError() != nil {
respError := resp.GetError()
log.Fatalf("Publish msg[%s] error %d(%s)\n", channalName, respError.Code, respError.Message)
} else {
log.Println("Successfully published")
}
}
}

90
grpcproto/message.go Normal file
View File

@ -0,0 +1,90 @@
package apiproto
import (
"log"
"time"
)
// 消息服务
type IMsgServer interface {
// 获取通道名
getChannelName() string
// 发送消息间隔时间,单位ms
getInterval() time.Duration
// 全量信息
allMsgData() []byte
// 定时发送的消息
onTick() []TopicMsg
}
// 消息实体
type TopicMsg struct {
// 通道名称
channalName string
// 消息信息
data []byte
}
// 消息类型服务集合
var serverMap = make(map[string]*IMsgServer)
// 消息服务退出通道
var serverExitChannelMap = make(map[string]chan bool)
// 注册服务
func RegisterMsgServer(server IMsgServer) {
serverMap[server.getChannelName()] = &server
if server.getInterval() > 0 {
exitChannel := make(chan bool)
ticker := time.NewTicker(server.getInterval())
serverExitChannelMap[server.getChannelName()] = exitChannel
go func() {
defer func() {
if r := recover(); r != nil {
log.Fatalf("定时器发生错误,%v\n", r)
}
ticker.Stop() // 意外退出时关闭定时器
}()
// 循环推送信息
for range ticker.C {
select {
case <-ticker.C:
topicMsgs := server.onTick()
if topicMsgs != nil && len(topicMsgs) != 0 {
for _, msg := range topicMsgs {
PublishMsg(msg.channalName, msg.data)
}
}
case <-exitChannel:
return
}
}
}()
}
}
// 用户初次进入系统后执行消息发送(这里可以使用对个人消息发送,但目前命名空间没有调试通)
func Subscription() {
for key, server := range serverMap {
data := (*server).allMsgData()
if data != nil && len(data) > 0 {
PublishMsg(key, data)
}
}
}
// 注销消息服务
func UnRegisterMsgServer(key string) {
channel := serverExitChannelMap[key]
if channel != nil {
// 定时任务取消
channel <- false
delete(serverExitChannelMap, key)
// 删除集合信息
delete(serverMap, key)
}
}

24
grpcproto/test_server.go Normal file
View File

@ -0,0 +1,24 @@
package apiproto
import "time"
type TestServer struct {
data map[string]string
}
func (t *TestServer) getChannelName() string {
return "test"
}
func (t *TestServer) getInterval() time.Duration {
return time.Second
}
func (t *TestServer) allMsgData() []byte {
return []byte(`{"input": "hello world"}`)
}
func (t *TestServer) onTick() []TopicMsg {
msg := TopicMsg{t.getChannelName(), []byte(`{"input": "hello from GRPC"}`)}
return []TopicMsg{msg}
}

View File

@ -2,6 +2,7 @@ package middleware
import (
"log"
"strconv"
"time"
jwt "github.com/appleboy/gin-jwt/v2"
@ -12,6 +13,7 @@ import (
)
const IdentityKey = "id"
const CentrifugoKey = "sub" // centrifugo 消息传递服务器token验证需要的主键
func InitGinJwtMiddleware() (authMiddleware *jwt.GinJWTMiddleware) {
// the jwt middleware
@ -25,6 +27,7 @@ func InitGinJwtMiddleware() (authMiddleware *jwt.GinJWTMiddleware) {
if v, ok := data.(*model.User); ok {
return jwt.MapClaims{
IdentityKey: v.ID,
CentrifugoKey: strconv.Itoa(int(v.ID)),
}
}
return jwt.MapClaims{}
@ -79,8 +82,3 @@ func InitGinJwtMiddleware() (authMiddleware *jwt.GinJWTMiddleware) {
return
}
func ParseContextUserId(c *gin.Context) int32 {
claims := jwt.ExtractClaims(c)
return int32(claims[IdentityKey].(float64))
}

View File

@ -38,3 +38,4 @@ messaging:
tokenSecret: aaa
apiKey: bbb
apiEndpoint: /api
address: 192.168.3.233:10000