From 70c8f9a41de6743400ca4dc6e542227f962bd0a8 Mon Sep 17 00:00:00 2001 From: walker Date: Mon, 18 Dec 2023 17:52:23 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E9=A9=B1=E9=87=87=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E5=AE=9A=E4=B9=89=E5=92=8C=E5=AE=9E=E7=8E=B0=20?= =?UTF-8?q?=E4=BF=AE=E6=94=B9modbus=E9=A9=B1=E9=87=87=E6=98=A0=E5=B0=84?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E4=BB=BB=E5=8A=A1=E4=BD=BF=E7=94=A8=E9=A9=B1?= =?UTF-8?q?=E9=87=87=E4=BB=BB=E5=8A=A1=E5=AF=B9=E8=B1=A1=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.go | 17 ++++++--- service/api.go | 10 +++++ service/modbus_qc_mapping.go | 74 ++++++++++++++++++++++-------------- service/qc_handle_task.go | 50 ++++++++++++++++++++++++ 4 files changed, 117 insertions(+), 34 deletions(-) create mode 100644 service/qc_handle_task.go diff --git a/main.go b/main.go index 4855fd2..b23eb57 100644 --- a/main.go +++ b/main.go @@ -36,8 +36,6 @@ func main() { os.Exit(1) } - time.Sleep(time.Second * 3) - go func() { for { time.Sleep(time.Second) @@ -50,7 +48,7 @@ func main() { mds, err := service.NewModbusQcService(&proto.ModbusConfig{ Url: "tcp://127.0.0.1:502", - UnitId: 1, + UnitId: 2, Timeout: 500, Interval: 1000, Mapping: []*proto.ModbusDcMapping{ @@ -76,11 +74,15 @@ func main() { panic(err) } - go func() { + mds.RegisterQcDataHandleScheduleTask(func() { i := 0 for { c := mds.GetCjBytes() - fmt.Printf("采集数据: %v\n", c) + fmt.Printf("发布采集数据: %v\n", c) + mqtt.PubIotCjData(&mproto.IotCj{ + Code: config.Cfg.Mqtt.ClientId, + Data: c, + }) i++ if i%3 == 0 { idx := i % 8 @@ -91,9 +93,12 @@ func main() { fmt.Printf("设置驱动数据成功: %v\n", mds.GetQdBytes()) } } + if i%10 == 0 { + mds.Stop() + } time.Sleep(time.Second) } - }() + }, time.Second) time.Sleep(time.Minute) diff --git a/service/api.go b/service/api.go index 7de4c42..927df4d 100644 --- a/service/api.go +++ b/service/api.go @@ -1,10 +1,15 @@ package service +import "time" + // IOT驱采映射服务 type IotQcMappingService interface { // 停止 Stop() error + // 注册驱动数据处理定时任务 + RegisterQcDataHandleScheduleTask(task func(), interval time.Duration) + // 获取驱动字节列表 GetQdBytes() []byte // 获取驱动位列表 @@ -18,3 +23,8 @@ type IotQcMappingService interface { // 写采集位列表 WriteCjBytes(bytes []byte) error } + +type QcDataHandleScheduleTask interface { + // 停止 + Stop() +} diff --git a/service/modbus_qc_mapping.go b/service/modbus_qc_mapping.go index f9a0b76..808e110 100644 --- a/service/modbus_qc_mapping.go +++ b/service/modbus_qc_mapping.go @@ -1,7 +1,6 @@ package service import ( - "context" "errors" "fmt" "log/slog" @@ -15,16 +14,34 @@ import ( // Modbus驱采服务 type modbusQcService struct { - config *sproto.ModbusConfig - cli modbus.MasterClient - qc model.QC - cancel context.CancelFunc - done chan struct{} // 服务协程退出信号 + config *sproto.ModbusConfig + cli modbus.MasterClient + qc model.QC + tasks []QcDataHandleScheduleTask + stopped bool + // cancel context.CancelFunc + // done chan struct{} // 服务协程退出信号 +} + +// RegisterQcDataHandleScheduleTask implements IotQcMappingService. +func (s *modbusQcService) RegisterQcDataHandleScheduleTask(task func(), interval time.Duration) { + if s.stopped { + panic(fmt.Errorf("modbus驱采映射服务已停止")) + } + s.tasks = append(s.tasks, NewQcHandleTask(task, interval)) } func (m *modbusQcService) Stop() error { - m.cancel() - <-m.done + if m.stopped { // 已经停止 + return nil + } + m.stopped = true + for _, task := range m.tasks { + task.Stop() + } + m.tasks = nil + slog.Debug("Modbus驱采映射循环取消,关闭modbus客户端", "url", m.config.Url) + defer modbus.DeleteClient(m.config.Url) slog.Info("Modbus驱采映射服务线程退出", "url", m.config.Url) return nil } @@ -100,12 +117,13 @@ func NewModbusQcService(config *sproto.ModbusConfig, qd []byte, cj []byte) (IotQ config: config, cli: cli, qc: model.NewDC(qd, cj), - done: make(chan struct{}), + // done: make(chan struct{}), } + s.RegisterQcDataHandleScheduleTask(s.readTaskExecute, time.Duration(config.Interval)*time.Millisecond) // s.initOnUpdateTask() - ctx, cancel := context.WithCancel(context.Background()) - go s.run(ctx) - s.cancel = cancel + // ctx, cancel := context.WithCancel(context.Background()) + // go s.run(ctx) + // s.cancel = cancel return s, nil } @@ -178,23 +196,23 @@ func (m *modbusQcService) onWrite(dt sproto.DataType, bytes []byte) error { // } // } -func (m *modbusQcService) run(ctx context.Context) { - defer close(m.done) -mainLoop: - for { - select { - case <-ctx.Done(): - slog.Debug("Modbus驱采映射循环取消,关闭modbus客户端", "url", m.config.Url) - modbus.DeleteClient(m.config.Url) - break mainLoop - default: - } - m.mappingTaskExecute() - time.Sleep(time.Millisecond * time.Duration(m.config.Interval)) - } -} +// func (m *modbusQcService) run(ctx context.Context) { +// defer close(m.done) +// mainLoop: +// for { +// select { +// case <-ctx.Done(): +// slog.Debug("Modbus驱采映射循环取消,关闭modbus客户端", "url", m.config.Url) +// modbus.DeleteClient(m.config.Url) +// break mainLoop +// default: +// } +// m.readTaskExecute() +// time.Sleep(time.Millisecond * time.Duration(m.config.Interval)) +// } +// } -func (m *modbusQcService) mappingTaskExecute() { +func (m *modbusQcService) readTaskExecute() { if m.cli.IsConnected() { for _, mdm := range m.config.Mapping { switch mdm.Function { diff --git a/service/qc_handle_task.go b/service/qc_handle_task.go new file mode 100644 index 0000000..801b1c4 --- /dev/null +++ b/service/qc_handle_task.go @@ -0,0 +1,50 @@ +package service + +import ( + "context" + "log/slog" + "time" +) + +type qcHandleTask struct { + fn func() + interval time.Duration + cancel context.CancelFunc + done chan struct{} // 服务协程退出信号 +} + +// Stop implements QcDataHandleScheduleTask. +func (t *qcHandleTask) Stop() { + t.cancel() + <-t.done + slog.Info("驱采数据处理任务线程退出") +} + +func (t *qcHandleTask) run(ctx context.Context) { + defer close(t.done) +mainLoop: + for { + select { + case <-ctx.Done(): + break mainLoop + default: + } + t.fn() + time.Sleep(t.interval) + } +} + +func NewQcHandleTask(run func(), interval time.Duration) QcDataHandleScheduleTask { + if interval <= 0 { + interval = time.Second + } + task := &qcHandleTask{ + fn: run, + interval: interval, + done: make(chan struct{}), + } + ctx, cancel := context.WithCancel(context.Background()) + go task.run(ctx) + task.cancel = cancel + return task +}