作为子系统使用的例子程序实现(example/subsys_use/main.go)
修改IOT-MQTT接口bug 完善IOT启动/停止驱采服务逻辑
This commit is contained in:
parent
f64cccccfc
commit
e9e0ee0772
@ -2,8 +2,9 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log/slog"
|
||||||
"net/url"
|
"net/url"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -13,49 +14,244 @@ import (
|
|||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
"joylink.club/iot/dto"
|
"joylink.club/iot/dto"
|
||||||
"joylink.club/iot/mqtt"
|
"joylink.club/iot/mqtt"
|
||||||
|
"joylink.club/iot/server"
|
||||||
|
"joylink.club/iot/service"
|
||||||
|
"joylink.club/iot/service/model"
|
||||||
)
|
)
|
||||||
|
|
||||||
// 作为子系统使用方式
|
// 作为子系统使用方式
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
clientId := "iotlogreq_test"
|
go server.StartIotQcServer()
|
||||||
logReqTopic := mqtt.GetLogReqTopic()
|
|
||||||
cliCfg := getCmConfig(clientId, logReqTopic)
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
time.Sleep(2 * time.Second) // 等待mqtt主题初始化
|
||||||
defer cancel()
|
ac := initAppClient()
|
||||||
cm, err := autopaho.NewConnection(ctx, cliCfg)
|
time.Sleep(3 * time.Second) // 等待应用mqtt连接
|
||||||
if err != nil {
|
for i := 0; i < 4; i++ {
|
||||||
panic(err)
|
ac.startIotQcService() // 启动IOT驱采服务
|
||||||
|
time.Sleep(10 * time.Second)
|
||||||
|
ac.stopIotQcService() // 停止IOT驱采服务
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
}
|
}
|
||||||
h, err := rpc.NewHandler(ctx, rpc.HandlerOpts{
|
time.Sleep(10 * time.Second)
|
||||||
Conn: cm,
|
ac.disconnect()
|
||||||
Router: cliCfg.Router,
|
}
|
||||||
ResponseTopicFmt: "%s/iotlogresp",
|
|
||||||
ClientID: clientId,
|
type AppClient struct {
|
||||||
|
cfg *autopaho.ClientConfig
|
||||||
|
cm *autopaho.ConnectionManager
|
||||||
|
task service.IScheduledTask
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *AppClient) stopIotQcService() {
|
||||||
|
resp, err := app.iotStopReq(&dto.IotQcServiceStopReq{})
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Errorf("停止服务请求错误, err: %v", err))
|
||||||
|
}
|
||||||
|
if resp.Code != 0 {
|
||||||
|
panic(fmt.Errorf("停止服务请求响应错误, code: %d, msg: %s", resp.Code, resp.Msg))
|
||||||
|
}
|
||||||
|
slog.Info("应用停止iot服务成功", "resp", resp)
|
||||||
|
app.cfg.Router.UnregisterHandler(mqtt.GetCjTopic())
|
||||||
|
app.cfg.Router.UnregisterHandler(mqtt.GetQdTopic())
|
||||||
|
t := app.task
|
||||||
|
app.task = nil
|
||||||
|
if t != nil {
|
||||||
|
t.Stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *AppClient) startIotQcService() {
|
||||||
|
modbusCfg := &dto.ModbusConfig{
|
||||||
|
Url: "tcp://127.0.0.1:502",
|
||||||
|
UnitId: 2,
|
||||||
|
Timeout: 500,
|
||||||
|
Interval: 1000,
|
||||||
|
Qdl: 2, // 驱动数据字节数
|
||||||
|
Cjl: 2, // 采集数据字节数
|
||||||
|
Mapping: []*dto.ModbusDcMapping{
|
||||||
|
{
|
||||||
|
// Function: proto.Modbus_ReadHoldingRegister,
|
||||||
|
Function: dto.Modbus_ReadCoil,
|
||||||
|
Addr: 0,
|
||||||
|
Quantity: 16,
|
||||||
|
Type: dto.DataType_CJ,
|
||||||
|
Start: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Function: dto.Modbus_RWCoils,
|
||||||
|
Addr: 16,
|
||||||
|
Quantity: 16,
|
||||||
|
Type: dto.DataType_QD,
|
||||||
|
Start: 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
resp, err := app.iotServiceStartReq(&dto.IotQcServiceStartReq{
|
||||||
|
Config: modbusCfg,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
panic(fmt.Errorf("启动服务请求错误, err: %v", err))
|
||||||
}
|
}
|
||||||
|
if resp.Code != 0 {
|
||||||
|
panic(fmt.Errorf("启动服务请求响应错误, code: %d, msg: %s", resp.Code, resp.Msg))
|
||||||
|
}
|
||||||
|
slog.Info("应用启动iot服务成功", "resp", resp)
|
||||||
|
app.RegIotCjDataHandler(func(cj *dto.IotCj) {
|
||||||
|
slog.Info("应用收到采集数据", "cj", model.BytesDebug(cj.Data))
|
||||||
|
})
|
||||||
|
app.RegIotQdDataHandler(func(qd *dto.IotQd) {
|
||||||
|
slog.Info("应用收到驱动数据", "qd", model.BytesDebug(qd.Data))
|
||||||
|
})
|
||||||
|
|
||||||
req := &dto.IotServiceLogReq{
|
i := 0
|
||||||
Count: 10,
|
writeTask := service.NewScheduledTask(func() {
|
||||||
|
i++
|
||||||
|
idx := i % 8
|
||||||
|
err := app.PubIotQdData(&dto.IotQd{Data: []byte{byte(1 << idx), byte(3 << idx)}})
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("发布写入驱动数据错误", "error", err)
|
||||||
}
|
}
|
||||||
|
}, time.Second)
|
||||||
|
app.task = writeTask
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *AppClient) disconnect() {
|
||||||
|
slog.Info("断开应用MQTT客户端")
|
||||||
|
ctx, cancle := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancle()
|
||||||
|
err := app.cm.Disconnect(ctx)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("断开MQTT客户端失败", "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func (app *AppClient) newRpcHandler(respTopicFmt string) (*rpc.Handler, error) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
|
||||||
|
defer cancel()
|
||||||
|
h, err := rpc.NewHandler(ctx, rpc.HandlerOpts{
|
||||||
|
Conn: app.cm,
|
||||||
|
Router: app.cfg.Router,
|
||||||
|
ResponseTopicFmt: respTopicFmt,
|
||||||
|
ClientID: app.cfg.ClientID,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("创建RPC处理器失败", "err", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return h, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 注册IOT服务启动请求处理
|
||||||
|
func (app *AppClient) iotServiceStartReq(req *dto.IotQcServiceStartReq) (*dto.IotQcServiceCommonResp, error) {
|
||||||
|
h, err := app.newRpcHandler("%s/iotstartresp")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
|
||||||
|
defer cancel()
|
||||||
b, err := proto.Marshal(req)
|
b, err := proto.Marshal(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
slog.Error("序列化请求消息失败", "err", err)
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
resp, err := h.Request(ctx, &paho.Publish{
|
resp, err := h.Request(ctx, &paho.Publish{
|
||||||
Topic: logReqTopic,
|
Topic: mqtt.GetIotServiceStartTopic(),
|
||||||
Payload: b,
|
Payload: b,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
return nil, errors.Join(fmt.Errorf("发送启动服务请求错误"), err)
|
||||||
}
|
}
|
||||||
fmt.Printf("请求结果: %v\n", resp)
|
result := &dto.IotQcServiceCommonResp{}
|
||||||
|
proto.Unmarshal(resp.Payload, result)
|
||||||
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getCmConfig(clientId, logReqTopic string) autopaho.ClientConfig {
|
func (app *AppClient) iotStopReq(req *dto.IotQcServiceStopReq) (*dto.IotQcServiceCommonResp, error) {
|
||||||
|
h, err := app.newRpcHandler("%s/iotstopresp")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
||||||
|
defer cancel()
|
||||||
|
b, err := proto.Marshal(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Join(fmt.Errorf("序列化IOT服务停止请求消息失败"), err)
|
||||||
|
}
|
||||||
|
resp, err := h.Request(ctx, &paho.Publish{
|
||||||
|
Topic: mqtt.GetIotServiceStopTopic(),
|
||||||
|
Payload: b,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Join(fmt.Errorf("发送停止IOT服务请求错误"), err)
|
||||||
|
}
|
||||||
|
result := &dto.IotQcServiceCommonResp{}
|
||||||
|
proto.Unmarshal(resp.Payload, result)
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *AppClient) RegIotCjDataHandler(h func(*dto.IotCj)) {
|
||||||
|
app.cfg.Router.RegisterHandler(mqtt.GetCjTopic(), func(p *paho.Publish) {
|
||||||
|
cj := &dto.IotCj{}
|
||||||
|
err := proto.Unmarshal(p.Payload, cj)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("采集数据proto.Unmarshal异常", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
h(cj)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *AppClient) RegIotQdDataHandler(h func(*dto.IotQd)) {
|
||||||
|
app.cfg.Router.RegisterHandler(mqtt.GetQdTopic(), func(p *paho.Publish) {
|
||||||
|
qd := &dto.IotQd{}
|
||||||
|
err := proto.Unmarshal(p.Payload, qd)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("驱动数据proto.Unmarshal异常", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
h(qd)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *AppClient) PubIotCjData(cj *dto.IotCj) error {
|
||||||
|
return app.pub(mqtt.GetCjTopic(), cj)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *AppClient) PubIotQdData(qd *dto.IotQd) error {
|
||||||
|
slog.Warn("应用发布驱动数据", "topic", mqtt.GetQdTopic(), "data", model.BytesDebug(qd.Data))
|
||||||
|
return app.pub(mqtt.GetQdTopic(), qd)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *AppClient) pub(topic string, data proto.Message) error {
|
||||||
|
b, err := proto.Marshal(data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
_, err = app.cm.Publish(ctx, &paho.Publish{
|
||||||
|
Topic: topic,
|
||||||
|
Payload: b,
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func initAppClient() *AppClient {
|
||||||
|
clientId := "iotstartreq_test"
|
||||||
|
topics := []string{mqtt.GetIotServiceStateTopic(), mqtt.GetCjTopic(), mqtt.GetQdTopic()}
|
||||||
|
cfg := getCmConfig(clientId, topics)
|
||||||
|
|
||||||
|
// ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
// defer cancel()
|
||||||
|
cm, err := autopaho.NewConnection(context.Background(), cfg)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
ac := &AppClient{cfg: &cfg, cm: cm}
|
||||||
|
return ac
|
||||||
|
}
|
||||||
|
|
||||||
|
func getCmConfig(clientId string, subTopics []string) autopaho.ClientConfig {
|
||||||
addr, _ := url.Parse("tcp://192.168.3.233:1883")
|
addr, _ := url.Parse("tcp://192.168.3.233:1883")
|
||||||
cc := autopaho.ClientConfig{
|
cc := autopaho.ClientConfig{
|
||||||
BrokerUrls: []*url.URL{addr},
|
BrokerUrls: []*url.URL{addr},
|
||||||
@ -64,26 +260,28 @@ func getCmConfig(clientId, logReqTopic string) autopaho.ClientConfig {
|
|||||||
fmt.Println("mqtt connection up")
|
fmt.Println("mqtt connection up")
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
for _, v := range subTopics {
|
||||||
if _, err := cm.Subscribe(ctx, &paho.Subscribe{
|
if _, err := cm.Subscribe(ctx, &paho.Subscribe{
|
||||||
Subscriptions: []paho.SubscribeOptions{
|
Subscriptions: []paho.SubscribeOptions{
|
||||||
{Topic: logReqTopic, QoS: 0},
|
{Topic: v, QoS: 0, NoLocal: true},
|
||||||
},
|
},
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err)
|
fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
}
|
||||||
fmt.Println("mqtt subscription made")
|
fmt.Println("mqtt subscription made")
|
||||||
},
|
},
|
||||||
OnConnectError: func(err error) { fmt.Printf("error whilst attempting connection: %s\n", err) },
|
OnConnectError: func(err error) { fmt.Printf("error whilst attempting connection: %s\n", err) },
|
||||||
ClientConfig: paho.ClientConfig{
|
ClientConfig: paho.ClientConfig{
|
||||||
ClientID: clientId,
|
ClientID: clientId,
|
||||||
Router: paho.NewStandardRouter(),
|
Router: paho.NewStandardRouter(),
|
||||||
OnClientError: func(err error) { fmt.Printf("%s requested disconnect: %s\n", clientId, err) },
|
OnClientError: func(err error) { fmt.Printf("%s 客户端错误: %s\n", clientId, err) },
|
||||||
OnServerDisconnect: func(d *paho.Disconnect) {
|
OnServerDisconnect: func(d *paho.Disconnect) {
|
||||||
if d.Properties != nil {
|
if d.Properties != nil {
|
||||||
fmt.Printf("%s requested disconnect: %s\n", clientId, d.Properties.ReasonString)
|
fmt.Printf("%s 服务断联: %v\n", clientId, d)
|
||||||
} else {
|
} else {
|
||||||
fmt.Printf("%s requested disconnect; reason code: %d\n", clientId, d.ReasonCode)
|
fmt.Printf("%s 服务断联; reason code: %d\n", clientId, d.ReasonCode)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -83,12 +83,12 @@ func PubIotCjData(cj *dto.IotCj) error {
|
|||||||
|
|
||||||
// 发布IOT驱动数据
|
// 发布IOT驱动数据
|
||||||
func PubIotQdData(qd *dto.IotQd) error {
|
func PubIotQdData(qd *dto.IotQd) error {
|
||||||
return pub(GetCjTopic(), qd)
|
return pub(GetQdTopic(), qd)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 注册IOT采集数据处理
|
// 注册IOT采集数据处理
|
||||||
func RegIotCjHandler(h func(cj *dto.IotCj)) {
|
func RegIotCjHandler(h func(cj *dto.IotCj)) {
|
||||||
iotcli.cc.Router.RegisterHandler(GetLogReqTopic(), func(p *paho.Publish) {
|
iotcli.cc.Router.RegisterHandler(GetCjTopic(), func(p *paho.Publish) {
|
||||||
cmd := &dto.IotCj{}
|
cmd := &dto.IotCj{}
|
||||||
err := proto.Unmarshal(p.Payload, cmd)
|
err := proto.Unmarshal(p.Payload, cmd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -97,11 +97,12 @@ func RegIotCjHandler(h func(cj *dto.IotCj)) {
|
|||||||
}
|
}
|
||||||
h(cmd)
|
h(cmd)
|
||||||
})
|
})
|
||||||
|
slog.Info("注册IOT采集数据处理success")
|
||||||
}
|
}
|
||||||
|
|
||||||
// 注册IOT驱动数据处理
|
// 注册IOT驱动数据处理
|
||||||
func RegIotQdHandler(h func(qd *dto.IotQd)) {
|
func RegIotQdHandler(h func(qd *dto.IotQd)) {
|
||||||
iotcli.cc.Router.RegisterHandler(GetLogReqTopic(), func(p *paho.Publish) {
|
iotcli.cc.Router.RegisterHandler(GetQdTopic(), func(p *paho.Publish) {
|
||||||
cmd := &dto.IotQd{}
|
cmd := &dto.IotQd{}
|
||||||
err := proto.Unmarshal(p.Payload, cmd)
|
err := proto.Unmarshal(p.Payload, cmd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -110,6 +111,7 @@ func RegIotQdHandler(h func(qd *dto.IotQd)) {
|
|||||||
}
|
}
|
||||||
h(cmd)
|
h(cmd)
|
||||||
})
|
})
|
||||||
|
slog.Info("注册IOT驱动数据处理success")
|
||||||
}
|
}
|
||||||
|
|
||||||
// 注册IOT服务启动请求处理
|
// 注册IOT服务启动请求处理
|
||||||
@ -172,7 +174,7 @@ func sub(topic string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func reqHandle[T proto.Message, P proto.Message](p *paho.Publish, h func(T) P, r T) {
|
func reqHandle[T proto.Message, P proto.Message](p *paho.Publish, h func(T) P, r T) {
|
||||||
fmt.Printf("收到请求: %v\n", p)
|
slog.Info("IOT-MQTT服务收到请求", "publish", p)
|
||||||
if p.Properties != nil && p.Properties.CorrelationData != nil && p.Properties.ResponseTopic != "" {
|
if p.Properties != nil && p.Properties.CorrelationData != nil && p.Properties.ResponseTopic != "" {
|
||||||
err := proto.Unmarshal(p.Payload, r)
|
err := proto.Unmarshal(p.Payload, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -206,17 +208,16 @@ func pub(topic string, data protoreflect.ProtoMessage) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// switch topic {
|
switch topic {
|
||||||
// case GetIotServiceStateTopic():
|
case GetIotServiceStateTopic():
|
||||||
// slog.Debug("发布Iot服务状态", "topic", topic, "data", data)
|
slog.Debug("IOT-MQTT发布Iot服务状态", "topic", topic, "data", data)
|
||||||
// case GetCjTopic():
|
case GetCjTopic():
|
||||||
// slog.Debug("发布采集数据", "topic", topic, "data", data)
|
slog.Debug("IOT-MQTT发布采集数据", "topic", topic, "data", data)
|
||||||
// case GetQdTopic():
|
case GetQdTopic():
|
||||||
// slog.Debug("发布驱动数据", "topic", topic, "data", data)
|
slog.Debug("IOT-MQTT发布驱动数据", "topic", topic, "data", data)
|
||||||
// default:
|
default:
|
||||||
// slog.Error("未知发布主题", "topic", topic, "data", data)
|
return fmt.Errorf("IOT-MQTT未知发布主题: topic=%s", topic)
|
||||||
// return fmt.Errorf("未知发布主题: topic=%s", topic)
|
}
|
||||||
// }
|
|
||||||
_, err = iotcli.cm.Publish(context.Background(), &paho.Publish{
|
_, err = iotcli.cm.Publish(context.Background(), &paho.Publish{
|
||||||
Topic: topic,
|
Topic: topic,
|
||||||
QoS: 0,
|
QoS: 0,
|
||||||
|
@ -43,18 +43,18 @@ func (c *IotMqttConfig) tryInto() (*autopaho.ClientConfig, error) {
|
|||||||
ConnectRetryDelay: time.Duration(c.ConnectRetryDelay) * time.Second,
|
ConnectRetryDelay: time.Duration(c.ConnectRetryDelay) * time.Second,
|
||||||
ConnectTimeout: time.Duration(c.ConnectTimeout) * time.Second,
|
ConnectTimeout: time.Duration(c.ConnectTimeout) * time.Second,
|
||||||
OnConnectionUp: func(*autopaho.ConnectionManager, *paho.Connack) {
|
OnConnectionUp: func(*autopaho.ConnectionManager, *paho.Connack) {
|
||||||
slog.Info("MQTT连接成功")
|
slog.Info("MQTT连接成功", "url", c.BrokerUrl)
|
||||||
subIotQc()
|
subIotQc()
|
||||||
},
|
},
|
||||||
OnConnectError: func(err error) {
|
OnConnectError: func(err error) {
|
||||||
slog.Error("MQTT连接失败", "error", err)
|
slog.Error("MQTT连接失败", "url", c.BrokerUrl, "error", err)
|
||||||
},
|
},
|
||||||
ClientConfig: paho.ClientConfig{
|
ClientConfig: paho.ClientConfig{
|
||||||
ClientID: c.ClientId,
|
ClientID: c.ClientId,
|
||||||
Router: paho.NewStandardRouter(),
|
Router: paho.NewStandardRouter(),
|
||||||
OnClientError: func(err error) { fmt.Printf("%s Mqtt客户端发生错误: %s\n", c.ClientId, err) },
|
OnClientError: func(err error) { slog.Error("MQTT客户端发生错误", "clientId", c.ClientId, "err", err) },
|
||||||
OnServerDisconnect: func(d *paho.Disconnect) {
|
OnServerDisconnect: func(d *paho.Disconnect) {
|
||||||
fmt.Printf("%s 连接断开; reason code: %d,properties: %v\n", c.ClientId, d.ReasonCode, d.Properties)
|
slog.Error("MQTT连接断开", "clientId", c.ClientId, "reasonCode", d.ReasonCode, "properties", d.Properties)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
@ -15,8 +16,10 @@ var iqcs *IotQcServer
|
|||||||
|
|
||||||
type IotQcServer struct {
|
type IotQcServer struct {
|
||||||
qcMappingService service.IotQcMappingService
|
qcMappingService service.IotQcMappingService
|
||||||
|
qcDataPubTask service.IScheduledTask
|
||||||
tasks []service.IScheduledTask
|
tasks []service.IScheduledTask
|
||||||
state *dto.IotServiceState
|
state *dto.IotServiceState
|
||||||
|
cancel context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *IotQcServer) start() error {
|
func (s *IotQcServer) start() error {
|
||||||
@ -25,9 +28,35 @@ func (s *IotQcServer) start() error {
|
|||||||
s.registerReqHandlers()
|
s.registerReqHandlers()
|
||||||
// 启动服务状态发布定时任务
|
// 启动服务状态发布定时任务
|
||||||
iqcs.tasks = append(iqcs.tasks, service.NewScheduledTask(pubServerState, 1*time.Second))
|
iqcs.tasks = append(iqcs.tasks, service.NewScheduledTask(pubServerState, 1*time.Second))
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
s.serve(ctx)
|
||||||
|
s.cancel = cancel
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *IotQcServer) serve(ctx context.Context) {
|
||||||
|
defer s.stop()
|
||||||
|
for {
|
||||||
|
<-ctx.Done()
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *IotQcServer) stop() error {
|
||||||
|
if s.qcDataPubTask != nil {
|
||||||
|
s.qcDataPubTask.Stop()
|
||||||
|
}
|
||||||
|
if s.qcMappingService != nil {
|
||||||
|
s.qcMappingService.Stop()
|
||||||
|
}
|
||||||
|
for _, task := range s.tasks {
|
||||||
|
task.Stop()
|
||||||
|
}
|
||||||
|
mqtt.Stop()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 服务状态监测
|
||||||
func (s *IotQcServer) stateMonitor() *dto.IotServiceState {
|
func (s *IotQcServer) stateMonitor() *dto.IotServiceState {
|
||||||
if s.qcMappingService != nil {
|
if s.qcMappingService != nil {
|
||||||
if err := s.qcMappingService.ReportError(); err != nil {
|
if err := s.qcMappingService.ReportError(); err != nil {
|
||||||
@ -43,10 +72,22 @@ func (s *IotQcServer) stateMonitor() *dto.IotServiceState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 注册服务请求处理
|
||||||
func (s *IotQcServer) registerReqHandlers() {
|
func (s *IotQcServer) registerReqHandlers() {
|
||||||
mqtt.RegIotQcServiceStartReqHandler(s.startIotQcMappingService)
|
mqtt.RegIotQcServiceStartReqHandler(s.startIotQcMappingService)
|
||||||
mqtt.RegIotQcServiceStopReqHandler(s.stopIotQcMappingService)
|
mqtt.RegIotQcServiceStopReqHandler(s.stopIotQcMappingService)
|
||||||
mqtt.RegIotLogReqHandler(GetIotLog)
|
mqtt.RegIotLogReqHandler(GetIotLog)
|
||||||
|
// 注册驱采数据写入处理
|
||||||
|
mqtt.RegIotQdHandler(s.handleQdWrite)
|
||||||
|
mqtt.RegIotCjHandler(s.handleCjWrite)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *IotQcServer) pubQcData() {
|
||||||
|
service := s.qcMappingService
|
||||||
|
if service != nil {
|
||||||
|
mqtt.PubIotCjData(&dto.IotCj{Data: service.GetCjBytes()})
|
||||||
|
mqtt.PubIotQdData(&dto.IotQd{Data: service.GetQdBytes()})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *IotQcServer) startIotQcMappingService(req *dto.IotQcServiceStartReq) *dto.IotQcServiceCommonResp {
|
func (s *IotQcServer) startIotQcMappingService(req *dto.IotQcServiceStartReq) *dto.IotQcServiceCommonResp {
|
||||||
@ -56,13 +97,34 @@ func (s *IotQcServer) startIotQcMappingService(req *dto.IotQcServiceStartReq) *d
|
|||||||
return &dto.IotQcServiceCommonResp{Code: 1, Msg: err.Error()}
|
return &dto.IotQcServiceCommonResp{Code: 1, Msg: err.Error()}
|
||||||
}
|
}
|
||||||
s.qcMappingService = mqcs
|
s.qcMappingService = mqcs
|
||||||
|
s.qcDataPubTask = service.NewScheduledTask(s.pubQcData, time.Duration(req.Config.Interval)*time.Millisecond)
|
||||||
return &dto.IotQcServiceCommonResp{Code: 0, Msg: "成功"}
|
return &dto.IotQcServiceCommonResp{Code: 0, Msg: "成功"}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *IotQcServer) handleQdWrite(qd *dto.IotQd) {
|
||||||
|
if s.qcMappingService != nil {
|
||||||
|
slog.Info("IOT收到并执行写入驱动数据", "data", qd.Data)
|
||||||
|
s.qcMappingService.WriteQdBytes(qd.Data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *IotQcServer) handleCjWrite(cj *dto.IotCj) {
|
||||||
|
if s.qcMappingService != nil {
|
||||||
|
slog.Info("IOT收到并执行写入采集数据", "data", cj.Data)
|
||||||
|
s.qcMappingService.WriteCjBytes(cj.Data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *IotQcServer) stopIotQcMappingService(req *dto.IotQcServiceStopReq) *dto.IotQcServiceCommonResp {
|
func (s *IotQcServer) stopIotQcMappingService(req *dto.IotQcServiceStopReq) *dto.IotQcServiceCommonResp {
|
||||||
if err := s.qcMappingService.Stop(); err != nil {
|
task := s.qcDataPubTask
|
||||||
slog.Error("停止Modbus驱采映射服务失败", "err", err)
|
s.qcDataPubTask = nil
|
||||||
return &dto.IotQcServiceCommonResp{Code: 1, Msg: err.Error()}
|
if task != nil {
|
||||||
|
task.Stop()
|
||||||
|
}
|
||||||
|
service := s.qcMappingService
|
||||||
|
s.qcMappingService = nil
|
||||||
|
if service != nil {
|
||||||
|
service.Stop()
|
||||||
}
|
}
|
||||||
return &dto.IotQcServiceCommonResp{Code: 0, Msg: "成功"}
|
return &dto.IotQcServiceCommonResp{Code: 0, Msg: "成功"}
|
||||||
}
|
}
|
||||||
@ -77,8 +139,13 @@ func StartIotQcServer() {
|
|||||||
iqcs.start()
|
iqcs.start()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func StopIotQcServer() {
|
||||||
|
iqcs.cancel()
|
||||||
|
}
|
||||||
|
|
||||||
func pubServerState() {
|
func pubServerState() {
|
||||||
state := iqcs.stateMonitor()
|
state := iqcs.stateMonitor()
|
||||||
|
slog.Debug("发布服务状态", "state", state.State, "msg", state.ErrMsg)
|
||||||
mqtt.PubIotServiceState(state)
|
mqtt.PubIotServiceState(state)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -24,7 +24,8 @@ type modbusQcService struct {
|
|||||||
// ReportError implements IotQcMappingService.
|
// ReportError implements IotQcMappingService.
|
||||||
func (s *modbusQcService) ReportError() error {
|
func (s *modbusQcService) ReportError() error {
|
||||||
if !s.cli.IsConnected() {
|
if !s.cli.IsConnected() {
|
||||||
return fmt.Errorf("modbus连接断开")
|
slog.Error("Modbus驱采服务映射任务Modbus客户端未连接,", "url", s.config.Url, "unitid", s.config.UnitId)
|
||||||
|
return fmt.Errorf("modbus未连接或连接断开")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -96,6 +97,7 @@ func (s *modbusQcService) WriteQdBytes(bytes []byte) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 新建Modbus驱采映射处理服务
|
||||||
func NewModbusQcService(config *dto.ModbusConfig) (IotQcMappingService, error) {
|
func NewModbusQcService(config *dto.ModbusConfig) (IotQcMappingService, error) {
|
||||||
// 基础配置检查
|
// 基础配置检查
|
||||||
if err := checkConfig(config); err != nil {
|
if err := checkConfig(config); err != nil {
|
||||||
@ -147,7 +149,7 @@ func (m *modbusQcService) onWrite(dt dto.DataType, bytes []byte) error {
|
|||||||
slog.Error("Modbus驱动采集服务写入线圈失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err, "Function", mdm.Function)
|
slog.Error("Modbus驱动采集服务写入线圈失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err, "Function", mdm.Function)
|
||||||
return err
|
return err
|
||||||
} else {
|
} else {
|
||||||
slog.Debug("Modbus驱动采集服务写入线圈成功", "url", m.config.Url, "unitid", m.config.UnitId, "Function", mdm.Function, "data", model.BitsDebug(data), "mapping", mdm)
|
slog.Info("Modbus驱动采集服务写入线圈成功", "url", m.config.Url, "unitid", m.config.UnitId, "Function", mdm.Function, "data", model.BitsDebug(data), "mapping", mdm)
|
||||||
}
|
}
|
||||||
case dto.Modbus_WriteRegister, dto.Modbus_WriteRegisters, dto.Modbus_RWRegisters:
|
case dto.Modbus_WriteRegister, dto.Modbus_WriteRegisters, dto.Modbus_RWRegisters:
|
||||||
data := getQcBytes(bytes, mdm)
|
data := getQcBytes(bytes, mdm)
|
||||||
@ -156,7 +158,7 @@ func (m *modbusQcService) onWrite(dt dto.DataType, bytes []byte) error {
|
|||||||
slog.Error("Modbus驱动采集服务写入寄存器失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err, "Function", mdm.Function)
|
slog.Error("Modbus驱动采集服务写入寄存器失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err, "Function", mdm.Function)
|
||||||
return err
|
return err
|
||||||
} else {
|
} else {
|
||||||
slog.Debug("Modbus驱动采集服务写入寄存器成功", "url", m.config.Url, "unitid", m.config.UnitId, "Function", mdm.Function, "data", model.BytesDebug(data), "mapping", mdm)
|
slog.Info("Modbus驱动采集服务写入寄存器成功", "url", m.config.Url, "unitid", m.config.UnitId, "Function", mdm.Function, "data", model.BytesDebug(data), "mapping", mdm)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -165,56 +167,6 @@ func (m *modbusQcService) onWrite(dt dto.DataType, bytes []byte) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// func (m *modbusQcService) initOnUpdateTask() {
|
|
||||||
// mapping := m.config.Mapping
|
|
||||||
// for _, mdm := range mapping {
|
|
||||||
// if mdm.WriteStrategy == dto.Modbus_OnUpdate && isWriteFunction(mdm.Function) {
|
|
||||||
// et := model.DCE_Drive_Update
|
|
||||||
// if mdm.Type == dto.DataType_CollectTable {
|
|
||||||
// et = model.DCE_Collect_Update
|
|
||||||
// }
|
|
||||||
// m.qc.On(et, func(d model.QC) {
|
|
||||||
// if !m.cli.IsConnected() {
|
|
||||||
// slog.Warn("Modbus驱动采集服务数据更新写入失败,modbus客户端未连接", "url", m.config.Url, "Function", mdm.Function)
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
// switch mdm.Function {
|
|
||||||
// case dto.Modbus_WriteCoil, dto.Modbus_WriteCoils, dto.Modbus_RWCoils:
|
|
||||||
// err := m.cli.WriteCoils(uint16(mdm.Addr), m.GetDcBits(mdm))
|
|
||||||
// if err != nil {
|
|
||||||
// slog.Error("Modbus驱动采集服务写入线圈失败", "url", m.config.Url, "error", err, "Function", mdm.Function)
|
|
||||||
// } else {
|
|
||||||
// slog.Info("Modbus驱动采集服务写入线圈成功", "url", m.config.Url, "Function", mdm.Function)
|
|
||||||
// }
|
|
||||||
// case dto.Modbus_WriteRegister, dto.Modbus_WriteRegisters, dto.Modbus_RWRegisters:
|
|
||||||
// err := m.cli.WriteRegisterBytes(uint16(mdm.Addr), m.GetDcBytes(mdm))
|
|
||||||
// if err != nil {
|
|
||||||
// slog.Error("Modbus驱动采集服务写入寄存器失败", "url", m.config.Url, "error", err, "Function", mdm.Function)
|
|
||||||
// } else {
|
|
||||||
// slog.Info("Modbus驱动采集服务写入寄存器成功", "url", m.config.Url, "Function", mdm.Function)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// })
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// func (m *modbusQcService) run(ctx context.Context) {
|
|
||||||
// defer close(m.done)
|
|
||||||
// mainLoop:
|
|
||||||
// for {
|
|
||||||
// select {
|
|
||||||
// case <-ctx.Done():
|
|
||||||
// slog.Debug("Modbus驱采映射循环取消,关闭modbus客户端", "url", m.config.Url)
|
|
||||||
// modbus.DeleteClient(m.config.Url)
|
|
||||||
// break mainLoop
|
|
||||||
// default:
|
|
||||||
// }
|
|
||||||
// m.readTaskExecute()
|
|
||||||
// time.Sleep(time.Millisecond * time.Duration(m.config.Interval))
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
func (m *modbusQcService) readTaskExecute() {
|
func (m *modbusQcService) readTaskExecute() {
|
||||||
if m.cli.IsConnected() {
|
if m.cli.IsConnected() {
|
||||||
for _, mdm := range m.config.Mapping {
|
for _, mdm := range m.config.Mapping {
|
||||||
@ -301,8 +253,6 @@ func (m *modbusQcService) readTaskExecute() {
|
|||||||
// }
|
// }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
slog.Error("Modbus驱动采集服务映射任务执行失败,Modbus未连接", "url", m.config.Url, "unitid", m.config.UnitId)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -325,28 +275,6 @@ func getQcBytes(bytes []byte, mdm *dto.ModbusDcMapping) []byte {
|
|||||||
return bytes[start : start+quantity]
|
return bytes[start : start+quantity]
|
||||||
}
|
}
|
||||||
|
|
||||||
// func (m *modbusQcService) GetDcBits(mdm *dto.ModbusDcMapping) []bool {
|
|
||||||
// switch mdm.Type {
|
|
||||||
// case dto.DataType_CollectTable: // 采集数据
|
|
||||||
// return m.qc.GetCjBitsOf(mdm.Start, mdm.Quantity)
|
|
||||||
// case dto.DataType_DriveTable: // 驱动数据
|
|
||||||
// return m.qc.GetQdBitsOf(mdm.Start, mdm.Quantity)
|
|
||||||
// default:
|
|
||||||
// panic("未知数据类型")
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// func (m *modbusQcService) GetDcBytes(mdm *dto.ModbusDcMapping) []byte {
|
|
||||||
// switch mdm.Type {
|
|
||||||
// case dto.DataType_CollectTable: // 采集数据
|
|
||||||
// return m.qc.GetCjBytesOf(mdm.Start, mdm.Quantity*2)
|
|
||||||
// case dto.DataType_DriveTable: // 驱动数据
|
|
||||||
// return m.qc.GetQdBytesOf(mdm.Start, mdm.Quantity*2)
|
|
||||||
// default:
|
|
||||||
// panic("未知数据类型")
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
func (m *modbusQcService) updateDcByBits(mdm *dto.ModbusDcMapping, bits []bool) error {
|
func (m *modbusQcService) updateDcByBits(mdm *dto.ModbusDcMapping, bits []bool) error {
|
||||||
switch mdm.Type {
|
switch mdm.Type {
|
||||||
case dto.DataType_CJ: // 采集数据
|
case dto.DataType_CJ: // 采集数据
|
||||||
|
Loading…
Reference in New Issue
Block a user