From 0b20dcdf7acb58935500ff09f05bc0650a0b22fc Mon Sep 17 00:00:00 2001 From: walker Date: Mon, 18 Dec 2023 15:34:10 +0800 Subject: [PATCH] =?UTF-8?q?modbus=E9=A9=B1=E9=87=87=E6=98=A0=E5=B0=84?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=8A=9F=E8=83=BD=E8=B0=83=E6=95=B4=E9=87=8D?= =?UTF-8?q?=E6=9E=84=201,=E6=B7=BB=E5=8A=A0=E9=A9=B1=E5=8A=A8=E9=87=87?= =?UTF-8?q?=E9=9B=86=E8=8E=B7=E5=8F=96=E5=92=8C=E5=86=99=E5=85=A5=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=202,=E5=88=9B=E5=BB=BA=E6=9C=8D=E5=8A=A1=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E8=B0=83=E6=95=B4=E6=8E=A5=E6=94=B6=E9=A9=B1=E9=87=87?= =?UTF-8?q?=E5=AD=97=E8=8A=82=E6=95=B0=E7=BB=84=20mqtt=E5=AE=A2=E6=88=B7?= =?UTF-8?q?=E7=AB=AF=E5=8A=9F=E8=83=BD=E4=BB=A3=E7=A0=81=E8=B0=83=E6=95=B4?= =?UTF-8?q?,=E6=9C=AA=E5=AE=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.go | 136 ++++----- modbus/function_code.go | 23 -- mqtt/app_protocol.go | 28 -- mqtt/client.go | 196 +++++++++---- mqtt/config.go | 11 +- mqtt/proto/mqtt.pb.go | 171 +++++------- mqtt/topic.go | 6 +- proto/src/mqtt/mqtt.proto | 24 +- {modbus => protocol/modbus}/api.go | 0 {modbus => protocol/modbus}/client.go | 0 {modbus => protocol/modbus}/manage.go | 0 service/api.go | 18 +- service/modbus_dc_mapping.go | 287 ------------------- service/modbus_qc_mapping.go | 380 ++++++++++++++++++++++++++ service/model/dc.go | 240 ++++++++-------- service/model/dc_test.go | 4 +- service/model/ss.go | 6 + 17 files changed, 819 insertions(+), 711 deletions(-) delete mode 100644 modbus/function_code.go delete mode 100644 mqtt/app_protocol.go rename {modbus => protocol/modbus}/api.go (100%) rename {modbus => protocol/modbus}/client.go (100%) rename {modbus => protocol/modbus}/manage.go (100%) delete mode 100644 service/modbus_dc_mapping.go create mode 100644 service/modbus_qc_mapping.go create mode 100644 service/model/ss.go diff --git a/main.go b/main.go index b93d7c8..4855fd2 100644 --- a/main.go +++ b/main.go @@ -6,23 +6,22 @@ import ( "os" "time" - "github.com/eclipse/paho.golang/autopaho" - "github.com/eclipse/paho.golang/paho" - "google.golang.org/protobuf/proto" "joylink.club/iot/config" "joylink.club/iot/mqtt" mproto "joylink.club/iot/mqtt/proto" + "joylink.club/iot/service" + "joylink.club/iot/service/proto" ) func main() { - slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{ - Level: slog.LevelDebug, - AddSource: false, - }))) + // slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{ + // Level: slog.LevelDebug, + // AddSource: false, + // }))) config.LoadConfig() mqttcfg := config.Cfg.Mqtt - mqtt.BuildTopics(mqttcfg.Topic.App, mqttcfg.ClientId) - cmc := &mqtt.ClientManageConfig{ + cmc := &mqtt.IotMqttConfig{ + AppId: mqttcfg.Topic.App, BrokerUrl: mqttcfg.Address, ClientId: mqttcfg.ClientId, Username: mqttcfg.Username, @@ -30,14 +29,6 @@ func main() { KeepAlive: mqttcfg.KeepAlive, ConnectRetryDelay: mqttcfg.ConnectRetryDelay, ConnectTimeout: mqttcfg.ConnectTimeout, - OnConnectionUp: func(*autopaho.ConnectionManager, *paho.Connack) { - slog.Info("MQTT连接成功") - // err := mqtt.SubIotServiceState(mqtt.GetIotServiceStateTopic()) - // if err != nil { - // slog.Error("订阅IotServiceState失败", "error", err) - // os.Exit(1) - // } - }, } err := mqtt.Start(cmc) if err != nil { @@ -46,24 +37,6 @@ func main() { } time.Sleep(time.Second * 3) - err = mqtt.SubIotServiceState(mqtt.GetIotServiceStateTopic()) - if err != nil { - slog.Error("订阅IotServiceState失败", "error", err) - os.Exit(1) - } - - i := 0 - mqtt.RegisterHandler(mqtt.GetIotServiceStateTopic(), func(m *paho.Publish) { - iss := &mproto.IotServiceState{} - err := proto.Unmarshal(m.Payload, iss) - if err != nil { - slog.Error("SubIotServiceState proto.Unmarshal异常", "error", err) - return - } - slog.Debug("收到IotServiceState发布消息", "state", iss) - i++ - fmt.Printf("%v次处理IotServiceState: %v\n", i, iss) - }) go func() { for { @@ -75,51 +48,54 @@ func main() { } }() + mds, err := service.NewModbusQcService(&proto.ModbusConfig{ + Url: "tcp://127.0.0.1:502", + UnitId: 1, + Timeout: 500, + Interval: 1000, + Mapping: []*proto.ModbusDcMapping{ + { + // Function: proto.Modbus_ReadHoldingRegister, + Function: proto.Modbus_ReadCoil, + Addr: 0, + Quantity: 16, + Type: proto.DataType_CollectTable, + Start: 0, + }, + { + Function: proto.Modbus_WriteCoils, + Addr: 16, + Quantity: 16, + Type: proto.DataType_DriveTable, + WriteStrategy: proto.Modbus_OnUpdate, + Start: 0, + }, + }, + }, make([]byte, 2), make([]byte, 2)) + if err != nil { + panic(err) + } + + go func() { + i := 0 + for { + c := mds.GetCjBytes() + fmt.Printf("采集数据: %v\n", c) + i++ + if i%3 == 0 { + idx := i % 8 + err := mds.WriteQdBytes([]byte{byte(1 << idx), byte(3 << idx)}) + if err != nil { + slog.Error("设置驱动数据失败", "error", err) + } else { + fmt.Printf("设置驱动数据成功: %v\n", mds.GetQdBytes()) + } + } + time.Sleep(time.Second) + } + }() + time.Sleep(time.Minute) - // dc := model.NewDC(make([]byte, 2), make([]byte, 2)) - // mds, err := service.NewModbusQcService(&proto.ModbusConfig{ - // Url: "tcp://127.0.0.1:502", - // UnitId: 2, - // Timeout: 500, - // Interval: 1000, - // Mapping: []*proto.ModbusDcMapping{ - // { - // // Function: proto.Modbus_ReadHoldingRegister, - // Function: proto.Modbus_ReadCoil, - // Addr: 0, - // Quantity: 16, - // Type: proto.DataType_CollectTable, - // Start: 0, - // }, - // { - // Function: proto.Modbus_WriteCoils, - // Addr: 16, - // Quantity: 16, - // Type: proto.DataType_DriveTable, - // WriteStrategy: proto.Modbus_OnUpdate, - // Start: 0, - // }, - // }, - // }, dc) - // if err != nil { - // panic(err) - // } - // go func() { - // i := 0 - // for { - // c := dc.GetCollect() - // fmt.Printf("采集数据: %v\n", c) - // i++ - // if i%3 == 0 { - // idx := i % 8 - // dc.UpdateDriveByBytes(0, []byte{byte(1 << idx)}) - // fmt.Printf("设置驱动数据: %v\n", dc.GetDrive()) - // } - // time.Sleep(time.Second) - // } - // }() - - // time.Sleep(time.Minute * 2) - // mds.Stop() + mds.Stop() } diff --git a/modbus/function_code.go b/modbus/function_code.go deleted file mode 100644 index 9fe7cb3..0000000 --- a/modbus/function_code.go +++ /dev/null @@ -1,23 +0,0 @@ -package modbus - -// 功能码 -type FunctionCode int - -const ( - // 读线圈 - FCReadCoil FunctionCode = 0x01 - // 读离散输入 - FCReadDiscreteInput FunctionCode = 0x02 - // 读多个寄存器 - FCReadHoldingRegister FunctionCode = 0x03 - // 读输入寄存器 - FCReadInputRegister FunctionCode = 0x04 - // 写单个线圈 - FCWriteSingleCoil FunctionCode = 0x05 - // 写单个寄存器 - FCWriteSingleRegister FunctionCode = 0x06 - // 写多个线圈 - FCWriteMultipleCoil FunctionCode = 0x0F - // 写多个寄存器 - FCWriteMultipleRegister FunctionCode = 0x10 -) diff --git a/mqtt/app_protocol.go b/mqtt/app_protocol.go deleted file mode 100644 index 2ac0ca9..0000000 --- a/mqtt/app_protocol.go +++ /dev/null @@ -1,28 +0,0 @@ -package mqtt - -import "joylink.club/iot/mqtt/proto" - -type IotServiceStateHandler func(state *proto.IotServiceState) - -type IotService interface { - PubIotServiceState(state *proto.IotServiceState) - PubIotQdData(qd *proto.IotQd) - SubIotQd() - RegIotQd(h func(qd *proto.IotQd)) - PubIotCjData(cj *proto.IotCj) - SubIotCj() - RegIotCj(h func(cj *proto.IotCj)) - SubIotReq(cmd *proto.IotServiceReq) -} - -type Service interface { - SubIotServiceState() - RegIotServiceState(h func(state *proto.IotServiceState)) - PubIotQdData(qd *proto.IotQd) - SubIotQd() - RegIotQd(h func(qd *proto.IotQd)) - PubIotCjData(cj *proto.IotCj) - SubIotCj() - RegIotCj(h func(cj *proto.IotCj)) - ReqIotService(cmd *proto.IotServiceReq) -} diff --git a/mqtt/client.go b/mqtt/client.go index 345d764..6e69d4c 100644 --- a/mqtt/client.go +++ b/mqtt/client.go @@ -2,22 +2,30 @@ package mqtt import ( "context" + "fmt" "log/slog" "github.com/eclipse/paho.golang/autopaho" "github.com/eclipse/paho.golang/paho" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protoreflect" mproto "joylink.club/iot/mqtt/proto" ) -type Manager struct { - cc *autopaho.ClientConfig - cm *autopaho.ConnectionManager +var iotcli *IotClient + +type IotClient struct { + cmc *IotMqttConfig + cc *autopaho.ClientConfig + cm *autopaho.ConnectionManager } -var manager *Manager - -func Start(cmc *ClientManageConfig) error { +// 初始化并启动MQTT客户端服务 +func Start(cmc *IotMqttConfig) error { + if err := checkConfig(cmc); err != nil { + return err + } + BuildTopics(cmc.AppId, cmc.ClientId) cc, err := cmc.tryInto() if err != nil { return err @@ -26,62 +34,146 @@ func Start(cmc *ClientManageConfig) error { if err != nil { return err } - manager = &Manager{ - cc: cc, - cm: cm, + iotcli = &IotClient{ + cmc: cmc, + cc: cc, + cm: cm, } return nil } -func Publish(ctx context.Context, publish *paho.Publish) (*paho.PublishResponse, error) { - return manager.cm.Publish(ctx, publish) -} - -func PubIotServiceState(s *mproto.IotServiceState) error { - return manager.PubIotServiceState(s) -} - -func SubIotServiceState(topic string) error { - return manager.SubIotServiceState(topic) -} - -func (m *Manager) PubIotServiceState(s *mproto.IotServiceState) error { - if s == nil { - return nil +func checkConfig(cmc *IotMqttConfig) error { + if cmc.AppId == "" { + return fmt.Errorf("应用编号不能为空") } - slog.Debug("PubIotServiceState", "topic", GetIotServiceStateTopic(), "state", s) - b, err := proto.Marshal(s) + if cmc.ClientId == "" { + return fmt.Errorf("客户端编号不能为空") + } + if cmc.BrokerUrl == "" { + return fmt.Errorf("MQTT代理服务地址不能为空") + } + if cmc.Username == "" { + return fmt.Errorf("MQTT用户名不能为空") + } + if cmc.Password == "" { + return fmt.Errorf("MQTT密码不能为空") + } + return nil +} + +// 发布IOT服务状态 +func PubIotServiceState(s *mproto.IotServiceState) error { + return pub(GetIotServiceStateTopic(), s) +} + +// 发布IOT采集数据 +func PubIotCjData(cj *mproto.IotCj) error { + return pub(GetCjTopic(), cj) +} + +// 发布IOT驱动数据 +func PubIotQdData(qd *mproto.IotQd) error { + return pub(GetCjTopic(), qd) +} + +// 注册IOT采集数据处理 +func RegIotCjHandler(h func(cj *mproto.IotCj)) { + iotcli.cc.Router.RegisterHandler(GetCmdTopic(), func(p *paho.Publish) { + cmd := &mproto.IotCj{} + err := proto.Unmarshal(p.Payload, cmd) + if err != nil { + slog.Error("采集数据proto.Unmarshal异常", "error", err) + return + } + h(cmd) + }) +} + +// 注册IOT驱动数据处理 +func RegIotQdHandler(h func(qd *mproto.IotQd)) { + iotcli.cc.Router.RegisterHandler(GetCmdTopic(), func(p *paho.Publish) { + cmd := &mproto.IotQd{} + err := proto.Unmarshal(p.Payload, cmd) + if err != nil { + slog.Error("驱动数据proto.Unmarshal异常", "error", err) + return + } + h(cmd) + }) +} + +// 注册IOT日志查询请求处理 +func RegIotLogReqHandler(h func(cmd *mproto.IotServiceLogReq)) { + iotcli.cc.Router.RegisterHandler(GetCmdTopic(), func(p *paho.Publish) { + cmd := &mproto.IotServiceLogReq{} + err := proto.Unmarshal(p.Payload, cmd) + if err != nil { + slog.Error("RegIotReqHandler proto.Unmarshal异常", "error", err) + return + } + h(cmd) + }) +} + +// 注销IOT处理 +func UnregHandler(topic string) { + iotcli.cc.Router.UnregisterHandler(topic) +} + +// 注销所有IOT处理 +func UnregAllHandler() { + iotcli.cc.Router = paho.NewStandardRouter() +} + +func subIotQc() { + slog.Info("订阅Iot驱采") + sub(GetCjTopic()) // 订阅采集 + sub(GetQdTopic()) // 订阅驱动 +} + +// 发起订阅 +func sub(topic string) { + slog.Info("发起订阅", "topic", topic) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + _, err := iotcli.cm.Subscribe(ctx, &paho.Subscribe{ + Subscriptions: []paho.SubscribeOptions{ + { + Topic: topic, + QoS: 0, + NoLocal: true, + }, + }, + }) + if err != nil { + slog.Error("订阅失败", "topic", topic, "error", err) + } +} + +// 发布数据 +func pub(topic string, data protoreflect.ProtoMessage) error { + if data == nil { + return fmt.Errorf("发布数据引用为nil") + } + b, err := proto.Marshal(data) if err != nil { return err } - _, err = m.cm.Publish(context.Background(), &paho.Publish{ - Topic: GetIotServiceStateTopic(), + switch topic { + case GetIotServiceStateTopic(): + slog.Debug("发布Iot服务状态", "topic", topic, "data", data) + case GetCjTopic(): + slog.Debug("发布采集数据", "topic", topic, "data", data) + case GetQdTopic(): + slog.Debug("发布驱动数据", "topic", topic, "data", data) + default: + slog.Error("未知发布主题", "topic", topic, "data", data) + return fmt.Errorf("未知发布主题: topic=%s", topic) + } + _, err = iotcli.cm.Publish(context.Background(), &paho.Publish{ + Topic: topic, QoS: 0, Payload: b, }) return err } - -func RegisterHandler(topic string, h func(m *paho.Publish)) { - manager.cc.Router.RegisterHandler(topic, h) -} - -func (m *Manager) RegisterHandler(topic string, h func(m *paho.Publish)) { - m.cc.Router.RegisterHandler(topic, h) -} - -func (m *Manager) SubIotServiceState(topic string) error { - slog.Debug("订阅IotServiceState", "topic", topic) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - _, err := m.cm.Subscribe(ctx, &paho.Subscribe{ - Subscriptions: []paho.SubscribeOptions{ - { - Topic: topic, - QoS: 0, - // NoLocal: true, - }, - }, - }) - return err -} diff --git a/mqtt/config.go b/mqtt/config.go index 3752318..7800943 100644 --- a/mqtt/config.go +++ b/mqtt/config.go @@ -10,7 +10,8 @@ import ( "github.com/eclipse/paho.golang/paho" ) -type ClientManageConfig struct { +type IotMqttConfig struct { + AppId string // 所属应用编号 BrokerUrl string // Broker地址 ClientId string // 客户端ID Username string // 用户名 @@ -18,10 +19,9 @@ type ClientManageConfig struct { KeepAlive uint16 // 保活时间间隔,单位s,默认为60 ConnectRetryDelay uint16 // 连接重试延时,单位s,默认为3 ConnectTimeout uint16 // 连接操作超时,单位s,默认为3 - OnConnectionUp func(*autopaho.ConnectionManager, *paho.Connack) } -func (c *ClientManageConfig) tryInto() (*autopaho.ClientConfig, error) { +func (c *IotMqttConfig) tryInto() (*autopaho.ClientConfig, error) { addr, err := url.Parse(c.BrokerUrl) if err != nil { return nil, fmt.Errorf("Mqtt.Address格式错误, %s: %w", c.BrokerUrl, err) @@ -42,7 +42,10 @@ func (c *ClientManageConfig) tryInto() (*autopaho.ClientConfig, error) { KeepAlive: c.KeepAlive, ConnectRetryDelay: time.Duration(c.ConnectRetryDelay) * time.Second, ConnectTimeout: time.Duration(c.ConnectTimeout) * time.Second, - OnConnectionUp: c.OnConnectionUp, + OnConnectionUp: func(*autopaho.ConnectionManager, *paho.Connack) { + slog.Info("MQTT连接成功") + subIotQc() + }, OnConnectError: func(err error) { slog.Error("MQTT连接失败", "error", err) }, diff --git a/mqtt/proto/mqtt.pb.go b/mqtt/proto/mqtt.pb.go index 4c313f7..869c768 100644 --- a/mqtt/proto/mqtt.pb.go +++ b/mqtt/proto/mqtt.pb.go @@ -66,49 +66,6 @@ func (ServiceState) EnumDescriptor() ([]byte, []int) { return file_mqtt_mqtt_proto_rawDescGZIP(), []int{0} } -type ServiceRequest int32 - -const ( - ServiceRequest_Logs ServiceRequest = 0 // 日志 -) - -// Enum value maps for ServiceRequest. -var ( - ServiceRequest_name = map[int32]string{ - 0: "Logs", - } - ServiceRequest_value = map[string]int32{ - "Logs": 0, - } -) - -func (x ServiceRequest) Enum() *ServiceRequest { - p := new(ServiceRequest) - *p = x - return p -} - -func (x ServiceRequest) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (ServiceRequest) Descriptor() protoreflect.EnumDescriptor { - return file_mqtt_mqtt_proto_enumTypes[1].Descriptor() -} - -func (ServiceRequest) Type() protoreflect.EnumType { - return &file_mqtt_mqtt_proto_enumTypes[1] -} - -func (x ServiceRequest) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Use ServiceRequest.Descriptor instead. -func (ServiceRequest) EnumDescriptor() ([]byte, []int) { - return file_mqtt_mqtt_proto_rawDescGZIP(), []int{1} -} - // IOT服务状态 type IotServiceState struct { state protoimpl.MessageState @@ -165,18 +122,18 @@ func (x *IotServiceState) GetState() ServiceState { return ServiceState_Normal } -// IOT服务请求 -type IotServiceReq struct { +// IOT服务获取日志请求 +type IotServiceLogReq struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Code string `protobuf:"bytes,1,opt,name=code,proto3" json:"code,omitempty"` // 服务编号 - Req ServiceRequest `protobuf:"varint,2,opt,name=req,proto3,enum=mqtt_api.ServiceRequest" json:"req,omitempty"` // 服务请求 + Code string `protobuf:"bytes,1,opt,name=code,proto3" json:"code,omitempty"` // 服务编号 + Count int32 `protobuf:"varint,3,opt,name=count,proto3" json:"count,omitempty"` // 日志条数 } -func (x *IotServiceReq) Reset() { - *x = IotServiceReq{} +func (x *IotServiceLogReq) Reset() { + *x = IotServiceLogReq{} if protoimpl.UnsafeEnabled { mi := &file_mqtt_mqtt_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -184,13 +141,13 @@ func (x *IotServiceReq) Reset() { } } -func (x *IotServiceReq) String() string { +func (x *IotServiceLogReq) String() string { return protoimpl.X.MessageStringOf(x) } -func (*IotServiceReq) ProtoMessage() {} +func (*IotServiceLogReq) ProtoMessage() {} -func (x *IotServiceReq) ProtoReflect() protoreflect.Message { +func (x *IotServiceLogReq) ProtoReflect() protoreflect.Message { mi := &file_mqtt_mqtt_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -202,36 +159,37 @@ func (x *IotServiceReq) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use IotServiceReq.ProtoReflect.Descriptor instead. -func (*IotServiceReq) Descriptor() ([]byte, []int) { +// Deprecated: Use IotServiceLogReq.ProtoReflect.Descriptor instead. +func (*IotServiceLogReq) Descriptor() ([]byte, []int) { return file_mqtt_mqtt_proto_rawDescGZIP(), []int{1} } -func (x *IotServiceReq) GetCode() string { +func (x *IotServiceLogReq) GetCode() string { if x != nil { return x.Code } return "" } -func (x *IotServiceReq) GetReq() ServiceRequest { +func (x *IotServiceLogReq) GetCount() int32 { if x != nil { - return x.Req + return x.Count } - return ServiceRequest_Logs + return 0 } -// IOT服务响应 -type IotServiceResp struct { +// IOT服务日志响应 +type IotServiceLogResp struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Code string `protobuf:"bytes,1,opt,name=code,proto3" json:"code,omitempty"` // 服务编号 + Code string `protobuf:"bytes,1,opt,name=code,proto3" json:"code,omitempty"` // 服务编号 + Logs []string `protobuf:"bytes,4,rep,name=logs,proto3" json:"logs,omitempty"` // 日志 } -func (x *IotServiceResp) Reset() { - *x = IotServiceResp{} +func (x *IotServiceLogResp) Reset() { + *x = IotServiceLogResp{} if protoimpl.UnsafeEnabled { mi := &file_mqtt_mqtt_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -239,13 +197,13 @@ func (x *IotServiceResp) Reset() { } } -func (x *IotServiceResp) String() string { +func (x *IotServiceLogResp) String() string { return protoimpl.X.MessageStringOf(x) } -func (*IotServiceResp) ProtoMessage() {} +func (*IotServiceLogResp) ProtoMessage() {} -func (x *IotServiceResp) ProtoReflect() protoreflect.Message { +func (x *IotServiceLogResp) ProtoReflect() protoreflect.Message { mi := &file_mqtt_mqtt_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -257,18 +215,25 @@ func (x *IotServiceResp) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use IotServiceResp.ProtoReflect.Descriptor instead. -func (*IotServiceResp) Descriptor() ([]byte, []int) { +// Deprecated: Use IotServiceLogResp.ProtoReflect.Descriptor instead. +func (*IotServiceLogResp) Descriptor() ([]byte, []int) { return file_mqtt_mqtt_proto_rawDescGZIP(), []int{2} } -func (x *IotServiceResp) GetCode() string { +func (x *IotServiceLogResp) GetCode() string { if x != nil { return x.Code } return "" } +func (x *IotServiceLogResp) GetLogs() []string { + if x != nil { + return x.Logs + } + return nil +} + // IOT驱动数据 type IotQd struct { state protoimpl.MessageState @@ -391,26 +356,24 @@ var file_mqtt_mqtt_proto_rawDesc = []byte{ 0x64, 0x65, 0x12, 0x2c, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x16, 0x2e, 0x6d, 0x71, 0x74, 0x74, 0x5f, 0x61, 0x70, 0x69, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, - 0x22, 0x4f, 0x0a, 0x0d, 0x49, 0x6f, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x65, - 0x71, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x2a, 0x0a, 0x03, 0x72, 0x65, 0x71, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x0e, 0x32, 0x18, 0x2e, 0x6d, 0x71, 0x74, 0x74, 0x5f, 0x61, 0x70, 0x69, 0x2e, 0x53, 0x65, - 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x52, 0x03, 0x72, 0x65, - 0x71, 0x22, 0x24, 0x0a, 0x0e, 0x49, 0x6f, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, + 0x22, 0x3c, 0x0a, 0x10, 0x49, 0x6f, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4c, 0x6f, + 0x67, 0x52, 0x65, 0x71, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x63, 0x6f, 0x75, 0x6e, + 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x3b, + 0x0a, 0x11, 0x49, 0x6f, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x73, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x22, 0x2f, 0x0a, 0x05, 0x49, 0x6f, 0x74, 0x51, 0x64, - 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, - 0x63, 0x6f, 0x64, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x2f, 0x0a, 0x05, 0x49, 0x6f, 0x74, 0x43, - 0x6a, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x2a, 0x27, 0x0a, 0x0c, 0x53, 0x65, 0x72, - 0x76, 0x69, 0x63, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x0a, 0x0a, 0x06, 0x4e, 0x6f, 0x72, - 0x6d, 0x61, 0x6c, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x4f, 0x66, 0x66, 0x6c, 0x69, 0x6e, 0x65, - 0x10, 0x01, 0x2a, 0x1a, 0x0a, 0x0e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x12, 0x08, 0x0a, 0x04, 0x4c, 0x6f, 0x67, 0x73, 0x10, 0x00, 0x42, 0x0e, - 0x5a, 0x0c, 0x2e, 0x2f, 0x6d, 0x71, 0x74, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x09, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6c, 0x6f, 0x67, 0x73, 0x18, + 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6c, 0x6f, 0x67, 0x73, 0x22, 0x2f, 0x0a, 0x05, 0x49, + 0x6f, 0x74, 0x51, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x2f, 0x0a, 0x05, + 0x49, 0x6f, 0x74, 0x43, 0x6a, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, + 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x2a, 0x27, 0x0a, + 0x0c, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x0a, 0x0a, + 0x06, 0x4e, 0x6f, 0x72, 0x6d, 0x61, 0x6c, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x4f, 0x66, 0x66, + 0x6c, 0x69, 0x6e, 0x65, 0x10, 0x01, 0x42, 0x0e, 0x5a, 0x0c, 0x2e, 0x2f, 0x6d, 0x71, 0x74, 0x74, + 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -425,25 +388,23 @@ func file_mqtt_mqtt_proto_rawDescGZIP() []byte { return file_mqtt_mqtt_proto_rawDescData } -var file_mqtt_mqtt_proto_enumTypes = make([]protoimpl.EnumInfo, 2) +var file_mqtt_mqtt_proto_enumTypes = make([]protoimpl.EnumInfo, 1) var file_mqtt_mqtt_proto_msgTypes = make([]protoimpl.MessageInfo, 5) var file_mqtt_mqtt_proto_goTypes = []interface{}{ - (ServiceState)(0), // 0: mqtt_api.ServiceState - (ServiceRequest)(0), // 1: mqtt_api.ServiceRequest - (*IotServiceState)(nil), // 2: mqtt_api.IotServiceState - (*IotServiceReq)(nil), // 3: mqtt_api.IotServiceReq - (*IotServiceResp)(nil), // 4: mqtt_api.IotServiceResp - (*IotQd)(nil), // 5: mqtt_api.IotQd - (*IotCj)(nil), // 6: mqtt_api.IotCj + (ServiceState)(0), // 0: mqtt_api.ServiceState + (*IotServiceState)(nil), // 1: mqtt_api.IotServiceState + (*IotServiceLogReq)(nil), // 2: mqtt_api.IotServiceLogReq + (*IotServiceLogResp)(nil), // 3: mqtt_api.IotServiceLogResp + (*IotQd)(nil), // 4: mqtt_api.IotQd + (*IotCj)(nil), // 5: mqtt_api.IotCj } var file_mqtt_mqtt_proto_depIdxs = []int32{ 0, // 0: mqtt_api.IotServiceState.state:type_name -> mqtt_api.ServiceState - 1, // 1: mqtt_api.IotServiceReq.req:type_name -> mqtt_api.ServiceRequest - 2, // [2:2] is the sub-list for method output_type - 2, // [2:2] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name } func init() { file_mqtt_mqtt_proto_init() } @@ -465,7 +426,7 @@ func file_mqtt_mqtt_proto_init() { } } file_mqtt_mqtt_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*IotServiceReq); i { + switch v := v.(*IotServiceLogReq); i { case 0: return &v.state case 1: @@ -477,7 +438,7 @@ func file_mqtt_mqtt_proto_init() { } } file_mqtt_mqtt_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*IotServiceResp); i { + switch v := v.(*IotServiceLogResp); i { case 0: return &v.state case 1: @@ -518,7 +479,7 @@ func file_mqtt_mqtt_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_mqtt_mqtt_proto_rawDesc, - NumEnums: 2, + NumEnums: 1, NumMessages: 5, NumExtensions: 0, NumServices: 0, diff --git a/mqtt/topic.go b/mqtt/topic.go index cc77a26..20f0498 100644 --- a/mqtt/topic.go +++ b/mqtt/topic.go @@ -4,7 +4,7 @@ import "fmt" const ( Topic_IotServiceState string = "/%s/%s/iotss" - Topic_IotCmd string = "/%s/%s/iotcmd" + Topic_IotLog string = "/%s/%s/iotlog" Topic_IotQd string = "/%s/%s/iotqd" Topic_IotCj string = "/%s/%s/iotcj" ) @@ -13,7 +13,7 @@ var topicMap = make(map[string]string, 4) func BuildTopics(sysCode, iotCode string) { topicMap[Topic_IotServiceState] = fmt.Sprintf(Topic_IotServiceState, sysCode, iotCode) - topicMap[Topic_IotCmd] = fmt.Sprintf(Topic_IotCmd, sysCode, iotCode) + topicMap[Topic_IotLog] = fmt.Sprintf(Topic_IotLog, sysCode, iotCode) topicMap[Topic_IotQd] = fmt.Sprintf(Topic_IotQd, sysCode, iotCode) topicMap[Topic_IotCj] = fmt.Sprintf(Topic_IotCj, sysCode, iotCode) } @@ -23,7 +23,7 @@ func GetIotServiceStateTopic() string { } func GetCmdTopic() string { - return topicMap[Topic_IotCmd] + return topicMap[Topic_IotLog] } func GetQdTopic() string { diff --git a/proto/src/mqtt/mqtt.proto b/proto/src/mqtt/mqtt.proto index 4b82dd8..b95d6de 100644 --- a/proto/src/mqtt/mqtt.proto +++ b/proto/src/mqtt/mqtt.proto @@ -9,34 +9,22 @@ enum ServiceState { Offline = 1; // 离线 } -enum ServiceRequest { - Logs = 0; // 日志 -} - // IOT服务状态 message IotServiceState { string code = 1; // 服务编号 ServiceState state = 2; // 服务状态 } -// IOT服务请求 -message IotServiceReq { +// IOT服务获取日志请求 +message IotServiceLogReq { string code = 1; // 服务编号 - ServiceRequest req = 2; // 服务请求 + int32 count = 3; // 日志条数 } -// IOT服务响应 -message IotServiceResp { +// IOT服务日志响应 +message IotServiceLogResp { string code = 1; // 服务编号 - int32 err = 2; // 错误码 - string errmsg = 3; // 错误信息 - oneof data { - IotServiceLog logs = 4; // 日志 - } -} - -message IotServiceLog { - repeated string logs = 1; // 日志 + repeated string logs = 4; // 日志 } // IOT驱动数据 diff --git a/modbus/api.go b/protocol/modbus/api.go similarity index 100% rename from modbus/api.go rename to protocol/modbus/api.go diff --git a/modbus/client.go b/protocol/modbus/client.go similarity index 100% rename from modbus/client.go rename to protocol/modbus/client.go diff --git a/modbus/manage.go b/protocol/modbus/manage.go similarity index 100% rename from modbus/manage.go rename to protocol/modbus/manage.go diff --git a/service/api.go b/service/api.go index 9e5bb81..7de4c42 100644 --- a/service/api.go +++ b/service/api.go @@ -1,6 +1,20 @@ package service -// IOT物联网应用 -type IotService interface { +// IOT驱采映射服务 +type IotQcMappingService interface { + // 停止 Stop() error + + // 获取驱动字节列表 + GetQdBytes() []byte + // 获取驱动位列表 + GetQdBits() []bool + // 获取采集字节列表 + GetCjBytes() []byte + // 获取采集位列表 + GetCjBits() []bool + // 写驱动字节列表 + WriteQdBytes(bytes []byte) error + // 写采集位列表 + WriteCjBytes(bytes []byte) error } diff --git a/service/modbus_dc_mapping.go b/service/modbus_dc_mapping.go deleted file mode 100644 index 3931eca..0000000 --- a/service/modbus_dc_mapping.go +++ /dev/null @@ -1,287 +0,0 @@ -package service - -import ( - "context" - "errors" - "fmt" - "log/slog" - "strings" - "time" - - "joylink.club/iot/modbus" - "joylink.club/iot/service/model" - sproto "joylink.club/iot/service/proto" -) - -// Modbus驱采服务 -type modbusQcService struct { - config *sproto.ModbusConfig - cli modbus.MasterClient - qc model.QC - cancel context.CancelFunc - done chan struct{} // 服务协程退出信号 -} - -func NewModbusQcService(config *sproto.ModbusConfig, dc model.QC) (IotService, error) { - if err := checkConfig(config); err != nil { - return nil, err - } - _, ok := modbus.GetClient(config.Url) - if ok { - return nil, fmt.Errorf("modbus客户端已存在,url=%s", config.Url) - } - cli, err := modbus.NewClient(&modbus.ClientConfig{ - Url: config.Url, - Timeout: config.Timeout, - }) - if err != nil { - return nil, errors.Join(err, fmt.Errorf("modbus客户端创建失败,url=%s", config.Url)) - } - cli.SetUnitId(uint8(config.UnitId)) - cli.SetEndianness(convertEndianness(config.Endianness)) - cli.Start() - s := &modbusQcService{ - config: config, - cli: cli, - qc: dc, - done: make(chan struct{}), - } - s.initOnUpdateTask() - ctx, cancel := context.WithCancel(context.Background()) - go s.run(ctx) - s.cancel = cancel - return s, nil -} - -func (m *modbusQcService) initOnUpdateTask() { - mapping := m.config.Mapping - for _, mdm := range mapping { - if mdm.WriteStrategy == sproto.Modbus_OnUpdate && isWriteFunction(mdm.Function) { - et := model.DCE_Drive_Update - if mdm.Type == sproto.DataType_CollectTable { - et = model.DCE_Collect_Update - } - m.qc.On(et, func(d model.QC) { - if !m.cli.IsConnected() { - slog.Warn("Modbus驱动采集服务数据更新写入失败,modbus客户端未连接", "url", m.config.Url, "Function", mdm.Function) - return - } - switch mdm.Function { - case sproto.Modbus_WriteCoil, sproto.Modbus_WriteCoils, sproto.Modbus_RWCoils: - err := m.cli.WriteCoils(uint16(mdm.Addr), m.GetDcBits(mdm)) - if err != nil { - slog.Error("Modbus驱动采集服务写入线圈失败", "url", m.config.Url, "error", err, "Function", mdm.Function) - } else { - slog.Info("Modbus驱动采集服务写入线圈成功", "url", m.config.Url, "Function", mdm.Function) - } - case sproto.Modbus_WriteRegister, sproto.Modbus_WriteRegisters, sproto.Modbus_RWRegisters: - err := m.cli.WriteRegisterBytes(uint16(mdm.Addr), m.GetDcBytes(mdm)) - if err != nil { - slog.Error("Modbus驱动采集服务写入寄存器失败", "url", m.config.Url, "error", err, "Function", mdm.Function) - } else { - slog.Info("Modbus驱动采集服务写入寄存器成功", "url", m.config.Url, "Function", mdm.Function) - } - } - }) - } - } -} - -func isWriteFunction(modbus_Function sproto.Modbus_Function) bool { - return modbus_Function == sproto.Modbus_WriteCoil || - modbus_Function == sproto.Modbus_WriteCoils || - modbus_Function == sproto.Modbus_WriteRegister || - modbus_Function == sproto.Modbus_WriteRegisters || - modbus_Function == sproto.Modbus_RWCoils || - modbus_Function == sproto.Modbus_RWRegisters -} - -func (m *modbusQcService) run(ctx context.Context) { - defer close(m.done) -mainLoop: - for { - select { - case <-ctx.Done(): - slog.Debug("Modbus驱采映射循环取消,关闭modbus客户端", "url", m.config.Url) - modbus.DeleteClient(m.config.Url) - break mainLoop - default: - } - m.mappingTaskExecute() - time.Sleep(time.Millisecond * time.Duration(m.config.Interval)) - } -} - -func (m *modbusQcService) mappingTaskExecute() { - if m.cli.IsConnected() { - for _, mdm := range m.config.Mapping { - switch mdm.Function { - case sproto.Modbus_ReadCoil, sproto.Modbus_RWCoils: - data, err := m.cli.ReadCoil(uint16(mdm.Addr), uint16(mdm.Quantity)) - if err != nil { - slog.Error("Modbus驱动采集服务读取线圈失败", "url", m.config.Url, "error", err) - continue - } - err = m.updateDcByBits(mdm, data) - if err != nil { - slog.Error("Modbus驱动采集服务更新驱采数据失败", "url", m.config.Url, "error", err) - continue - } - case sproto.Modbus_ReadDiscreteInput: - data, err := m.cli.ReadDiscreteInput(uint16(mdm.Addr), uint16(mdm.Quantity)) - if err != nil { - slog.Error("Modbus驱动采集服务读取离散输入失败", "url", m.config.Url, "error", err) - continue - } - err = m.updateDcByBits(mdm, data) - if err != nil { - slog.Error("Modbus驱动采集服务更新驱采数据失败", "url", m.config.Url, "error", err) - continue - } - case sproto.Modbus_ReadHoldingRegister, sproto.Modbus_RWRegisters: - data, err := m.cli.ReadHoldingRegisterBytes(uint16(mdm.Addr), uint16(mdm.Quantity*2)) - if err != nil { - slog.Error("Modbus驱动采集服务读取保持寄存器失败", "url", m.config.Url, "error", err) - continue - } - err = m.updateDcByBytes(mdm, data) - if err != nil { - slog.Error("Modbus驱动采集服务更新驱采数据失败", "url", m.config.Url, "error", err) - continue - } - case sproto.Modbus_ReadInputRegister: - data, err := m.cli.ReadInputRegisterBytes(uint16(mdm.Addr), uint16(mdm.Quantity*2)) - if err != nil { - slog.Error("Modbus驱动采集服务读取输入寄存器失败", "url", m.config.Url, "error", err) - continue - } - err = m.updateDcByBytes(mdm, data) - if err != nil { - slog.Error("Modbus驱动采集服务更新驱采数据失败", "url", m.config.Url, "error", err) - continue - } - case sproto.Modbus_WriteCoil: - if mdm.WriteStrategy == sproto.Modbus_OnScheduled { // 定时写 - bits := m.GetDcBits(mdm) - err := m.cli.WriteCoil(uint16(mdm.Addr), bits[0]) - if err != nil { - slog.Error("Modbus驱动采集服务写单线圈失败", "url", m.config.Url, "error", err) - continue - } - } - case sproto.Modbus_WriteCoils: - if mdm.WriteStrategy == sproto.Modbus_OnScheduled { // 定时写 - bits := m.GetDcBits(mdm) - err := m.cli.WriteCoils(uint16(mdm.Addr), bits) - if err != nil { - slog.Error("Modbus驱动采集服务写多线圈失败", "url", m.config.Url, "error", err) - continue - } - } - case sproto.Modbus_WriteRegister: - if mdm.WriteStrategy == sproto.Modbus_OnScheduled { // 定时写 - data := m.GetDcBytes(mdm) - err := m.cli.WriteRegisterBytes(uint16(mdm.Addr), data) - if err != nil { - slog.Error("Modbus驱动采集服务写单寄存器失败", "url", m.config.Url, "error", err) - continue - } - } - case sproto.Modbus_WriteRegisters: - if mdm.WriteStrategy == sproto.Modbus_OnScheduled { // 定时写 - data := m.GetDcBytes(mdm) - err := m.cli.WriteRegisterBytes(uint16(mdm.Addr), data) - if err != nil { - slog.Error("Modbus驱动采集服务写多寄存器失败", "url", m.config.Url, "error", err) - continue - } - } - } - } - } else { - slog.Error("Modbus驱动采集服务映射任务执行失败,Modbus未连接", "url", m.config.Url) - } -} - -func (m *modbusQcService) GetDcBits(mdm *sproto.ModbusDcMapping) []bool { - switch mdm.Type { - case sproto.DataType_CollectTable: // 采集数据 - return m.qc.GetCollectBits(mdm.Start, mdm.Quantity) - case sproto.DataType_DriveTable: // 驱动数据 - return m.qc.GetDriveBits(mdm.Start, mdm.Quantity) - default: - panic("未知数据类型") - } -} - -func (m *modbusQcService) GetDcBytes(mdm *sproto.ModbusDcMapping) []byte { - switch mdm.Type { - case sproto.DataType_CollectTable: // 采集数据 - return m.qc.GetCollectBytes(mdm.Start, mdm.Quantity*2) - case sproto.DataType_DriveTable: // 驱动数据 - return m.qc.GetDriveBytes(mdm.Start, mdm.Quantity*2) - default: - panic("未知数据类型") - } -} - -func (m *modbusQcService) updateDcByBits(mdm *sproto.ModbusDcMapping, bits []bool) error { - switch mdm.Type { - case sproto.DataType_CollectTable: // 采集数据 - return m.qc.UpdateCollectByBits(mdm.Start, bits) - case sproto.DataType_DriveTable: // 驱动数据 - return m.qc.UpdateDriveByBits(mdm.Start, bits) - } - return nil -} - -func (m *modbusQcService) updateDcByBytes(mdm *sproto.ModbusDcMapping, bytes []byte) error { - switch mdm.Type { - case sproto.DataType_CollectTable: // 采集数据 - return m.qc.UpdateCollectByBytes(mdm.Start, bytes) - case sproto.DataType_DriveTable: // 驱动数据 - return m.qc.UpdateDriveByBytes(mdm.Start, bytes) - } - return nil -} - -func checkConfig(config *sproto.ModbusConfig) error { - if config.Url == "" { - return fmt.Errorf("Modbus配置未设置url") - } - if !strings.HasPrefix(config.Url, "tcp") { - return fmt.Errorf("Modbus配置url必须以tcp开头") - } - if config.UnitId == 0 { - return fmt.Errorf("Modbus配置未设置unitId") - } - if config.Interval == 0 { - return fmt.Errorf("Modbus配置未设置interval") - } - if len(config.Mapping) == 0 { - return fmt.Errorf("Modbus配置无映射配置") - } - return nil -} - -func (m *modbusQcService) Start() error { - - return nil -} - -func (m *modbusQcService) Stop() error { - m.cancel() - <-m.done - slog.Info("Modbus驱采映射服务线程退出", "url", m.config.Url) - return nil -} - -func convertEndianness(endianness sproto.Modbus_Endianness) modbus.Endianness { - switch endianness { - case sproto.Modbus_BigEndian: - return modbus.BigEndian - case sproto.Modbus_LittleEndian: - return modbus.LittleEndian - } - return modbus.BigEndian -} diff --git a/service/modbus_qc_mapping.go b/service/modbus_qc_mapping.go new file mode 100644 index 0000000..ee58620 --- /dev/null +++ b/service/modbus_qc_mapping.go @@ -0,0 +1,380 @@ +package service + +import ( + "context" + "errors" + "fmt" + "log/slog" + "strings" + "time" + + "joylink.club/iot/protocol/modbus" + "joylink.club/iot/service/model" + sproto "joylink.club/iot/service/proto" +) + +// Modbus驱采服务 +type modbusQcService struct { + config *sproto.ModbusConfig + cli modbus.MasterClient + qc model.QC + cancel context.CancelFunc + done chan struct{} // 服务协程退出信号 +} + +func (m *modbusQcService) Stop() error { + m.cancel() + <-m.done + slog.Info("Modbus驱采映射服务线程退出", "url", m.config.Url) + return nil +} + +// GetCjBits implements IotQcMappingService. +func (s *modbusQcService) GetCjBits() []bool { + return s.qc.GetCjBits() +} + +// GetCjBytes implements IotQcMappingService. +func (s *modbusQcService) GetCjBytes() []byte { + return s.qc.GetCjBytes() +} + +// GetQdBits implements IotQcMappingService. +func (s *modbusQcService) GetQdBits() []bool { + return s.qc.GetQdBits() +} + +// GetQdBytes implements IotQcMappingService. +func (s *modbusQcService) GetQdBytes() []byte { + return s.qc.GetQdBytes() +} + +// WriteCjBytes implements IotQcMappingService. +func (s *modbusQcService) WriteCjBytes(bytes []byte) error { + if len(bytes) != len(s.qc.GetCjBytes()) { + return fmt.Errorf("写入采集字节长度有误,应为%d,实为%d", len(s.qc.GetCjBytes()), len(bytes)) + } + err := s.onWrite(sproto.DataType_CollectTable, bytes) + if err == nil { + s.qc.SetCjBytes(bytes) + } + return err +} + +// WriteQdBytes implements IotQcMappingService. +func (s *modbusQcService) WriteQdBytes(bytes []byte) error { + if len(bytes) != len(s.qc.GetQdBytes()) { + return fmt.Errorf("写入驱动字节长度有误,应为%d,实为%d", len(s.qc.GetQdBytes()), len(bytes)) + } + err := s.onWrite(sproto.DataType_DriveTable, bytes) + if err == nil { + s.qc.SetQdBytes(bytes) + } + return err +} + +func NewModbusQcService(config *sproto.ModbusConfig, qd []byte, cj []byte) (IotQcMappingService, error) { + if err := checkConfig(config); err != nil { + return nil, err + } + _, ok := modbus.GetClient(config.Url) + if ok { + return nil, fmt.Errorf("modbus客户端已存在,url=%s", config.Url) + } + cli, err := modbus.NewClient(&modbus.ClientConfig{ + Url: config.Url, + Timeout: config.Timeout, + }) + if err != nil { + return nil, errors.Join(err, fmt.Errorf("modbus客户端创建失败,url=%s", config.Url)) + } + cli.SetUnitId(uint8(config.UnitId)) + cli.SetEndianness(convertEndianness(config.Endianness)) + cli.Start() + s := &modbusQcService{ + config: config, + cli: cli, + qc: model.NewDC(qd, cj), + done: make(chan struct{}), + } + // s.initOnUpdateTask() + ctx, cancel := context.WithCancel(context.Background()) + go s.run(ctx) + s.cancel = cancel + return s, nil +} + +func (m *modbusQcService) onWrite(dt sproto.DataType, bytes []byte) error { + mapping := m.config.Mapping + for _, mdm := range mapping { + if isWriteFunction(mdm.Function) { + if mdm.Type == dt { + if !m.cli.IsConnected() { + slog.Warn("Modbus驱动采集服务数据写入失败,modbus客户端未连接", "url", m.config.Url, "unitid", m.config.UnitId, "Function", mdm.Function) + return errors.New("数据写入失败,modbus客户端未连接") + } + switch mdm.Function { + case sproto.Modbus_WriteCoil, sproto.Modbus_WriteCoils, sproto.Modbus_RWCoils: + data := getQcBits(bytes, mdm) + err := m.cli.WriteCoils(uint16(mdm.Addr), data) + if err != nil { + slog.Error("Modbus驱动采集服务写入线圈失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err, "Function", mdm.Function) + return err + } else { + slog.Info("Modbus驱动采集服务写入线圈成功", "url", m.config.Url, "unitid", m.config.UnitId, "Function", mdm.Function, "data", data, "mapping", mdm) + } + case sproto.Modbus_WriteRegister, sproto.Modbus_WriteRegisters, sproto.Modbus_RWRegisters: + data := getQcBytes(bytes, mdm) + err := m.cli.WriteRegisterBytes(uint16(mdm.Addr), data) + if err != nil { + slog.Error("Modbus驱动采集服务写入寄存器失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err, "Function", mdm.Function) + return err + } else { + slog.Info("Modbus驱动采集服务写入寄存器成功", "url", m.config.Url, "unitid", m.config.UnitId, "Function", mdm.Function, "data", data, "mapping", mdm) + } + } + } + } + } + return nil +} + +// func (m *modbusQcService) initOnUpdateTask() { +// mapping := m.config.Mapping +// for _, mdm := range mapping { +// if mdm.WriteStrategy == sproto.Modbus_OnUpdate && isWriteFunction(mdm.Function) { +// et := model.DCE_Drive_Update +// if mdm.Type == sproto.DataType_CollectTable { +// et = model.DCE_Collect_Update +// } +// m.qc.On(et, func(d model.QC) { +// if !m.cli.IsConnected() { +// slog.Warn("Modbus驱动采集服务数据更新写入失败,modbus客户端未连接", "url", m.config.Url, "Function", mdm.Function) +// return +// } +// switch mdm.Function { +// case sproto.Modbus_WriteCoil, sproto.Modbus_WriteCoils, sproto.Modbus_RWCoils: +// err := m.cli.WriteCoils(uint16(mdm.Addr), m.GetDcBits(mdm)) +// if err != nil { +// slog.Error("Modbus驱动采集服务写入线圈失败", "url", m.config.Url, "error", err, "Function", mdm.Function) +// } else { +// slog.Info("Modbus驱动采集服务写入线圈成功", "url", m.config.Url, "Function", mdm.Function) +// } +// case sproto.Modbus_WriteRegister, sproto.Modbus_WriteRegisters, sproto.Modbus_RWRegisters: +// err := m.cli.WriteRegisterBytes(uint16(mdm.Addr), m.GetDcBytes(mdm)) +// if err != nil { +// slog.Error("Modbus驱动采集服务写入寄存器失败", "url", m.config.Url, "error", err, "Function", mdm.Function) +// } else { +// slog.Info("Modbus驱动采集服务写入寄存器成功", "url", m.config.Url, "Function", mdm.Function) +// } +// } +// }) +// } +// } +// } + +func isWriteFunction(modbus_Function sproto.Modbus_Function) bool { + return modbus_Function == sproto.Modbus_WriteCoil || + modbus_Function == sproto.Modbus_WriteCoils || + modbus_Function == sproto.Modbus_WriteRegister || + modbus_Function == sproto.Modbus_WriteRegisters || + modbus_Function == sproto.Modbus_RWCoils || + modbus_Function == sproto.Modbus_RWRegisters +} + +func (m *modbusQcService) run(ctx context.Context) { + defer close(m.done) +mainLoop: + for { + select { + case <-ctx.Done(): + slog.Debug("Modbus驱采映射循环取消,关闭modbus客户端", "url", m.config.Url) + modbus.DeleteClient(m.config.Url) + break mainLoop + default: + } + m.mappingTaskExecute() + time.Sleep(time.Millisecond * time.Duration(m.config.Interval)) + } +} + +func (m *modbusQcService) mappingTaskExecute() { + if m.cli.IsConnected() { + for _, mdm := range m.config.Mapping { + switch mdm.Function { + case sproto.Modbus_ReadCoil, sproto.Modbus_RWCoils: + data, err := m.cli.ReadCoil(uint16(mdm.Addr), uint16(mdm.Quantity)) + if err != nil { + slog.Error("Modbus驱动采集服务读取线圈失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) + continue + } + err = m.updateDcByBits(mdm, data) + if err != nil { + slog.Error("Modbus驱动采集服务更新驱采数据失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) + continue + } + case sproto.Modbus_ReadDiscreteInput: + data, err := m.cli.ReadDiscreteInput(uint16(mdm.Addr), uint16(mdm.Quantity)) + if err != nil { + slog.Error("Modbus驱动采集服务读取离散输入失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) + continue + } + err = m.updateDcByBits(mdm, data) + if err != nil { + slog.Error("Modbus驱动采集服务更新驱采数据失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) + continue + } + case sproto.Modbus_ReadHoldingRegister, sproto.Modbus_RWRegisters: + data, err := m.cli.ReadHoldingRegisterBytes(uint16(mdm.Addr), uint16(mdm.Quantity*2)) + if err != nil { + slog.Error("Modbus驱动采集服务读取保持寄存器失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) + continue + } + err = m.updateDcByBytes(mdm, data) + if err != nil { + slog.Error("Modbus驱动采集服务更新驱采数据失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) + continue + } + case sproto.Modbus_ReadInputRegister: + data, err := m.cli.ReadInputRegisterBytes(uint16(mdm.Addr), uint16(mdm.Quantity*2)) + if err != nil { + slog.Error("Modbus驱动采集服务读取输入寄存器失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) + continue + } + err = m.updateDcByBytes(mdm, data) + if err != nil { + slog.Error("Modbus驱动采集服务更新驱采数据失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) + continue + } + // case sproto.Modbus_WriteCoil: + // if mdm.WriteStrategy == sproto.Modbus_OnScheduled { // 定时写 + // bits := m.GetDcBits(mdm) + // err := m.cli.WriteCoil(uint16(mdm.Addr), bits[0]) + // if err != nil { + // slog.Error("Modbus驱动采集服务写单线圈失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) + // continue + // } + // } + // case sproto.Modbus_WriteCoils: + // if mdm.WriteStrategy == sproto.Modbus_OnScheduled { // 定时写 + // bits := m.GetDcBits(mdm) + // err := m.cli.WriteCoils(uint16(mdm.Addr), bits) + // if err != nil { + // slog.Error("Modbus驱动采集服务写多线圈失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) + // continue + // } + // } + // case sproto.Modbus_WriteRegister: + // if mdm.WriteStrategy == sproto.Modbus_OnScheduled { // 定时写 + // data := m.GetDcBytes(mdm) + // err := m.cli.WriteRegisterBytes(uint16(mdm.Addr), data) + // if err != nil { + // slog.Error("Modbus驱动采集服务写单寄存器失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) + // continue + // } + // } + // case sproto.Modbus_WriteRegisters: + // if mdm.WriteStrategy == sproto.Modbus_OnScheduled { // 定时写 + // data := m.GetDcBytes(mdm) + // err := m.cli.WriteRegisterBytes(uint16(mdm.Addr), data) + // if err != nil { + // slog.Error("Modbus驱动采集服务写多寄存器失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) + // continue + // } + // } + } + } + } else { + slog.Error("Modbus驱动采集服务映射任务执行失败,Modbus未连接", "url", m.config.Url, "unitid", m.config.UnitId) + } +} + +func getQcBits(bytes []byte, mdm *sproto.ModbusDcMapping) []bool { + bits := model.DecodeBools(bytes) + start := mdm.Start + quantity := mdm.Quantity + if start+quantity > uint32(len(bits)) { + panic(fmt.Errorf("getQcBits超出范围")) + } + return bits[start : start+quantity] +} + +func getQcBytes(bytes []byte, mdm *sproto.ModbusDcMapping) []byte { + start := mdm.Start + quantity := mdm.Quantity * 2 + if start+quantity > uint32(len(bytes)) { + panic(fmt.Errorf("getQcBytes超出范围")) + } + return bytes[start : start+quantity] +} + +// func (m *modbusQcService) GetDcBits(mdm *sproto.ModbusDcMapping) []bool { +// switch mdm.Type { +// case sproto.DataType_CollectTable: // 采集数据 +// return m.qc.GetCjBitsOf(mdm.Start, mdm.Quantity) +// case sproto.DataType_DriveTable: // 驱动数据 +// return m.qc.GetQdBitsOf(mdm.Start, mdm.Quantity) +// default: +// panic("未知数据类型") +// } +// } + +// func (m *modbusQcService) GetDcBytes(mdm *sproto.ModbusDcMapping) []byte { +// switch mdm.Type { +// case sproto.DataType_CollectTable: // 采集数据 +// return m.qc.GetCjBytesOf(mdm.Start, mdm.Quantity*2) +// case sproto.DataType_DriveTable: // 驱动数据 +// return m.qc.GetQdBytesOf(mdm.Start, mdm.Quantity*2) +// default: +// panic("未知数据类型") +// } +// } + +func (m *modbusQcService) updateDcByBits(mdm *sproto.ModbusDcMapping, bits []bool) error { + switch mdm.Type { + case sproto.DataType_CollectTable: // 采集数据 + return m.qc.UpdateCjByBits(mdm.Start, bits) + case sproto.DataType_DriveTable: // 驱动数据 + return m.qc.UpdateQdByBits(mdm.Start, bits) + } + return nil +} + +func (m *modbusQcService) updateDcByBytes(mdm *sproto.ModbusDcMapping, bytes []byte) error { + switch mdm.Type { + case sproto.DataType_CollectTable: // 采集数据 + return m.qc.UpdateCjByBytes(mdm.Start, bytes) + case sproto.DataType_DriveTable: // 驱动数据 + return m.qc.UpdateQdByBytes(mdm.Start, bytes) + } + return nil +} + +func checkConfig(config *sproto.ModbusConfig) error { + if config.Url == "" { + return fmt.Errorf("Modbus配置未设置url") + } + if !strings.HasPrefix(config.Url, "tcp") { + return fmt.Errorf("Modbus配置url必须以tcp开头") + } + if config.UnitId == 0 { + return fmt.Errorf("Modbus配置未设置unitId") + } + if config.Interval == 0 { + return fmt.Errorf("Modbus配置未设置interval") + } + if len(config.Mapping) == 0 { + return fmt.Errorf("Modbus配置无映射配置") + } + return nil +} + +func convertEndianness(endianness sproto.Modbus_Endianness) modbus.Endianness { + switch endianness { + case sproto.Modbus_BigEndian: + return modbus.BigEndian + case sproto.Modbus_LittleEndian: + return modbus.LittleEndian + } + return modbus.BigEndian +} diff --git a/service/model/dc.go b/service/model/dc.go index 4e3579e..f4a239d 100644 --- a/service/model/dc.go +++ b/service/model/dc.go @@ -17,121 +17,147 @@ const ( // 驱动采集数据 type QC interface { // 更新驱动数据,位数组 - UpdateDriveByBits(start uint32, bits []bool) error + UpdateQdByBits(start uint32, bits []bool) error // 更新驱动数据,字节数组 - UpdateDriveByBytes(start uint32, values []byte) error + UpdateQdByBytes(start uint32, values []byte) error // 更新采集数据,位数组 - UpdateCollectByBits(start uint32, bits []bool) error + UpdateCjByBits(start uint32, bits []bool) error // 更新采集数据,字节数组 - UpdateCollectByBytes(start uint32, values []byte) error + UpdateCjByBytes(start uint32, values []byte) error + // 设置驱动数据 + SetQdBytes(bytes []byte) // 获取驱动数据,字节数组 - GetDrive() []byte + GetQdBytes() []byte + // 获取驱动数据,位数组 + GetQdBits() []bool // 获取指定驱动数据,位 - GetDriveBit(start uint32) bool + GetQdBitOf(start uint32) bool // 获取指定驱动数据,位数组 - GetDriveBits(start uint32, quantity uint32) []bool + GetQdBitsOf(start uint32, quantity uint32) []bool // 获取指定驱动数据,字节 - GetDriveByte(start uint32) byte + GetQdByteOf(start uint32) byte // 获取指定驱动数据,字节数组 - GetDriveBytes(start uint32, quantity uint32) []byte + GetQdBytesOf(start uint32, quantity uint32) []byte + // 设置采集数据 + SetCjBytes(bytes []byte) // 获取采集数据,字节数组 - GetCollect() []byte + GetCjBytes() []byte + // 获取采集数据,位数组 + GetCjBits() []bool // 获取指定采集数据,位 - GetCollectBit(start uint32) bool + GetCjBitOf(start uint32) bool // 获取指定采集数据,位数组 - GetCollectBits(start uint32, quantity uint32) []bool + GetCjBitsOf(start uint32, quantity uint32) []bool // 获取指定采集数据,字节 - GetCollectByte(start uint32) byte + GetCjByteOf(start uint32) byte // 获取指定采集数据,字节数组 - GetCollectBytes(start uint32, quantity uint32) []byte - // 发布事件 - Emit(event DCEvent) - // 订阅事件 - On(event DCEvent, callback func(dc QC)) - // 取消订阅 - Off(event DCEvent, callback func(dc QC)) + GetCjBytesOf(start uint32, quantity uint32) []byte + // // 发布事件 + // Emit(event DCEvent) + // // 订阅事件 + // On(event DCEvent, callback func(dc QC)) + // // 取消订阅 + // Off(event DCEvent, callback func(dc QC)) } type qc struct { // 驱动数据 - drive []byte - driveBits []bool + qd []byte + qdBits []bool // 采集数据 - collect []byte - collectBits []bool - // 事件 - subscribes map[DCEvent][]func(dc QC) + cj []byte + cjBits []bool + // // 事件 + // subscribes map[DCEvent][]func(dc QC) } -// GetCollect implements DC. -func (d *qc) GetCollect() []byte { - return d.collect +func (d *qc) SetCjBytes(bytes []byte) { + d.cj = bytes + d.cjBits = DecodeBools(bytes) } -func (d *qc) GetCollectBit(start uint32) bool { - if start >= uint32(len(d.collectBits)) { +// GetCjBytes implements DC. +func (d *qc) GetCjBytes() []byte { + return d.cj +} + +func (d *qc) GetCjBits() []bool { + return d.cjBits +} + +func (d *qc) GetCjBitOf(start uint32) bool { + if start >= uint32(len(d.cjBits)) { panic(fmt.Errorf("GetCollectBit超出范围")) } - return d.collectBits[start] + return d.cjBits[start] } -func (d *qc) GetCollectBits(start uint32, quantity uint32) []bool { - if start+quantity > uint32(len(d.collectBits)) { +func (d *qc) GetCjBitsOf(start uint32, quantity uint32) []bool { + if start+quantity > uint32(len(d.cjBits)) { panic(fmt.Errorf("GetCollectBits超出范围")) } - return d.collectBits[start : start+quantity] + return d.cjBits[start : start+quantity] } -func (d *qc) GetCollectByte(start uint32) byte { - if start >= uint32(len(d.collect)) { +func (d *qc) GetCjByteOf(start uint32) byte { + if start >= uint32(len(d.cj)) { panic(fmt.Errorf("GetCollectByte超出范围")) } - return d.collect[start] + return d.cj[start] } -func (d *qc) GetCollectBytes(start uint32, quantity uint32) []byte { - if start+quantity > uint32(len(d.collect)) { +func (d *qc) GetCjBytesOf(start uint32, quantity uint32) []byte { + if start+quantity > uint32(len(d.cj)) { panic(fmt.Errorf("GetCollectBytes超出范围")) } - return d.collect[start : start+quantity] + return d.cj[start : start+quantity] } -// GetDrive implements DC. -func (d *qc) GetDrive() []byte { - return d.drive +func (d *qc) SetQdBytes(bytes []byte) { + d.qd = bytes + d.qdBits = DecodeBools(bytes) } -func (d *qc) GetDriveBit(start uint32) bool { - if start >= uint32(len(d.driveBits)) { +// GetQdBytes implements DC. +func (d *qc) GetQdBytes() []byte { + return d.qd +} + +func (d *qc) GetQdBits() []bool { + return d.qdBits +} + +func (d *qc) GetQdBitOf(start uint32) bool { + if start >= uint32(len(d.qdBits)) { panic(fmt.Errorf("GetDriveBit超出范围")) } - return d.driveBits[start] + return d.qdBits[start] } -func (d *qc) GetDriveBits(start uint32, quantity uint32) []bool { - if start+quantity > uint32(len(d.driveBits)) { +func (d *qc) GetQdBitsOf(start uint32, quantity uint32) []bool { + if start+quantity > uint32(len(d.qdBits)) { panic(fmt.Errorf("GetDriveBits超出范围")) } - return d.driveBits[start : start+quantity] + return d.qdBits[start : start+quantity] } -func (d *qc) GetDriveByte(start uint32) byte { - if start >= uint32(len(d.drive)) { +func (d *qc) GetQdByteOf(start uint32) byte { + if start >= uint32(len(d.qd)) { panic(fmt.Errorf("GetDriveByte超出范围")) } - return d.drive[start] + return d.qd[start] } -func (d *qc) GetDriveBytes(start uint32, quantity uint32) []byte { - if start+quantity > uint32(len(d.drive)) { +func (d *qc) GetQdBytesOf(start uint32, quantity uint32) []byte { + if start+quantity > uint32(len(d.qd)) { panic(fmt.Errorf("GetDriveBytes超出范围")) } - return d.drive[start : start+quantity] + return d.qd[start : start+quantity] } -// UpdateCollectByBits implements DC. -func (d *qc) UpdateCollectByBits(start uint32, bits []bool) error { - total := len(d.collectBits) +// UpdateCjByBits implements DC. +func (d *qc) UpdateCjByBits(start uint32, bits []bool) error { + total := len(d.cjBits) if start >= uint32(total) { return fmt.Errorf("UpdateCollectByBits参数start超出范围") } @@ -140,17 +166,17 @@ func (d *qc) UpdateCollectByBits(start uint32, bits []bool) error { return fmt.Errorf("UpdateCollectByBits参数start+len(bits)超出范围") } for i := start; i < end; i++ { - d.collectBits[i] = bits[i-start] + d.cjBits[i] = bits[i-start] } - d.collect = encodeBytes(d.collectBits) - slog.Debug("UpdateCollectByBits成功", "collect", fmt.Sprintf("%v", d.collect), "collectBits", d.collectBits) - d.Emit(DCE_Collect_Update) + d.cj = EncodeBytes(d.cjBits) + slog.Debug("UpdateCollectByBits成功", "collect", fmt.Sprintf("%v", d.cj), "collectBits", d.cjBits) + // d.Emit(DCE_Collect_Update) return nil } -// UpdateCollectByBytes implements DC. -func (d *qc) UpdateCollectByBytes(start uint32, values []byte) error { - total := len(d.collect) +// UpdateCjByBytes implements DC. +func (d *qc) UpdateCjByBytes(start uint32, values []byte) error { + total := len(d.cj) if start >= uint32(total) { return fmt.Errorf("UpdateCollectByBytes参数start超出范围") } @@ -158,16 +184,16 @@ func (d *qc) UpdateCollectByBytes(start uint32, values []byte) error { if end > uint32(total) { return fmt.Errorf("UpdateCollectByBytes参数start+len(values)超出范围") } - copy(d.collect[start:end], values) - d.collectBits = decodeBools(d.collect) - slog.Debug("UpdateCollectByBytes成功", "collect", fmt.Sprintf("%v", d.collect), "collectBits", d.collectBits) - d.Emit(DCE_Collect_Update) + copy(d.cj[start:end], values) + d.cjBits = DecodeBools(d.cj) + slog.Debug("UpdateCollectByBytes成功", "collect", fmt.Sprintf("%v", d.cj), "collectBits", d.cjBits) + // d.Emit(DCE_Collect_Update) return nil } -// UpdateDriveByBits implements DC. -func (d *qc) UpdateDriveByBits(start uint32, bits []bool) error { - total := len(d.driveBits) +// UpdateQdByBits implements DC. +func (d *qc) UpdateQdByBits(start uint32, bits []bool) error { + total := len(d.qdBits) if start >= uint32(total) { return fmt.Errorf("UpdateDriveByBits参数start超出范围") } @@ -176,17 +202,17 @@ func (d *qc) UpdateDriveByBits(start uint32, bits []bool) error { return fmt.Errorf("UpdateDriveByBits参数start+len(bits)超出范围") } for i := start; i < end; i++ { - d.driveBits[i] = bits[i-start] + d.qdBits[i] = bits[i-start] } - d.drive = encodeBytes(d.driveBits) - slog.Debug("UpdateDriveByBits成功", "drive", fmt.Sprintf("%v", d.drive), "driveBits", d.driveBits) - d.Emit(DCE_Drive_Update) + d.qd = EncodeBytes(d.qdBits) + slog.Debug("UpdateDriveByBits成功", "drive", fmt.Sprintf("%v", d.qd), "driveBits", d.qdBits) + // d.Emit(DCE_Drive_Update) return nil } -// UpdateDriveByBytes implements DC. -func (d *qc) UpdateDriveByBytes(start uint32, values []byte) error { - total := len(d.drive) +// UpdateQdByBytes implements DC. +func (d *qc) UpdateQdByBytes(start uint32, values []byte) error { + total := len(d.qd) if start >= uint32(total) { return fmt.Errorf("UpdateDriveByBytes参数start超出范围") } @@ -194,42 +220,42 @@ func (d *qc) UpdateDriveByBytes(start uint32, values []byte) error { if end > uint32(total) { return fmt.Errorf("UpdateDriveByBytes参数start+len(values)超出范围") } - copy(d.drive[start:end], values) - d.driveBits = decodeBools(d.drive) - slog.Debug("UpdateDriveByBytes成功", "drive", fmt.Sprintf("%v", d.drive), "driveBits", d.driveBits) - d.Emit(DCE_Drive_Update) + copy(d.qd[start:end], values) + d.qdBits = DecodeBools(d.qd) + slog.Debug("UpdateDriveByBytes成功", "drive", fmt.Sprintf("%v", d.qd), "driveBits", d.qdBits) + // d.Emit(DCE_Drive_Update) return nil } -// Emit implements DC. -func (d *qc) Emit(event DCEvent) { - listeners := d.subscribes[event] - for _, v := range listeners { - v(d) - } -} +// // Emit implements DC. +// func (d *qc) Emit(event DCEvent) { +// listeners := d.subscribes[event] +// for _, v := range listeners { +// v(d) +// } +// } -// On implements DC. -func (d *qc) On(event DCEvent, callback func(d QC)) { - d.subscribes[event] = append(d.subscribes[event], callback) -} +// // On implements DC. +// func (d *qc) On(event DCEvent, callback func(d QC)) { +// d.subscribes[event] = append(d.subscribes[event], callback) +// } -// Off implements DC. -func (d *qc) Off(event DCEvent, callback func(d QC)) { - panic("unimplemented") -} +// // Off implements DC. +// func (d *qc) Off(event DCEvent, callback func(d QC)) { +// panic("unimplemented") +// } -func NewDC(d []byte, c []byte) QC { +func NewDC(qd []byte, cj []byte) QC { return &qc{ - drive: d, - driveBits: decodeBools(d), - collect: c, - collectBits: decodeBools(c), - subscribes: make(map[DCEvent][]func(d QC)), + qd: qd, + qdBits: DecodeBools(qd), + cj: cj, + cjBits: DecodeBools(cj), + // subscribes: make(map[DCEvent][]func(d QC)), } } -func decodeBools(bytes []byte) (out []bool) { +func DecodeBools(bytes []byte) (out []bool) { len := len(bytes) * 8 var i int for i = 0; i < len; i++ { @@ -238,7 +264,7 @@ func decodeBools(bytes []byte) (out []bool) { return out } -func encodeBytes(bits []bool) (out []byte) { +func EncodeBytes(bits []bool) (out []byte) { len := len(bits) if len%8 != 0 { panic("encodeBytes参数bits长度错误") diff --git a/service/model/dc_test.go b/service/model/dc_test.go index 0480281..20eb0ce 100644 --- a/service/model/dc_test.go +++ b/service/model/dc_test.go @@ -7,7 +7,7 @@ import ( func TestEncodeBytes(t *testing.T) { want := byte(0b11011000) bits := []bool{false, false, false, true, true, false, true, true} - bs := encodeBytes(bits) + bs := EncodeBytes(bits) if bs[0] != want { t.Errorf("encodeBytes(%v) = %v, want %v", bits, bs, want) } @@ -16,7 +16,7 @@ func TestEncodeBytes(t *testing.T) { func TestDecodeBools(t *testing.T) { want := []bool{false, false, false, true, true, false, true, true} bs := []byte{0xD8} - bits := decodeBools(bs) + bits := DecodeBools(bs) if !sliceEquals(bits, want) { t.Errorf("decodeBits(%v) = %v, want %v", bs, bits, want) } diff --git a/service/model/ss.go b/service/model/ss.go new file mode 100644 index 0000000..2793be5 --- /dev/null +++ b/service/model/ss.go @@ -0,0 +1,6 @@ +package model + +// 服务状态 +type ServiceState struct { + State int32 +}