From dc80e8f5f20662a9f8d0587b6f89bf04bf87b541 Mon Sep 17 00:00:00 2001 From: Hoshina Date: Sun, 26 Apr 2026 16:23:58 +0800 Subject: [PATCH] test(events): migrate agent tests to runtime events Move AgentLoop event assertions to the runtime event stream and keep the legacy SubscribeEvents test only for dual-publish compatibility. Validation: go test ./pkg/agent; make lint --- pkg/agent/agent_test.go | 15 ++- pkg/agent/context_manager_test.go | 43 ++++++--- pkg/agent/eventbus_test.go | 148 ++++++++++++------------------ pkg/agent/hook_process_test.go | 14 ++- pkg/agent/hooks_test.go | 95 +++++++++++-------- pkg/agent/runtime_event_test.go | 37 ++++++++ pkg/agent/steering_test.go | 33 +++++-- pkg/agent/subturn_test.go | 75 ++++++++++----- 8 files changed, 277 insertions(+), 183 deletions(-) diff --git a/pkg/agent/agent_test.go b/pkg/agent/agent_test.go index 01657d43a..13c5b21a0 100644 --- a/pkg/agent/agent_test.go +++ b/pkg/agent/agent_test.go @@ -19,6 +19,7 @@ import ( "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/channels" "github.com/sipeed/picoclaw/pkg/config" + runtimeevents "github.com/sipeed/picoclaw/pkg/events" "github.com/sipeed/picoclaw/pkg/media" "github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/routing" @@ -5217,13 +5218,19 @@ func TestParallelMessageProcessing_SameSessionProcessedSequentially(t *testing.T }) defer al.Close() - sub := al.SubscribeEvents(64) + runtimeCh, closeRuntimeEvents := subscribeRuntimeEventsForTest( + t, + al, + 64, + runtimeevents.KindAgentTurnStart, + ) + defer closeRuntimeEvents() go func() { - for evt := range sub.C { - if evt.Kind == EventKindTurnStart { + for evt := range runtimeCh { + if evt.Kind == runtimeevents.KindAgentTurnStart { mu.Lock() - turnIDs[evt.Meta.TurnID] = true + turnIDs[evt.Scope.TurnID] = true mu.Unlock() } } diff --git a/pkg/agent/context_manager_test.go b/pkg/agent/context_manager_test.go index 629d11fcb..46e521be4 100644 --- a/pkg/agent/context_manager_test.go +++ b/pkg/agent/context_manager_test.go @@ -12,6 +12,7 @@ import ( "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/config" + runtimeevents "github.com/sipeed/picoclaw/pkg/events" "github.com/sipeed/picoclaw/pkg/providers" ) @@ -305,8 +306,13 @@ func TestLegacyCompact_Overflow(t *testing.T) { } defaultAgent.Sessions.SetHistory("session-overflow", history) - sub := al.SubscribeEvents(16) - defer al.UnsubscribeEvents(sub.ID) + runtimeCh, closeRuntimeEvents := subscribeRuntimeEventsForTest( + t, + al, + 16, + runtimeevents.KindAgentContextCompress, + ) + defer closeRuntimeEvents() err := al.contextManager.Compact(context.Background(), &CompactRequest{ SessionKey: "session-overflow", @@ -329,8 +335,8 @@ func TestLegacyCompact_Overflow(t *testing.T) { } // Event should carry the proactive reason - events := collectEventStream(sub.C) - compressEvt, ok := findEvent(events, EventKindContextCompress) + events := collectRuntimeEventStream(runtimeCh) + compressEvt, ok := findRuntimeEvent(events, runtimeevents.KindAgentContextCompress) if !ok { t.Fatal("expected context compress event") } @@ -361,8 +367,13 @@ func TestLegacyCompact_Overflow_ProactiveReason(t *testing.T) { } defaultAgent.Sessions.SetHistory("session-proactive", history) - sub := al.SubscribeEvents(16) - defer al.UnsubscribeEvents(sub.ID) + runtimeCh, closeRuntimeEvents := subscribeRuntimeEventsForTest( + t, + al, + 16, + runtimeevents.KindAgentContextCompress, + ) + defer closeRuntimeEvents() err := al.contextManager.Compact(context.Background(), &CompactRequest{ SessionKey: "session-proactive", @@ -372,8 +383,8 @@ func TestLegacyCompact_Overflow_ProactiveReason(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - events := collectEventStream(sub.C) - compressEvt, ok := findEvent(events, EventKindContextCompress) + events := collectRuntimeEventStream(runtimeCh) + compressEvt, ok := findRuntimeEvent(events, runtimeevents.KindAgentContextCompress) if !ok { t.Fatal("expected context compress event") } @@ -483,6 +494,14 @@ func TestLegacyCompact_PostTurn_ExceedsMessageThreshold(t *testing.T) { } defaultAgent.Sessions.SetHistory("session-threshold", history) + runtimeCh, closeRuntimeEvents := subscribeRuntimeEventsForTest( + t, + al, + 16, + runtimeevents.KindAgentSessionSummarize, + ) + defer closeRuntimeEvents() + err := al.contextManager.Compact(context.Background(), &CompactRequest{ SessionKey: "session-threshold", Reason: ContextCompressReasonSummarize, @@ -491,12 +510,8 @@ func TestLegacyCompact_PostTurn_ExceedsMessageThreshold(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - // Wait for async summarization to complete via event - sub := al.SubscribeEvents(16) - defer al.UnsubscribeEvents(sub.ID) - - waitForEvent(t, sub.C, 5*time.Second, func(evt Event) bool { - return evt.Kind == EventKindSessionSummarize + waitForRuntimeEvent(t, runtimeCh, 5*time.Second, func(evt runtimeevents.Event) bool { + return evt.Kind == runtimeevents.KindAgentSessionSummarize }) newHistory := defaultAgent.Sessions.GetHistory("session-threshold") diff --git a/pkg/agent/eventbus_test.go b/pkg/agent/eventbus_test.go index 614f12f9d..87b1c68b1 100644 --- a/pkg/agent/eventbus_test.go +++ b/pkg/agent/eventbus_test.go @@ -222,8 +222,18 @@ func TestAgentLoop_EmitsMinimalTurnEvents(t *testing.T) { t.Fatal("expected default agent") } - sub := al.SubscribeEvents(16) - defer al.UnsubscribeEvents(sub.ID) + expectedKinds := []runtimeevents.Kind{ + runtimeevents.KindAgentTurnStart, + runtimeevents.KindAgentLLMRequest, + runtimeevents.KindAgentLLMResponse, + runtimeevents.KindAgentToolExecStart, + runtimeevents.KindAgentToolExecEnd, + runtimeevents.KindAgentLLMRequest, + runtimeevents.KindAgentLLMResponse, + runtimeevents.KindAgentTurnEnd, + } + runtimeCh, closeRuntimeEvents := subscribeRuntimeEventsForTest(t, al, 16, expectedKinds...) + defer closeRuntimeEvents() response, err := al.runAgentLoop(context.Background(), defaultAgent, processOptions{ SessionKey: "session-1", @@ -266,49 +276,36 @@ func TestAgentLoop_EmitsMinimalTurnEvents(t *testing.T) { t.Fatalf("expected final response 'done', got %q", response) } - events := collectEventStream(sub.C) + events := collectRuntimeEventStream(runtimeCh) if len(events) != 8 { t.Fatalf("expected 8 events, got %d", len(events)) } - kinds := make([]EventKind, 0, len(events)) + kinds := make([]runtimeevents.Kind, 0, len(events)) for _, evt := range events { kinds = append(kinds, evt.Kind) } - expectedKinds := []EventKind{ - EventKindTurnStart, - EventKindLLMRequest, - EventKindLLMResponse, - EventKindToolExecStart, - EventKindToolExecEnd, - EventKindLLMRequest, - EventKindLLMResponse, - EventKindTurnEnd, - } if !slices.Equal(kinds, expectedKinds) { t.Fatalf("unexpected event sequence: got %v want %v", kinds, expectedKinds) } - turnID := events[0].Meta.TurnID + turnID := events[0].Scope.TurnID + if turnID == "" { + t.Fatal("expected runtime events to include turn id") + } for i, evt := range events { - if evt.Meta.TurnID != turnID { - t.Fatalf("event %d has mismatched turn id %q, want %q", i, evt.Meta.TurnID, turnID) + if evt.Scope.TurnID != turnID { + t.Fatalf("event %d has mismatched turn id %q, want %q", i, evt.Scope.TurnID, turnID) } - if evt.Meta.SessionKey != "session-1" { - t.Fatalf("event %d has session key %q, want session-1", i, evt.Meta.SessionKey) + if evt.Scope.SessionKey != "session-1" { + t.Fatalf("event %d has session key %q, want session-1", i, evt.Scope.SessionKey) } - if evt.Context == nil || evt.Context.Inbound == nil { - t.Fatalf("event %d missing inbound turn context", i) + if evt.Scope.Channel != "cli" || evt.Scope.ChatID != "direct" || evt.Scope.SenderID != "tester" { + t.Fatalf("event %d scope = %+v", i, evt.Scope) } - if evt.Context.Inbound.Channel != "cli" || evt.Context.Inbound.SenderID != "tester" { - t.Fatalf("event %d inbound context = %+v", i, evt.Context.Inbound) - } - if evt.Context.Route == nil || evt.Context.Route.AgentID != "main" { - t.Fatalf("event %d missing route context: %+v", i, evt.Context.Route) - } - if evt.Context.Scope == nil || evt.Context.Scope.Values["sender"] != "tester" { - t.Fatalf("event %d missing session scope: %+v", i, evt.Context.Scope) + if evt.Scope.AgentID != "main" { + t.Fatalf("event %d has agent id %q, want main", i, evt.Scope.AgentID) } } @@ -404,8 +401,15 @@ func TestAgentLoop_EmitsSteeringAndSkippedToolEvents(t *testing.T) { al.RegisterTool(tool1) al.RegisterTool(tool2) - sub := al.SubscribeEvents(32) - defer al.UnsubscribeEvents(sub.ID) + runtimeCh, closeRuntimeEvents := subscribeRuntimeEventsForTest( + t, + al, + 32, + runtimeevents.KindAgentSteeringInjected, + runtimeevents.KindAgentToolExecSkipped, + runtimeevents.KindAgentInterruptReceived, + ) + defer closeRuntimeEvents() resultCh := make(chan string, 1) go func() { @@ -432,8 +436,8 @@ func TestAgentLoop_EmitsSteeringAndSkippedToolEvents(t *testing.T) { t.Fatal("timeout waiting for steered response") } - events := collectEventStream(sub.C) - steeringEvt, ok := findEvent(events, EventKindSteeringInjected) + events := collectRuntimeEventStream(runtimeCh) + steeringEvt, ok := findRuntimeEvent(events, runtimeevents.KindAgentSteeringInjected) if !ok { t.Fatal("expected steering injected event") } @@ -445,7 +449,7 @@ func TestAgentLoop_EmitsSteeringAndSkippedToolEvents(t *testing.T) { t.Fatalf("expected 1 steering message, got %d", steeringPayload.Count) } - skippedEvt, ok := findEvent(events, EventKindToolExecSkipped) + skippedEvt, ok := findRuntimeEvent(events, runtimeevents.KindAgentToolExecSkipped) if !ok { t.Fatal("expected skipped tool event") } @@ -457,7 +461,7 @@ func TestAgentLoop_EmitsSteeringAndSkippedToolEvents(t *testing.T) { t.Fatalf("expected skipped tool_two, got %q", skippedPayload.Tool) } - interruptEvt, ok := findEvent(events, EventKindInterruptReceived) + interruptEvt, ok := findRuntimeEvent(events, runtimeevents.KindAgentInterruptReceived) if !ok { t.Fatal("expected interrupt received event") } @@ -515,8 +519,14 @@ func TestAgentLoop_EmitsContextCompressEventOnRetry(t *testing.T) { {Role: "user", Content: "Trigger message"}, }) - sub := al.SubscribeEvents(16) - defer al.UnsubscribeEvents(sub.ID) + runtimeCh, closeRuntimeEvents := subscribeRuntimeEventsForTest( + t, + al, + 16, + runtimeevents.KindAgentLLMRetry, + runtimeevents.KindAgentContextCompress, + ) + defer closeRuntimeEvents() resp, err := al.runAgentLoop(context.Background(), defaultAgent, processOptions{ SessionKey: "session-1", @@ -534,8 +544,8 @@ func TestAgentLoop_EmitsContextCompressEventOnRetry(t *testing.T) { t.Fatalf("expected retry success, got %q", resp) } - events := collectEventStream(sub.C) - retryEvt, ok := findEvent(events, EventKindLLMRetry) + events := collectRuntimeEventStream(runtimeCh) + retryEvt, ok := findRuntimeEvent(events, runtimeevents.KindAgentLLMRetry) if !ok { t.Fatal("expected llm retry event") } @@ -550,7 +560,7 @@ func TestAgentLoop_EmitsContextCompressEventOnRetry(t *testing.T) { t.Fatalf("expected retry attempt 1, got %d", retryPayload.Attempt) } - compressEvt, ok := findEvent(events, EventKindContextCompress) + compressEvt, ok := findRuntimeEvent(events, runtimeevents.KindAgentContextCompress) if !ok { t.Fatal("expected context compress event") } @@ -603,14 +613,19 @@ func TestAgentLoop_EmitsSessionSummarizeEvent(t *testing.T) { {Role: "assistant", Content: "Answer three"}, }) - sub := al.SubscribeEvents(16) - defer al.UnsubscribeEvents(sub.ID) + runtimeCh, closeRuntimeEvents := subscribeRuntimeEventsForTest( + t, + al, + 16, + runtimeevents.KindAgentSessionSummarize, + ) + defer closeRuntimeEvents() lcm := &legacyContextManager{al: al} lcm.summarizeSession(defaultAgent, "session-1") - events := collectEventStream(sub.C) - summaryEvt, ok := findEvent(events, EventKindSessionSummarize) + events := collectRuntimeEventStream(runtimeCh) + summaryEvt, ok := findRuntimeEvent(events, runtimeevents.KindAgentSessionSummarize) if !ok { t.Fatal("expected session summarize event") } @@ -721,21 +736,6 @@ func TestAgentLoop_EmitsFollowUpQueuedEvent(t *testing.T) { } } -func collectEventStream(ch <-chan Event) []Event { - var events []Event - for { - select { - case evt, ok := <-ch: - if !ok { - return events - } - events = append(events, evt) - default: - return events - } - } -} - func receiveLegacyEvent(t *testing.T, ch <-chan Event) Event { t.Helper() @@ -766,36 +766,6 @@ func receiveRuntimeEvent(t *testing.T, ch <-chan runtimeevents.Event) runtimeeve } } -func waitForEvent(t *testing.T, ch <-chan Event, timeout time.Duration, match func(Event) bool) Event { - t.Helper() - - timer := time.NewTimer(timeout) - defer timer.Stop() - - for { - select { - case evt, ok := <-ch: - if !ok { - t.Fatal("event stream closed before expected event arrived") - } - if match(evt) { - return evt - } - case <-timer.C: - t.Fatal("timed out waiting for expected event") - } - } -} - -func findEvent(events []Event, kind EventKind) (Event, bool) { - for _, evt := range events { - if evt.Kind == kind { - return evt, true - } - } - return Event{}, false -} - type stringError string func (e stringError) Error() string { diff --git a/pkg/agent/hook_process_test.go b/pkg/agent/hook_process_test.go index 31f4ff7e4..f8d0d54f9 100644 --- a/pkg/agent/hook_process_test.go +++ b/pkg/agent/hook_process_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/sipeed/picoclaw/pkg/config" + runtimeevents "github.com/sipeed/picoclaw/pkg/events" "github.com/sipeed/picoclaw/pkg/isolation" "github.com/sipeed/picoclaw/pkg/providers" ) @@ -146,8 +147,13 @@ func TestAgentLoop_MountProcessHook_ApprovalDeny(t *testing.T) { t.Fatalf("MountProcessHook failed: %v", err) } - sub := al.SubscribeEvents(16) - defer al.UnsubscribeEvents(sub.ID) + runtimeCh, closeRuntimeEvents := subscribeRuntimeEventsForTest( + t, + al, + 16, + runtimeevents.KindAgentToolExecSkipped, + ) + defer closeRuntimeEvents() resp, err := al.runAgentLoop(context.Background(), agent, processOptions{ SessionKey: "session-1", @@ -167,8 +173,8 @@ func TestAgentLoop_MountProcessHook_ApprovalDeny(t *testing.T) { t.Fatalf("expected %q, got %q", expected, resp) } - events := collectEventStream(sub.C) - skippedEvt, ok := findEvent(events, EventKindToolExecSkipped) + events := collectRuntimeEventStream(runtimeCh) + skippedEvt, ok := findRuntimeEvent(events, runtimeevents.KindAgentToolExecSkipped) if !ok { t.Fatal("expected tool skipped event") } diff --git a/pkg/agent/hooks_test.go b/pkg/agent/hooks_test.go index d835cdc7d..ac466c42d 100644 --- a/pkg/agent/hooks_test.go +++ b/pkg/agent/hooks_test.go @@ -885,8 +885,13 @@ func TestAgentLoop_Hooks_ToolApproverCanDeny(t *testing.T) { t.Fatalf("MountHook failed: %v", err) } - sub := al.SubscribeEvents(16) - defer al.UnsubscribeEvents(sub.ID) + runtimeCh, closeRuntimeEvents := subscribeRuntimeEventsForTest( + t, + al, + 16, + runtimeevents.KindAgentToolExecSkipped, + ) + defer closeRuntimeEvents() resp, err := al.runAgentLoop(context.Background(), agent, processOptions{ SessionKey: "session-1", @@ -905,8 +910,8 @@ func TestAgentLoop_Hooks_ToolApproverCanDeny(t *testing.T) { t.Fatalf("expected %q, got %q", expected, resp) } - events := collectEventStream(sub.C) - skippedEvt, ok := findEvent(events, EventKindToolExecSkipped) + events := collectRuntimeEventStream(runtimeCh) + skippedEvt, ok := findRuntimeEvent(events, runtimeevents.KindAgentToolExecSkipped) if !ok { t.Fatal("expected tool skipped event") } @@ -961,8 +966,13 @@ func TestAgentLoop_Hooks_ToolRespondAction(t *testing.T) { t.Fatalf("MountHook failed: %v", err) } - sub := al.SubscribeEvents(16) - defer al.UnsubscribeEvents(sub.ID) + runtimeCh, closeRuntimeEvents := subscribeRuntimeEventsForTest( + t, + al, + 16, + runtimeevents.KindAgentToolExecEnd, + ) + defer closeRuntimeEvents() resp, err := al.runAgentLoop(context.Background(), agent, processOptions{ SessionKey: "session-1", @@ -984,8 +994,8 @@ func TestAgentLoop_Hooks_ToolRespondAction(t *testing.T) { } // Verify event stream has ToolExecEnd, not actual tool execution - events := collectEventStream(sub.C) - endEvt, ok := findEvent(events, EventKindToolExecEnd) + events := collectRuntimeEventStream(runtimeCh) + endEvt, ok := findRuntimeEvent(events, runtimeevents.KindAgentToolExecEnd) if !ok { t.Fatal("expected tool exec end event") } @@ -1150,8 +1160,13 @@ func TestAgentLoop_HookRespond_MediaError(t *testing.T) { sendErr: errors.New("channel unavailable"), }) - sub := al.SubscribeEvents(16) - defer al.UnsubscribeEvents(sub.ID) + runtimeCh, closeRuntimeEvents := subscribeRuntimeEventsForTest( + t, + al, + 16, + runtimeevents.KindAgentToolExecEnd, + ) + defer closeRuntimeEvents() _, err := al.runAgentLoop(context.Background(), agent, processOptions{ SessionKey: "session-media-err", @@ -1166,8 +1181,8 @@ func TestAgentLoop_HookRespond_MediaError(t *testing.T) { t.Fatalf("runAgentLoop failed: %v", err) } - events := collectEventStream(sub.C) - endEvt, ok := findEvent(events, EventKindToolExecEnd) + events := collectRuntimeEventStream(runtimeCh) + endEvt, ok := findRuntimeEvent(events, runtimeevents.KindAgentToolExecEnd) if !ok { t.Fatal("expected ToolExecEnd event") } @@ -1205,8 +1220,13 @@ func TestAgentLoop_HookRespond_BusFallback(t *testing.T) { t.Fatalf("MountHook failed: %v", err) } - sub := al.SubscribeEvents(16) - defer al.UnsubscribeEvents(sub.ID) + runtimeCh, closeRuntimeEvents := subscribeRuntimeEventsForTest( + t, + al, + 16, + runtimeevents.KindAgentToolExecEnd, + ) + defer closeRuntimeEvents() resp, err := al.runAgentLoop(context.Background(), agent, processOptions{ SessionKey: "session-bus-fallback", @@ -1221,8 +1241,8 @@ func TestAgentLoop_HookRespond_BusFallback(t *testing.T) { t.Fatalf("runAgentLoop failed: %v", err) } - events := collectEventStream(sub.C) - endEvt, ok := findEvent(events, EventKindToolExecEnd) + events := collectRuntimeEventStream(runtimeCh) + endEvt, ok := findRuntimeEvent(events, runtimeevents.KindAgentToolExecEnd) if !ok { t.Fatal("expected ToolExecEnd event") } @@ -1367,8 +1387,13 @@ func TestAgentLoop_HookRespond_InterruptSkipsRemaining(t *testing.T) { t.Fatalf("MountHook failed: %v", err) } - sub := al.SubscribeEvents(32) - defer al.UnsubscribeEvents(sub.ID) + runtimeCh, closeRuntimeEvents := subscribeRuntimeEventsForTest( + t, + al, + 32, + runtimeevents.KindAgentToolExecSkipped, + ) + defer closeRuntimeEvents() sessionKey := session.BuildMainSessionKey(routing.DefaultAgentID) @@ -1407,9 +1432,9 @@ func TestAgentLoop_HookRespond_InterruptSkipsRemaining(t *testing.T) { t.Fatal("timeout waiting for result") } - events := collectEventStream(sub.C) + events := collectRuntimeEventStream(runtimeCh) - skippedEvts := filterEvents(events, EventKindToolExecSkipped) + skippedEvts := filterRuntimeEvents(events, runtimeevents.KindAgentToolExecSkipped) if len(skippedEvts) < 1 { t.Fatal("expected at least one ToolExecSkipped event after interrupt") } @@ -1447,8 +1472,14 @@ func TestAgentLoop_HookRespond_SteeringSkipsRemaining(t *testing.T) { t.Fatalf("MountHook failed: %v", err) } - sub := al.SubscribeEvents(32) - defer al.UnsubscribeEvents(sub.ID) + runtimeCh, closeRuntimeEvents := subscribeRuntimeEventsForTest( + t, + al, + 32, + runtimeevents.KindAgentToolExecEnd, + runtimeevents.KindAgentToolExecSkipped, + ) + defer closeRuntimeEvents() sessionKey := session.BuildMainSessionKey(routing.DefaultAgentID) @@ -1468,14 +1499,14 @@ func TestAgentLoop_HookRespond_SteeringSkipsRemaining(t *testing.T) { resultCh <- result{resp: resp, err: err} }() - collectedEvents := make([]Event, 0, 8) + collectedEvents := make([]runtimeevents.Event, 0, 8) steered := false deadline := time.After(3 * time.Second) for !steered { select { - case evt := <-sub.C: + case evt := <-runtimeCh: collectedEvents = append(collectedEvents, evt) - if evt.Kind != EventKindToolExecEnd { + if evt.Kind != runtimeevents.KindAgentToolExecEnd { continue } payload, ok := evt.Payload.(ToolExecEndPayload) @@ -1498,9 +1529,9 @@ func TestAgentLoop_HookRespond_SteeringSkipsRemaining(t *testing.T) { t.Fatal("timeout waiting for result") } - events := append(collectedEvents, collectEventStream(sub.C)...) + events := append(collectedEvents, collectRuntimeEventStream(runtimeCh)...) - skippedEvts := filterEvents(events, EventKindToolExecSkipped) + skippedEvts := filterRuntimeEvents(events, runtimeevents.KindAgentToolExecSkipped) if len(skippedEvts) < 1 { t.Fatal("expected at least one ToolExecSkipped event after steering") } @@ -1565,13 +1596,3 @@ func TestCloneStringAnyMap_EmptyMapReturnsNonNil(t *testing.T) { } }) } - -func filterEvents(events []Event, kind EventKind) []Event { - var result []Event - for _, evt := range events { - if evt.Kind == kind { - result = append(result, evt) - } - } - return result -} diff --git a/pkg/agent/runtime_event_test.go b/pkg/agent/runtime_event_test.go index 47fd6923a..162ccf424 100644 --- a/pkg/agent/runtime_event_test.go +++ b/pkg/agent/runtime_event_test.go @@ -64,3 +64,40 @@ func waitForRuntimeEvent( } } } + +func collectRuntimeEventStream(ch <-chan runtimeevents.Event) []runtimeevents.Event { + var events []runtimeevents.Event + for { + select { + case evt, ok := <-ch: + if !ok { + return events + } + events = append(events, evt) + default: + return events + } + } +} + +func findRuntimeEvent( + events []runtimeevents.Event, + kind runtimeevents.Kind, +) (runtimeevents.Event, bool) { + for _, evt := range events { + if evt.Kind == kind { + return evt, true + } + } + return runtimeevents.Event{}, false +} + +func filterRuntimeEvents(events []runtimeevents.Event, kind runtimeevents.Kind) []runtimeevents.Event { + var filtered []runtimeevents.Event + for _, evt := range events { + if evt.Kind == kind { + filtered = append(filtered, evt) + } + } + return filtered +} diff --git a/pkg/agent/steering_test.go b/pkg/agent/steering_test.go index bba988672..1e78fd56a 100644 --- a/pkg/agent/steering_test.go +++ b/pkg/agent/steering_test.go @@ -14,6 +14,7 @@ import ( "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/config" + runtimeevents "github.com/sipeed/picoclaw/pkg/events" "github.com/sipeed/picoclaw/pkg/media" "github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/routing" @@ -1134,8 +1135,14 @@ func TestAgentLoop_InterruptGraceful_UsesTerminalNoToolCall(t *testing.T) { al.RegisterTool(tool2) sessionKey := session.BuildMainSessionKey(routing.DefaultAgentID) - sub := al.SubscribeEvents(32) - defer al.UnsubscribeEvents(sub.ID) + runtimeCh, closeRuntimeEvents := subscribeRuntimeEventsForTest( + t, + al, + 32, + runtimeevents.KindAgentInterruptReceived, + runtimeevents.KindAgentTurnEnd, + ) + defer closeRuntimeEvents() type result struct { resp string @@ -1222,8 +1229,8 @@ func TestAgentLoop_InterruptGraceful_UsesTerminalNoToolCall(t *testing.T) { t.Fatal("expected remaining tool to be marked as skipped after graceful interrupt") } - events := collectEventStream(sub.C) - interruptEvt, ok := findEvent(events, EventKindInterruptReceived) + events := collectRuntimeEventStream(runtimeCh) + interruptEvt, ok := findRuntimeEvent(events, runtimeevents.KindAgentInterruptReceived) if !ok { t.Fatal("expected interrupt received event") } @@ -1235,7 +1242,7 @@ func TestAgentLoop_InterruptGraceful_UsesTerminalNoToolCall(t *testing.T) { t.Fatalf("expected graceful interrupt payload, got %q", interruptPayload.Kind) } - turnEndEvt, ok := findEvent(events, EventKindTurnEnd) + turnEndEvt, ok := findRuntimeEvent(events, runtimeevents.KindAgentTurnEnd) if !ok { t.Fatal("expected turn end event") } @@ -1299,8 +1306,14 @@ func TestAgentLoop_InterruptHard_RestoresSession(t *testing.T) { } defaultAgent.Sessions.SetHistory(sessionKey, originalHistory) - sub := al.SubscribeEvents(16) - defer al.UnsubscribeEvents(sub.ID) + runtimeCh, closeRuntimeEvents := subscribeRuntimeEventsForTest( + t, + al, + 16, + runtimeevents.KindAgentInterruptReceived, + runtimeevents.KindAgentTurnEnd, + ) + defer closeRuntimeEvents() type result struct { resp string @@ -1353,8 +1366,8 @@ func TestAgentLoop_InterruptHard_RestoresSession(t *testing.T) { t.Fatalf("expected history rollback after hard abort, got %#v", finalHistory) } - events := collectEventStream(sub.C) - interruptEvt, ok := findEvent(events, EventKindInterruptReceived) + events := collectRuntimeEventStream(runtimeCh) + interruptEvt, ok := findRuntimeEvent(events, runtimeevents.KindAgentInterruptReceived) if !ok { t.Fatal("expected interrupt received event") } @@ -1366,7 +1379,7 @@ func TestAgentLoop_InterruptHard_RestoresSession(t *testing.T) { t.Fatalf("expected hard interrupt payload, got %q", interruptPayload.Kind) } - turnEndEvt, ok := findEvent(events, EventKindTurnEnd) + turnEndEvt, ok := findRuntimeEvent(events, runtimeevents.KindAgentTurnEnd) if !ok { t.Fatal("expected turn end event") } diff --git a/pkg/agent/subturn_test.go b/pkg/agent/subturn_test.go index 040063249..26cb28fb7 100644 --- a/pkg/agent/subturn_test.go +++ b/pkg/agent/subturn_test.go @@ -10,6 +10,7 @@ import ( "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/config" + runtimeevents "github.com/sipeed/picoclaw/pkg/events" "github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/tools" ) @@ -22,30 +23,38 @@ const ( // ====================== Test Helper: Event Collector ====================== type eventCollector struct { mu sync.Mutex - events []Event + events []runtimeevents.Event } func newEventCollector(t *testing.T, al *AgentLoop) (*eventCollector, func()) { t.Helper() c := &eventCollector{} - sub := al.SubscribeEvents(16) + runtimeCh, closeRuntimeEvents := subscribeRuntimeEventsForTest( + t, + al, + 16, + runtimeevents.KindAgentSubTurnSpawn, + runtimeevents.KindAgentSubTurnEnd, + runtimeevents.KindAgentSubTurnResultDelivered, + runtimeevents.KindAgentSubTurnOrphan, + ) done := make(chan struct{}) go func() { defer close(done) - for evt := range sub.C { + for evt := range runtimeCh { c.mu.Lock() c.events = append(c.events, evt) c.mu.Unlock() } }() cleanup := func() { - al.UnsubscribeEvents(sub.ID) + closeRuntimeEvents() <-done } return c, cleanup } -func (c *eventCollector) hasEventOfKind(kind EventKind) bool { +func (c *eventCollector) hasEventOfKind(kind runtimeevents.Kind) bool { c.mu.Lock() defer c.mu.Unlock() for _, e := range c.events { @@ -158,12 +167,12 @@ func TestSpawnSubTurn(t *testing.T) { // Verify event emission time.Sleep(10 * time.Millisecond) // let event goroutine flush if tt.wantSpawn { - if !collector.hasEventOfKind(EventKindSubTurnSpawn) { + if !collector.hasEventOfKind(runtimeevents.KindAgentSubTurnSpawn) { t.Error("SubTurnSpawnEvent not emitted") } } if tt.wantEnd { - if !collector.hasEventOfKind(EventKindSubTurnEnd) { + if !collector.hasEventOfKind(runtimeevents.KindAgentSubTurnEnd) { t.Error("SubTurnEndEvent not emitted") } } @@ -316,7 +325,7 @@ func TestSpawnSubTurn_OrphanResultRouting(t *testing.T) { time.Sleep(10 * time.Millisecond) // let event goroutine flush // Verify Orphan event is emitted - if !collector.hasEventOfKind(EventKindSubTurnOrphan) { + if !collector.hasEventOfKind(runtimeevents.KindAgentSubTurnOrphan) { t.Error("SubTurnOrphanResultEvent not emitted for finished parent") } @@ -591,12 +600,16 @@ func TestNestedSubTurnHierarchy(t *testing.T) { var spawnedTurns []turnInfo var mu sync.Mutex - // Subscribe to real EventBus to capture spawn events - sub := al.SubscribeEvents(16) - defer al.UnsubscribeEvents(sub.ID) + runtimeCh, closeRuntimeEvents := subscribeRuntimeEventsForTest( + t, + al, + 16, + runtimeevents.KindAgentSubTurnSpawn, + ) + defer closeRuntimeEvents() go func() { - for evt := range sub.C { - if evt.Kind == EventKindSubTurnSpawn { + for evt := range runtimeCh { + if evt.Kind == runtimeevents.KindAgentSubTurnSpawn { p, _ := evt.Payload.(SubTurnSpawnPayload) mu.Lock() spawnedTurns = append(spawnedTurns, turnInfo{ @@ -879,7 +892,7 @@ func TestSpawnSubTurn_PanicRecovery(t *testing.T) { time.Sleep(10 * time.Millisecond) // let event goroutine flush // SubTurnEndEvent should still be emitted - if !collector.hasEventOfKind(EventKindSubTurnEnd) { + if !collector.hasEventOfKind(runtimeevents.KindAgentSubTurnEnd) { t.Error("SubTurnEndEvent not emitted after panic") } @@ -1229,18 +1242,23 @@ func TestDeliverSubTurnResult_RaceWithFinish(t *testing.T) { al, _, _, _, cleanup := newTestAgentLoop(t) //nolint:dogsled defer cleanup() - // Collect events via real EventBus var mu sync.Mutex var deliveredCount, orphanCount int - sub := al.SubscribeEvents(64) - defer al.UnsubscribeEvents(sub.ID) + runtimeCh, closeRuntimeEvents := subscribeRuntimeEventsForTest( + t, + al, + 64, + runtimeevents.KindAgentSubTurnResultDelivered, + runtimeevents.KindAgentSubTurnOrphan, + ) + defer closeRuntimeEvents() go func() { - for evt := range sub.C { + for evt := range runtimeCh { mu.Lock() switch evt.Kind { - case EventKindSubTurnResultDelivered: + case runtimeevents.KindAgentSubTurnResultDelivered: deliveredCount++ - case EventKindSubTurnOrphan: + case runtimeevents.KindAgentSubTurnOrphan: orphanCount++ } mu.Unlock() @@ -1795,13 +1813,20 @@ func TestAsyncSubTurn_ParentFinishesEarly(t *testing.T) { provider := &slowMockProvider{delay: 5 * time.Second} // SubTurn takes 5 seconds al := NewAgentLoop(cfg, msgBus, provider) - // Capture events via real EventBus var mu sync.Mutex - var events []Event - sub := al.SubscribeEvents(32) - defer al.UnsubscribeEvents(sub.ID) + var events []runtimeevents.Event + runtimeCh, closeRuntimeEvents := subscribeRuntimeEventsForTest( + t, + al, + 32, + runtimeevents.KindAgentSubTurnSpawn, + runtimeevents.KindAgentSubTurnEnd, + runtimeevents.KindAgentSubTurnResultDelivered, + runtimeevents.KindAgentSubTurnOrphan, + ) + defer closeRuntimeEvents() go func() { - for evt := range sub.C { + for evt := range runtimeCh { mu.Lock() events = append(events, evt) mu.Unlock()