添加驱采任务定义和实现

修改modbus驱采映射服务任务使用驱采任务对象实现
This commit is contained in:
walker 2023-12-18 17:52:23 +08:00
parent 87d392719a
commit 70c8f9a41d
4 changed files with 117 additions and 34 deletions

17
main.go
View File

@ -36,8 +36,6 @@ func main() {
os.Exit(1)
}
time.Sleep(time.Second * 3)
go func() {
for {
time.Sleep(time.Second)
@ -50,7 +48,7 @@ func main() {
mds, err := service.NewModbusQcService(&proto.ModbusConfig{
Url: "tcp://127.0.0.1:502",
UnitId: 1,
UnitId: 2,
Timeout: 500,
Interval: 1000,
Mapping: []*proto.ModbusDcMapping{
@ -76,11 +74,15 @@ func main() {
panic(err)
}
go func() {
mds.RegisterQcDataHandleScheduleTask(func() {
i := 0
for {
c := mds.GetCjBytes()
fmt.Printf("采集数据: %v\n", c)
fmt.Printf("发布采集数据: %v\n", c)
mqtt.PubIotCjData(&mproto.IotCj{
Code: config.Cfg.Mqtt.ClientId,
Data: c,
})
i++
if i%3 == 0 {
idx := i % 8
@ -91,9 +93,12 @@ func main() {
fmt.Printf("设置驱动数据成功: %v\n", mds.GetQdBytes())
}
}
if i%10 == 0 {
mds.Stop()
}
time.Sleep(time.Second)
}
}()
}, time.Second)
time.Sleep(time.Minute)

View File

@ -1,10 +1,15 @@
package service
import "time"
// IOT驱采映射服务
type IotQcMappingService interface {
// 停止
Stop() error
// 注册驱动数据处理定时任务
RegisterQcDataHandleScheduleTask(task func(), interval time.Duration)
// 获取驱动字节列表
GetQdBytes() []byte
// 获取驱动位列表
@ -18,3 +23,8 @@ type IotQcMappingService interface {
// 写采集位列表
WriteCjBytes(bytes []byte) error
}
type QcDataHandleScheduleTask interface {
// 停止
Stop()
}

View File

@ -1,7 +1,6 @@
package service
import (
"context"
"errors"
"fmt"
"log/slog"
@ -15,16 +14,34 @@ import (
// Modbus驱采服务
type modbusQcService struct {
config *sproto.ModbusConfig
cli modbus.MasterClient
qc model.QC
cancel context.CancelFunc
done chan struct{} // 服务协程退出信号
config *sproto.ModbusConfig
cli modbus.MasterClient
qc model.QC
tasks []QcDataHandleScheduleTask
stopped bool
// cancel context.CancelFunc
// done chan struct{} // 服务协程退出信号
}
// RegisterQcDataHandleScheduleTask implements IotQcMappingService.
func (s *modbusQcService) RegisterQcDataHandleScheduleTask(task func(), interval time.Duration) {
if s.stopped {
panic(fmt.Errorf("modbus驱采映射服务已停止"))
}
s.tasks = append(s.tasks, NewQcHandleTask(task, interval))
}
func (m *modbusQcService) Stop() error {
m.cancel()
<-m.done
if m.stopped { // 已经停止
return nil
}
m.stopped = true
for _, task := range m.tasks {
task.Stop()
}
m.tasks = nil
slog.Debug("Modbus驱采映射循环取消,关闭modbus客户端", "url", m.config.Url)
defer modbus.DeleteClient(m.config.Url)
slog.Info("Modbus驱采映射服务线程退出", "url", m.config.Url)
return nil
}
@ -100,12 +117,13 @@ func NewModbusQcService(config *sproto.ModbusConfig, qd []byte, cj []byte) (IotQ
config: config,
cli: cli,
qc: model.NewDC(qd, cj),
done: make(chan struct{}),
// done: make(chan struct{}),
}
s.RegisterQcDataHandleScheduleTask(s.readTaskExecute, time.Duration(config.Interval)*time.Millisecond)
// s.initOnUpdateTask()
ctx, cancel := context.WithCancel(context.Background())
go s.run(ctx)
s.cancel = cancel
// ctx, cancel := context.WithCancel(context.Background())
// go s.run(ctx)
// s.cancel = cancel
return s, nil
}
@ -178,23 +196,23 @@ func (m *modbusQcService) onWrite(dt sproto.DataType, bytes []byte) error {
// }
// }
func (m *modbusQcService) run(ctx context.Context) {
defer close(m.done)
mainLoop:
for {
select {
case <-ctx.Done():
slog.Debug("Modbus驱采映射循环取消,关闭modbus客户端", "url", m.config.Url)
modbus.DeleteClient(m.config.Url)
break mainLoop
default:
}
m.mappingTaskExecute()
time.Sleep(time.Millisecond * time.Duration(m.config.Interval))
}
}
// func (m *modbusQcService) run(ctx context.Context) {
// defer close(m.done)
// mainLoop:
// for {
// select {
// case <-ctx.Done():
// slog.Debug("Modbus驱采映射循环取消,关闭modbus客户端", "url", m.config.Url)
// modbus.DeleteClient(m.config.Url)
// break mainLoop
// default:
// }
// m.readTaskExecute()
// time.Sleep(time.Millisecond * time.Duration(m.config.Interval))
// }
// }
func (m *modbusQcService) mappingTaskExecute() {
func (m *modbusQcService) readTaskExecute() {
if m.cli.IsConnected() {
for _, mdm := range m.config.Mapping {
switch mdm.Function {

50
service/qc_handle_task.go Normal file
View File

@ -0,0 +1,50 @@
package service
import (
"context"
"log/slog"
"time"
)
type qcHandleTask struct {
fn func()
interval time.Duration
cancel context.CancelFunc
done chan struct{} // 服务协程退出信号
}
// Stop implements QcDataHandleScheduleTask.
func (t *qcHandleTask) Stop() {
t.cancel()
<-t.done
slog.Info("驱采数据处理任务线程退出")
}
func (t *qcHandleTask) run(ctx context.Context) {
defer close(t.done)
mainLoop:
for {
select {
case <-ctx.Done():
break mainLoop
default:
}
t.fn()
time.Sleep(t.interval)
}
}
func NewQcHandleTask(run func(), interval time.Duration) QcDataHandleScheduleTask {
if interval <= 0 {
interval = time.Second
}
task := &qcHandleTask{
fn: run,
interval: interval,
done: make(chan struct{}),
}
ctx, cancel := context.WithCancel(context.Background())
go task.run(ctx)
task.cancel = cancel
return task
}