From af61d0bca720340030fdc2afe2d858e57ff9a583 Mon Sep 17 00:00:00 2001 From: Hoshina Date: Fri, 20 Mar 2026 14:53:22 +0800 Subject: [PATCH 1/4] feat(agent): add event bus foundation --- pkg/agent/eventbus.go | 121 +++++++++++++++++++ pkg/agent/eventbus_test.go | 235 +++++++++++++++++++++++++++++++++++++ pkg/agent/events.go | 129 ++++++++++++++++++++ pkg/agent/loop.go | 166 +++++++++++++++++++++++++- 4 files changed, 650 insertions(+), 1 deletion(-) create mode 100644 pkg/agent/eventbus.go create mode 100644 pkg/agent/eventbus_test.go create mode 100644 pkg/agent/events.go diff --git a/pkg/agent/eventbus.go b/pkg/agent/eventbus.go new file mode 100644 index 000000000..546d8436d --- /dev/null +++ b/pkg/agent/eventbus.go @@ -0,0 +1,121 @@ +package agent + +import ( + "sync" + "sync/atomic" + "time" +) + +const defaultEventSubscriberBuffer = 16 + +// EventSubscription identifies a subscriber channel returned by EventBus.Subscribe. +type EventSubscription struct { + ID uint64 + C <-chan Event +} + +type eventSubscriber struct { + ch chan Event +} + +// EventBus is a lightweight multi-subscriber broadcaster for agent-loop events. +type EventBus struct { + mu sync.RWMutex + subs map[uint64]eventSubscriber + nextID uint64 + closed bool + dropped [eventKindCount]atomic.Int64 +} + +// NewEventBus creates a new in-process event broadcaster. +func NewEventBus() *EventBus { + return &EventBus{ + subs: make(map[uint64]eventSubscriber), + } +} + +// Subscribe registers a new subscriber with the requested channel buffer size. +// A non-positive buffer uses the default size. +func (b *EventBus) Subscribe(buffer int) EventSubscription { + if buffer <= 0 { + buffer = defaultEventSubscriberBuffer + } + + b.mu.Lock() + defer b.mu.Unlock() + + if b.closed { + ch := make(chan Event) + close(ch) + return EventSubscription{C: ch} + } + + b.nextID++ + id := b.nextID + ch := make(chan Event, buffer) + b.subs[id] = eventSubscriber{ch: ch} + return EventSubscription{ID: id, C: ch} +} + +// Unsubscribe removes a subscriber and closes its channel. +func (b *EventBus) Unsubscribe(id uint64) { + b.mu.Lock() + defer b.mu.Unlock() + + sub, ok := b.subs[id] + if !ok { + return + } + + delete(b.subs, id) + close(sub.ch) +} + +// Emit broadcasts an event to all current subscribers without blocking. +// When a subscriber channel is full, the event is dropped for that subscriber. +func (b *EventBus) Emit(evt Event) { + if evt.Time.IsZero() { + evt.Time = time.Now() + } + + b.mu.RLock() + defer b.mu.RUnlock() + + if b.closed { + return + } + + for _, sub := range b.subs { + select { + case sub.ch <- evt: + default: + if evt.Kind < eventKindCount { + b.dropped[evt.Kind].Add(1) + } + } + } +} + +// Dropped returns the number of dropped events for a given kind. +func (b *EventBus) Dropped(kind EventKind) int64 { + if kind >= eventKindCount { + return 0 + } + return b.dropped[kind].Load() +} + +// Close closes all subscriber channels and stops future broadcasts. +func (b *EventBus) Close() { + b.mu.Lock() + defer b.mu.Unlock() + + if b.closed { + return + } + + b.closed = true + for id, sub := range b.subs { + close(sub.ch) + delete(b.subs, id) + } +} diff --git a/pkg/agent/eventbus_test.go b/pkg/agent/eventbus_test.go new file mode 100644 index 000000000..d57fac094 --- /dev/null +++ b/pkg/agent/eventbus_test.go @@ -0,0 +1,235 @@ +package agent + +import ( + "context" + "os" + "slices" + "testing" + "time" + + "github.com/sipeed/picoclaw/pkg/bus" + "github.com/sipeed/picoclaw/pkg/config" + "github.com/sipeed/picoclaw/pkg/providers" + "github.com/sipeed/picoclaw/pkg/tools" +) + +func TestEventBus_SubscribeEmitUnsubscribeClose(t *testing.T) { + eventBus := NewEventBus() + sub := eventBus.Subscribe(1) + + eventBus.Emit(Event{ + Kind: EventKindTurnStart, + Meta: EventMeta{TurnID: "turn-1"}, + }) + + select { + case evt := <-sub.C: + if evt.Kind != EventKindTurnStart { + t.Fatalf("expected %v, got %v", EventKindTurnStart, evt.Kind) + } + if evt.Meta.TurnID != "turn-1" { + t.Fatalf("expected turn id turn-1, got %q", evt.Meta.TurnID) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for event") + } + + eventBus.Unsubscribe(sub.ID) + if _, ok := <-sub.C; ok { + t.Fatal("expected subscriber channel to be closed after unsubscribe") + } + + eventBus.Close() + closedSub := eventBus.Subscribe(1) + if _, ok := <-closedSub.C; ok { + t.Fatal("expected closed bus to return a closed subscriber channel") + } +} + +func TestEventBus_DropsWhenSubscriberIsFull(t *testing.T) { + eventBus := NewEventBus() + sub := eventBus.Subscribe(1) + defer eventBus.Unsubscribe(sub.ID) + + start := time.Now() + for i := 0; i < 1000; i++ { + eventBus.Emit(Event{Kind: EventKindLLMRequest}) + } + + if elapsed := time.Since(start); elapsed > 100*time.Millisecond { + t.Fatalf("Emit took too long with a blocked subscriber: %s", elapsed) + } + + if got := eventBus.Dropped(EventKindLLMRequest); got != 999 { + t.Fatalf("expected 999 dropped events, got %d", got) + } +} + +type scriptedToolProvider struct { + calls int +} + +func (m *scriptedToolProvider) Chat( + ctx context.Context, + messages []providers.Message, + toolDefs []providers.ToolDefinition, + model string, + opts map[string]any, +) (*providers.LLMResponse, error) { + m.calls++ + if m.calls == 1 { + return &providers.LLMResponse{ + ToolCalls: []providers.ToolCall{ + { + ID: "call-1", + Name: "mock_custom", + Arguments: map[string]any{"task": "ping"}, + }, + }, + }, nil + } + + return &providers.LLMResponse{ + Content: "done", + }, nil +} + +func (m *scriptedToolProvider) GetDefaultModel() string { + return "scripted-tool-model" +} + +func TestAgentLoop_EmitsMinimalTurnEvents(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "agent-eventbus-*") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + cfg := &config.Config{ + Agents: config.AgentsConfig{ + Defaults: config.AgentDefaults{ + Workspace: tmpDir, + Model: "test-model", + MaxTokens: 4096, + MaxToolIterations: 10, + }, + }, + } + + msgBus := bus.NewMessageBus() + provider := &scriptedToolProvider{} + al := NewAgentLoop(cfg, msgBus, provider) + al.RegisterTool(&mockCustomTool{}) + defaultAgent := al.registry.GetDefaultAgent() + if defaultAgent == nil { + t.Fatal("expected default agent") + } + + sub := al.SubscribeEvents(16) + defer al.UnsubscribeEvents(sub.ID) + + response, err := al.runAgentLoop(context.Background(), defaultAgent, processOptions{ + SessionKey: "session-1", + Channel: "cli", + ChatID: "direct", + UserMessage: "run tool", + DefaultResponse: defaultResponse, + EnableSummary: false, + SendResponse: false, + }) + if err != nil { + t.Fatalf("runAgentLoop failed: %v", err) + } + if response != "done" { + t.Fatalf("expected final response 'done', got %q", response) + } + + events := collectEventStream(sub.C) + if len(events) != 8 { + t.Fatalf("expected 8 events, got %d", len(events)) + } + + kinds := make([]EventKind, 0, len(events)) + for _, evt := range events { + kinds = append(kinds, evt.Kind) + } + + expectedKinds := []EventKind{ + EventKindTurnStart, + EventKindLLMRequest, + EventKindLLMResponse, + EventKindToolExecStart, + EventKindToolExecEnd, + EventKindLLMRequest, + EventKindLLMResponse, + EventKindTurnEnd, + } + if !slices.Equal(kinds, expectedKinds) { + t.Fatalf("unexpected event sequence: got %v want %v", kinds, expectedKinds) + } + + turnID := events[0].Meta.TurnID + for i, evt := range events { + if evt.Meta.TurnID != turnID { + t.Fatalf("event %d has mismatched turn id %q, want %q", i, evt.Meta.TurnID, turnID) + } + if evt.Meta.SessionKey != "session-1" { + t.Fatalf("event %d has session key %q, want session-1", i, evt.Meta.SessionKey) + } + } + + startPayload, ok := events[0].Payload.(TurnStartPayload) + if !ok { + t.Fatalf("expected TurnStartPayload, got %T", events[0].Payload) + } + if startPayload.UserMessage != "run tool" { + t.Fatalf("expected user message 'run tool', got %q", startPayload.UserMessage) + } + + toolStartPayload, ok := events[3].Payload.(ToolExecStartPayload) + if !ok { + t.Fatalf("expected ToolExecStartPayload, got %T", events[3].Payload) + } + if toolStartPayload.Tool != "mock_custom" { + t.Fatalf("expected tool name mock_custom, got %q", toolStartPayload.Tool) + } + + toolEndPayload, ok := events[4].Payload.(ToolExecEndPayload) + if !ok { + t.Fatalf("expected ToolExecEndPayload, got %T", events[4].Payload) + } + if toolEndPayload.Tool != "mock_custom" { + t.Fatalf("expected tool end payload for mock_custom, got %q", toolEndPayload.Tool) + } + if toolEndPayload.IsError { + t.Fatal("expected mock_custom tool to succeed") + } + + turnEndPayload, ok := events[len(events)-1].Payload.(TurnEndPayload) + if !ok { + t.Fatalf("expected TurnEndPayload, got %T", events[len(events)-1].Payload) + } + if turnEndPayload.Status != TurnEndStatusCompleted { + t.Fatalf("expected completed turn, got %q", turnEndPayload.Status) + } + if turnEndPayload.Iterations != 2 { + t.Fatalf("expected 2 iterations, got %d", turnEndPayload.Iterations) + } +} + +func collectEventStream(ch <-chan Event) []Event { + var events []Event + for { + select { + case evt, ok := <-ch: + if !ok { + return events + } + events = append(events, evt) + default: + return events + } + } +} + +var _ tools.Tool = (*mockCustomTool)(nil) diff --git a/pkg/agent/events.go b/pkg/agent/events.go new file mode 100644 index 000000000..92aec7436 --- /dev/null +++ b/pkg/agent/events.go @@ -0,0 +1,129 @@ +package agent + +import ( + "fmt" + "time" +) + +// EventKind identifies a structured agent-loop event. +type EventKind uint8 + +const ( + // EventKindTurnStart is emitted when a turn begins processing. + EventKindTurnStart EventKind = iota + // EventKindTurnEnd is emitted when a turn finishes, successfully or with an error. + EventKindTurnEnd + // EventKindLLMRequest is emitted before a provider chat request is made. + EventKindLLMRequest + // EventKindLLMResponse is emitted after a provider chat response is received. + EventKindLLMResponse + // EventKindToolExecStart is emitted immediately before a tool executes. + EventKindToolExecStart + // EventKindToolExecEnd is emitted immediately after a tool finishes executing. + EventKindToolExecEnd + // EventKindError is emitted when a turn encounters an execution error. + EventKindError + + eventKindCount +) + +var eventKindNames = [...]string{ + "turn_start", + "turn_end", + "llm_request", + "llm_response", + "tool_exec_start", + "tool_exec_end", + "error", +} + +// String returns the stable string form of an EventKind. +func (k EventKind) String() string { + if k >= eventKindCount { + return fmt.Sprintf("event_kind(%d)", k) + } + return eventKindNames[k] +} + +// Event is the structured envelope broadcast by the agent EventBus. +type Event struct { + Kind EventKind + Time time.Time + Meta EventMeta + Payload any +} + +// EventMeta contains correlation fields shared by all agent-loop events. +type EventMeta struct { + AgentID string + TurnID string + ParentTurnID string + SessionKey string + Iteration int + TracePath string + Source string +} + +// TurnEndStatus describes the terminal state of a turn. +type TurnEndStatus string + +const ( + // TurnEndStatusCompleted indicates the turn finished normally. + TurnEndStatusCompleted TurnEndStatus = "completed" + // TurnEndStatusError indicates the turn ended because of an error. + TurnEndStatusError TurnEndStatus = "error" +) + +// TurnStartPayload describes the start of a turn. +type TurnStartPayload struct { + Channel string + ChatID string + UserMessage string + MediaCount int +} + +// TurnEndPayload describes the completion of a turn. +type TurnEndPayload struct { + Status TurnEndStatus + Iterations int + Duration time.Duration + FinalContentLen int +} + +// LLMRequestPayload describes an outbound LLM request. +type LLMRequestPayload struct { + Model string + MessagesCount int + ToolsCount int + MaxTokens int + Temperature float64 +} + +// LLMResponsePayload describes an inbound LLM response. +type LLMResponsePayload struct { + ContentLen int + ToolCalls int + HasReasoning bool +} + +// ToolExecStartPayload describes a tool execution request. +type ToolExecStartPayload struct { + Tool string + Arguments map[string]any +} + +// ToolExecEndPayload describes the outcome of a tool execution. +type ToolExecEndPayload struct { + Tool string + Duration time.Duration + ForLLMLen int + ForUserLen int + IsError bool + Async bool +} + +// ErrorPayload describes an execution error inside the agent loop. +type ErrorPayload struct { + Stage string + Message string +} diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index c583f5ca5..2c9c86cf9 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -39,6 +39,7 @@ type AgentLoop struct { cfg *config.Config registry *AgentRegistry state *state.Manager + eventBus *EventBus running atomic.Bool summarizing sync.Map fallback *providers.FallbackChain @@ -49,6 +50,7 @@ type AgentLoop struct { mcp mcpRuntime steering *steeringQueue mu sync.RWMutex + turnSeq atomic.Uint64 // Track active requests for safe provider cleanup activeRequests sync.WaitGroup } @@ -103,6 +105,7 @@ func NewAgentLoop( cfg: cfg, registry: registry, state: stateManager, + eventBus: NewEventBus(), summarizing: sync.Map{}, fallback: fallbackChain, cmdRegistry: commands.NewRegistry(commands.BuiltinDefinitions()), @@ -380,6 +383,84 @@ func (al *AgentLoop) Close() { } al.GetRegistry().Close() + if al.eventBus != nil { + al.eventBus.Close() + } +} + +// SubscribeEvents registers a subscriber for agent-loop events. +func (al *AgentLoop) SubscribeEvents(buffer int) EventSubscription { + if al == nil || al.eventBus == nil { + ch := make(chan Event) + close(ch) + return EventSubscription{C: ch} + } + return al.eventBus.Subscribe(buffer) +} + +// UnsubscribeEvents removes a previously registered event subscriber. +func (al *AgentLoop) UnsubscribeEvents(id uint64) { + if al == nil || al.eventBus == nil { + return + } + al.eventBus.Unsubscribe(id) +} + +// EventDrops returns the number of dropped events for the given kind. +func (al *AgentLoop) EventDrops(kind EventKind) int64 { + if al == nil || al.eventBus == nil { + return 0 + } + return al.eventBus.Dropped(kind) +} + +type turnEventScope struct { + agentID string + sessionKey string + turnID string +} + +func (al *AgentLoop) newTurnEventScope(agentID, sessionKey string) turnEventScope { + seq := al.turnSeq.Add(1) + return turnEventScope{ + agentID: agentID, + sessionKey: sessionKey, + turnID: fmt.Sprintf("%s-turn-%d", agentID, seq), + } +} + +func (ts turnEventScope) meta(iteration int, source, tracePath string) EventMeta { + return EventMeta{ + AgentID: ts.agentID, + TurnID: ts.turnID, + SessionKey: ts.sessionKey, + Iteration: iteration, + Source: source, + TracePath: tracePath, + } +} + +func (al *AgentLoop) emitEvent(kind EventKind, meta EventMeta, payload any) { + if al == nil || al.eventBus == nil { + return + } + al.eventBus.Emit(Event{ + Kind: kind, + Meta: meta, + Payload: payload, + }) +} + +func cloneEventArguments(args map[string]any) map[string]any { + if len(args) == 0 { + return nil + } + + cloned := make(map[string]any, len(args)) + for k, v := range args { + cloned[k] = v + } + return cloned } func (al *AgentLoop) RegisterTool(tool tools.Tool) { @@ -895,6 +976,35 @@ func (al *AgentLoop) runAgentLoop( agent *AgentInstance, opts processOptions, ) (string, error) { + turnScope := al.newTurnEventScope(agent.ID, opts.SessionKey) + turnStartedAt := time.Now() + turnIterations := 0 + turnFinalContentLen := 0 + turnStatus := TurnEndStatusCompleted + defer func() { + al.emitEvent( + EventKindTurnEnd, + turnScope.meta(turnIterations, "runAgentLoop", "turn.end"), + TurnEndPayload{ + Status: turnStatus, + Iterations: turnIterations, + Duration: time.Since(turnStartedAt), + FinalContentLen: turnFinalContentLen, + }, + ) + }() + + al.emitEvent( + EventKindTurnStart, + turnScope.meta(0, "runAgentLoop", "turn.start"), + TurnStartPayload{ + Channel: opts.Channel, + ChatID: opts.ChatID, + UserMessage: opts.UserMessage, + MediaCount: len(opts.Media), + }, + ) + // 0. Record last channel for heartbeat notifications (skip internal channels and cli) if opts.Channel != "" && opts.ChatID != "" { if !constants.IsInternalChannel(opts.Channel) { @@ -952,8 +1062,10 @@ func (al *AgentLoop) runAgentLoop( agent.Sessions.AddMessage(opts.SessionKey, "user", opts.UserMessage) // 3. Run LLM iteration loop - finalContent, iteration, err := al.runLLMIteration(ctx, agent, messages, opts) + finalContent, iteration, err := al.runLLMIteration(ctx, agent, messages, opts, turnScope) + turnIterations = iteration if err != nil { + turnStatus = TurnEndStatusError return "", err } @@ -964,6 +1076,7 @@ func (al *AgentLoop) runAgentLoop( if finalContent == "" { finalContent = opts.DefaultResponse } + turnFinalContentLen = len(finalContent) // 5. Save final assistant message to session agent.Sessions.AddMessage(opts.SessionKey, "assistant", finalContent) @@ -1058,6 +1171,7 @@ func (al *AgentLoop) runLLMIteration( agent *AgentInstance, messages []providers.Message, opts processOptions, + turnScope turnEventScope, ) (string, int, error) { iteration := 0 var finalContent string @@ -1106,6 +1220,17 @@ func (al *AgentLoop) runLLMIteration( // Build tool definitions providerToolDefs := agent.Tools.ToProviderDefs() + al.emitEvent( + EventKindLLMRequest, + turnScope.meta(iteration, "runLLMIteration", "turn.llm.request"), + LLMRequestPayload{ + Model: activeModel, + MessagesCount: len(messages), + ToolsCount: len(providerToolDefs), + MaxTokens: agent.MaxTokens, + Temperature: agent.Temperature, + }, + ) // Log LLM request details logger.DebugCF("agent", "LLM request", @@ -1246,6 +1371,14 @@ func (al *AgentLoop) runLLMIteration( } if err != nil { + al.emitEvent( + EventKindError, + turnScope.meta(iteration, "runLLMIteration", "turn.error"), + ErrorPayload{ + Stage: "llm", + Message: err.Error(), + }, + ) logger.ErrorCF("agent", "LLM call failed", map[string]any{ "agent_id": agent.ID, @@ -1262,6 +1395,15 @@ func (al *AgentLoop) runLLMIteration( opts.Channel, al.targetReasoningChannelID(opts.Channel), ) + al.emitEvent( + EventKindLLMResponse, + turnScope.meta(iteration, "runLLMIteration", "turn.llm.response"), + LLMResponsePayload{ + ContentLen: len(response.Content), + ToolCalls: len(response.ToolCalls), + HasReasoning: response.Reasoning != "" || response.ReasoningContent != "", + }, + ) logger.DebugCF("agent", "LLM response", map[string]any{ @@ -1352,6 +1494,14 @@ func (al *AgentLoop) runLLMIteration( "tool": tc.Name, "iteration": iteration, }) + al.emitEvent( + EventKindToolExecStart, + turnScope.meta(iteration, "runLLMIteration", "turn.tool.start"), + ToolExecStartPayload{ + Tool: tc.Name, + Arguments: cloneEventArguments(tc.Arguments), + }, + ) // Create async callback for tools that implement AsyncExecutor. asyncCallback := func(_ context.Context, result *tools.ToolResult) { @@ -1390,6 +1540,7 @@ func (al *AgentLoop) runLLMIteration( }) } + toolStart := time.Now() toolResult := agent.Tools.ExecuteWithContext( ctx, tc.Name, @@ -1398,6 +1549,7 @@ func (al *AgentLoop) runLLMIteration( opts.ChatID, asyncCallback, ) + toolDuration := time.Since(toolStart) // Process tool result if !toolResult.Silent && toolResult.ForUser != "" && opts.SendResponse { @@ -1443,6 +1595,18 @@ func (al *AgentLoop) runLLMIteration( Content: contentForLLM, ToolCallID: tc.ID, } + al.emitEvent( + EventKindToolExecEnd, + turnScope.meta(iteration, "runLLMIteration", "turn.tool.end"), + ToolExecEndPayload{ + Tool: tc.Name, + Duration: toolDuration, + ForLLMLen: len(contentForLLM), + ForUserLen: len(toolResult.ForUser), + IsError: toolResult.IsError, + Async: toolResult.Async, + }, + ) messages = append(messages, toolResultMsg) agent.Sessions.AddFullMessage(opts.SessionKey, toolResultMsg) From 50cc7100cee14247690bfb2690bf6fbea5be4e37 Mon Sep 17 00:00:00 2001 From: Hoshina Date: Fri, 20 Mar 2026 15:06:43 +0800 Subject: [PATCH 2/4] feat(agent): make event logs show event kind clearly --- pkg/agent/loop.go | 68 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 63 insertions(+), 5 deletions(-) diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 2c9c86cf9..ac97104b1 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -441,14 +441,18 @@ func (ts turnEventScope) meta(iteration int, source, tracePath string) EventMeta } func (al *AgentLoop) emitEvent(kind EventKind, meta EventMeta, payload any) { - if al == nil || al.eventBus == nil { - return - } - al.eventBus.Emit(Event{ + evt := Event{ Kind: kind, Meta: meta, Payload: payload, - }) + } + + al.logEvent(evt) + + if al == nil || al.eventBus == nil { + return + } + al.eventBus.Emit(evt) } func cloneEventArguments(args map[string]any) map[string]any { @@ -463,6 +467,60 @@ func cloneEventArguments(args map[string]any) map[string]any { return cloned } +func (al *AgentLoop) logEvent(evt Event) { + fields := map[string]any{ + "event_kind": evt.Kind.String(), + "agent_id": evt.Meta.AgentID, + "turn_id": evt.Meta.TurnID, + "session_key": evt.Meta.SessionKey, + "iteration": evt.Meta.Iteration, + } + + if evt.Meta.TracePath != "" { + fields["trace"] = evt.Meta.TracePath + } + if evt.Meta.Source != "" { + fields["source"] = evt.Meta.Source + } + + switch payload := evt.Payload.(type) { + case TurnStartPayload: + fields["channel"] = payload.Channel + fields["chat_id"] = payload.ChatID + fields["user_len"] = len(payload.UserMessage) + fields["media_count"] = payload.MediaCount + case TurnEndPayload: + fields["status"] = payload.Status + fields["iterations_total"] = payload.Iterations + fields["duration_ms"] = payload.Duration.Milliseconds() + fields["final_len"] = payload.FinalContentLen + case LLMRequestPayload: + fields["model"] = payload.Model + fields["messages"] = payload.MessagesCount + fields["tools"] = payload.ToolsCount + fields["max_tokens"] = payload.MaxTokens + case LLMResponsePayload: + fields["content_len"] = payload.ContentLen + fields["tool_calls"] = payload.ToolCalls + fields["has_reasoning"] = payload.HasReasoning + case ToolExecStartPayload: + fields["tool"] = payload.Tool + fields["args_count"] = len(payload.Arguments) + case ToolExecEndPayload: + fields["tool"] = payload.Tool + fields["duration_ms"] = payload.Duration.Milliseconds() + fields["for_llm_len"] = payload.ForLLMLen + fields["for_user_len"] = payload.ForUserLen + fields["is_error"] = payload.IsError + fields["async"] = payload.Async + case ErrorPayload: + fields["stage"] = payload.Stage + fields["error"] = payload.Message + } + + logger.InfoCF("eventbus", fmt.Sprintf("Agent event: %s", evt.Kind.String()), fields) +} + func (al *AgentLoop) RegisterTool(tool tools.Tool) { registry := al.GetRegistry() for _, agentID := range registry.ListAgentIDs() { From 57cde73b36cc27da4f7979b5526eabaad0f0bfed Mon Sep 17 00:00:00 2001 From: Hoshina Date: Fri, 20 Mar 2026 15:29:52 +0800 Subject: [PATCH 3/4] feat(agent): expand event bus coverage --- pkg/agent/eventbus_test.go | 444 +++++++++++++++++++++++++++++++++++++ pkg/agent/events.go | 119 ++++++++++ pkg/agent/loop.go | 150 ++++++++++++- pkg/agent/steering.go | 19 ++ pkg/tools/spawn.go | 3 + pkg/tools/subagent.go | 3 + 6 files changed, 730 insertions(+), 8 deletions(-) diff --git a/pkg/agent/eventbus_test.go b/pkg/agent/eventbus_test.go index d57fac094..dadbc2f94 100644 --- a/pkg/agent/eventbus_test.go +++ b/pkg/agent/eventbus_test.go @@ -217,6 +217,374 @@ func TestAgentLoop_EmitsMinimalTurnEvents(t *testing.T) { } } +func TestAgentLoop_EmitsSteeringAndSkippedToolEvents(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "agent-eventbus-steering-*") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + cfg := &config.Config{ + Agents: config.AgentsConfig{ + Defaults: config.AgentDefaults{ + Workspace: tmpDir, + Model: "test-model", + MaxTokens: 4096, + MaxToolIterations: 10, + }, + }, + } + + tool1ExecCh := make(chan struct{}) + tool1 := &slowTool{name: "tool_one", duration: 50 * time.Millisecond, execCh: tool1ExecCh} + tool2 := &slowTool{name: "tool_two", duration: 50 * time.Millisecond} + + provider := &toolCallProvider{ + toolCalls: []providers.ToolCall{ + { + ID: "call_1", + Type: "function", + Name: "tool_one", + Function: &providers.FunctionCall{ + Name: "tool_one", + Arguments: "{}", + }, + Arguments: map[string]any{}, + }, + { + ID: "call_2", + Type: "function", + Name: "tool_two", + Function: &providers.FunctionCall{ + Name: "tool_two", + Arguments: "{}", + }, + Arguments: map[string]any{}, + }, + }, + finalResp: "steered response", + } + + msgBus := bus.NewMessageBus() + al := NewAgentLoop(cfg, msgBus, provider) + al.RegisterTool(tool1) + al.RegisterTool(tool2) + + sub := al.SubscribeEvents(32) + defer al.UnsubscribeEvents(sub.ID) + + resultCh := make(chan string, 1) + go func() { + resp, _ := al.ProcessDirectWithChannel(context.Background(), "do something", "test-session", "test", "chat1") + resultCh <- resp + }() + + select { + case <-tool1ExecCh: + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting for tool_one to start") + } + + if err := al.Steer(providers.Message{Role: "user", Content: "change course"}); err != nil { + t.Fatalf("Steer failed: %v", err) + } + + select { + case resp := <-resultCh: + if resp != "steered response" { + t.Fatalf("expected steered response, got %q", resp) + } + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for steered response") + } + + events := collectEventStream(sub.C) + steeringEvt, ok := findEvent(events, EventKindSteeringInjected) + if !ok { + t.Fatal("expected steering injected event") + } + steeringPayload, ok := steeringEvt.Payload.(SteeringInjectedPayload) + if !ok { + t.Fatalf("expected SteeringInjectedPayload, got %T", steeringEvt.Payload) + } + if steeringPayload.Count != 1 { + t.Fatalf("expected 1 steering message, got %d", steeringPayload.Count) + } + + skippedEvt, ok := findEvent(events, EventKindToolExecSkipped) + if !ok { + t.Fatal("expected skipped tool event") + } + skippedPayload, ok := skippedEvt.Payload.(ToolExecSkippedPayload) + if !ok { + t.Fatalf("expected ToolExecSkippedPayload, got %T", skippedEvt.Payload) + } + if skippedPayload.Tool != "tool_two" { + t.Fatalf("expected skipped tool_two, got %q", skippedPayload.Tool) + } + + interruptEvt, ok := findEvent(events, EventKindInterruptReceived) + if !ok { + t.Fatal("expected interrupt received event") + } + interruptPayload, ok := interruptEvt.Payload.(InterruptReceivedPayload) + if !ok { + t.Fatalf("expected InterruptReceivedPayload, got %T", interruptEvt.Payload) + } + if interruptPayload.Role != "user" { + t.Fatalf("expected interrupt role user, got %q", interruptPayload.Role) + } + if interruptPayload.ContentLen != len("change course") { + t.Fatalf("expected interrupt content len %d, got %d", len("change course"), interruptPayload.ContentLen) + } +} + +func TestAgentLoop_EmitsContextCompressEventOnRetry(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "agent-eventbus-compress-*") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + cfg := &config.Config{ + Agents: config.AgentsConfig{ + Defaults: config.AgentDefaults{ + Workspace: tmpDir, + Model: "test-model", + MaxTokens: 4096, + MaxToolIterations: 10, + }, + }, + } + + contextErr := errString("InvalidParameter: Total tokens of image and text exceed max message tokens") + provider := &failFirstMockProvider{ + failures: 1, + failError: contextErr, + successResp: "Recovered from context error", + } + msgBus := bus.NewMessageBus() + al := NewAgentLoop(cfg, msgBus, provider) + defaultAgent := al.registry.GetDefaultAgent() + if defaultAgent == nil { + t.Fatal("expected default agent") + } + + defaultAgent.Sessions.SetHistory("session-1", []providers.Message{ + {Role: "user", Content: "Old message 1"}, + {Role: "assistant", Content: "Old response 1"}, + {Role: "user", Content: "Old message 2"}, + {Role: "assistant", Content: "Old response 2"}, + {Role: "user", Content: "Trigger message"}, + }) + + sub := al.SubscribeEvents(16) + defer al.UnsubscribeEvents(sub.ID) + + resp, err := al.runAgentLoop(context.Background(), defaultAgent, processOptions{ + SessionKey: "session-1", + Channel: "cli", + ChatID: "direct", + UserMessage: "Trigger message", + DefaultResponse: defaultResponse, + EnableSummary: false, + SendResponse: false, + }) + if err != nil { + t.Fatalf("runAgentLoop failed: %v", err) + } + if resp != "Recovered from context error" { + t.Fatalf("expected retry success, got %q", resp) + } + + events := collectEventStream(sub.C) + retryEvt, ok := findEvent(events, EventKindLLMRetry) + if !ok { + t.Fatal("expected llm retry event") + } + retryPayload, ok := retryEvt.Payload.(LLMRetryPayload) + if !ok { + t.Fatalf("expected LLMRetryPayload, got %T", retryEvt.Payload) + } + if retryPayload.Reason != "context_limit" { + t.Fatalf("expected context_limit retry reason, got %q", retryPayload.Reason) + } + if retryPayload.Attempt != 1 { + t.Fatalf("expected retry attempt 1, got %d", retryPayload.Attempt) + } + + compressEvt, ok := findEvent(events, EventKindContextCompress) + if !ok { + t.Fatal("expected context compress event") + } + payload, ok := compressEvt.Payload.(ContextCompressPayload) + if !ok { + t.Fatalf("expected ContextCompressPayload, got %T", compressEvt.Payload) + } + if payload.Reason != ContextCompressReasonRetry { + t.Fatalf("expected retry compress reason, got %q", payload.Reason) + } + if payload.DroppedMessages == 0 { + t.Fatal("expected dropped messages to be recorded") + } +} + +func TestAgentLoop_EmitsSessionSummarizeEvent(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "agent-eventbus-summary-*") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + cfg := &config.Config{ + Agents: config.AgentsConfig{ + Defaults: config.AgentDefaults{ + Workspace: tmpDir, + Model: "test-model", + MaxTokens: 4096, + MaxToolIterations: 10, + ContextWindow: 8000, + SummarizeMessageThreshold: 2, + SummarizeTokenPercent: 75, + }, + }, + } + + msgBus := bus.NewMessageBus() + al := NewAgentLoop(cfg, msgBus, &simpleMockProvider{response: "summary text"}) + defaultAgent := al.registry.GetDefaultAgent() + if defaultAgent == nil { + t.Fatal("expected default agent") + } + + defaultAgent.Sessions.SetHistory("session-1", []providers.Message{ + {Role: "user", Content: "Question one"}, + {Role: "assistant", Content: "Answer one"}, + {Role: "user", Content: "Question two"}, + {Role: "assistant", Content: "Answer two"}, + {Role: "user", Content: "Question three"}, + {Role: "assistant", Content: "Answer three"}, + }) + + sub := al.SubscribeEvents(16) + defer al.UnsubscribeEvents(sub.ID) + + turnScope := al.newTurnEventScope(defaultAgent.ID, "session-1") + al.summarizeSession(defaultAgent, "session-1", turnScope) + + events := collectEventStream(sub.C) + summaryEvt, ok := findEvent(events, EventKindSessionSummarize) + if !ok { + t.Fatal("expected session summarize event") + } + payload, ok := summaryEvt.Payload.(SessionSummarizePayload) + if !ok { + t.Fatalf("expected SessionSummarizePayload, got %T", summaryEvt.Payload) + } + if payload.SummaryLen == 0 { + t.Fatal("expected non-empty summary length") + } +} + +func TestAgentLoop_EmitsFollowUpQueuedEvent(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "agent-eventbus-followup-*") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + cfg := &config.Config{ + Agents: config.AgentsConfig{ + Defaults: config.AgentDefaults{ + Workspace: tmpDir, + Model: "test-model", + MaxTokens: 4096, + MaxToolIterations: 10, + }, + }, + } + + provider := &toolCallProvider{ + toolCalls: []providers.ToolCall{ + { + ID: "call_async_1", + Type: "function", + Name: "async_followup", + Function: &providers.FunctionCall{ + Name: "async_followup", + Arguments: "{}", + }, + Arguments: map[string]any{}, + }, + }, + finalResp: "async launched", + } + + msgBus := bus.NewMessageBus() + al := NewAgentLoop(cfg, msgBus, provider) + doneCh := make(chan struct{}) + al.RegisterTool(&asyncFollowUpTool{ + name: "async_followup", + followUpText: "background result", + completionSig: doneCh, + }) + defaultAgent := al.registry.GetDefaultAgent() + if defaultAgent == nil { + t.Fatal("expected default agent") + } + + sub := al.SubscribeEvents(32) + defer al.UnsubscribeEvents(sub.ID) + + resp, err := al.runAgentLoop(context.Background(), defaultAgent, processOptions{ + SessionKey: "session-1", + Channel: "cli", + ChatID: "direct", + UserMessage: "run async tool", + DefaultResponse: defaultResponse, + EnableSummary: false, + SendResponse: false, + }) + if err != nil { + t.Fatalf("runAgentLoop failed: %v", err) + } + if resp != "async launched" { + t.Fatalf("expected final response 'async launched', got %q", resp) + } + + select { + case <-doneCh: + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting for async tool completion") + } + + followUpEvt := waitForEvent(t, sub.C, 2*time.Second, func(evt Event) bool { + return evt.Kind == EventKindFollowUpQueued + }) + payload, ok := followUpEvt.Payload.(FollowUpQueuedPayload) + if !ok { + t.Fatalf("expected FollowUpQueuedPayload, got %T", followUpEvt.Payload) + } + if payload.SourceTool != "async_followup" { + t.Fatalf("expected source tool async_followup, got %q", payload.SourceTool) + } + if payload.Channel != "cli" { + t.Fatalf("expected channel cli, got %q", payload.Channel) + } + if payload.ChatID != "direct" { + t.Fatalf("expected chat id direct, got %q", payload.ChatID) + } + if payload.ContentLen != len("background result") { + t.Fatalf("expected content len %d, got %d", len("background result"), payload.ContentLen) + } + if followUpEvt.Meta.SessionKey != "session-1" { + t.Fatalf("expected session key session-1, got %q", followUpEvt.Meta.SessionKey) + } + if followUpEvt.Meta.TurnID == "" { + t.Fatal("expected follow-up event to include turn id") + } +} + func collectEventStream(ch <-chan Event) []Event { var events []Event for { @@ -232,4 +600,80 @@ func collectEventStream(ch <-chan Event) []Event { } } +func waitForEvent(t *testing.T, ch <-chan Event, timeout time.Duration, match func(Event) bool) Event { + t.Helper() + + timer := time.NewTimer(timeout) + defer timer.Stop() + + for { + select { + case evt, ok := <-ch: + if !ok { + t.Fatal("event stream closed before expected event arrived") + } + if match(evt) { + return evt + } + case <-timer.C: + t.Fatal("timed out waiting for expected event") + } + } +} + +func findEvent(events []Event, kind EventKind) (Event, bool) { + for _, evt := range events { + if evt.Kind == kind { + return evt, true + } + } + return Event{}, false +} + +type errString string + +func (e errString) Error() string { + return string(e) +} + +type asyncFollowUpTool struct { + name string + followUpText string + completionSig chan struct{} +} + +func (t *asyncFollowUpTool) Name() string { + return t.name +} + +func (t *asyncFollowUpTool) Description() string { + return "async follow-up tool for testing" +} + +func (t *asyncFollowUpTool) Parameters() map[string]any { + return map[string]any{ + "type": "object", + "properties": map[string]any{}, + } +} + +func (t *asyncFollowUpTool) Execute(ctx context.Context, args map[string]any) *tools.ToolResult { + return tools.AsyncResult("async follow-up scheduled") +} + +func (t *asyncFollowUpTool) ExecuteAsync( + ctx context.Context, + args map[string]any, + cb tools.AsyncCallback, +) *tools.ToolResult { + go func() { + cb(ctx, &tools.ToolResult{ForLLM: t.followUpText}) + if t.completionSig != nil { + close(t.completionSig) + } + }() + return tools.AsyncResult("async follow-up scheduled") +} + var _ tools.Tool = (*mockCustomTool)(nil) +var _ tools.AsyncExecutor = (*asyncFollowUpTool)(nil) diff --git a/pkg/agent/events.go b/pkg/agent/events.go index 92aec7436..fae5033a3 100644 --- a/pkg/agent/events.go +++ b/pkg/agent/events.go @@ -15,12 +15,34 @@ const ( EventKindTurnEnd // EventKindLLMRequest is emitted before a provider chat request is made. EventKindLLMRequest + // EventKindLLMDelta is emitted when a streaming provider yields a partial delta. + EventKindLLMDelta // EventKindLLMResponse is emitted after a provider chat response is received. EventKindLLMResponse + // EventKindLLMRetry is emitted when an LLM request is retried. + EventKindLLMRetry + // EventKindContextCompress is emitted when session history is forcibly compressed. + EventKindContextCompress + // EventKindSessionSummarize is emitted when asynchronous summarization completes. + EventKindSessionSummarize // EventKindToolExecStart is emitted immediately before a tool executes. EventKindToolExecStart // EventKindToolExecEnd is emitted immediately after a tool finishes executing. EventKindToolExecEnd + // EventKindToolExecSkipped is emitted when a queued tool call is skipped. + EventKindToolExecSkipped + // EventKindSteeringInjected is emitted when queued steering is injected into context. + EventKindSteeringInjected + // EventKindFollowUpQueued is emitted when an async tool queues a follow-up system message. + EventKindFollowUpQueued + // EventKindInterruptReceived is emitted when a soft interrupt message is accepted. + EventKindInterruptReceived + // EventKindSubTurnSpawn is emitted when a sub-turn is spawned. + EventKindSubTurnSpawn + // EventKindSubTurnEnd is emitted when a sub-turn finishes. + EventKindSubTurnEnd + // EventKindSubTurnResultDelivered is emitted when a sub-turn result is delivered. + EventKindSubTurnResultDelivered // EventKindError is emitted when a turn encounters an execution error. EventKindError @@ -31,9 +53,20 @@ var eventKindNames = [...]string{ "turn_start", "turn_end", "llm_request", + "llm_delta", "llm_response", + "llm_retry", + "context_compress", + "session_summarize", "tool_exec_start", "tool_exec_end", + "tool_exec_skipped", + "steering_injected", + "follow_up_queued", + "interrupt_received", + "subturn_spawn", + "subturn_end", + "subturn_result_delivered", "error", } @@ -106,6 +139,46 @@ type LLMResponsePayload struct { HasReasoning bool } +// LLMDeltaPayload describes a streamed LLM delta. +type LLMDeltaPayload struct { + ContentDeltaLen int + ReasoningDeltaLen int +} + +// LLMRetryPayload describes a retry of an LLM request. +type LLMRetryPayload struct { + Attempt int + MaxRetries int + Reason string + Error string + Backoff time.Duration +} + +// ContextCompressReason identifies why emergency compression ran. +type ContextCompressReason string + +const ( + // ContextCompressReasonProactive indicates compression before the first LLM call. + ContextCompressReasonProactive ContextCompressReason = "proactive_budget" + // ContextCompressReasonRetry indicates compression during context-error retry handling. + ContextCompressReasonRetry ContextCompressReason = "llm_retry" +) + +// ContextCompressPayload describes a forced history compression. +type ContextCompressPayload struct { + Reason ContextCompressReason + DroppedMessages int + RemainingMessages int +} + +// SessionSummarizePayload describes a completed async session summarization. +type SessionSummarizePayload struct { + SummarizedMessages int + KeptMessages int + SummaryLen int + OmittedOversized bool +} + // ToolExecStartPayload describes a tool execution request. type ToolExecStartPayload struct { Tool string @@ -122,6 +195,52 @@ type ToolExecEndPayload struct { Async bool } +// ToolExecSkippedPayload describes a skipped tool call. +type ToolExecSkippedPayload struct { + Tool string + Reason string +} + +// SteeringInjectedPayload describes steering messages appended before the next LLM call. +type SteeringInjectedPayload struct { + Count int + TotalContentLen int +} + +// FollowUpQueuedPayload describes an async follow-up queued back into the inbound bus. +type FollowUpQueuedPayload struct { + SourceTool string + Channel string + ChatID string + ContentLen int +} + +// InterruptReceivedPayload describes a queued soft interrupt. +type InterruptReceivedPayload struct { + Role string + ContentLen int + QueueDepth int +} + +// SubTurnSpawnPayload describes the creation of a child turn. +type SubTurnSpawnPayload struct { + AgentID string + Label string +} + +// SubTurnEndPayload describes the completion of a child turn. +type SubTurnEndPayload struct { + AgentID string + Status string +} + +// SubTurnResultDeliveredPayload describes delivery of a sub-turn result. +type SubTurnResultDeliveredPayload struct { + TargetChannel string + TargetChatID string + ContentLen int +} + // ErrorPayload describes an execution error inside the agent loop. type ErrorPayload struct { Stage string diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index ac97104b1..877dbbd94 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -499,10 +499,28 @@ func (al *AgentLoop) logEvent(evt Event) { fields["messages"] = payload.MessagesCount fields["tools"] = payload.ToolsCount fields["max_tokens"] = payload.MaxTokens + case LLMDeltaPayload: + fields["content_delta_len"] = payload.ContentDeltaLen + fields["reasoning_delta_len"] = payload.ReasoningDeltaLen case LLMResponsePayload: fields["content_len"] = payload.ContentLen fields["tool_calls"] = payload.ToolCalls fields["has_reasoning"] = payload.HasReasoning + case LLMRetryPayload: + fields["attempt"] = payload.Attempt + fields["max_retries"] = payload.MaxRetries + fields["reason"] = payload.Reason + fields["error"] = payload.Error + fields["backoff_ms"] = payload.Backoff.Milliseconds() + case ContextCompressPayload: + fields["reason"] = payload.Reason + fields["dropped_messages"] = payload.DroppedMessages + fields["remaining_messages"] = payload.RemainingMessages + case SessionSummarizePayload: + fields["summarized_messages"] = payload.SummarizedMessages + fields["kept_messages"] = payload.KeptMessages + fields["summary_len"] = payload.SummaryLen + fields["omitted_oversized"] = payload.OmittedOversized case ToolExecStartPayload: fields["tool"] = payload.Tool fields["args_count"] = len(payload.Arguments) @@ -513,6 +531,31 @@ func (al *AgentLoop) logEvent(evt Event) { fields["for_user_len"] = payload.ForUserLen fields["is_error"] = payload.IsError fields["async"] = payload.Async + case ToolExecSkippedPayload: + fields["tool"] = payload.Tool + fields["reason"] = payload.Reason + case SteeringInjectedPayload: + fields["count"] = payload.Count + fields["total_content_len"] = payload.TotalContentLen + case FollowUpQueuedPayload: + fields["source_tool"] = payload.SourceTool + fields["channel"] = payload.Channel + fields["chat_id"] = payload.ChatID + fields["content_len"] = payload.ContentLen + case InterruptReceivedPayload: + fields["role"] = payload.Role + fields["content_len"] = payload.ContentLen + fields["queue_depth"] = payload.QueueDepth + case SubTurnSpawnPayload: + fields["child_agent_id"] = payload.AgentID + fields["label"] = payload.Label + case SubTurnEndPayload: + fields["child_agent_id"] = payload.AgentID + fields["status"] = payload.Status + case SubTurnResultDeliveredPayload: + fields["target_channel"] = payload.TargetChannel + fields["target_chat_id"] = payload.TargetChatID + fields["content_len"] = payload.ContentLen case ErrorPayload: fields["stage"] = payload.Stage fields["error"] = payload.Message @@ -1105,7 +1148,17 @@ func (al *AgentLoop) runAgentLoop( if isOverContextBudget(agent.ContextWindow, messages, toolDefs, agent.MaxTokens) { logger.WarnCF("agent", "Proactive compression: context budget exceeded before LLM call", map[string]any{"session_key": opts.SessionKey}) - al.forceCompression(agent, opts.SessionKey) + if compression, ok := al.forceCompression(agent, opts.SessionKey); ok { + al.emitEvent( + EventKindContextCompress, + turnScope.meta(0, "runAgentLoop", "turn.context.compress"), + ContextCompressPayload{ + Reason: ContextCompressReasonProactive, + DroppedMessages: compression.DroppedMessages, + RemainingMessages: compression.RemainingMessages, + }, + ) + } newHistory := agent.Sessions.GetHistory(opts.SessionKey) newSummary := agent.Sessions.GetSummary(opts.SessionKey) messages = agent.ContextBuilder.BuildMessages( @@ -1142,7 +1195,7 @@ func (al *AgentLoop) runAgentLoop( // 6. Optional: summarization if opts.EnableSummary { - al.maybeSummarize(agent, opts.SessionKey, opts.Channel, opts.ChatID) + al.maybeSummarize(agent, opts.SessionKey, turnScope) } // 7. Optional: send response via bus @@ -1256,9 +1309,11 @@ func (al *AgentLoop) runLLMIteration( // Inject pending steering messages into the conversation context // before the next LLM call. if len(pendingMessages) > 0 { + totalContentLen := 0 for _, pm := range pendingMessages { messages = append(messages, pm) agent.Sessions.AddMessage(opts.SessionKey, pm.Role, pm.Content) + totalContentLen += len(pm.Content) logger.InfoCF("agent", "Injected steering message into context", map[string]any{ "agent_id": agent.ID, @@ -1266,6 +1321,14 @@ func (al *AgentLoop) runLLMIteration( "content_len": len(pm.Content), }) } + al.emitEvent( + EventKindSteeringInjected, + turnScope.meta(iteration, "runLLMIteration", "turn.steering.injected"), + SteeringInjectedPayload{ + Count: len(pendingMessages), + TotalContentLen: totalContentLen, + }, + ) pendingMessages = nil } @@ -1334,6 +1397,8 @@ func (al *AgentLoop) runLLMIteration( callLLM := func() (*providers.LLMResponse, error) { al.activeRequests.Add(1) defer al.activeRequests.Done() + // TODO(eventbus): emit EventKindLLMDelta when providers expose + // streaming callbacks instead of only the final Chat response. if len(activeCandidates) > 1 && al.fallback != nil { fbResult, fbErr := al.fallback.Execute( @@ -1389,6 +1454,17 @@ func (al *AgentLoop) runLLMIteration( if isTimeoutError && retry < maxRetries { backoff := time.Duration(retry+1) * 5 * time.Second + al.emitEvent( + EventKindLLMRetry, + turnScope.meta(iteration, "runLLMIteration", "turn.llm.retry"), + LLMRetryPayload{ + Attempt: retry + 1, + MaxRetries: maxRetries, + Reason: "timeout", + Error: err.Error(), + Backoff: backoff, + }, + ) logger.WarnCF("agent", "Timeout error, retrying after backoff", map[string]any{ "error": err.Error(), "retry": retry, @@ -1399,6 +1475,16 @@ func (al *AgentLoop) runLLMIteration( } if isContextError && retry < maxRetries { + al.emitEvent( + EventKindLLMRetry, + turnScope.meta(iteration, "runLLMIteration", "turn.llm.retry"), + LLMRetryPayload{ + Attempt: retry + 1, + MaxRetries: maxRetries, + Reason: "context_limit", + Error: err.Error(), + }, + ) logger.WarnCF( "agent", "Context window error detected, attempting compression", @@ -1416,7 +1502,17 @@ func (al *AgentLoop) runLLMIteration( }) } - al.forceCompression(agent, opts.SessionKey) + if compression, ok := al.forceCompression(agent, opts.SessionKey); ok { + al.emitEvent( + EventKindContextCompress, + turnScope.meta(iteration, "runLLMIteration", "turn.context.compress"), + ContextCompressPayload{ + Reason: ContextCompressReasonRetry, + DroppedMessages: compression.DroppedMessages, + RemainingMessages: compression.RemainingMessages, + }, + ) + } newHistory := agent.Sessions.GetHistory(opts.SessionKey) newSummary := agent.Sessions.GetSummary(opts.SessionKey) messages = agent.ContextBuilder.BuildMessages( @@ -1587,6 +1683,16 @@ func (al *AgentLoop) runLLMIteration( "content_len": len(content), "channel": opts.Channel, }) + al.emitEvent( + EventKindFollowUpQueued, + turnScope.meta(iteration, "runLLMIteration", "turn.follow_up.queued"), + FollowUpQueuedPayload{ + SourceTool: tc.Name, + Channel: opts.Channel, + ChatID: opts.ChatID, + ContentLen: len(content), + }, + ) pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second) defer pubCancel() @@ -1686,6 +1792,14 @@ func (al *AgentLoop) runLLMIteration( // Mark remaining tool calls as skipped for j := i + 1; j < len(normalizedToolCalls); j++ { skippedTC := normalizedToolCalls[j] + al.emitEvent( + EventKindToolExecSkipped, + turnScope.meta(iteration, "runLLMIteration", "turn.tool.skipped"), + ToolExecSkippedPayload{ + Tool: skippedTC.Name, + Reason: "queued user steering message", + }, + ) toolResultMsg := providers.Message{ Role: "tool", Content: "Skipped due to queued user message.", @@ -1760,7 +1874,7 @@ func (al *AgentLoop) selectCandidates( } // maybeSummarize triggers summarization if the session history exceeds thresholds. -func (al *AgentLoop) maybeSummarize(agent *AgentInstance, sessionKey, channel, chatID string) { +func (al *AgentLoop) maybeSummarize(agent *AgentInstance, sessionKey string, turnScope turnEventScope) { newHistory := agent.Sessions.GetHistory(sessionKey) tokenEstimate := al.estimateTokens(newHistory) threshold := agent.ContextWindow * agent.SummarizeTokenPercent / 100 @@ -1771,12 +1885,17 @@ func (al *AgentLoop) maybeSummarize(agent *AgentInstance, sessionKey, channel, c go func() { defer al.summarizing.Delete(summarizeKey) logger.Debug("Memory threshold reached. Optimizing conversation history...") - al.summarizeSession(agent, sessionKey) + al.summarizeSession(agent, sessionKey, turnScope) }() } } } +type compressionResult struct { + DroppedMessages int + RemainingMessages int +} + // forceCompression aggressively reduces context when the limit is hit. // It drops the oldest ~50% of Turns (a Turn is a complete user→LLM→response // cycle, as defined in #1316), so tool-call sequences are never split. @@ -1789,10 +1908,10 @@ func (al *AgentLoop) maybeSummarize(agent *AgentInstance, sessionKey, channel, c // prompt is built dynamically by BuildMessages and is NOT stored here. // The compression note is recorded in the session summary so that // BuildMessages can include it in the next system prompt. -func (al *AgentLoop) forceCompression(agent *AgentInstance, sessionKey string) { +func (al *AgentLoop) forceCompression(agent *AgentInstance, sessionKey string) (compressionResult, bool) { history := agent.Sessions.GetHistory(sessionKey) if len(history) <= 2 { - return + return compressionResult{}, false } // Split at a Turn boundary so no tool-call sequence is torn apart. @@ -1846,6 +1965,11 @@ func (al *AgentLoop) forceCompression(agent *AgentInstance, sessionKey string) { "dropped_msgs": droppedCount, "new_count": len(keptHistory), }) + + return compressionResult{ + DroppedMessages: droppedCount, + RemainingMessages: len(keptHistory), + }, true } // GetStartupInfo returns information about loaded tools and skills for logging. @@ -1937,7 +2061,7 @@ func formatToolsForLog(toolDefs []providers.ToolDefinition) string { } // summarizeSession summarizes the conversation history for a session. -func (al *AgentLoop) summarizeSession(agent *AgentInstance, sessionKey string) { +func (al *AgentLoop) summarizeSession(agent *AgentInstance, sessionKey string, turnScope turnEventScope) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() @@ -2022,6 +2146,16 @@ func (al *AgentLoop) summarizeSession(agent *AgentInstance, sessionKey string) { agent.Sessions.SetSummary(sessionKey, finalSummary) agent.Sessions.TruncateHistory(sessionKey, keepCount) agent.Sessions.Save(sessionKey) + al.emitEvent( + EventKindSessionSummarize, + turnScope.meta(0, "summarizeSession", "turn.session.summarize"), + SessionSummarizePayload{ + SummarizedMessages: len(validMessages), + KeptMessages: keepCount, + SummaryLen: len(finalSummary), + OmittedOversized: omitted, + }, + ) } } diff --git a/pkg/agent/steering.go b/pkg/agent/steering.go index 8c7c79c16..90d1cc091 100644 --- a/pkg/agent/steering.go +++ b/pkg/agent/steering.go @@ -122,6 +122,25 @@ func (al *AgentLoop) Steer(msg providers.Message) error { "content_len": len(msg.Content), "queue_len": al.steering.len(), }) + agentID := "" + if registry := al.GetRegistry(); registry != nil { + if agent := registry.GetDefaultAgent(); agent != nil { + agentID = agent.ID + } + } + al.emitEvent( + EventKindInterruptReceived, + EventMeta{ + AgentID: agentID, + Source: "Steer", + TracePath: "turn.interrupt.received", + }, + InterruptReceivedPayload{ + Role: msg.Role, + ContentLen: len(msg.Content), + QueueDepth: al.steering.len(), + }, + ) return nil } diff --git a/pkg/tools/spawn.go b/pkg/tools/spawn.go index be40ffda2..34ccc80e4 100644 --- a/pkg/tools/spawn.go +++ b/pkg/tools/spawn.go @@ -96,6 +96,9 @@ func (t *SpawnTool) execute(ctx context.Context, args map[string]any, cb AsyncCa } // Pass callback to manager for async completion notification + // TODO(eventbus): when background subagents are migrated onto the + // agent package's runTurn/sub-turn tree, emit SubTurnSpawn here and move + // lifecycle events out of the legacy SubagentManager path. result, err := t.manager.Spawn(ctx, task, label, agentID, channel, chatID, cb) if err != nil { return ErrorResult(fmt.Sprintf("failed to spawn subagent: %v", err)) diff --git a/pkg/tools/subagent.go b/pkg/tools/subagent.go index e51cbaafa..9915c5900 100644 --- a/pkg/tools/subagent.go +++ b/pkg/tools/subagent.go @@ -111,6 +111,9 @@ func (sm *SubagentManager) Spawn( func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask, callback AsyncCallback) { task.Status = "running" task.Created = time.Now().UnixMilli() + // TODO(eventbus): once subagents are modeled as child turns inside + // pkg/agent, emit SubTurnEnd and SubTurnResultDelivered from the parent + // AgentLoop instead of this legacy manager. // Build system prompt for subagent systemPrompt := `You are a subagent. Complete the given task independently and report the result. From a65e0e95d618bc7437d80acb529a9568cce7b44c Mon Sep 17 00:00:00 2001 From: Hoshina Date: Fri, 20 Mar 2026 15:45:27 +0800 Subject: [PATCH 4/4] fix: lint err --- pkg/agent/eventbus_test.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/agent/eventbus_test.go b/pkg/agent/eventbus_test.go index dadbc2f94..13f2f2282 100644 --- a/pkg/agent/eventbus_test.go +++ b/pkg/agent/eventbus_test.go @@ -357,7 +357,7 @@ func TestAgentLoop_EmitsContextCompressEventOnRetry(t *testing.T) { }, } - contextErr := errString("InvalidParameter: Total tokens of image and text exceed max message tokens") + contextErr := stringError("InvalidParameter: Total tokens of image and text exceed max message tokens") provider := &failFirstMockProvider{ failures: 1, failError: contextErr, @@ -630,9 +630,9 @@ func findEvent(events []Event, kind EventKind) (Event, bool) { return Event{}, false } -type errString string +type stringError string -func (e errString) Error() string { +func (e stringError) Error() string { return string(e) } @@ -675,5 +675,7 @@ func (t *asyncFollowUpTool) ExecuteAsync( return tools.AsyncResult("async follow-up scheduled") } -var _ tools.Tool = (*mockCustomTool)(nil) -var _ tools.AsyncExecutor = (*asyncFollowUpTool)(nil) +var ( + _ tools.Tool = (*mockCustomTool)(nil) + _ tools.AsyncExecutor = (*asyncFollowUpTool)(nil) +)