refactor(events): start runtime event consumer migration

Deprecate the legacy agent event APIs and add a runtime event test helper, then migrate the follow-up queued test to the runtime event stream.

Validation: go test ./pkg/agent; make lint
This commit is contained in:
Hoshina
2026-04-26 16:11:09 +08:00
parent 8caf9aeb2b
commit d9717b5632
6 changed files with 99 additions and 7 deletions
+6
View File
@@ -166,6 +166,8 @@ func (al *AgentLoop) UnmountHook(name string) {
}
// SubscribeEvents registers a subscriber for agent-loop events.
//
// Deprecated: use RuntimeEvents for new event observation code.
func (al *AgentLoop) SubscribeEvents(buffer int) EventSubscription {
if al == nil || al.eventBus == nil {
ch := make(chan Event)
@@ -176,6 +178,8 @@ func (al *AgentLoop) SubscribeEvents(buffer int) EventSubscription {
}
// UnsubscribeEvents removes a previously registered event subscriber.
//
// Deprecated: use the Subscription returned by RuntimeEvents instead.
func (al *AgentLoop) UnsubscribeEvents(id uint64) {
if al == nil || al.eventBus == nil {
return
@@ -184,6 +188,8 @@ func (al *AgentLoop) UnsubscribeEvents(id uint64) {
}
// EventDrops returns the number of dropped events for the given kind.
//
// Deprecated: use RuntimeEventStats for runtime event drop counters.
func (al *AgentLoop) EventDrops(kind EventKind) int64 {
if al == nil || al.eventBus == nil {
return 0
+7
View File
@@ -9,6 +9,8 @@ import (
const defaultEventSubscriberBuffer = 16
// EventSubscription identifies a subscriber channel returned by EventBus.Subscribe.
//
// Deprecated: use pkg/events.Subscription from RuntimeEvents for new code.
type EventSubscription struct {
ID uint64
C <-chan Event
@@ -19,6 +21,9 @@ type eventSubscriber struct {
}
// EventBus is a lightweight multi-subscriber broadcaster for agent-loop events.
//
// Deprecated: use pkg/events.EventBus for new code. This legacy bus remains
// only while existing agent event consumers migrate.
type EventBus struct {
mu sync.RWMutex
subs map[uint64]eventSubscriber
@@ -28,6 +33,8 @@ type EventBus struct {
}
// NewEventBus creates a new in-process event broadcaster.
//
// Deprecated: use events.NewBus for new runtime event publishers.
func NewEventBus() *EventBus {
return &EventBus{
subs: make(map[uint64]eventSubscriber),
+12 -7
View File
@@ -670,8 +670,13 @@ func TestAgentLoop_EmitsFollowUpQueuedEvent(t *testing.T) {
t.Fatal("expected default agent")
}
sub := al.SubscribeEvents(32)
defer al.UnsubscribeEvents(sub.ID)
runtimeCh, closeRuntimeEvents := subscribeRuntimeEventsForTest(
t,
al,
32,
runtimeevents.KindAgentFollowUpQueued,
)
defer closeRuntimeEvents()
resp, err := al.runAgentLoop(context.Background(), defaultAgent, processOptions{
SessionKey: "session-1",
@@ -695,8 +700,8 @@ func TestAgentLoop_EmitsFollowUpQueuedEvent(t *testing.T) {
t.Fatal("timeout waiting for async tool completion")
}
followUpEvt := waitForEvent(t, sub.C, 2*time.Second, func(evt Event) bool {
return evt.Kind == EventKindFollowUpQueued
followUpEvt := waitForRuntimeEvent(t, runtimeCh, 2*time.Second, func(evt runtimeevents.Event) bool {
return evt.Kind == runtimeevents.KindAgentFollowUpQueued
})
payload, ok := followUpEvt.Payload.(FollowUpQueuedPayload)
if !ok {
@@ -708,10 +713,10 @@ func TestAgentLoop_EmitsFollowUpQueuedEvent(t *testing.T) {
if payload.ContentLen != len("background result") {
t.Fatalf("expected content len %d, got %d", len("background result"), payload.ContentLen)
}
if followUpEvt.Meta.SessionKey != "session-1" {
t.Fatalf("expected session key session-1, got %q", followUpEvt.Meta.SessionKey)
if followUpEvt.Scope.SessionKey != "session-1" {
t.Fatalf("expected session key session-1, got %q", followUpEvt.Scope.SessionKey)
}
if followUpEvt.Meta.TurnID == "" {
if followUpEvt.Scope.TurnID == "" {
t.Fatal("expected follow-up event to include turn id")
}
}
+7
View File
@@ -6,6 +6,10 @@ import (
)
// EventKind identifies a structured agent-loop event.
//
// Deprecated: use github.com/sipeed/picoclaw/pkg/events.Kind for new runtime
// event consumers. This legacy kind exists only during the runtime event
// migration window.
type EventKind uint8
const (
@@ -82,6 +86,9 @@ func (k EventKind) String() string {
}
// Event is the structured envelope broadcast by the agent EventBus.
//
// Deprecated: use github.com/sipeed/picoclaw/pkg/events.Event for new
// observation code. Agent payload types remain supported.
type Event struct {
Kind EventKind
Time time.Time
+1
View File
@@ -73,6 +73,7 @@ func NamedHook(name string, hook any) HookRegistration {
}
type EventObserver interface {
// Deprecated: implement RuntimeEventObserver for new observation code.
OnEvent(ctx context.Context, evt Event) error
}
+66
View File
@@ -0,0 +1,66 @@
package agent
import (
"testing"
"time"
runtimeevents "github.com/sipeed/picoclaw/pkg/events"
)
func subscribeRuntimeEventsForTest(
t *testing.T,
al *AgentLoop,
buffer int,
kinds ...runtimeevents.Kind,
) (<-chan runtimeevents.Event, func()) {
t.Helper()
if al == nil {
t.Fatal("agent loop is nil")
}
channel := al.RuntimeEvents()
if channel == nil {
t.Fatal("runtime event channel is nil")
}
if len(kinds) > 0 {
channel = channel.OfKind(kinds...)
}
sub, ch, err := channel.SubscribeChan(
t.Context(),
runtimeevents.SubscribeOptions{Name: "agent-runtime-test", Buffer: buffer},
)
if err != nil {
t.Fatalf("SubscribeChan failed: %v", err)
}
return ch, func() {
if err := sub.Close(); err != nil {
t.Errorf("runtime subscription close failed: %v", err)
}
}
}
func waitForRuntimeEvent(
t *testing.T,
ch <-chan runtimeevents.Event,
timeout time.Duration,
match func(runtimeevents.Event) bool,
) runtimeevents.Event {
t.Helper()
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case evt, ok := <-ch:
if !ok {
t.Fatal("runtime event stream closed before expected event arrived")
}
if match(evt) {
return evt
}
case <-timer.C:
t.Fatal("timed out waiting for expected runtime event")
}
}
}