From 57cde73b36cc27da4f7979b5526eabaad0f0bfed Mon Sep 17 00:00:00 2001 From: Hoshina Date: Fri, 20 Mar 2026 15:29:52 +0800 Subject: [PATCH] feat(agent): expand event bus coverage --- pkg/agent/eventbus_test.go | 444 +++++++++++++++++++++++++++++++++++++ pkg/agent/events.go | 119 ++++++++++ pkg/agent/loop.go | 150 ++++++++++++- pkg/agent/steering.go | 19 ++ pkg/tools/spawn.go | 3 + pkg/tools/subagent.go | 3 + 6 files changed, 730 insertions(+), 8 deletions(-) diff --git a/pkg/agent/eventbus_test.go b/pkg/agent/eventbus_test.go index d57fac094..dadbc2f94 100644 --- a/pkg/agent/eventbus_test.go +++ b/pkg/agent/eventbus_test.go @@ -217,6 +217,374 @@ func TestAgentLoop_EmitsMinimalTurnEvents(t *testing.T) { } } +func TestAgentLoop_EmitsSteeringAndSkippedToolEvents(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "agent-eventbus-steering-*") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + cfg := &config.Config{ + Agents: config.AgentsConfig{ + Defaults: config.AgentDefaults{ + Workspace: tmpDir, + Model: "test-model", + MaxTokens: 4096, + MaxToolIterations: 10, + }, + }, + } + + tool1ExecCh := make(chan struct{}) + tool1 := &slowTool{name: "tool_one", duration: 50 * time.Millisecond, execCh: tool1ExecCh} + tool2 := &slowTool{name: "tool_two", duration: 50 * time.Millisecond} + + provider := &toolCallProvider{ + toolCalls: []providers.ToolCall{ + { + ID: "call_1", + Type: "function", + Name: "tool_one", + Function: &providers.FunctionCall{ + Name: "tool_one", + Arguments: "{}", + }, + Arguments: map[string]any{}, + }, + { + ID: "call_2", + Type: "function", + Name: "tool_two", + Function: &providers.FunctionCall{ + Name: "tool_two", + Arguments: "{}", + }, + Arguments: map[string]any{}, + }, + }, + finalResp: "steered response", + } + + msgBus := bus.NewMessageBus() + al := NewAgentLoop(cfg, msgBus, provider) + al.RegisterTool(tool1) + al.RegisterTool(tool2) + + sub := al.SubscribeEvents(32) + defer al.UnsubscribeEvents(sub.ID) + + resultCh := make(chan string, 1) + go func() { + resp, _ := al.ProcessDirectWithChannel(context.Background(), "do something", "test-session", "test", "chat1") + resultCh <- resp + }() + + select { + case <-tool1ExecCh: + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting for tool_one to start") + } + + if err := al.Steer(providers.Message{Role: "user", Content: "change course"}); err != nil { + t.Fatalf("Steer failed: %v", err) + } + + select { + case resp := <-resultCh: + if resp != "steered response" { + t.Fatalf("expected steered response, got %q", resp) + } + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for steered response") + } + + events := collectEventStream(sub.C) + steeringEvt, ok := findEvent(events, EventKindSteeringInjected) + if !ok { + t.Fatal("expected steering injected event") + } + steeringPayload, ok := steeringEvt.Payload.(SteeringInjectedPayload) + if !ok { + t.Fatalf("expected SteeringInjectedPayload, got %T", steeringEvt.Payload) + } + if steeringPayload.Count != 1 { + t.Fatalf("expected 1 steering message, got %d", steeringPayload.Count) + } + + skippedEvt, ok := findEvent(events, EventKindToolExecSkipped) + if !ok { + t.Fatal("expected skipped tool event") + } + skippedPayload, ok := skippedEvt.Payload.(ToolExecSkippedPayload) + if !ok { + t.Fatalf("expected ToolExecSkippedPayload, got %T", skippedEvt.Payload) + } + if skippedPayload.Tool != "tool_two" { + t.Fatalf("expected skipped tool_two, got %q", skippedPayload.Tool) + } + + interruptEvt, ok := findEvent(events, EventKindInterruptReceived) + if !ok { + t.Fatal("expected interrupt received event") + } + interruptPayload, ok := interruptEvt.Payload.(InterruptReceivedPayload) + if !ok { + t.Fatalf("expected InterruptReceivedPayload, got %T", interruptEvt.Payload) + } + if interruptPayload.Role != "user" { + t.Fatalf("expected interrupt role user, got %q", interruptPayload.Role) + } + if interruptPayload.ContentLen != len("change course") { + t.Fatalf("expected interrupt content len %d, got %d", len("change course"), interruptPayload.ContentLen) + } +} + +func TestAgentLoop_EmitsContextCompressEventOnRetry(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "agent-eventbus-compress-*") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + cfg := &config.Config{ + Agents: config.AgentsConfig{ + Defaults: config.AgentDefaults{ + Workspace: tmpDir, + Model: "test-model", + MaxTokens: 4096, + MaxToolIterations: 10, + }, + }, + } + + contextErr := errString("InvalidParameter: Total tokens of image and text exceed max message tokens") + provider := &failFirstMockProvider{ + failures: 1, + failError: contextErr, + successResp: "Recovered from context error", + } + msgBus := bus.NewMessageBus() + al := NewAgentLoop(cfg, msgBus, provider) + defaultAgent := al.registry.GetDefaultAgent() + if defaultAgent == nil { + t.Fatal("expected default agent") + } + + defaultAgent.Sessions.SetHistory("session-1", []providers.Message{ + {Role: "user", Content: "Old message 1"}, + {Role: "assistant", Content: "Old response 1"}, + {Role: "user", Content: "Old message 2"}, + {Role: "assistant", Content: "Old response 2"}, + {Role: "user", Content: "Trigger message"}, + }) + + sub := al.SubscribeEvents(16) + defer al.UnsubscribeEvents(sub.ID) + + resp, err := al.runAgentLoop(context.Background(), defaultAgent, processOptions{ + SessionKey: "session-1", + Channel: "cli", + ChatID: "direct", + UserMessage: "Trigger message", + DefaultResponse: defaultResponse, + EnableSummary: false, + SendResponse: false, + }) + if err != nil { + t.Fatalf("runAgentLoop failed: %v", err) + } + if resp != "Recovered from context error" { + t.Fatalf("expected retry success, got %q", resp) + } + + events := collectEventStream(sub.C) + retryEvt, ok := findEvent(events, EventKindLLMRetry) + if !ok { + t.Fatal("expected llm retry event") + } + retryPayload, ok := retryEvt.Payload.(LLMRetryPayload) + if !ok { + t.Fatalf("expected LLMRetryPayload, got %T", retryEvt.Payload) + } + if retryPayload.Reason != "context_limit" { + t.Fatalf("expected context_limit retry reason, got %q", retryPayload.Reason) + } + if retryPayload.Attempt != 1 { + t.Fatalf("expected retry attempt 1, got %d", retryPayload.Attempt) + } + + compressEvt, ok := findEvent(events, EventKindContextCompress) + if !ok { + t.Fatal("expected context compress event") + } + payload, ok := compressEvt.Payload.(ContextCompressPayload) + if !ok { + t.Fatalf("expected ContextCompressPayload, got %T", compressEvt.Payload) + } + if payload.Reason != ContextCompressReasonRetry { + t.Fatalf("expected retry compress reason, got %q", payload.Reason) + } + if payload.DroppedMessages == 0 { + t.Fatal("expected dropped messages to be recorded") + } +} + +func TestAgentLoop_EmitsSessionSummarizeEvent(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "agent-eventbus-summary-*") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + cfg := &config.Config{ + Agents: config.AgentsConfig{ + Defaults: config.AgentDefaults{ + Workspace: tmpDir, + Model: "test-model", + MaxTokens: 4096, + MaxToolIterations: 10, + ContextWindow: 8000, + SummarizeMessageThreshold: 2, + SummarizeTokenPercent: 75, + }, + }, + } + + msgBus := bus.NewMessageBus() + al := NewAgentLoop(cfg, msgBus, &simpleMockProvider{response: "summary text"}) + defaultAgent := al.registry.GetDefaultAgent() + if defaultAgent == nil { + t.Fatal("expected default agent") + } + + defaultAgent.Sessions.SetHistory("session-1", []providers.Message{ + {Role: "user", Content: "Question one"}, + {Role: "assistant", Content: "Answer one"}, + {Role: "user", Content: "Question two"}, + {Role: "assistant", Content: "Answer two"}, + {Role: "user", Content: "Question three"}, + {Role: "assistant", Content: "Answer three"}, + }) + + sub := al.SubscribeEvents(16) + defer al.UnsubscribeEvents(sub.ID) + + turnScope := al.newTurnEventScope(defaultAgent.ID, "session-1") + al.summarizeSession(defaultAgent, "session-1", turnScope) + + events := collectEventStream(sub.C) + summaryEvt, ok := findEvent(events, EventKindSessionSummarize) + if !ok { + t.Fatal("expected session summarize event") + } + payload, ok := summaryEvt.Payload.(SessionSummarizePayload) + if !ok { + t.Fatalf("expected SessionSummarizePayload, got %T", summaryEvt.Payload) + } + if payload.SummaryLen == 0 { + t.Fatal("expected non-empty summary length") + } +} + +func TestAgentLoop_EmitsFollowUpQueuedEvent(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "agent-eventbus-followup-*") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + cfg := &config.Config{ + Agents: config.AgentsConfig{ + Defaults: config.AgentDefaults{ + Workspace: tmpDir, + Model: "test-model", + MaxTokens: 4096, + MaxToolIterations: 10, + }, + }, + } + + provider := &toolCallProvider{ + toolCalls: []providers.ToolCall{ + { + ID: "call_async_1", + Type: "function", + Name: "async_followup", + Function: &providers.FunctionCall{ + Name: "async_followup", + Arguments: "{}", + }, + Arguments: map[string]any{}, + }, + }, + finalResp: "async launched", + } + + msgBus := bus.NewMessageBus() + al := NewAgentLoop(cfg, msgBus, provider) + doneCh := make(chan struct{}) + al.RegisterTool(&asyncFollowUpTool{ + name: "async_followup", + followUpText: "background result", + completionSig: doneCh, + }) + defaultAgent := al.registry.GetDefaultAgent() + if defaultAgent == nil { + t.Fatal("expected default agent") + } + + sub := al.SubscribeEvents(32) + defer al.UnsubscribeEvents(sub.ID) + + resp, err := al.runAgentLoop(context.Background(), defaultAgent, processOptions{ + SessionKey: "session-1", + Channel: "cli", + ChatID: "direct", + UserMessage: "run async tool", + DefaultResponse: defaultResponse, + EnableSummary: false, + SendResponse: false, + }) + if err != nil { + t.Fatalf("runAgentLoop failed: %v", err) + } + if resp != "async launched" { + t.Fatalf("expected final response 'async launched', got %q", resp) + } + + select { + case <-doneCh: + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting for async tool completion") + } + + followUpEvt := waitForEvent(t, sub.C, 2*time.Second, func(evt Event) bool { + return evt.Kind == EventKindFollowUpQueued + }) + payload, ok := followUpEvt.Payload.(FollowUpQueuedPayload) + if !ok { + t.Fatalf("expected FollowUpQueuedPayload, got %T", followUpEvt.Payload) + } + if payload.SourceTool != "async_followup" { + t.Fatalf("expected source tool async_followup, got %q", payload.SourceTool) + } + if payload.Channel != "cli" { + t.Fatalf("expected channel cli, got %q", payload.Channel) + } + if payload.ChatID != "direct" { + t.Fatalf("expected chat id direct, got %q", payload.ChatID) + } + if payload.ContentLen != len("background result") { + t.Fatalf("expected content len %d, got %d", len("background result"), payload.ContentLen) + } + if followUpEvt.Meta.SessionKey != "session-1" { + t.Fatalf("expected session key session-1, got %q", followUpEvt.Meta.SessionKey) + } + if followUpEvt.Meta.TurnID == "" { + t.Fatal("expected follow-up event to include turn id") + } +} + func collectEventStream(ch <-chan Event) []Event { var events []Event for { @@ -232,4 +600,80 @@ func collectEventStream(ch <-chan Event) []Event { } } +func waitForEvent(t *testing.T, ch <-chan Event, timeout time.Duration, match func(Event) bool) Event { + t.Helper() + + timer := time.NewTimer(timeout) + defer timer.Stop() + + for { + select { + case evt, ok := <-ch: + if !ok { + t.Fatal("event stream closed before expected event arrived") + } + if match(evt) { + return evt + } + case <-timer.C: + t.Fatal("timed out waiting for expected event") + } + } +} + +func findEvent(events []Event, kind EventKind) (Event, bool) { + for _, evt := range events { + if evt.Kind == kind { + return evt, true + } + } + return Event{}, false +} + +type errString string + +func (e errString) Error() string { + return string(e) +} + +type asyncFollowUpTool struct { + name string + followUpText string + completionSig chan struct{} +} + +func (t *asyncFollowUpTool) Name() string { + return t.name +} + +func (t *asyncFollowUpTool) Description() string { + return "async follow-up tool for testing" +} + +func (t *asyncFollowUpTool) Parameters() map[string]any { + return map[string]any{ + "type": "object", + "properties": map[string]any{}, + } +} + +func (t *asyncFollowUpTool) Execute(ctx context.Context, args map[string]any) *tools.ToolResult { + return tools.AsyncResult("async follow-up scheduled") +} + +func (t *asyncFollowUpTool) ExecuteAsync( + ctx context.Context, + args map[string]any, + cb tools.AsyncCallback, +) *tools.ToolResult { + go func() { + cb(ctx, &tools.ToolResult{ForLLM: t.followUpText}) + if t.completionSig != nil { + close(t.completionSig) + } + }() + return tools.AsyncResult("async follow-up scheduled") +} + var _ tools.Tool = (*mockCustomTool)(nil) +var _ tools.AsyncExecutor = (*asyncFollowUpTool)(nil) diff --git a/pkg/agent/events.go b/pkg/agent/events.go index 92aec7436..fae5033a3 100644 --- a/pkg/agent/events.go +++ b/pkg/agent/events.go @@ -15,12 +15,34 @@ const ( EventKindTurnEnd // EventKindLLMRequest is emitted before a provider chat request is made. EventKindLLMRequest + // EventKindLLMDelta is emitted when a streaming provider yields a partial delta. + EventKindLLMDelta // EventKindLLMResponse is emitted after a provider chat response is received. EventKindLLMResponse + // EventKindLLMRetry is emitted when an LLM request is retried. + EventKindLLMRetry + // EventKindContextCompress is emitted when session history is forcibly compressed. + EventKindContextCompress + // EventKindSessionSummarize is emitted when asynchronous summarization completes. + EventKindSessionSummarize // EventKindToolExecStart is emitted immediately before a tool executes. EventKindToolExecStart // EventKindToolExecEnd is emitted immediately after a tool finishes executing. EventKindToolExecEnd + // EventKindToolExecSkipped is emitted when a queued tool call is skipped. + EventKindToolExecSkipped + // EventKindSteeringInjected is emitted when queued steering is injected into context. + EventKindSteeringInjected + // EventKindFollowUpQueued is emitted when an async tool queues a follow-up system message. + EventKindFollowUpQueued + // EventKindInterruptReceived is emitted when a soft interrupt message is accepted. + EventKindInterruptReceived + // EventKindSubTurnSpawn is emitted when a sub-turn is spawned. + EventKindSubTurnSpawn + // EventKindSubTurnEnd is emitted when a sub-turn finishes. + EventKindSubTurnEnd + // EventKindSubTurnResultDelivered is emitted when a sub-turn result is delivered. + EventKindSubTurnResultDelivered // EventKindError is emitted when a turn encounters an execution error. EventKindError @@ -31,9 +53,20 @@ var eventKindNames = [...]string{ "turn_start", "turn_end", "llm_request", + "llm_delta", "llm_response", + "llm_retry", + "context_compress", + "session_summarize", "tool_exec_start", "tool_exec_end", + "tool_exec_skipped", + "steering_injected", + "follow_up_queued", + "interrupt_received", + "subturn_spawn", + "subturn_end", + "subturn_result_delivered", "error", } @@ -106,6 +139,46 @@ type LLMResponsePayload struct { HasReasoning bool } +// LLMDeltaPayload describes a streamed LLM delta. +type LLMDeltaPayload struct { + ContentDeltaLen int + ReasoningDeltaLen int +} + +// LLMRetryPayload describes a retry of an LLM request. +type LLMRetryPayload struct { + Attempt int + MaxRetries int + Reason string + Error string + Backoff time.Duration +} + +// ContextCompressReason identifies why emergency compression ran. +type ContextCompressReason string + +const ( + // ContextCompressReasonProactive indicates compression before the first LLM call. + ContextCompressReasonProactive ContextCompressReason = "proactive_budget" + // ContextCompressReasonRetry indicates compression during context-error retry handling. + ContextCompressReasonRetry ContextCompressReason = "llm_retry" +) + +// ContextCompressPayload describes a forced history compression. +type ContextCompressPayload struct { + Reason ContextCompressReason + DroppedMessages int + RemainingMessages int +} + +// SessionSummarizePayload describes a completed async session summarization. +type SessionSummarizePayload struct { + SummarizedMessages int + KeptMessages int + SummaryLen int + OmittedOversized bool +} + // ToolExecStartPayload describes a tool execution request. type ToolExecStartPayload struct { Tool string @@ -122,6 +195,52 @@ type ToolExecEndPayload struct { Async bool } +// ToolExecSkippedPayload describes a skipped tool call. +type ToolExecSkippedPayload struct { + Tool string + Reason string +} + +// SteeringInjectedPayload describes steering messages appended before the next LLM call. +type SteeringInjectedPayload struct { + Count int + TotalContentLen int +} + +// FollowUpQueuedPayload describes an async follow-up queued back into the inbound bus. +type FollowUpQueuedPayload struct { + SourceTool string + Channel string + ChatID string + ContentLen int +} + +// InterruptReceivedPayload describes a queued soft interrupt. +type InterruptReceivedPayload struct { + Role string + ContentLen int + QueueDepth int +} + +// SubTurnSpawnPayload describes the creation of a child turn. +type SubTurnSpawnPayload struct { + AgentID string + Label string +} + +// SubTurnEndPayload describes the completion of a child turn. +type SubTurnEndPayload struct { + AgentID string + Status string +} + +// SubTurnResultDeliveredPayload describes delivery of a sub-turn result. +type SubTurnResultDeliveredPayload struct { + TargetChannel string + TargetChatID string + ContentLen int +} + // ErrorPayload describes an execution error inside the agent loop. type ErrorPayload struct { Stage string diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index ac97104b1..877dbbd94 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -499,10 +499,28 @@ func (al *AgentLoop) logEvent(evt Event) { fields["messages"] = payload.MessagesCount fields["tools"] = payload.ToolsCount fields["max_tokens"] = payload.MaxTokens + case LLMDeltaPayload: + fields["content_delta_len"] = payload.ContentDeltaLen + fields["reasoning_delta_len"] = payload.ReasoningDeltaLen case LLMResponsePayload: fields["content_len"] = payload.ContentLen fields["tool_calls"] = payload.ToolCalls fields["has_reasoning"] = payload.HasReasoning + case LLMRetryPayload: + fields["attempt"] = payload.Attempt + fields["max_retries"] = payload.MaxRetries + fields["reason"] = payload.Reason + fields["error"] = payload.Error + fields["backoff_ms"] = payload.Backoff.Milliseconds() + case ContextCompressPayload: + fields["reason"] = payload.Reason + fields["dropped_messages"] = payload.DroppedMessages + fields["remaining_messages"] = payload.RemainingMessages + case SessionSummarizePayload: + fields["summarized_messages"] = payload.SummarizedMessages + fields["kept_messages"] = payload.KeptMessages + fields["summary_len"] = payload.SummaryLen + fields["omitted_oversized"] = payload.OmittedOversized case ToolExecStartPayload: fields["tool"] = payload.Tool fields["args_count"] = len(payload.Arguments) @@ -513,6 +531,31 @@ func (al *AgentLoop) logEvent(evt Event) { fields["for_user_len"] = payload.ForUserLen fields["is_error"] = payload.IsError fields["async"] = payload.Async + case ToolExecSkippedPayload: + fields["tool"] = payload.Tool + fields["reason"] = payload.Reason + case SteeringInjectedPayload: + fields["count"] = payload.Count + fields["total_content_len"] = payload.TotalContentLen + case FollowUpQueuedPayload: + fields["source_tool"] = payload.SourceTool + fields["channel"] = payload.Channel + fields["chat_id"] = payload.ChatID + fields["content_len"] = payload.ContentLen + case InterruptReceivedPayload: + fields["role"] = payload.Role + fields["content_len"] = payload.ContentLen + fields["queue_depth"] = payload.QueueDepth + case SubTurnSpawnPayload: + fields["child_agent_id"] = payload.AgentID + fields["label"] = payload.Label + case SubTurnEndPayload: + fields["child_agent_id"] = payload.AgentID + fields["status"] = payload.Status + case SubTurnResultDeliveredPayload: + fields["target_channel"] = payload.TargetChannel + fields["target_chat_id"] = payload.TargetChatID + fields["content_len"] = payload.ContentLen case ErrorPayload: fields["stage"] = payload.Stage fields["error"] = payload.Message @@ -1105,7 +1148,17 @@ func (al *AgentLoop) runAgentLoop( if isOverContextBudget(agent.ContextWindow, messages, toolDefs, agent.MaxTokens) { logger.WarnCF("agent", "Proactive compression: context budget exceeded before LLM call", map[string]any{"session_key": opts.SessionKey}) - al.forceCompression(agent, opts.SessionKey) + if compression, ok := al.forceCompression(agent, opts.SessionKey); ok { + al.emitEvent( + EventKindContextCompress, + turnScope.meta(0, "runAgentLoop", "turn.context.compress"), + ContextCompressPayload{ + Reason: ContextCompressReasonProactive, + DroppedMessages: compression.DroppedMessages, + RemainingMessages: compression.RemainingMessages, + }, + ) + } newHistory := agent.Sessions.GetHistory(opts.SessionKey) newSummary := agent.Sessions.GetSummary(opts.SessionKey) messages = agent.ContextBuilder.BuildMessages( @@ -1142,7 +1195,7 @@ func (al *AgentLoop) runAgentLoop( // 6. Optional: summarization if opts.EnableSummary { - al.maybeSummarize(agent, opts.SessionKey, opts.Channel, opts.ChatID) + al.maybeSummarize(agent, opts.SessionKey, turnScope) } // 7. Optional: send response via bus @@ -1256,9 +1309,11 @@ func (al *AgentLoop) runLLMIteration( // Inject pending steering messages into the conversation context // before the next LLM call. if len(pendingMessages) > 0 { + totalContentLen := 0 for _, pm := range pendingMessages { messages = append(messages, pm) agent.Sessions.AddMessage(opts.SessionKey, pm.Role, pm.Content) + totalContentLen += len(pm.Content) logger.InfoCF("agent", "Injected steering message into context", map[string]any{ "agent_id": agent.ID, @@ -1266,6 +1321,14 @@ func (al *AgentLoop) runLLMIteration( "content_len": len(pm.Content), }) } + al.emitEvent( + EventKindSteeringInjected, + turnScope.meta(iteration, "runLLMIteration", "turn.steering.injected"), + SteeringInjectedPayload{ + Count: len(pendingMessages), + TotalContentLen: totalContentLen, + }, + ) pendingMessages = nil } @@ -1334,6 +1397,8 @@ func (al *AgentLoop) runLLMIteration( callLLM := func() (*providers.LLMResponse, error) { al.activeRequests.Add(1) defer al.activeRequests.Done() + // TODO(eventbus): emit EventKindLLMDelta when providers expose + // streaming callbacks instead of only the final Chat response. if len(activeCandidates) > 1 && al.fallback != nil { fbResult, fbErr := al.fallback.Execute( @@ -1389,6 +1454,17 @@ func (al *AgentLoop) runLLMIteration( if isTimeoutError && retry < maxRetries { backoff := time.Duration(retry+1) * 5 * time.Second + al.emitEvent( + EventKindLLMRetry, + turnScope.meta(iteration, "runLLMIteration", "turn.llm.retry"), + LLMRetryPayload{ + Attempt: retry + 1, + MaxRetries: maxRetries, + Reason: "timeout", + Error: err.Error(), + Backoff: backoff, + }, + ) logger.WarnCF("agent", "Timeout error, retrying after backoff", map[string]any{ "error": err.Error(), "retry": retry, @@ -1399,6 +1475,16 @@ func (al *AgentLoop) runLLMIteration( } if isContextError && retry < maxRetries { + al.emitEvent( + EventKindLLMRetry, + turnScope.meta(iteration, "runLLMIteration", "turn.llm.retry"), + LLMRetryPayload{ + Attempt: retry + 1, + MaxRetries: maxRetries, + Reason: "context_limit", + Error: err.Error(), + }, + ) logger.WarnCF( "agent", "Context window error detected, attempting compression", @@ -1416,7 +1502,17 @@ func (al *AgentLoop) runLLMIteration( }) } - al.forceCompression(agent, opts.SessionKey) + if compression, ok := al.forceCompression(agent, opts.SessionKey); ok { + al.emitEvent( + EventKindContextCompress, + turnScope.meta(iteration, "runLLMIteration", "turn.context.compress"), + ContextCompressPayload{ + Reason: ContextCompressReasonRetry, + DroppedMessages: compression.DroppedMessages, + RemainingMessages: compression.RemainingMessages, + }, + ) + } newHistory := agent.Sessions.GetHistory(opts.SessionKey) newSummary := agent.Sessions.GetSummary(opts.SessionKey) messages = agent.ContextBuilder.BuildMessages( @@ -1587,6 +1683,16 @@ func (al *AgentLoop) runLLMIteration( "content_len": len(content), "channel": opts.Channel, }) + al.emitEvent( + EventKindFollowUpQueued, + turnScope.meta(iteration, "runLLMIteration", "turn.follow_up.queued"), + FollowUpQueuedPayload{ + SourceTool: tc.Name, + Channel: opts.Channel, + ChatID: opts.ChatID, + ContentLen: len(content), + }, + ) pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second) defer pubCancel() @@ -1686,6 +1792,14 @@ func (al *AgentLoop) runLLMIteration( // Mark remaining tool calls as skipped for j := i + 1; j < len(normalizedToolCalls); j++ { skippedTC := normalizedToolCalls[j] + al.emitEvent( + EventKindToolExecSkipped, + turnScope.meta(iteration, "runLLMIteration", "turn.tool.skipped"), + ToolExecSkippedPayload{ + Tool: skippedTC.Name, + Reason: "queued user steering message", + }, + ) toolResultMsg := providers.Message{ Role: "tool", Content: "Skipped due to queued user message.", @@ -1760,7 +1874,7 @@ func (al *AgentLoop) selectCandidates( } // maybeSummarize triggers summarization if the session history exceeds thresholds. -func (al *AgentLoop) maybeSummarize(agent *AgentInstance, sessionKey, channel, chatID string) { +func (al *AgentLoop) maybeSummarize(agent *AgentInstance, sessionKey string, turnScope turnEventScope) { newHistory := agent.Sessions.GetHistory(sessionKey) tokenEstimate := al.estimateTokens(newHistory) threshold := agent.ContextWindow * agent.SummarizeTokenPercent / 100 @@ -1771,12 +1885,17 @@ func (al *AgentLoop) maybeSummarize(agent *AgentInstance, sessionKey, channel, c go func() { defer al.summarizing.Delete(summarizeKey) logger.Debug("Memory threshold reached. Optimizing conversation history...") - al.summarizeSession(agent, sessionKey) + al.summarizeSession(agent, sessionKey, turnScope) }() } } } +type compressionResult struct { + DroppedMessages int + RemainingMessages int +} + // forceCompression aggressively reduces context when the limit is hit. // It drops the oldest ~50% of Turns (a Turn is a complete user→LLM→response // cycle, as defined in #1316), so tool-call sequences are never split. @@ -1789,10 +1908,10 @@ func (al *AgentLoop) maybeSummarize(agent *AgentInstance, sessionKey, channel, c // prompt is built dynamically by BuildMessages and is NOT stored here. // The compression note is recorded in the session summary so that // BuildMessages can include it in the next system prompt. -func (al *AgentLoop) forceCompression(agent *AgentInstance, sessionKey string) { +func (al *AgentLoop) forceCompression(agent *AgentInstance, sessionKey string) (compressionResult, bool) { history := agent.Sessions.GetHistory(sessionKey) if len(history) <= 2 { - return + return compressionResult{}, false } // Split at a Turn boundary so no tool-call sequence is torn apart. @@ -1846,6 +1965,11 @@ func (al *AgentLoop) forceCompression(agent *AgentInstance, sessionKey string) { "dropped_msgs": droppedCount, "new_count": len(keptHistory), }) + + return compressionResult{ + DroppedMessages: droppedCount, + RemainingMessages: len(keptHistory), + }, true } // GetStartupInfo returns information about loaded tools and skills for logging. @@ -1937,7 +2061,7 @@ func formatToolsForLog(toolDefs []providers.ToolDefinition) string { } // summarizeSession summarizes the conversation history for a session. -func (al *AgentLoop) summarizeSession(agent *AgentInstance, sessionKey string) { +func (al *AgentLoop) summarizeSession(agent *AgentInstance, sessionKey string, turnScope turnEventScope) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() @@ -2022,6 +2146,16 @@ func (al *AgentLoop) summarizeSession(agent *AgentInstance, sessionKey string) { agent.Sessions.SetSummary(sessionKey, finalSummary) agent.Sessions.TruncateHistory(sessionKey, keepCount) agent.Sessions.Save(sessionKey) + al.emitEvent( + EventKindSessionSummarize, + turnScope.meta(0, "summarizeSession", "turn.session.summarize"), + SessionSummarizePayload{ + SummarizedMessages: len(validMessages), + KeptMessages: keepCount, + SummaryLen: len(finalSummary), + OmittedOversized: omitted, + }, + ) } } diff --git a/pkg/agent/steering.go b/pkg/agent/steering.go index 8c7c79c16..90d1cc091 100644 --- a/pkg/agent/steering.go +++ b/pkg/agent/steering.go @@ -122,6 +122,25 @@ func (al *AgentLoop) Steer(msg providers.Message) error { "content_len": len(msg.Content), "queue_len": al.steering.len(), }) + agentID := "" + if registry := al.GetRegistry(); registry != nil { + if agent := registry.GetDefaultAgent(); agent != nil { + agentID = agent.ID + } + } + al.emitEvent( + EventKindInterruptReceived, + EventMeta{ + AgentID: agentID, + Source: "Steer", + TracePath: "turn.interrupt.received", + }, + InterruptReceivedPayload{ + Role: msg.Role, + ContentLen: len(msg.Content), + QueueDepth: al.steering.len(), + }, + ) return nil } diff --git a/pkg/tools/spawn.go b/pkg/tools/spawn.go index be40ffda2..34ccc80e4 100644 --- a/pkg/tools/spawn.go +++ b/pkg/tools/spawn.go @@ -96,6 +96,9 @@ func (t *SpawnTool) execute(ctx context.Context, args map[string]any, cb AsyncCa } // Pass callback to manager for async completion notification + // TODO(eventbus): when background subagents are migrated onto the + // agent package's runTurn/sub-turn tree, emit SubTurnSpawn here and move + // lifecycle events out of the legacy SubagentManager path. result, err := t.manager.Spawn(ctx, task, label, agentID, channel, chatID, cb) if err != nil { return ErrorResult(fmt.Sprintf("failed to spawn subagent: %v", err)) diff --git a/pkg/tools/subagent.go b/pkg/tools/subagent.go index e51cbaafa..9915c5900 100644 --- a/pkg/tools/subagent.go +++ b/pkg/tools/subagent.go @@ -111,6 +111,9 @@ func (sm *SubagentManager) Spawn( func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask, callback AsyncCallback) { task.Status = "running" task.Created = time.Now().UnixMilli() + // TODO(eventbus): once subagents are modeled as child turns inside + // pkg/agent, emit SubTurnEnd and SubTurnResultDelivered from the parent + // AgentLoop instead of this legacy manager. // Build system prompt for subagent systemPrompt := `You are a subagent. Complete the given task independently and report the result.