From d9717b5632a8858f0c69919fbf4b304296d4adfa Mon Sep 17 00:00:00 2001 From: Hoshina Date: Sun, 26 Apr 2026 16:11:09 +0800 Subject: [PATCH] refactor(events): start runtime event consumer migration Deprecate the legacy agent event APIs and add a runtime event test helper, then migrate the follow-up queued test to the runtime event stream. Validation: go test ./pkg/agent; make lint --- pkg/agent/agent_event.go | 6 +++ pkg/agent/eventbus.go | 7 ++++ pkg/agent/eventbus_test.go | 19 ++++++---- pkg/agent/events.go | 7 ++++ pkg/agent/hooks.go | 1 + pkg/agent/runtime_event_test.go | 66 +++++++++++++++++++++++++++++++++ 6 files changed, 99 insertions(+), 7 deletions(-) create mode 100644 pkg/agent/runtime_event_test.go diff --git a/pkg/agent/agent_event.go b/pkg/agent/agent_event.go index 5ba6c579c..6bbd275e5 100644 --- a/pkg/agent/agent_event.go +++ b/pkg/agent/agent_event.go @@ -166,6 +166,8 @@ func (al *AgentLoop) UnmountHook(name string) { } // SubscribeEvents registers a subscriber for agent-loop events. +// +// Deprecated: use RuntimeEvents for new event observation code. func (al *AgentLoop) SubscribeEvents(buffer int) EventSubscription { if al == nil || al.eventBus == nil { ch := make(chan Event) @@ -176,6 +178,8 @@ func (al *AgentLoop) SubscribeEvents(buffer int) EventSubscription { } // UnsubscribeEvents removes a previously registered event subscriber. +// +// Deprecated: use the Subscription returned by RuntimeEvents instead. func (al *AgentLoop) UnsubscribeEvents(id uint64) { if al == nil || al.eventBus == nil { return @@ -184,6 +188,8 @@ func (al *AgentLoop) UnsubscribeEvents(id uint64) { } // EventDrops returns the number of dropped events for the given kind. +// +// Deprecated: use RuntimeEventStats for runtime event drop counters. func (al *AgentLoop) EventDrops(kind EventKind) int64 { if al == nil || al.eventBus == nil { return 0 diff --git a/pkg/agent/eventbus.go b/pkg/agent/eventbus.go index 546d8436d..54ada62fb 100644 --- a/pkg/agent/eventbus.go +++ b/pkg/agent/eventbus.go @@ -9,6 +9,8 @@ import ( const defaultEventSubscriberBuffer = 16 // EventSubscription identifies a subscriber channel returned by EventBus.Subscribe. +// +// Deprecated: use pkg/events.Subscription from RuntimeEvents for new code. type EventSubscription struct { ID uint64 C <-chan Event @@ -19,6 +21,9 @@ type eventSubscriber struct { } // EventBus is a lightweight multi-subscriber broadcaster for agent-loop events. +// +// Deprecated: use pkg/events.EventBus for new code. This legacy bus remains +// only while existing agent event consumers migrate. type EventBus struct { mu sync.RWMutex subs map[uint64]eventSubscriber @@ -28,6 +33,8 @@ type EventBus struct { } // NewEventBus creates a new in-process event broadcaster. +// +// Deprecated: use events.NewBus for new runtime event publishers. func NewEventBus() *EventBus { return &EventBus{ subs: make(map[uint64]eventSubscriber), diff --git a/pkg/agent/eventbus_test.go b/pkg/agent/eventbus_test.go index 4a5de46c9..614f12f9d 100644 --- a/pkg/agent/eventbus_test.go +++ b/pkg/agent/eventbus_test.go @@ -670,8 +670,13 @@ func TestAgentLoop_EmitsFollowUpQueuedEvent(t *testing.T) { t.Fatal("expected default agent") } - sub := al.SubscribeEvents(32) - defer al.UnsubscribeEvents(sub.ID) + runtimeCh, closeRuntimeEvents := subscribeRuntimeEventsForTest( + t, + al, + 32, + runtimeevents.KindAgentFollowUpQueued, + ) + defer closeRuntimeEvents() resp, err := al.runAgentLoop(context.Background(), defaultAgent, processOptions{ SessionKey: "session-1", @@ -695,8 +700,8 @@ func TestAgentLoop_EmitsFollowUpQueuedEvent(t *testing.T) { t.Fatal("timeout waiting for async tool completion") } - followUpEvt := waitForEvent(t, sub.C, 2*time.Second, func(evt Event) bool { - return evt.Kind == EventKindFollowUpQueued + followUpEvt := waitForRuntimeEvent(t, runtimeCh, 2*time.Second, func(evt runtimeevents.Event) bool { + return evt.Kind == runtimeevents.KindAgentFollowUpQueued }) payload, ok := followUpEvt.Payload.(FollowUpQueuedPayload) if !ok { @@ -708,10 +713,10 @@ func TestAgentLoop_EmitsFollowUpQueuedEvent(t *testing.T) { if payload.ContentLen != len("background result") { t.Fatalf("expected content len %d, got %d", len("background result"), payload.ContentLen) } - if followUpEvt.Meta.SessionKey != "session-1" { - t.Fatalf("expected session key session-1, got %q", followUpEvt.Meta.SessionKey) + if followUpEvt.Scope.SessionKey != "session-1" { + t.Fatalf("expected session key session-1, got %q", followUpEvt.Scope.SessionKey) } - if followUpEvt.Meta.TurnID == "" { + if followUpEvt.Scope.TurnID == "" { t.Fatal("expected follow-up event to include turn id") } } diff --git a/pkg/agent/events.go b/pkg/agent/events.go index f68d3eab5..a2d466b9e 100644 --- a/pkg/agent/events.go +++ b/pkg/agent/events.go @@ -6,6 +6,10 @@ import ( ) // EventKind identifies a structured agent-loop event. +// +// Deprecated: use github.com/sipeed/picoclaw/pkg/events.Kind for new runtime +// event consumers. This legacy kind exists only during the runtime event +// migration window. type EventKind uint8 const ( @@ -82,6 +86,9 @@ func (k EventKind) String() string { } // Event is the structured envelope broadcast by the agent EventBus. +// +// Deprecated: use github.com/sipeed/picoclaw/pkg/events.Event for new +// observation code. Agent payload types remain supported. type Event struct { Kind EventKind Time time.Time diff --git a/pkg/agent/hooks.go b/pkg/agent/hooks.go index b84362fc1..0a6c6785a 100644 --- a/pkg/agent/hooks.go +++ b/pkg/agent/hooks.go @@ -73,6 +73,7 @@ func NamedHook(name string, hook any) HookRegistration { } type EventObserver interface { + // Deprecated: implement RuntimeEventObserver for new observation code. OnEvent(ctx context.Context, evt Event) error } diff --git a/pkg/agent/runtime_event_test.go b/pkg/agent/runtime_event_test.go new file mode 100644 index 000000000..47fd6923a --- /dev/null +++ b/pkg/agent/runtime_event_test.go @@ -0,0 +1,66 @@ +package agent + +import ( + "testing" + "time" + + runtimeevents "github.com/sipeed/picoclaw/pkg/events" +) + +func subscribeRuntimeEventsForTest( + t *testing.T, + al *AgentLoop, + buffer int, + kinds ...runtimeevents.Kind, +) (<-chan runtimeevents.Event, func()) { + t.Helper() + + if al == nil { + t.Fatal("agent loop is nil") + } + channel := al.RuntimeEvents() + if channel == nil { + t.Fatal("runtime event channel is nil") + } + if len(kinds) > 0 { + channel = channel.OfKind(kinds...) + } + sub, ch, err := channel.SubscribeChan( + t.Context(), + runtimeevents.SubscribeOptions{Name: "agent-runtime-test", Buffer: buffer}, + ) + if err != nil { + t.Fatalf("SubscribeChan failed: %v", err) + } + return ch, func() { + if err := sub.Close(); err != nil { + t.Errorf("runtime subscription close failed: %v", err) + } + } +} + +func waitForRuntimeEvent( + t *testing.T, + ch <-chan runtimeevents.Event, + timeout time.Duration, + match func(runtimeevents.Event) bool, +) runtimeevents.Event { + t.Helper() + + timer := time.NewTimer(timeout) + defer timer.Stop() + + for { + select { + case evt, ok := <-ch: + if !ok { + t.Fatal("runtime event stream closed before expected event arrived") + } + if match(evt) { + return evt + } + case <-timer.C: + t.Fatal("timed out waiting for expected runtime event") + } + } +}