diff --git a/dto/mqtt.pb.go b/dto/mqtt.pb.go index db01345..125ab5f 100644 --- a/dto/mqtt.pb.go +++ b/dto/mqtt.pb.go @@ -23,22 +23,19 @@ const ( type ServiceState int32 const ( - ServiceState_Normal ServiceState = 0 // 正常 - ServiceState_Offline ServiceState = 1 // 离线 - ServiceState_Error ServiceState = 2 // 异常 + ServiceState_Normal ServiceState = 0 // 正常 + ServiceState_Error ServiceState = 2 // 错误 ) // Enum value maps for ServiceState. var ( ServiceState_name = map[int32]string{ 0: "Normal", - 1: "Offline", 2: "Error", } ServiceState_value = map[string]int32{ - "Normal": 0, - "Offline": 1, - "Error": 2, + "Normal": 0, + "Error": 2, } ) @@ -76,7 +73,8 @@ type IotServiceState struct { unknownFields protoimpl.UnknownFields // string cliId = 1; // IOT服务mqtt客户端id - State ServiceState `protobuf:"varint,2,opt,name=state,proto3,enum=iot_dto.ServiceState" json:"state,omitempty"` // 服务状态 + State ServiceState `protobuf:"varint,2,opt,name=state,proto3,enum=iot_dto.ServiceState" json:"state,omitempty"` // 服务状态 + ErrMsg string `protobuf:"bytes,3,opt,name=errMsg,proto3" json:"errMsg,omitempty"` // 当state为error时,错误描述 } func (x *IotServiceState) Reset() { @@ -118,6 +116,13 @@ func (x *IotServiceState) GetState() ServiceState { return ServiceState_Normal } +func (x *IotServiceState) GetErrMsg() string { + if x != nil { + return x.ErrMsg + } + return "" +} + type IotQcServiceStartReq struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -470,35 +475,36 @@ var file_proto_src_mqtt_proto_rawDesc = []byte{ 0x0a, 0x14, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x72, 0x63, 0x2f, 0x6d, 0x71, 0x74, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x69, 0x6f, 0x74, 0x5f, 0x64, 0x74, 0x6f, 0x1a, 0x12, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x72, 0x63, 0x2f, 0x64, 0x63, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x22, 0x3e, 0x0a, 0x0f, 0x49, 0x6f, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x6f, 0x74, 0x6f, 0x22, 0x56, 0x0a, 0x0f, 0x49, 0x6f, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x2b, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e, 0x69, 0x6f, 0x74, 0x5f, 0x64, 0x74, 0x6f, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, - 0x61, 0x74, 0x65, 0x22, 0x5b, 0x0a, 0x14, 0x49, 0x6f, 0x74, 0x51, 0x63, 0x53, 0x65, 0x72, 0x76, - 0x69, 0x63, 0x65, 0x53, 0x74, 0x61, 0x72, 0x74, 0x52, 0x65, 0x71, 0x12, 0x2d, 0x0a, 0x06, 0x63, - 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x69, 0x6f, - 0x74, 0x5f, 0x64, 0x74, 0x6f, 0x2e, 0x4d, 0x6f, 0x64, 0x62, 0x75, 0x73, 0x43, 0x6f, 0x6e, 0x66, - 0x69, 0x67, 0x52, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x14, 0x0a, 0x05, 0x66, 0x6f, - 0x72, 0x63, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x66, 0x6f, 0x72, 0x63, 0x65, - 0x22, 0x15, 0x0a, 0x13, 0x49, 0x6f, 0x74, 0x51, 0x63, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, - 0x53, 0x74, 0x6f, 0x70, 0x52, 0x65, 0x71, 0x22, 0x3e, 0x0a, 0x16, 0x49, 0x6f, 0x74, 0x51, 0x63, - 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x43, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x52, 0x65, 0x73, - 0x70, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, - 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6d, 0x73, 0x67, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x03, 0x6d, 0x73, 0x67, 0x22, 0x28, 0x0a, 0x10, 0x49, 0x6f, 0x74, 0x53, 0x65, - 0x72, 0x76, 0x69, 0x63, 0x65, 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x71, 0x12, 0x14, 0x0a, 0x05, 0x63, - 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x63, 0x6f, 0x75, 0x6e, - 0x74, 0x22, 0x27, 0x0a, 0x11, 0x49, 0x6f, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4c, - 0x6f, 0x67, 0x52, 0x65, 0x73, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x6c, 0x6f, 0x67, 0x73, 0x18, 0x04, - 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6c, 0x6f, 0x67, 0x73, 0x22, 0x1b, 0x0a, 0x05, 0x49, 0x6f, - 0x74, 0x51, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x1b, 0x0a, 0x05, 0x49, 0x6f, 0x74, 0x43, 0x6a, - 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, - 0x64, 0x61, 0x74, 0x61, 0x2a, 0x32, 0x0a, 0x0c, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x53, - 0x74, 0x61, 0x74, 0x65, 0x12, 0x0a, 0x0a, 0x06, 0x4e, 0x6f, 0x72, 0x6d, 0x61, 0x6c, 0x10, 0x00, - 0x12, 0x0b, 0x0a, 0x07, 0x4f, 0x66, 0x66, 0x6c, 0x69, 0x6e, 0x65, 0x10, 0x01, 0x12, 0x09, 0x0a, - 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x10, 0x02, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x2f, 0x64, 0x74, - 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x61, 0x74, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x65, 0x72, 0x72, 0x4d, 0x73, 0x67, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x06, 0x65, 0x72, 0x72, 0x4d, 0x73, 0x67, 0x22, 0x5b, 0x0a, 0x14, 0x49, + 0x6f, 0x74, 0x51, 0x63, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x53, 0x74, 0x61, 0x72, 0x74, + 0x52, 0x65, 0x71, 0x12, 0x2d, 0x0a, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x69, 0x6f, 0x74, 0x5f, 0x64, 0x74, 0x6f, 0x2e, 0x4d, 0x6f, + 0x64, 0x62, 0x75, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x06, 0x63, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x12, 0x14, 0x0a, 0x05, 0x66, 0x6f, 0x72, 0x63, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, + 0x08, 0x52, 0x05, 0x66, 0x6f, 0x72, 0x63, 0x65, 0x22, 0x15, 0x0a, 0x13, 0x49, 0x6f, 0x74, 0x51, + 0x63, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x53, 0x74, 0x6f, 0x70, 0x52, 0x65, 0x71, 0x22, + 0x3e, 0x0a, 0x16, 0x49, 0x6f, 0x74, 0x51, 0x63, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x43, + 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x10, 0x0a, + 0x03, 0x6d, 0x73, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6d, 0x73, 0x67, 0x22, + 0x28, 0x0a, 0x10, 0x49, 0x6f, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4c, 0x6f, 0x67, + 0x52, 0x65, 0x71, 0x12, 0x14, 0x0a, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x05, 0x52, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x27, 0x0a, 0x11, 0x49, 0x6f, 0x74, + 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x73, 0x70, 0x12, 0x12, + 0x0a, 0x04, 0x6c, 0x6f, 0x67, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6c, 0x6f, + 0x67, 0x73, 0x22, 0x1b, 0x0a, 0x05, 0x49, 0x6f, 0x74, 0x51, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x64, + 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, + 0x1b, 0x0a, 0x05, 0x49, 0x6f, 0x74, 0x43, 0x6a, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x2a, 0x25, 0x0a, 0x0c, + 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x0a, 0x0a, 0x06, + 0x4e, 0x6f, 0x72, 0x6d, 0x61, 0x6c, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x45, 0x72, 0x72, 0x6f, + 0x72, 0x10, 0x02, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x2f, 0x64, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/mqtt/client.go b/mqtt/client.go index 44b02ba..cebbbd1 100644 --- a/mqtt/client.go +++ b/mqtt/client.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "time" "github.com/eclipse/paho.golang/autopaho" "github.com/eclipse/paho.golang/paho" @@ -42,6 +43,15 @@ func Start(cmc *IotMqttConfig) error { return nil } +// 断开MQTT客户端 +func Stop() error { + slog.Info("停止MQTT客户端") + ctx, cancle := context.WithTimeout(context.Background(), 5*time.Second) + defer cancle() + err := iotcli.cm.Disconnect(ctx) + return err +} + func checkConfig(cmc *IotMqttConfig) error { if cmc.AppId == "" { return fmt.Errorf("应用编号不能为空") diff --git a/mqtt/client_request_test.go b/mqtt/client_request_test.go index 69a7df9..59277a4 100644 --- a/mqtt/client_request_test.go +++ b/mqtt/client_request_test.go @@ -61,7 +61,7 @@ func TestRequest(t *testing.T) { fmt.Printf("请求结果: %v\n", resp) lr := &dto.IotServiceLogResp{} - err = proto.Unmarshal(resp.Payload, lr) + _ = proto.Unmarshal(resp.Payload, lr) log.Printf("Received response: %s\n", lr) time.Sleep(3 * time.Second) @@ -111,7 +111,7 @@ func listen() { v.Wait() } -func getCmConfig(clientId, logReqTopic string) autopaho.ClientConfig { +func getCmConfig(clientId, subTopic string) autopaho.ClientConfig { addr, _ := url.Parse("tcp://192.168.3.233:1883") cc := autopaho.ClientConfig{ BrokerUrls: []*url.URL{addr}, @@ -122,7 +122,7 @@ func getCmConfig(clientId, logReqTopic string) autopaho.ClientConfig { defer cancel() if _, err := cm.Subscribe(ctx, &paho.Subscribe{ Subscriptions: []paho.SubscribeOptions{ - {Topic: logReqTopic, QoS: 0}, + {Topic: subTopic, QoS: 0}, }, }); err != nil { fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err) diff --git a/mqtt/client_will_test.go b/mqtt/client_will_test.go deleted file mode 100644 index 13dccfb..0000000 --- a/mqtt/client_will_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package mqtt - -import "testing" - -func TestWillMessage(t *testing.T) { - t.Log("test will message") -} diff --git a/mqtt/config.go b/mqtt/config.go index 7800943..b86830a 100644 --- a/mqtt/config.go +++ b/mqtt/config.go @@ -59,6 +59,5 @@ func (c *IotMqttConfig) tryInto() (*autopaho.ClientConfig, error) { }, } cc.SetUsernamePassword(c.Username, []byte(c.Password)) - cc.SetWillMessage(GetIotServiceStateTopic(), []byte("离线"), 1, true) return cc, nil } diff --git a/proto/src/mqtt.proto b/proto/src/mqtt.proto index 25e11cb..0ada4ad 100644 --- a/proto/src/mqtt.proto +++ b/proto/src/mqtt.proto @@ -7,14 +7,14 @@ import "proto/src/dc.proto"; enum ServiceState { Normal = 0; // 正常 - Offline = 1; // 离线 - Error = 2; // 异常 + Error = 2; // 出现异常或发生错误 } // IOT服务状态 message IotServiceState { // string cliId = 1; // IOT服务mqtt客户端id ServiceState state = 2; // 服务状态 + string errMsg = 3; // 当state为error时,异常/错误描述 } message IotQcServiceStartReq { @@ -24,7 +24,7 @@ message IotQcServiceStartReq { } message IotQcServiceStopReq { - + } message IotQcServiceCommonResp { diff --git a/server/server.go b/server/server.go index 679047f..6ad5780 100644 --- a/server/server.go +++ b/server/server.go @@ -16,6 +16,7 @@ var iqcs *IotQcServer type IotQcServer struct { qcMappingService service.IotQcMappingService tasks []service.IScheduledTask + state *dto.IotServiceState } func (s *IotQcServer) start() error { @@ -27,6 +28,21 @@ func (s *IotQcServer) start() error { return nil } +func (s *IotQcServer) stateMonitor() *dto.IotServiceState { + if s.qcMappingService != nil { + if err := s.qcMappingService.ReportError(); err != nil { + // slog.Error("Modbus驱采映射服务报错", "err", err) + return &dto.IotServiceState{ + State: dto.ServiceState_Error, + ErrMsg: err.Error(), + } + } + } + return &dto.IotServiceState{ + State: dto.ServiceState_Normal, + } +} + func (s *IotQcServer) registerReqHandlers() { mqtt.RegIotQcServiceStartReqHandler(s.startIotQcMappingService) mqtt.RegIotQcServiceStopReqHandler(s.stopIotQcMappingService) @@ -54,15 +70,16 @@ func (s *IotQcServer) stopIotQcMappingService(req *dto.IotQcServiceStopReq) *dto func StartIotQcServer() { iqcs = &IotQcServer{ tasks: []service.IScheduledTask{}, + state: &dto.IotServiceState{ + State: dto.ServiceState_Normal, + }, } iqcs.start() } func pubServerState() { - state := dto.ServiceState_Normal - mqtt.PubIotServiceState(&dto.IotServiceState{ - State: state, - }) + state := iqcs.stateMonitor() + mqtt.PubIotServiceState(state) } func startMqttClient() { diff --git a/service/api.go b/service/api.go index 18647ec..495b559 100644 --- a/service/api.go +++ b/service/api.go @@ -22,6 +22,8 @@ type IotQcMappingService interface { WriteQdBytes(bytes []byte) error // 写采集位列表 WriteCjBytes(bytes []byte) error + // 报错 + ReportError() error } // 定时任务 diff --git a/service/modbus_qc_mapping.go b/service/modbus_qc_mapping.go index d87d986..595dd2e 100644 --- a/service/modbus_qc_mapping.go +++ b/service/modbus_qc_mapping.go @@ -19,8 +19,14 @@ type modbusQcService struct { qc model.QC tasks []IScheduledTask stopped bool - // cancel context.CancelFunc - // done chan struct{} // 服务协程退出信号 +} + +// ReportError implements IotQcMappingService. +func (s *modbusQcService) ReportError() error { + if !s.cli.IsConnected() { + return fmt.Errorf("modbus连接断开") + } + return nil } // RegisterQcDataHandleScheduleTask implements IotQcMappingService. @@ -119,13 +125,8 @@ func NewModbusQcService(config *dto.ModbusConfig) (IotQcMappingService, error) { config: config, cli: cli, qc: model.NewDC(qd, cj), - // done: make(chan struct{}), } s.RegisterQcDataHandleScheduleTask(s.readTaskExecute, time.Duration(config.Interval)*time.Millisecond) - // s.initOnUpdateTask() - // ctx, cancel := context.WithCancel(context.Background()) - // go s.run(ctx) - // s.cancel = cancel return s, nil }