feat(agent): expand event bus coverage

This commit is contained in:
Hoshina
2026-03-20 15:29:52 +08:00
parent 50cc7100ce
commit 57cde73b36
6 changed files with 730 additions and 8 deletions
+444
View File
@@ -217,6 +217,374 @@ func TestAgentLoop_EmitsMinimalTurnEvents(t *testing.T) {
}
}
func TestAgentLoop_EmitsSteeringAndSkippedToolEvents(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "agent-eventbus-steering-*")
if err != nil {
t.Fatalf("failed to create temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
cfg := &config.Config{
Agents: config.AgentsConfig{
Defaults: config.AgentDefaults{
Workspace: tmpDir,
Model: "test-model",
MaxTokens: 4096,
MaxToolIterations: 10,
},
},
}
tool1ExecCh := make(chan struct{})
tool1 := &slowTool{name: "tool_one", duration: 50 * time.Millisecond, execCh: tool1ExecCh}
tool2 := &slowTool{name: "tool_two", duration: 50 * time.Millisecond}
provider := &toolCallProvider{
toolCalls: []providers.ToolCall{
{
ID: "call_1",
Type: "function",
Name: "tool_one",
Function: &providers.FunctionCall{
Name: "tool_one",
Arguments: "{}",
},
Arguments: map[string]any{},
},
{
ID: "call_2",
Type: "function",
Name: "tool_two",
Function: &providers.FunctionCall{
Name: "tool_two",
Arguments: "{}",
},
Arguments: map[string]any{},
},
},
finalResp: "steered response",
}
msgBus := bus.NewMessageBus()
al := NewAgentLoop(cfg, msgBus, provider)
al.RegisterTool(tool1)
al.RegisterTool(tool2)
sub := al.SubscribeEvents(32)
defer al.UnsubscribeEvents(sub.ID)
resultCh := make(chan string, 1)
go func() {
resp, _ := al.ProcessDirectWithChannel(context.Background(), "do something", "test-session", "test", "chat1")
resultCh <- resp
}()
select {
case <-tool1ExecCh:
case <-time.After(2 * time.Second):
t.Fatal("timeout waiting for tool_one to start")
}
if err := al.Steer(providers.Message{Role: "user", Content: "change course"}); err != nil {
t.Fatalf("Steer failed: %v", err)
}
select {
case resp := <-resultCh:
if resp != "steered response" {
t.Fatalf("expected steered response, got %q", resp)
}
case <-time.After(5 * time.Second):
t.Fatal("timeout waiting for steered response")
}
events := collectEventStream(sub.C)
steeringEvt, ok := findEvent(events, EventKindSteeringInjected)
if !ok {
t.Fatal("expected steering injected event")
}
steeringPayload, ok := steeringEvt.Payload.(SteeringInjectedPayload)
if !ok {
t.Fatalf("expected SteeringInjectedPayload, got %T", steeringEvt.Payload)
}
if steeringPayload.Count != 1 {
t.Fatalf("expected 1 steering message, got %d", steeringPayload.Count)
}
skippedEvt, ok := findEvent(events, EventKindToolExecSkipped)
if !ok {
t.Fatal("expected skipped tool event")
}
skippedPayload, ok := skippedEvt.Payload.(ToolExecSkippedPayload)
if !ok {
t.Fatalf("expected ToolExecSkippedPayload, got %T", skippedEvt.Payload)
}
if skippedPayload.Tool != "tool_two" {
t.Fatalf("expected skipped tool_two, got %q", skippedPayload.Tool)
}
interruptEvt, ok := findEvent(events, EventKindInterruptReceived)
if !ok {
t.Fatal("expected interrupt received event")
}
interruptPayload, ok := interruptEvt.Payload.(InterruptReceivedPayload)
if !ok {
t.Fatalf("expected InterruptReceivedPayload, got %T", interruptEvt.Payload)
}
if interruptPayload.Role != "user" {
t.Fatalf("expected interrupt role user, got %q", interruptPayload.Role)
}
if interruptPayload.ContentLen != len("change course") {
t.Fatalf("expected interrupt content len %d, got %d", len("change course"), interruptPayload.ContentLen)
}
}
func TestAgentLoop_EmitsContextCompressEventOnRetry(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "agent-eventbus-compress-*")
if err != nil {
t.Fatalf("failed to create temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
cfg := &config.Config{
Agents: config.AgentsConfig{
Defaults: config.AgentDefaults{
Workspace: tmpDir,
Model: "test-model",
MaxTokens: 4096,
MaxToolIterations: 10,
},
},
}
contextErr := errString("InvalidParameter: Total tokens of image and text exceed max message tokens")
provider := &failFirstMockProvider{
failures: 1,
failError: contextErr,
successResp: "Recovered from context error",
}
msgBus := bus.NewMessageBus()
al := NewAgentLoop(cfg, msgBus, provider)
defaultAgent := al.registry.GetDefaultAgent()
if defaultAgent == nil {
t.Fatal("expected default agent")
}
defaultAgent.Sessions.SetHistory("session-1", []providers.Message{
{Role: "user", Content: "Old message 1"},
{Role: "assistant", Content: "Old response 1"},
{Role: "user", Content: "Old message 2"},
{Role: "assistant", Content: "Old response 2"},
{Role: "user", Content: "Trigger message"},
})
sub := al.SubscribeEvents(16)
defer al.UnsubscribeEvents(sub.ID)
resp, err := al.runAgentLoop(context.Background(), defaultAgent, processOptions{
SessionKey: "session-1",
Channel: "cli",
ChatID: "direct",
UserMessage: "Trigger message",
DefaultResponse: defaultResponse,
EnableSummary: false,
SendResponse: false,
})
if err != nil {
t.Fatalf("runAgentLoop failed: %v", err)
}
if resp != "Recovered from context error" {
t.Fatalf("expected retry success, got %q", resp)
}
events := collectEventStream(sub.C)
retryEvt, ok := findEvent(events, EventKindLLMRetry)
if !ok {
t.Fatal("expected llm retry event")
}
retryPayload, ok := retryEvt.Payload.(LLMRetryPayload)
if !ok {
t.Fatalf("expected LLMRetryPayload, got %T", retryEvt.Payload)
}
if retryPayload.Reason != "context_limit" {
t.Fatalf("expected context_limit retry reason, got %q", retryPayload.Reason)
}
if retryPayload.Attempt != 1 {
t.Fatalf("expected retry attempt 1, got %d", retryPayload.Attempt)
}
compressEvt, ok := findEvent(events, EventKindContextCompress)
if !ok {
t.Fatal("expected context compress event")
}
payload, ok := compressEvt.Payload.(ContextCompressPayload)
if !ok {
t.Fatalf("expected ContextCompressPayload, got %T", compressEvt.Payload)
}
if payload.Reason != ContextCompressReasonRetry {
t.Fatalf("expected retry compress reason, got %q", payload.Reason)
}
if payload.DroppedMessages == 0 {
t.Fatal("expected dropped messages to be recorded")
}
}
func TestAgentLoop_EmitsSessionSummarizeEvent(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "agent-eventbus-summary-*")
if err != nil {
t.Fatalf("failed to create temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
cfg := &config.Config{
Agents: config.AgentsConfig{
Defaults: config.AgentDefaults{
Workspace: tmpDir,
Model: "test-model",
MaxTokens: 4096,
MaxToolIterations: 10,
ContextWindow: 8000,
SummarizeMessageThreshold: 2,
SummarizeTokenPercent: 75,
},
},
}
msgBus := bus.NewMessageBus()
al := NewAgentLoop(cfg, msgBus, &simpleMockProvider{response: "summary text"})
defaultAgent := al.registry.GetDefaultAgent()
if defaultAgent == nil {
t.Fatal("expected default agent")
}
defaultAgent.Sessions.SetHistory("session-1", []providers.Message{
{Role: "user", Content: "Question one"},
{Role: "assistant", Content: "Answer one"},
{Role: "user", Content: "Question two"},
{Role: "assistant", Content: "Answer two"},
{Role: "user", Content: "Question three"},
{Role: "assistant", Content: "Answer three"},
})
sub := al.SubscribeEvents(16)
defer al.UnsubscribeEvents(sub.ID)
turnScope := al.newTurnEventScope(defaultAgent.ID, "session-1")
al.summarizeSession(defaultAgent, "session-1", turnScope)
events := collectEventStream(sub.C)
summaryEvt, ok := findEvent(events, EventKindSessionSummarize)
if !ok {
t.Fatal("expected session summarize event")
}
payload, ok := summaryEvt.Payload.(SessionSummarizePayload)
if !ok {
t.Fatalf("expected SessionSummarizePayload, got %T", summaryEvt.Payload)
}
if payload.SummaryLen == 0 {
t.Fatal("expected non-empty summary length")
}
}
func TestAgentLoop_EmitsFollowUpQueuedEvent(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "agent-eventbus-followup-*")
if err != nil {
t.Fatalf("failed to create temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
cfg := &config.Config{
Agents: config.AgentsConfig{
Defaults: config.AgentDefaults{
Workspace: tmpDir,
Model: "test-model",
MaxTokens: 4096,
MaxToolIterations: 10,
},
},
}
provider := &toolCallProvider{
toolCalls: []providers.ToolCall{
{
ID: "call_async_1",
Type: "function",
Name: "async_followup",
Function: &providers.FunctionCall{
Name: "async_followup",
Arguments: "{}",
},
Arguments: map[string]any{},
},
},
finalResp: "async launched",
}
msgBus := bus.NewMessageBus()
al := NewAgentLoop(cfg, msgBus, provider)
doneCh := make(chan struct{})
al.RegisterTool(&asyncFollowUpTool{
name: "async_followup",
followUpText: "background result",
completionSig: doneCh,
})
defaultAgent := al.registry.GetDefaultAgent()
if defaultAgent == nil {
t.Fatal("expected default agent")
}
sub := al.SubscribeEvents(32)
defer al.UnsubscribeEvents(sub.ID)
resp, err := al.runAgentLoop(context.Background(), defaultAgent, processOptions{
SessionKey: "session-1",
Channel: "cli",
ChatID: "direct",
UserMessage: "run async tool",
DefaultResponse: defaultResponse,
EnableSummary: false,
SendResponse: false,
})
if err != nil {
t.Fatalf("runAgentLoop failed: %v", err)
}
if resp != "async launched" {
t.Fatalf("expected final response 'async launched', got %q", resp)
}
select {
case <-doneCh:
case <-time.After(2 * time.Second):
t.Fatal("timeout waiting for async tool completion")
}
followUpEvt := waitForEvent(t, sub.C, 2*time.Second, func(evt Event) bool {
return evt.Kind == EventKindFollowUpQueued
})
payload, ok := followUpEvt.Payload.(FollowUpQueuedPayload)
if !ok {
t.Fatalf("expected FollowUpQueuedPayload, got %T", followUpEvt.Payload)
}
if payload.SourceTool != "async_followup" {
t.Fatalf("expected source tool async_followup, got %q", payload.SourceTool)
}
if payload.Channel != "cli" {
t.Fatalf("expected channel cli, got %q", payload.Channel)
}
if payload.ChatID != "direct" {
t.Fatalf("expected chat id direct, got %q", payload.ChatID)
}
if payload.ContentLen != len("background result") {
t.Fatalf("expected content len %d, got %d", len("background result"), payload.ContentLen)
}
if followUpEvt.Meta.SessionKey != "session-1" {
t.Fatalf("expected session key session-1, got %q", followUpEvt.Meta.SessionKey)
}
if followUpEvt.Meta.TurnID == "" {
t.Fatal("expected follow-up event to include turn id")
}
}
func collectEventStream(ch <-chan Event) []Event {
var events []Event
for {
@@ -232,4 +600,80 @@ func collectEventStream(ch <-chan Event) []Event {
}
}
func waitForEvent(t *testing.T, ch <-chan Event, timeout time.Duration, match func(Event) bool) Event {
t.Helper()
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case evt, ok := <-ch:
if !ok {
t.Fatal("event stream closed before expected event arrived")
}
if match(evt) {
return evt
}
case <-timer.C:
t.Fatal("timed out waiting for expected event")
}
}
}
func findEvent(events []Event, kind EventKind) (Event, bool) {
for _, evt := range events {
if evt.Kind == kind {
return evt, true
}
}
return Event{}, false
}
type errString string
func (e errString) Error() string {
return string(e)
}
type asyncFollowUpTool struct {
name string
followUpText string
completionSig chan struct{}
}
func (t *asyncFollowUpTool) Name() string {
return t.name
}
func (t *asyncFollowUpTool) Description() string {
return "async follow-up tool for testing"
}
func (t *asyncFollowUpTool) Parameters() map[string]any {
return map[string]any{
"type": "object",
"properties": map[string]any{},
}
}
func (t *asyncFollowUpTool) Execute(ctx context.Context, args map[string]any) *tools.ToolResult {
return tools.AsyncResult("async follow-up scheduled")
}
func (t *asyncFollowUpTool) ExecuteAsync(
ctx context.Context,
args map[string]any,
cb tools.AsyncCallback,
) *tools.ToolResult {
go func() {
cb(ctx, &tools.ToolResult{ForLLM: t.followUpText})
if t.completionSig != nil {
close(t.completionSig)
}
}()
return tools.AsyncResult("async follow-up scheduled")
}
var _ tools.Tool = (*mockCustomTool)(nil)
var _ tools.AsyncExecutor = (*asyncFollowUpTool)(nil)
+119
View File
@@ -15,12 +15,34 @@ const (
EventKindTurnEnd
// EventKindLLMRequest is emitted before a provider chat request is made.
EventKindLLMRequest
// EventKindLLMDelta is emitted when a streaming provider yields a partial delta.
EventKindLLMDelta
// EventKindLLMResponse is emitted after a provider chat response is received.
EventKindLLMResponse
// EventKindLLMRetry is emitted when an LLM request is retried.
EventKindLLMRetry
// EventKindContextCompress is emitted when session history is forcibly compressed.
EventKindContextCompress
// EventKindSessionSummarize is emitted when asynchronous summarization completes.
EventKindSessionSummarize
// EventKindToolExecStart is emitted immediately before a tool executes.
EventKindToolExecStart
// EventKindToolExecEnd is emitted immediately after a tool finishes executing.
EventKindToolExecEnd
// EventKindToolExecSkipped is emitted when a queued tool call is skipped.
EventKindToolExecSkipped
// EventKindSteeringInjected is emitted when queued steering is injected into context.
EventKindSteeringInjected
// EventKindFollowUpQueued is emitted when an async tool queues a follow-up system message.
EventKindFollowUpQueued
// EventKindInterruptReceived is emitted when a soft interrupt message is accepted.
EventKindInterruptReceived
// EventKindSubTurnSpawn is emitted when a sub-turn is spawned.
EventKindSubTurnSpawn
// EventKindSubTurnEnd is emitted when a sub-turn finishes.
EventKindSubTurnEnd
// EventKindSubTurnResultDelivered is emitted when a sub-turn result is delivered.
EventKindSubTurnResultDelivered
// EventKindError is emitted when a turn encounters an execution error.
EventKindError
@@ -31,9 +53,20 @@ var eventKindNames = [...]string{
"turn_start",
"turn_end",
"llm_request",
"llm_delta",
"llm_response",
"llm_retry",
"context_compress",
"session_summarize",
"tool_exec_start",
"tool_exec_end",
"tool_exec_skipped",
"steering_injected",
"follow_up_queued",
"interrupt_received",
"subturn_spawn",
"subturn_end",
"subturn_result_delivered",
"error",
}
@@ -106,6 +139,46 @@ type LLMResponsePayload struct {
HasReasoning bool
}
// LLMDeltaPayload describes a streamed LLM delta.
type LLMDeltaPayload struct {
ContentDeltaLen int
ReasoningDeltaLen int
}
// LLMRetryPayload describes a retry of an LLM request.
type LLMRetryPayload struct {
Attempt int
MaxRetries int
Reason string
Error string
Backoff time.Duration
}
// ContextCompressReason identifies why emergency compression ran.
type ContextCompressReason string
const (
// ContextCompressReasonProactive indicates compression before the first LLM call.
ContextCompressReasonProactive ContextCompressReason = "proactive_budget"
// ContextCompressReasonRetry indicates compression during context-error retry handling.
ContextCompressReasonRetry ContextCompressReason = "llm_retry"
)
// ContextCompressPayload describes a forced history compression.
type ContextCompressPayload struct {
Reason ContextCompressReason
DroppedMessages int
RemainingMessages int
}
// SessionSummarizePayload describes a completed async session summarization.
type SessionSummarizePayload struct {
SummarizedMessages int
KeptMessages int
SummaryLen int
OmittedOversized bool
}
// ToolExecStartPayload describes a tool execution request.
type ToolExecStartPayload struct {
Tool string
@@ -122,6 +195,52 @@ type ToolExecEndPayload struct {
Async bool
}
// ToolExecSkippedPayload describes a skipped tool call.
type ToolExecSkippedPayload struct {
Tool string
Reason string
}
// SteeringInjectedPayload describes steering messages appended before the next LLM call.
type SteeringInjectedPayload struct {
Count int
TotalContentLen int
}
// FollowUpQueuedPayload describes an async follow-up queued back into the inbound bus.
type FollowUpQueuedPayload struct {
SourceTool string
Channel string
ChatID string
ContentLen int
}
// InterruptReceivedPayload describes a queued soft interrupt.
type InterruptReceivedPayload struct {
Role string
ContentLen int
QueueDepth int
}
// SubTurnSpawnPayload describes the creation of a child turn.
type SubTurnSpawnPayload struct {
AgentID string
Label string
}
// SubTurnEndPayload describes the completion of a child turn.
type SubTurnEndPayload struct {
AgentID string
Status string
}
// SubTurnResultDeliveredPayload describes delivery of a sub-turn result.
type SubTurnResultDeliveredPayload struct {
TargetChannel string
TargetChatID string
ContentLen int
}
// ErrorPayload describes an execution error inside the agent loop.
type ErrorPayload struct {
Stage string
+142 -8
View File
@@ -499,10 +499,28 @@ func (al *AgentLoop) logEvent(evt Event) {
fields["messages"] = payload.MessagesCount
fields["tools"] = payload.ToolsCount
fields["max_tokens"] = payload.MaxTokens
case LLMDeltaPayload:
fields["content_delta_len"] = payload.ContentDeltaLen
fields["reasoning_delta_len"] = payload.ReasoningDeltaLen
case LLMResponsePayload:
fields["content_len"] = payload.ContentLen
fields["tool_calls"] = payload.ToolCalls
fields["has_reasoning"] = payload.HasReasoning
case LLMRetryPayload:
fields["attempt"] = payload.Attempt
fields["max_retries"] = payload.MaxRetries
fields["reason"] = payload.Reason
fields["error"] = payload.Error
fields["backoff_ms"] = payload.Backoff.Milliseconds()
case ContextCompressPayload:
fields["reason"] = payload.Reason
fields["dropped_messages"] = payload.DroppedMessages
fields["remaining_messages"] = payload.RemainingMessages
case SessionSummarizePayload:
fields["summarized_messages"] = payload.SummarizedMessages
fields["kept_messages"] = payload.KeptMessages
fields["summary_len"] = payload.SummaryLen
fields["omitted_oversized"] = payload.OmittedOversized
case ToolExecStartPayload:
fields["tool"] = payload.Tool
fields["args_count"] = len(payload.Arguments)
@@ -513,6 +531,31 @@ func (al *AgentLoop) logEvent(evt Event) {
fields["for_user_len"] = payload.ForUserLen
fields["is_error"] = payload.IsError
fields["async"] = payload.Async
case ToolExecSkippedPayload:
fields["tool"] = payload.Tool
fields["reason"] = payload.Reason
case SteeringInjectedPayload:
fields["count"] = payload.Count
fields["total_content_len"] = payload.TotalContentLen
case FollowUpQueuedPayload:
fields["source_tool"] = payload.SourceTool
fields["channel"] = payload.Channel
fields["chat_id"] = payload.ChatID
fields["content_len"] = payload.ContentLen
case InterruptReceivedPayload:
fields["role"] = payload.Role
fields["content_len"] = payload.ContentLen
fields["queue_depth"] = payload.QueueDepth
case SubTurnSpawnPayload:
fields["child_agent_id"] = payload.AgentID
fields["label"] = payload.Label
case SubTurnEndPayload:
fields["child_agent_id"] = payload.AgentID
fields["status"] = payload.Status
case SubTurnResultDeliveredPayload:
fields["target_channel"] = payload.TargetChannel
fields["target_chat_id"] = payload.TargetChatID
fields["content_len"] = payload.ContentLen
case ErrorPayload:
fields["stage"] = payload.Stage
fields["error"] = payload.Message
@@ -1105,7 +1148,17 @@ func (al *AgentLoop) runAgentLoop(
if isOverContextBudget(agent.ContextWindow, messages, toolDefs, agent.MaxTokens) {
logger.WarnCF("agent", "Proactive compression: context budget exceeded before LLM call",
map[string]any{"session_key": opts.SessionKey})
al.forceCompression(agent, opts.SessionKey)
if compression, ok := al.forceCompression(agent, opts.SessionKey); ok {
al.emitEvent(
EventKindContextCompress,
turnScope.meta(0, "runAgentLoop", "turn.context.compress"),
ContextCompressPayload{
Reason: ContextCompressReasonProactive,
DroppedMessages: compression.DroppedMessages,
RemainingMessages: compression.RemainingMessages,
},
)
}
newHistory := agent.Sessions.GetHistory(opts.SessionKey)
newSummary := agent.Sessions.GetSummary(opts.SessionKey)
messages = agent.ContextBuilder.BuildMessages(
@@ -1142,7 +1195,7 @@ func (al *AgentLoop) runAgentLoop(
// 6. Optional: summarization
if opts.EnableSummary {
al.maybeSummarize(agent, opts.SessionKey, opts.Channel, opts.ChatID)
al.maybeSummarize(agent, opts.SessionKey, turnScope)
}
// 7. Optional: send response via bus
@@ -1256,9 +1309,11 @@ func (al *AgentLoop) runLLMIteration(
// Inject pending steering messages into the conversation context
// before the next LLM call.
if len(pendingMessages) > 0 {
totalContentLen := 0
for _, pm := range pendingMessages {
messages = append(messages, pm)
agent.Sessions.AddMessage(opts.SessionKey, pm.Role, pm.Content)
totalContentLen += len(pm.Content)
logger.InfoCF("agent", "Injected steering message into context",
map[string]any{
"agent_id": agent.ID,
@@ -1266,6 +1321,14 @@ func (al *AgentLoop) runLLMIteration(
"content_len": len(pm.Content),
})
}
al.emitEvent(
EventKindSteeringInjected,
turnScope.meta(iteration, "runLLMIteration", "turn.steering.injected"),
SteeringInjectedPayload{
Count: len(pendingMessages),
TotalContentLen: totalContentLen,
},
)
pendingMessages = nil
}
@@ -1334,6 +1397,8 @@ func (al *AgentLoop) runLLMIteration(
callLLM := func() (*providers.LLMResponse, error) {
al.activeRequests.Add(1)
defer al.activeRequests.Done()
// TODO(eventbus): emit EventKindLLMDelta when providers expose
// streaming callbacks instead of only the final Chat response.
if len(activeCandidates) > 1 && al.fallback != nil {
fbResult, fbErr := al.fallback.Execute(
@@ -1389,6 +1454,17 @@ func (al *AgentLoop) runLLMIteration(
if isTimeoutError && retry < maxRetries {
backoff := time.Duration(retry+1) * 5 * time.Second
al.emitEvent(
EventKindLLMRetry,
turnScope.meta(iteration, "runLLMIteration", "turn.llm.retry"),
LLMRetryPayload{
Attempt: retry + 1,
MaxRetries: maxRetries,
Reason: "timeout",
Error: err.Error(),
Backoff: backoff,
},
)
logger.WarnCF("agent", "Timeout error, retrying after backoff", map[string]any{
"error": err.Error(),
"retry": retry,
@@ -1399,6 +1475,16 @@ func (al *AgentLoop) runLLMIteration(
}
if isContextError && retry < maxRetries {
al.emitEvent(
EventKindLLMRetry,
turnScope.meta(iteration, "runLLMIteration", "turn.llm.retry"),
LLMRetryPayload{
Attempt: retry + 1,
MaxRetries: maxRetries,
Reason: "context_limit",
Error: err.Error(),
},
)
logger.WarnCF(
"agent",
"Context window error detected, attempting compression",
@@ -1416,7 +1502,17 @@ func (al *AgentLoop) runLLMIteration(
})
}
al.forceCompression(agent, opts.SessionKey)
if compression, ok := al.forceCompression(agent, opts.SessionKey); ok {
al.emitEvent(
EventKindContextCompress,
turnScope.meta(iteration, "runLLMIteration", "turn.context.compress"),
ContextCompressPayload{
Reason: ContextCompressReasonRetry,
DroppedMessages: compression.DroppedMessages,
RemainingMessages: compression.RemainingMessages,
},
)
}
newHistory := agent.Sessions.GetHistory(opts.SessionKey)
newSummary := agent.Sessions.GetSummary(opts.SessionKey)
messages = agent.ContextBuilder.BuildMessages(
@@ -1587,6 +1683,16 @@ func (al *AgentLoop) runLLMIteration(
"content_len": len(content),
"channel": opts.Channel,
})
al.emitEvent(
EventKindFollowUpQueued,
turnScope.meta(iteration, "runLLMIteration", "turn.follow_up.queued"),
FollowUpQueuedPayload{
SourceTool: tc.Name,
Channel: opts.Channel,
ChatID: opts.ChatID,
ContentLen: len(content),
},
)
pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer pubCancel()
@@ -1686,6 +1792,14 @@ func (al *AgentLoop) runLLMIteration(
// Mark remaining tool calls as skipped
for j := i + 1; j < len(normalizedToolCalls); j++ {
skippedTC := normalizedToolCalls[j]
al.emitEvent(
EventKindToolExecSkipped,
turnScope.meta(iteration, "runLLMIteration", "turn.tool.skipped"),
ToolExecSkippedPayload{
Tool: skippedTC.Name,
Reason: "queued user steering message",
},
)
toolResultMsg := providers.Message{
Role: "tool",
Content: "Skipped due to queued user message.",
@@ -1760,7 +1874,7 @@ func (al *AgentLoop) selectCandidates(
}
// maybeSummarize triggers summarization if the session history exceeds thresholds.
func (al *AgentLoop) maybeSummarize(agent *AgentInstance, sessionKey, channel, chatID string) {
func (al *AgentLoop) maybeSummarize(agent *AgentInstance, sessionKey string, turnScope turnEventScope) {
newHistory := agent.Sessions.GetHistory(sessionKey)
tokenEstimate := al.estimateTokens(newHistory)
threshold := agent.ContextWindow * agent.SummarizeTokenPercent / 100
@@ -1771,12 +1885,17 @@ func (al *AgentLoop) maybeSummarize(agent *AgentInstance, sessionKey, channel, c
go func() {
defer al.summarizing.Delete(summarizeKey)
logger.Debug("Memory threshold reached. Optimizing conversation history...")
al.summarizeSession(agent, sessionKey)
al.summarizeSession(agent, sessionKey, turnScope)
}()
}
}
}
type compressionResult struct {
DroppedMessages int
RemainingMessages int
}
// forceCompression aggressively reduces context when the limit is hit.
// It drops the oldest ~50% of Turns (a Turn is a complete user→LLM→response
// cycle, as defined in #1316), so tool-call sequences are never split.
@@ -1789,10 +1908,10 @@ func (al *AgentLoop) maybeSummarize(agent *AgentInstance, sessionKey, channel, c
// prompt is built dynamically by BuildMessages and is NOT stored here.
// The compression note is recorded in the session summary so that
// BuildMessages can include it in the next system prompt.
func (al *AgentLoop) forceCompression(agent *AgentInstance, sessionKey string) {
func (al *AgentLoop) forceCompression(agent *AgentInstance, sessionKey string) (compressionResult, bool) {
history := agent.Sessions.GetHistory(sessionKey)
if len(history) <= 2 {
return
return compressionResult{}, false
}
// Split at a Turn boundary so no tool-call sequence is torn apart.
@@ -1846,6 +1965,11 @@ func (al *AgentLoop) forceCompression(agent *AgentInstance, sessionKey string) {
"dropped_msgs": droppedCount,
"new_count": len(keptHistory),
})
return compressionResult{
DroppedMessages: droppedCount,
RemainingMessages: len(keptHistory),
}, true
}
// GetStartupInfo returns information about loaded tools and skills for logging.
@@ -1937,7 +2061,7 @@ func formatToolsForLog(toolDefs []providers.ToolDefinition) string {
}
// summarizeSession summarizes the conversation history for a session.
func (al *AgentLoop) summarizeSession(agent *AgentInstance, sessionKey string) {
func (al *AgentLoop) summarizeSession(agent *AgentInstance, sessionKey string, turnScope turnEventScope) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
@@ -2022,6 +2146,16 @@ func (al *AgentLoop) summarizeSession(agent *AgentInstance, sessionKey string) {
agent.Sessions.SetSummary(sessionKey, finalSummary)
agent.Sessions.TruncateHistory(sessionKey, keepCount)
agent.Sessions.Save(sessionKey)
al.emitEvent(
EventKindSessionSummarize,
turnScope.meta(0, "summarizeSession", "turn.session.summarize"),
SessionSummarizePayload{
SummarizedMessages: len(validMessages),
KeptMessages: keepCount,
SummaryLen: len(finalSummary),
OmittedOversized: omitted,
},
)
}
}
+19
View File
@@ -122,6 +122,25 @@ func (al *AgentLoop) Steer(msg providers.Message) error {
"content_len": len(msg.Content),
"queue_len": al.steering.len(),
})
agentID := ""
if registry := al.GetRegistry(); registry != nil {
if agent := registry.GetDefaultAgent(); agent != nil {
agentID = agent.ID
}
}
al.emitEvent(
EventKindInterruptReceived,
EventMeta{
AgentID: agentID,
Source: "Steer",
TracePath: "turn.interrupt.received",
},
InterruptReceivedPayload{
Role: msg.Role,
ContentLen: len(msg.Content),
QueueDepth: al.steering.len(),
},
)
return nil
}
+3
View File
@@ -96,6 +96,9 @@ func (t *SpawnTool) execute(ctx context.Context, args map[string]any, cb AsyncCa
}
// Pass callback to manager for async completion notification
// TODO(eventbus): when background subagents are migrated onto the
// agent package's runTurn/sub-turn tree, emit SubTurnSpawn here and move
// lifecycle events out of the legacy SubagentManager path.
result, err := t.manager.Spawn(ctx, task, label, agentID, channel, chatID, cb)
if err != nil {
return ErrorResult(fmt.Sprintf("failed to spawn subagent: %v", err))
+3
View File
@@ -111,6 +111,9 @@ func (sm *SubagentManager) Spawn(
func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask, callback AsyncCallback) {
task.Status = "running"
task.Created = time.Now().UnixMilli()
// TODO(eventbus): once subagents are modeled as child turns inside
// pkg/agent, emit SubTurnEnd and SubTurnResultDelivered from the parent
// AgentLoop instead of this legacy manager.
// Build system prompt for subagent
systemPrompt := `You are a subagent. Complete the given task independently and report the result.