jl-iot/mqtt/client.go

228 lines
5.7 KiB
Go

package mqtt
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/paho"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"joylink.club/iot/dto"
)
var iotcli *IotClient
type IotClient struct {
cmc *IotMqttConfig
cc *autopaho.ClientConfig
cm *autopaho.ConnectionManager
}
// 初始化并启动MQTT客户端服务
func Start(cmc *IotMqttConfig) error {
if err := checkConfig(cmc); err != nil {
return err
}
BuildTopics(cmc.AppId, cmc.ClientId)
cc, err := cmc.tryInto()
if err != nil {
return err
}
cm, err := autopaho.NewConnection(context.Background(), *cc)
if err != nil {
return err
}
iotcli = &IotClient{
cmc: cmc,
cc: cc,
cm: cm,
}
return nil
}
// 断开MQTT客户端
func Stop() error {
slog.Info("停止MQTT客户端")
ctx, cancle := context.WithTimeout(context.Background(), 5*time.Second)
defer cancle()
err := iotcli.cm.Disconnect(ctx)
return err
}
func checkConfig(cmc *IotMqttConfig) error {
if cmc.AppId == "" {
return fmt.Errorf("应用编号不能为空")
}
if cmc.ClientId == "" {
return fmt.Errorf("客户端编号不能为空")
}
if cmc.BrokerUrl == "" {
return fmt.Errorf("MQTT代理服务地址不能为空")
}
if cmc.Username == "" {
return fmt.Errorf("MQTT用户名不能为空")
}
if cmc.Password == "" {
return fmt.Errorf("MQTT密码不能为空")
}
return nil
}
// 发布IOT服务状态
func PubIotServiceState(s *dto.IotServiceState) error {
return pub(GetIotServiceStateTopic(), s)
}
// 发布IOT采集数据
func PubIotCjData(cj *dto.IotCj) error {
return pub(GetCjTopic(), cj)
}
// 发布IOT驱动数据
func PubIotQdData(qd *dto.IotQd) error {
return pub(GetQdTopic(), qd)
}
// 注册IOT采集数据处理
func RegIotCjHandler(h func(cj *dto.IotCj)) {
iotcli.cc.Router.RegisterHandler(GetCjTopic(), func(p *paho.Publish) {
cmd := &dto.IotCj{}
err := proto.Unmarshal(p.Payload, cmd)
if err != nil {
slog.Error("采集数据proto.Unmarshal异常", "error", err)
return
}
h(cmd)
})
slog.Info("注册IOT采集数据处理success")
}
// 注册IOT驱动数据处理
func RegIotQdHandler(h func(qd *dto.IotQd)) {
iotcli.cc.Router.RegisterHandler(GetQdTopic(), func(p *paho.Publish) {
cmd := &dto.IotQd{}
err := proto.Unmarshal(p.Payload, cmd)
if err != nil {
slog.Error("驱动数据proto.Unmarshal异常", "error", err)
return
}
h(cmd)
})
slog.Info("注册IOT驱动数据处理success")
}
// 注册IOT服务启动请求处理
func RegIotQcServiceStartReqHandler(h func(req *dto.IotQcServiceStartReq) *dto.IotQcServiceCommonResp) {
iotcli.cc.Router.RegisterHandler(GetIotServiceStartTopic(), func(p *paho.Publish) {
reqHandle(p, h, &dto.IotQcServiceStartReq{})
})
}
// 注册IOT服务停止请求处理
func RegIotQcServiceStopReqHandler(h func(req *dto.IotQcServiceStopReq) *dto.IotQcServiceCommonResp) {
iotcli.cc.Router.RegisterHandler(GetIotServiceStopTopic(), func(p *paho.Publish) {
reqHandle(p, h, &dto.IotQcServiceStopReq{})
})
}
// 注册IOT日志查询请求处理
func RegIotLogReqHandler(h func(req *dto.IotServiceLogReq) *dto.IotServiceLogResp) {
iotcli.cc.Router.RegisterHandler(GetLogReqTopic(), func(p *paho.Publish) {
reqHandle(p, h, &dto.IotServiceLogReq{})
})
}
// 注销IOT处理
func UnregHandler(topic string) {
iotcli.cc.Router.UnregisterHandler(topic)
}
// 注销所有IOT处理
func UnregAllHandler() {
iotcli.cc.Router = paho.NewStandardRouter()
}
func subIotQc() {
slog.Info("订阅Iot驱采")
sub(GetCjTopic()) // 订阅采集
sub(GetQdTopic()) // 订阅驱动
sub(GetIotServiceStartTopic()) // 订阅启动请求
sub(GetIotServiceStopTopic()) // 订阅停止请求
sub(GetLogReqTopic()) // 订阅日志查询请求
}
// 发起订阅
func sub(topic string) {
slog.Info("发起订阅", "topic", topic)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, err := iotcli.cm.Subscribe(ctx, &paho.Subscribe{
Subscriptions: []paho.SubscribeOptions{
{
Topic: topic,
QoS: 0,
NoLocal: true,
},
},
})
if err != nil {
slog.Error("订阅失败", "topic", topic, "error", err)
}
}
func reqHandle[T proto.Message, P proto.Message](p *paho.Publish, h func(T) P, r T) {
slog.Info("IOT-MQTT服务收到请求", "publish", p)
if p.Properties != nil && p.Properties.CorrelationData != nil && p.Properties.ResponseTopic != "" {
err := proto.Unmarshal(p.Payload, r)
if err != nil {
slog.Error("Iot请求响应数据处理proto.Unmarshal异常", "error", err)
return
}
resp := h(r)
b, err := proto.Marshal(resp)
if err != nil {
slog.Error("Iot请求响应数据处理proto.Marshal异常", "error", err)
}
_, err = iotcli.cm.Publish(context.Background(), &paho.Publish{
Topic: p.Properties.ResponseTopic,
Properties: &paho.PublishProperties{
CorrelationData: p.Properties.CorrelationData,
},
Payload: b,
})
if err != nil {
slog.Error("Iot请求处理回复异常", "error", err)
}
}
}
// 发布数据
func pub(topic string, data protoreflect.ProtoMessage) error {
if data == nil {
return fmt.Errorf("发布数据引用为nil")
}
b, err := proto.Marshal(data)
if err != nil {
return err
}
switch topic {
case GetIotServiceStateTopic():
slog.Debug("IOT-MQTT发布Iot服务状态", "topic", topic, "data", data)
case GetCjTopic():
slog.Debug("IOT-MQTT发布采集数据", "topic", topic, "data", data)
case GetQdTopic():
slog.Debug("IOT-MQTT发布驱动数据", "topic", topic, "data", data)
default:
return fmt.Errorf("IOT-MQTT未知发布主题: topic=%s", topic)
}
_, err = iotcli.cm.Publish(context.Background(), &paho.Publish{
Topic: topic,
QoS: 0,
Payload: b,
})
return err
}