修改事件封装

修改世界状态更新事件处理方式
This commit is contained in:
walker 2023-12-25 10:57:54 +08:00
parent bd947baa4e
commit e83feb89dd
3 changed files with 91 additions and 66 deletions

View File

@ -9,15 +9,16 @@ import (
"github.com/yohamta/donburi/features/events" "github.com/yohamta/donburi/features/events"
) )
// 注意事件相关操作的最终执行者均为world协程 // 事件订阅发布功能,底层使用donburi事件订阅发布
// 注意:事件订阅处理中应避免再次发布事件
// 事件相关定义 // 事件相关定义
type ( type (
// 事件类型定义 // 事件类型定义
EventType[T any] struct { EventType[T any] struct {
et *events.EventType[T] et *events.EventType[T]
subscriberMap map[string]events.Subscriber[T] mu sync.Mutex // 锁
subscriberMapLock sync.Mutex // subMap map[string]events.Subscriber[T]
} }
// 事件订阅者定义 // 事件订阅者定义
Subscriber[T any] func(w World, event T) Subscriber[T any] func(w World, event T)
@ -32,8 +33,8 @@ func EventsDebugEnable() {
func NewEventType[T any]() *EventType[T] { func NewEventType[T any]() *EventType[T] {
return &EventType[T]{ return &EventType[T]{
et: events.NewEventType[T](), et: events.NewEventType[T](),
subscriberMap: make(map[string]events.Subscriber[T]), mu: sync.Mutex{},
subscriberMapLock: sync.Mutex{}, subMap: make(map[string]events.Subscriber[T], 0),
} }
} }
@ -42,52 +43,47 @@ func processAllEvents(w World) {
events.ProcessAllEvents(w) events.ProcessAllEvents(w)
} }
// 执行事件
func (me *EventType[T]) ProcessEvents(wd World) {
me.mu.Lock()
defer me.mu.Unlock()
me.et.ProcessEvents(wd)
}
// 发布该类型的事件 // 发布该类型的事件
func (me *EventType[T]) Publish(wd World, event *T) { func (me *EventType[T]) Publish(wd World, event *T) {
wd.Execute(func() { me.mu.Lock()
defer me.mu.Unlock()
me.et.Publish(wd, *event) me.et.Publish(wd, *event)
})
} }
// 内部发布事件(同步发布,并直接触发)
func (me *EventType[T]) internalPublish(wd World, event *T) {
me.et.Publish(wd, *event)
processAllEvents(wd)
}
///////////////////////////////////////////////////////////////////////////////////
// 订阅该类型的事件 // 订阅该类型的事件
func (me *EventType[T]) Subscribe(wd World, subscriber Subscriber[T]) { func (me *EventType[T]) Subscribe(wd World, subscriber Subscriber[T]) {
me.subscriberMapLock.Lock() me.mu.Lock()
defer me.subscriberMapLock.Unlock() defer me.mu.Unlock()
subscriberKey := subscriberKey[T](wd, subscriber) sp := buildKey(wd, subscriber)
if _, exist := me.subscriberMap[subscriberKey]; !exist { if _, ok := me.subMap[sp]; ok { // 已经订阅过
fn := func(w donburi.World, event T) { return
}
sub := func(w donburi.World, event T) {
subscriber(wd, event) subscriber(wd, event)
} }
me.subscriberMap[subscriberKey] = fn me.subMap[sp] = sub
wd.Execute(func() { me.et.Subscribe(wd, sub)
me.et.Subscribe(wd, fn)
})
}
} }
// 取消订阅该类型的事件 // 取消订阅该类型的事件
func (me *EventType[T]) Unsubscribe(wd World, subscriber Subscriber[T]) { func (me *EventType[T]) Unsubscribe(wd World, subscriber Subscriber[T]) {
me.subscriberMapLock.Lock() me.mu.Lock()
defer me.subscriberMapLock.Unlock() defer me.mu.Unlock()
subscriberKey := subscriberKey[T](wd, subscriber) sp := buildKey(wd, subscriber)
if sub, ok := me.subscriberMap[subscriberKey]; ok { if sub, ok := me.subMap[sp]; ok { // 存在, 取消订阅
wd.Execute(func() {
me.et.Unsubscribe(wd, sub) me.et.Unsubscribe(wd, sub)
delete(me.subscriberMap, subscriberKey)
})
} }
} }
func subscriberKey[T any](wd World, subscriber Subscriber[T]) string { // 生成订阅键
wdSubscriberPointer := reflect.ValueOf(subscriber).Pointer() func buildKey[T any](wd World, subscriber Subscriber[T]) string {
subscriberKey := fmt.Sprintf("%d-%d", wd.Id(), wdSubscriberPointer) sp := fmt.Sprintf("%v_%v", wd.Id(), reflect.ValueOf(subscriber).Pointer())
return subscriberKey return sp
} }

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"context"
"fmt" "fmt"
"time" "time"
@ -43,8 +44,25 @@ var (
func main() { func main() {
go world1() go world1()
go world2() go world2()
time.Sleep(4 * time.Second) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
for {
select {
case <-ctx.Done():
return
default:
}
fireSwitchDcEventType.Publish(wd2, &FireSwitchDcEvent{Dc: false}) fireSwitchDcEventType.Publish(wd2, &FireSwitchDcEvent{Dc: false})
time.Sleep(10 * time.Millisecond)
}
}()
time.Sleep(2 * time.Second)
wd1.Close()
wd2.Close()
time.Sleep(5 * time.Second)
} }
func world1() { func world1() {
world := wd1 world := wd1
@ -52,6 +70,9 @@ func world1() {
// //
switchSystem := NewSwitchSystem() switchSystem := NewSwitchSystem()
world.AddSystem(switchSystem) world.AddSystem(switchSystem)
ecs.WorldStateChangeEvent.Subscribe(wd1, func(w ecs.World, event ecs.WorldStateChange) {
fmt.Println("世界1状态变更", event)
})
// //
fireSwitchDcEventType.Subscribe(world, switchSystem.WhenFireSwitchDcEvent) fireSwitchDcEventType.Subscribe(world, switchSystem.WhenFireSwitchDcEvent)
// //
@ -68,6 +89,10 @@ func world2() {
switchSystem := NewSwitchSystem() switchSystem := NewSwitchSystem()
world.AddSystem(switchSystem) world.AddSystem(switchSystem)
fireSwitchDcEventType.Subscribe(world, switchSystem.WhenFireSwitchDcEvent) fireSwitchDcEventType.Subscribe(world, switchSystem.WhenFireSwitchDcEvent)
ecs.WorldStateChangeEvent.Subscribe(wd2, func(w ecs.World, event ecs.WorldStateChange) {
fmt.Println("世界2状态变更", event)
})
// //
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
// //

View File

@ -1,6 +1,7 @@
package ecs package ecs
import ( import (
"context"
"fmt" "fmt"
"log/slog" "log/slog"
"math" "math"
@ -16,12 +17,11 @@ type WorldState int
type WorldId = donburi.WorldId type WorldId = donburi.WorldId
const ( const (
WorldInit WorldState = iota WorldInit WorldState = 0
WorldRunning WorldRunning WorldState = 1
WorldPause WorldPause WorldState = 2
WorldError WorldError WorldState = 3
WorldClose WorldClosed WorldState = 4
WorldClosed
) )
type ( type (
@ -38,7 +38,7 @@ type (
Resume() Resume()
// 关闭世界 // 关闭世界
Close() Close()
// 设置时间运行倍速 // 设置运行倍速
SetSpeed(speed float64) error SetSpeed(speed float64) error
// 添加系统 // 添加系统
AddSystem(sys ...ISystem) AddSystem(sys ...ISystem)
@ -72,6 +72,9 @@ type world struct {
// 待执行函数 // 待执行函数
toBeExecuteds chan HandleFunc toBeExecuteds chan HandleFunc
cancel context.CancelFunc
done chan struct{} // 服务协程退出信号
} }
// 新建一个组件类型 // 新建一个组件类型
@ -109,11 +112,10 @@ func NewWorld(tick int) World {
speed: 1, speed: 1,
times: 1, times: 1,
toBeExecuteds: make(chan HandleFunc, 32), toBeExecuteds: make(chan HandleFunc, 32),
done: make(chan struct{}),
} }
} }
func (w *world) Running() bool {
return w.state == WorldRunning
}
func (w *world) Tick() int { func (w *world) Tick() int {
return w.tick return w.tick
} }
@ -147,10 +149,11 @@ func (w *world) updateState(state WorldState) {
old := w.state old := w.state
slog.Debug("世界状态变更", "oldstate", old, "state", state) slog.Debug("世界状态变更", "oldstate", old, "state", state)
w.state = state w.state = state
WorldStateChangeEvent.internalPublish(w, &WorldStateChange{ WorldStateChangeEvent.Publish(w, &WorldStateChange{
OldState: old, OldState: old,
NewState: state, NewState: state,
}) })
WorldStateChangeEvent.ProcessEvents(w)
} }
} }
@ -179,8 +182,11 @@ func (w *world) SetSpeed(speed float64) error {
// 启动世界,世界逻辑开始执行且世界为运行状态 // 启动世界,世界逻辑开始执行且世界为运行状态
func (w *world) StartUp() { func (w *world) StartUp() {
if w.state == WorldInit { // 避免重复运行 if w.state == WorldInit { // 避免重复运行
slog.Debug("启动世界", "id", w.Id())
ctx, cancle := context.WithCancel(context.Background())
go w.run(ctx)
w.cancel = cancle
w.updateState(WorldRunning) w.updateState(WorldRunning)
go w.run()
} }
} }
@ -188,7 +194,7 @@ func (w *world) StartUp() {
func (w *world) Execute(fn HandleFunc) error { func (w *world) Execute(fn HandleFunc) error {
if w.state == WorldError { if w.state == WorldError {
return fmt.Errorf("世界运行异常,无法执行请求") return fmt.Errorf("世界运行异常,无法执行请求")
} else if w.state != WorldRunning && w.state != WorldPause { } else if w.state == WorldClosed {
return fmt.Errorf("世界已经关闭,无法执行请求") return fmt.Errorf("世界已经关闭,无法执行请求")
} }
w.toBeExecuteds <- fn w.toBeExecuteds <- fn
@ -197,13 +203,8 @@ func (w *world) Execute(fn HandleFunc) error {
// 关闭世界 // 关闭世界
func (w *world) Close() { func (w *world) Close() {
if w.state == WorldRunning || w.state == WorldPause { w.cancel()
w.Execute(func() { <-w.done
w.updateState(WorldClose)
})
} else if w.state == WorldError {
w.updateState(WorldClosed)
}
} }
// 执行待处理方法 // 执行待处理方法
@ -222,7 +223,7 @@ func (w *world) executeTodos() {
} }
// 世界循环 // 世界循环
func (w *world) run() { func (w *world) run(ctx context.Context) {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
w.exception(err) w.exception(err)
@ -230,13 +231,15 @@ func (w *world) run() {
debug.PrintStack() debug.PrintStack()
} }
}() }()
defer close(w.done)
for range w.ticker.C { for range w.ticker.C {
// slog.Debug("世界运行") select {
if w.state == WorldClose { case <-ctx.Done():
// 世界正常关闭
w.close() w.close()
return return
default:
} }
// start := time.Now() // start := time.Now()
if w.state != WorldRunning { // 世界非运行状态 if w.state != WorldRunning { // 世界非运行状态
continue continue
@ -272,6 +275,7 @@ func (w *world) exception(err any) {
// 世界正常关闭逻辑 // 世界正常关闭逻辑
func (w *world) close() { func (w *world) close() {
slog.Debug("关闭世界", "id", w.Id())
// 世界正常关闭 // 世界正常关闭
w.updateState(WorldClosed) w.updateState(WorldClosed)
// 关闭定时器 // 关闭定时器