mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
Merge pull request #1822 from alexhoshina/feat/agent-eventbus
Feat/agent eventbus
This commit is contained in:
@@ -0,0 +1,121 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
const defaultEventSubscriberBuffer = 16
|
||||
|
||||
// EventSubscription identifies a subscriber channel returned by EventBus.Subscribe.
|
||||
type EventSubscription struct {
|
||||
ID uint64
|
||||
C <-chan Event
|
||||
}
|
||||
|
||||
type eventSubscriber struct {
|
||||
ch chan Event
|
||||
}
|
||||
|
||||
// EventBus is a lightweight multi-subscriber broadcaster for agent-loop events.
|
||||
type EventBus struct {
|
||||
mu sync.RWMutex
|
||||
subs map[uint64]eventSubscriber
|
||||
nextID uint64
|
||||
closed bool
|
||||
dropped [eventKindCount]atomic.Int64
|
||||
}
|
||||
|
||||
// NewEventBus creates a new in-process event broadcaster.
|
||||
func NewEventBus() *EventBus {
|
||||
return &EventBus{
|
||||
subs: make(map[uint64]eventSubscriber),
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe registers a new subscriber with the requested channel buffer size.
|
||||
// A non-positive buffer uses the default size.
|
||||
func (b *EventBus) Subscribe(buffer int) EventSubscription {
|
||||
if buffer <= 0 {
|
||||
buffer = defaultEventSubscriberBuffer
|
||||
}
|
||||
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
if b.closed {
|
||||
ch := make(chan Event)
|
||||
close(ch)
|
||||
return EventSubscription{C: ch}
|
||||
}
|
||||
|
||||
b.nextID++
|
||||
id := b.nextID
|
||||
ch := make(chan Event, buffer)
|
||||
b.subs[id] = eventSubscriber{ch: ch}
|
||||
return EventSubscription{ID: id, C: ch}
|
||||
}
|
||||
|
||||
// Unsubscribe removes a subscriber and closes its channel.
|
||||
func (b *EventBus) Unsubscribe(id uint64) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
sub, ok := b.subs[id]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
delete(b.subs, id)
|
||||
close(sub.ch)
|
||||
}
|
||||
|
||||
// Emit broadcasts an event to all current subscribers without blocking.
|
||||
// When a subscriber channel is full, the event is dropped for that subscriber.
|
||||
func (b *EventBus) Emit(evt Event) {
|
||||
if evt.Time.IsZero() {
|
||||
evt.Time = time.Now()
|
||||
}
|
||||
|
||||
b.mu.RLock()
|
||||
defer b.mu.RUnlock()
|
||||
|
||||
if b.closed {
|
||||
return
|
||||
}
|
||||
|
||||
for _, sub := range b.subs {
|
||||
select {
|
||||
case sub.ch <- evt:
|
||||
default:
|
||||
if evt.Kind < eventKindCount {
|
||||
b.dropped[evt.Kind].Add(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Dropped returns the number of dropped events for a given kind.
|
||||
func (b *EventBus) Dropped(kind EventKind) int64 {
|
||||
if kind >= eventKindCount {
|
||||
return 0
|
||||
}
|
||||
return b.dropped[kind].Load()
|
||||
}
|
||||
|
||||
// Close closes all subscriber channels and stops future broadcasts.
|
||||
func (b *EventBus) Close() {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
if b.closed {
|
||||
return
|
||||
}
|
||||
|
||||
b.closed = true
|
||||
for id, sub := range b.subs {
|
||||
close(sub.ch)
|
||||
delete(b.subs, id)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,681 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"slices"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/sipeed/picoclaw/pkg/bus"
|
||||
"github.com/sipeed/picoclaw/pkg/config"
|
||||
"github.com/sipeed/picoclaw/pkg/providers"
|
||||
"github.com/sipeed/picoclaw/pkg/tools"
|
||||
)
|
||||
|
||||
func TestEventBus_SubscribeEmitUnsubscribeClose(t *testing.T) {
|
||||
eventBus := NewEventBus()
|
||||
sub := eventBus.Subscribe(1)
|
||||
|
||||
eventBus.Emit(Event{
|
||||
Kind: EventKindTurnStart,
|
||||
Meta: EventMeta{TurnID: "turn-1"},
|
||||
})
|
||||
|
||||
select {
|
||||
case evt := <-sub.C:
|
||||
if evt.Kind != EventKindTurnStart {
|
||||
t.Fatalf("expected %v, got %v", EventKindTurnStart, evt.Kind)
|
||||
}
|
||||
if evt.Meta.TurnID != "turn-1" {
|
||||
t.Fatalf("expected turn id turn-1, got %q", evt.Meta.TurnID)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timed out waiting for event")
|
||||
}
|
||||
|
||||
eventBus.Unsubscribe(sub.ID)
|
||||
if _, ok := <-sub.C; ok {
|
||||
t.Fatal("expected subscriber channel to be closed after unsubscribe")
|
||||
}
|
||||
|
||||
eventBus.Close()
|
||||
closedSub := eventBus.Subscribe(1)
|
||||
if _, ok := <-closedSub.C; ok {
|
||||
t.Fatal("expected closed bus to return a closed subscriber channel")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_DropsWhenSubscriberIsFull(t *testing.T) {
|
||||
eventBus := NewEventBus()
|
||||
sub := eventBus.Subscribe(1)
|
||||
defer eventBus.Unsubscribe(sub.ID)
|
||||
|
||||
start := time.Now()
|
||||
for i := 0; i < 1000; i++ {
|
||||
eventBus.Emit(Event{Kind: EventKindLLMRequest})
|
||||
}
|
||||
|
||||
if elapsed := time.Since(start); elapsed > 100*time.Millisecond {
|
||||
t.Fatalf("Emit took too long with a blocked subscriber: %s", elapsed)
|
||||
}
|
||||
|
||||
if got := eventBus.Dropped(EventKindLLMRequest); got != 999 {
|
||||
t.Fatalf("expected 999 dropped events, got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
type scriptedToolProvider struct {
|
||||
calls int
|
||||
}
|
||||
|
||||
func (m *scriptedToolProvider) Chat(
|
||||
ctx context.Context,
|
||||
messages []providers.Message,
|
||||
toolDefs []providers.ToolDefinition,
|
||||
model string,
|
||||
opts map[string]any,
|
||||
) (*providers.LLMResponse, error) {
|
||||
m.calls++
|
||||
if m.calls == 1 {
|
||||
return &providers.LLMResponse{
|
||||
ToolCalls: []providers.ToolCall{
|
||||
{
|
||||
ID: "call-1",
|
||||
Name: "mock_custom",
|
||||
Arguments: map[string]any{"task": "ping"},
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
return &providers.LLMResponse{
|
||||
Content: "done",
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *scriptedToolProvider) GetDefaultModel() string {
|
||||
return "scripted-tool-model"
|
||||
}
|
||||
|
||||
func TestAgentLoop_EmitsMinimalTurnEvents(t *testing.T) {
|
||||
tmpDir, err := os.MkdirTemp("", "agent-eventbus-*")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
cfg := &config.Config{
|
||||
Agents: config.AgentsConfig{
|
||||
Defaults: config.AgentDefaults{
|
||||
Workspace: tmpDir,
|
||||
Model: "test-model",
|
||||
MaxTokens: 4096,
|
||||
MaxToolIterations: 10,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
msgBus := bus.NewMessageBus()
|
||||
provider := &scriptedToolProvider{}
|
||||
al := NewAgentLoop(cfg, msgBus, provider)
|
||||
al.RegisterTool(&mockCustomTool{})
|
||||
defaultAgent := al.registry.GetDefaultAgent()
|
||||
if defaultAgent == nil {
|
||||
t.Fatal("expected default agent")
|
||||
}
|
||||
|
||||
sub := al.SubscribeEvents(16)
|
||||
defer al.UnsubscribeEvents(sub.ID)
|
||||
|
||||
response, err := al.runAgentLoop(context.Background(), defaultAgent, processOptions{
|
||||
SessionKey: "session-1",
|
||||
Channel: "cli",
|
||||
ChatID: "direct",
|
||||
UserMessage: "run tool",
|
||||
DefaultResponse: defaultResponse,
|
||||
EnableSummary: false,
|
||||
SendResponse: false,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("runAgentLoop failed: %v", err)
|
||||
}
|
||||
if response != "done" {
|
||||
t.Fatalf("expected final response 'done', got %q", response)
|
||||
}
|
||||
|
||||
events := collectEventStream(sub.C)
|
||||
if len(events) != 8 {
|
||||
t.Fatalf("expected 8 events, got %d", len(events))
|
||||
}
|
||||
|
||||
kinds := make([]EventKind, 0, len(events))
|
||||
for _, evt := range events {
|
||||
kinds = append(kinds, evt.Kind)
|
||||
}
|
||||
|
||||
expectedKinds := []EventKind{
|
||||
EventKindTurnStart,
|
||||
EventKindLLMRequest,
|
||||
EventKindLLMResponse,
|
||||
EventKindToolExecStart,
|
||||
EventKindToolExecEnd,
|
||||
EventKindLLMRequest,
|
||||
EventKindLLMResponse,
|
||||
EventKindTurnEnd,
|
||||
}
|
||||
if !slices.Equal(kinds, expectedKinds) {
|
||||
t.Fatalf("unexpected event sequence: got %v want %v", kinds, expectedKinds)
|
||||
}
|
||||
|
||||
turnID := events[0].Meta.TurnID
|
||||
for i, evt := range events {
|
||||
if evt.Meta.TurnID != turnID {
|
||||
t.Fatalf("event %d has mismatched turn id %q, want %q", i, evt.Meta.TurnID, turnID)
|
||||
}
|
||||
if evt.Meta.SessionKey != "session-1" {
|
||||
t.Fatalf("event %d has session key %q, want session-1", i, evt.Meta.SessionKey)
|
||||
}
|
||||
}
|
||||
|
||||
startPayload, ok := events[0].Payload.(TurnStartPayload)
|
||||
if !ok {
|
||||
t.Fatalf("expected TurnStartPayload, got %T", events[0].Payload)
|
||||
}
|
||||
if startPayload.UserMessage != "run tool" {
|
||||
t.Fatalf("expected user message 'run tool', got %q", startPayload.UserMessage)
|
||||
}
|
||||
|
||||
toolStartPayload, ok := events[3].Payload.(ToolExecStartPayload)
|
||||
if !ok {
|
||||
t.Fatalf("expected ToolExecStartPayload, got %T", events[3].Payload)
|
||||
}
|
||||
if toolStartPayload.Tool != "mock_custom" {
|
||||
t.Fatalf("expected tool name mock_custom, got %q", toolStartPayload.Tool)
|
||||
}
|
||||
|
||||
toolEndPayload, ok := events[4].Payload.(ToolExecEndPayload)
|
||||
if !ok {
|
||||
t.Fatalf("expected ToolExecEndPayload, got %T", events[4].Payload)
|
||||
}
|
||||
if toolEndPayload.Tool != "mock_custom" {
|
||||
t.Fatalf("expected tool end payload for mock_custom, got %q", toolEndPayload.Tool)
|
||||
}
|
||||
if toolEndPayload.IsError {
|
||||
t.Fatal("expected mock_custom tool to succeed")
|
||||
}
|
||||
|
||||
turnEndPayload, ok := events[len(events)-1].Payload.(TurnEndPayload)
|
||||
if !ok {
|
||||
t.Fatalf("expected TurnEndPayload, got %T", events[len(events)-1].Payload)
|
||||
}
|
||||
if turnEndPayload.Status != TurnEndStatusCompleted {
|
||||
t.Fatalf("expected completed turn, got %q", turnEndPayload.Status)
|
||||
}
|
||||
if turnEndPayload.Iterations != 2 {
|
||||
t.Fatalf("expected 2 iterations, got %d", turnEndPayload.Iterations)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgentLoop_EmitsSteeringAndSkippedToolEvents(t *testing.T) {
|
||||
tmpDir, err := os.MkdirTemp("", "agent-eventbus-steering-*")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
cfg := &config.Config{
|
||||
Agents: config.AgentsConfig{
|
||||
Defaults: config.AgentDefaults{
|
||||
Workspace: tmpDir,
|
||||
Model: "test-model",
|
||||
MaxTokens: 4096,
|
||||
MaxToolIterations: 10,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
tool1ExecCh := make(chan struct{})
|
||||
tool1 := &slowTool{name: "tool_one", duration: 50 * time.Millisecond, execCh: tool1ExecCh}
|
||||
tool2 := &slowTool{name: "tool_two", duration: 50 * time.Millisecond}
|
||||
|
||||
provider := &toolCallProvider{
|
||||
toolCalls: []providers.ToolCall{
|
||||
{
|
||||
ID: "call_1",
|
||||
Type: "function",
|
||||
Name: "tool_one",
|
||||
Function: &providers.FunctionCall{
|
||||
Name: "tool_one",
|
||||
Arguments: "{}",
|
||||
},
|
||||
Arguments: map[string]any{},
|
||||
},
|
||||
{
|
||||
ID: "call_2",
|
||||
Type: "function",
|
||||
Name: "tool_two",
|
||||
Function: &providers.FunctionCall{
|
||||
Name: "tool_two",
|
||||
Arguments: "{}",
|
||||
},
|
||||
Arguments: map[string]any{},
|
||||
},
|
||||
},
|
||||
finalResp: "steered response",
|
||||
}
|
||||
|
||||
msgBus := bus.NewMessageBus()
|
||||
al := NewAgentLoop(cfg, msgBus, provider)
|
||||
al.RegisterTool(tool1)
|
||||
al.RegisterTool(tool2)
|
||||
|
||||
sub := al.SubscribeEvents(32)
|
||||
defer al.UnsubscribeEvents(sub.ID)
|
||||
|
||||
resultCh := make(chan string, 1)
|
||||
go func() {
|
||||
resp, _ := al.ProcessDirectWithChannel(context.Background(), "do something", "test-session", "test", "chat1")
|
||||
resultCh <- resp
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-tool1ExecCh:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("timeout waiting for tool_one to start")
|
||||
}
|
||||
|
||||
if err := al.Steer(providers.Message{Role: "user", Content: "change course"}); err != nil {
|
||||
t.Fatalf("Steer failed: %v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case resp := <-resultCh:
|
||||
if resp != "steered response" {
|
||||
t.Fatalf("expected steered response, got %q", resp)
|
||||
}
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("timeout waiting for steered response")
|
||||
}
|
||||
|
||||
events := collectEventStream(sub.C)
|
||||
steeringEvt, ok := findEvent(events, EventKindSteeringInjected)
|
||||
if !ok {
|
||||
t.Fatal("expected steering injected event")
|
||||
}
|
||||
steeringPayload, ok := steeringEvt.Payload.(SteeringInjectedPayload)
|
||||
if !ok {
|
||||
t.Fatalf("expected SteeringInjectedPayload, got %T", steeringEvt.Payload)
|
||||
}
|
||||
if steeringPayload.Count != 1 {
|
||||
t.Fatalf("expected 1 steering message, got %d", steeringPayload.Count)
|
||||
}
|
||||
|
||||
skippedEvt, ok := findEvent(events, EventKindToolExecSkipped)
|
||||
if !ok {
|
||||
t.Fatal("expected skipped tool event")
|
||||
}
|
||||
skippedPayload, ok := skippedEvt.Payload.(ToolExecSkippedPayload)
|
||||
if !ok {
|
||||
t.Fatalf("expected ToolExecSkippedPayload, got %T", skippedEvt.Payload)
|
||||
}
|
||||
if skippedPayload.Tool != "tool_two" {
|
||||
t.Fatalf("expected skipped tool_two, got %q", skippedPayload.Tool)
|
||||
}
|
||||
|
||||
interruptEvt, ok := findEvent(events, EventKindInterruptReceived)
|
||||
if !ok {
|
||||
t.Fatal("expected interrupt received event")
|
||||
}
|
||||
interruptPayload, ok := interruptEvt.Payload.(InterruptReceivedPayload)
|
||||
if !ok {
|
||||
t.Fatalf("expected InterruptReceivedPayload, got %T", interruptEvt.Payload)
|
||||
}
|
||||
if interruptPayload.Role != "user" {
|
||||
t.Fatalf("expected interrupt role user, got %q", interruptPayload.Role)
|
||||
}
|
||||
if interruptPayload.ContentLen != len("change course") {
|
||||
t.Fatalf("expected interrupt content len %d, got %d", len("change course"), interruptPayload.ContentLen)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgentLoop_EmitsContextCompressEventOnRetry(t *testing.T) {
|
||||
tmpDir, err := os.MkdirTemp("", "agent-eventbus-compress-*")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
cfg := &config.Config{
|
||||
Agents: config.AgentsConfig{
|
||||
Defaults: config.AgentDefaults{
|
||||
Workspace: tmpDir,
|
||||
Model: "test-model",
|
||||
MaxTokens: 4096,
|
||||
MaxToolIterations: 10,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
contextErr := stringError("InvalidParameter: Total tokens of image and text exceed max message tokens")
|
||||
provider := &failFirstMockProvider{
|
||||
failures: 1,
|
||||
failError: contextErr,
|
||||
successResp: "Recovered from context error",
|
||||
}
|
||||
msgBus := bus.NewMessageBus()
|
||||
al := NewAgentLoop(cfg, msgBus, provider)
|
||||
defaultAgent := al.registry.GetDefaultAgent()
|
||||
if defaultAgent == nil {
|
||||
t.Fatal("expected default agent")
|
||||
}
|
||||
|
||||
defaultAgent.Sessions.SetHistory("session-1", []providers.Message{
|
||||
{Role: "user", Content: "Old message 1"},
|
||||
{Role: "assistant", Content: "Old response 1"},
|
||||
{Role: "user", Content: "Old message 2"},
|
||||
{Role: "assistant", Content: "Old response 2"},
|
||||
{Role: "user", Content: "Trigger message"},
|
||||
})
|
||||
|
||||
sub := al.SubscribeEvents(16)
|
||||
defer al.UnsubscribeEvents(sub.ID)
|
||||
|
||||
resp, err := al.runAgentLoop(context.Background(), defaultAgent, processOptions{
|
||||
SessionKey: "session-1",
|
||||
Channel: "cli",
|
||||
ChatID: "direct",
|
||||
UserMessage: "Trigger message",
|
||||
DefaultResponse: defaultResponse,
|
||||
EnableSummary: false,
|
||||
SendResponse: false,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("runAgentLoop failed: %v", err)
|
||||
}
|
||||
if resp != "Recovered from context error" {
|
||||
t.Fatalf("expected retry success, got %q", resp)
|
||||
}
|
||||
|
||||
events := collectEventStream(sub.C)
|
||||
retryEvt, ok := findEvent(events, EventKindLLMRetry)
|
||||
if !ok {
|
||||
t.Fatal("expected llm retry event")
|
||||
}
|
||||
retryPayload, ok := retryEvt.Payload.(LLMRetryPayload)
|
||||
if !ok {
|
||||
t.Fatalf("expected LLMRetryPayload, got %T", retryEvt.Payload)
|
||||
}
|
||||
if retryPayload.Reason != "context_limit" {
|
||||
t.Fatalf("expected context_limit retry reason, got %q", retryPayload.Reason)
|
||||
}
|
||||
if retryPayload.Attempt != 1 {
|
||||
t.Fatalf("expected retry attempt 1, got %d", retryPayload.Attempt)
|
||||
}
|
||||
|
||||
compressEvt, ok := findEvent(events, EventKindContextCompress)
|
||||
if !ok {
|
||||
t.Fatal("expected context compress event")
|
||||
}
|
||||
payload, ok := compressEvt.Payload.(ContextCompressPayload)
|
||||
if !ok {
|
||||
t.Fatalf("expected ContextCompressPayload, got %T", compressEvt.Payload)
|
||||
}
|
||||
if payload.Reason != ContextCompressReasonRetry {
|
||||
t.Fatalf("expected retry compress reason, got %q", payload.Reason)
|
||||
}
|
||||
if payload.DroppedMessages == 0 {
|
||||
t.Fatal("expected dropped messages to be recorded")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgentLoop_EmitsSessionSummarizeEvent(t *testing.T) {
|
||||
tmpDir, err := os.MkdirTemp("", "agent-eventbus-summary-*")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
cfg := &config.Config{
|
||||
Agents: config.AgentsConfig{
|
||||
Defaults: config.AgentDefaults{
|
||||
Workspace: tmpDir,
|
||||
Model: "test-model",
|
||||
MaxTokens: 4096,
|
||||
MaxToolIterations: 10,
|
||||
ContextWindow: 8000,
|
||||
SummarizeMessageThreshold: 2,
|
||||
SummarizeTokenPercent: 75,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
msgBus := bus.NewMessageBus()
|
||||
al := NewAgentLoop(cfg, msgBus, &simpleMockProvider{response: "summary text"})
|
||||
defaultAgent := al.registry.GetDefaultAgent()
|
||||
if defaultAgent == nil {
|
||||
t.Fatal("expected default agent")
|
||||
}
|
||||
|
||||
defaultAgent.Sessions.SetHistory("session-1", []providers.Message{
|
||||
{Role: "user", Content: "Question one"},
|
||||
{Role: "assistant", Content: "Answer one"},
|
||||
{Role: "user", Content: "Question two"},
|
||||
{Role: "assistant", Content: "Answer two"},
|
||||
{Role: "user", Content: "Question three"},
|
||||
{Role: "assistant", Content: "Answer three"},
|
||||
})
|
||||
|
||||
sub := al.SubscribeEvents(16)
|
||||
defer al.UnsubscribeEvents(sub.ID)
|
||||
|
||||
turnScope := al.newTurnEventScope(defaultAgent.ID, "session-1")
|
||||
al.summarizeSession(defaultAgent, "session-1", turnScope)
|
||||
|
||||
events := collectEventStream(sub.C)
|
||||
summaryEvt, ok := findEvent(events, EventKindSessionSummarize)
|
||||
if !ok {
|
||||
t.Fatal("expected session summarize event")
|
||||
}
|
||||
payload, ok := summaryEvt.Payload.(SessionSummarizePayload)
|
||||
if !ok {
|
||||
t.Fatalf("expected SessionSummarizePayload, got %T", summaryEvt.Payload)
|
||||
}
|
||||
if payload.SummaryLen == 0 {
|
||||
t.Fatal("expected non-empty summary length")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgentLoop_EmitsFollowUpQueuedEvent(t *testing.T) {
|
||||
tmpDir, err := os.MkdirTemp("", "agent-eventbus-followup-*")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
cfg := &config.Config{
|
||||
Agents: config.AgentsConfig{
|
||||
Defaults: config.AgentDefaults{
|
||||
Workspace: tmpDir,
|
||||
Model: "test-model",
|
||||
MaxTokens: 4096,
|
||||
MaxToolIterations: 10,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
provider := &toolCallProvider{
|
||||
toolCalls: []providers.ToolCall{
|
||||
{
|
||||
ID: "call_async_1",
|
||||
Type: "function",
|
||||
Name: "async_followup",
|
||||
Function: &providers.FunctionCall{
|
||||
Name: "async_followup",
|
||||
Arguments: "{}",
|
||||
},
|
||||
Arguments: map[string]any{},
|
||||
},
|
||||
},
|
||||
finalResp: "async launched",
|
||||
}
|
||||
|
||||
msgBus := bus.NewMessageBus()
|
||||
al := NewAgentLoop(cfg, msgBus, provider)
|
||||
doneCh := make(chan struct{})
|
||||
al.RegisterTool(&asyncFollowUpTool{
|
||||
name: "async_followup",
|
||||
followUpText: "background result",
|
||||
completionSig: doneCh,
|
||||
})
|
||||
defaultAgent := al.registry.GetDefaultAgent()
|
||||
if defaultAgent == nil {
|
||||
t.Fatal("expected default agent")
|
||||
}
|
||||
|
||||
sub := al.SubscribeEvents(32)
|
||||
defer al.UnsubscribeEvents(sub.ID)
|
||||
|
||||
resp, err := al.runAgentLoop(context.Background(), defaultAgent, processOptions{
|
||||
SessionKey: "session-1",
|
||||
Channel: "cli",
|
||||
ChatID: "direct",
|
||||
UserMessage: "run async tool",
|
||||
DefaultResponse: defaultResponse,
|
||||
EnableSummary: false,
|
||||
SendResponse: false,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("runAgentLoop failed: %v", err)
|
||||
}
|
||||
if resp != "async launched" {
|
||||
t.Fatalf("expected final response 'async launched', got %q", resp)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-doneCh:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("timeout waiting for async tool completion")
|
||||
}
|
||||
|
||||
followUpEvt := waitForEvent(t, sub.C, 2*time.Second, func(evt Event) bool {
|
||||
return evt.Kind == EventKindFollowUpQueued
|
||||
})
|
||||
payload, ok := followUpEvt.Payload.(FollowUpQueuedPayload)
|
||||
if !ok {
|
||||
t.Fatalf("expected FollowUpQueuedPayload, got %T", followUpEvt.Payload)
|
||||
}
|
||||
if payload.SourceTool != "async_followup" {
|
||||
t.Fatalf("expected source tool async_followup, got %q", payload.SourceTool)
|
||||
}
|
||||
if payload.Channel != "cli" {
|
||||
t.Fatalf("expected channel cli, got %q", payload.Channel)
|
||||
}
|
||||
if payload.ChatID != "direct" {
|
||||
t.Fatalf("expected chat id direct, got %q", payload.ChatID)
|
||||
}
|
||||
if payload.ContentLen != len("background result") {
|
||||
t.Fatalf("expected content len %d, got %d", len("background result"), payload.ContentLen)
|
||||
}
|
||||
if followUpEvt.Meta.SessionKey != "session-1" {
|
||||
t.Fatalf("expected session key session-1, got %q", followUpEvt.Meta.SessionKey)
|
||||
}
|
||||
if followUpEvt.Meta.TurnID == "" {
|
||||
t.Fatal("expected follow-up event to include turn id")
|
||||
}
|
||||
}
|
||||
|
||||
func collectEventStream(ch <-chan Event) []Event {
|
||||
var events []Event
|
||||
for {
|
||||
select {
|
||||
case evt, ok := <-ch:
|
||||
if !ok {
|
||||
return events
|
||||
}
|
||||
events = append(events, evt)
|
||||
default:
|
||||
return events
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func waitForEvent(t *testing.T, ch <-chan Event, timeout time.Duration, match func(Event) bool) Event {
|
||||
t.Helper()
|
||||
|
||||
timer := time.NewTimer(timeout)
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case evt, ok := <-ch:
|
||||
if !ok {
|
||||
t.Fatal("event stream closed before expected event arrived")
|
||||
}
|
||||
if match(evt) {
|
||||
return evt
|
||||
}
|
||||
case <-timer.C:
|
||||
t.Fatal("timed out waiting for expected event")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func findEvent(events []Event, kind EventKind) (Event, bool) {
|
||||
for _, evt := range events {
|
||||
if evt.Kind == kind {
|
||||
return evt, true
|
||||
}
|
||||
}
|
||||
return Event{}, false
|
||||
}
|
||||
|
||||
type stringError string
|
||||
|
||||
func (e stringError) Error() string {
|
||||
return string(e)
|
||||
}
|
||||
|
||||
type asyncFollowUpTool struct {
|
||||
name string
|
||||
followUpText string
|
||||
completionSig chan struct{}
|
||||
}
|
||||
|
||||
func (t *asyncFollowUpTool) Name() string {
|
||||
return t.name
|
||||
}
|
||||
|
||||
func (t *asyncFollowUpTool) Description() string {
|
||||
return "async follow-up tool for testing"
|
||||
}
|
||||
|
||||
func (t *asyncFollowUpTool) Parameters() map[string]any {
|
||||
return map[string]any{
|
||||
"type": "object",
|
||||
"properties": map[string]any{},
|
||||
}
|
||||
}
|
||||
|
||||
func (t *asyncFollowUpTool) Execute(ctx context.Context, args map[string]any) *tools.ToolResult {
|
||||
return tools.AsyncResult("async follow-up scheduled")
|
||||
}
|
||||
|
||||
func (t *asyncFollowUpTool) ExecuteAsync(
|
||||
ctx context.Context,
|
||||
args map[string]any,
|
||||
cb tools.AsyncCallback,
|
||||
) *tools.ToolResult {
|
||||
go func() {
|
||||
cb(ctx, &tools.ToolResult{ForLLM: t.followUpText})
|
||||
if t.completionSig != nil {
|
||||
close(t.completionSig)
|
||||
}
|
||||
}()
|
||||
return tools.AsyncResult("async follow-up scheduled")
|
||||
}
|
||||
|
||||
var (
|
||||
_ tools.Tool = (*mockCustomTool)(nil)
|
||||
_ tools.AsyncExecutor = (*asyncFollowUpTool)(nil)
|
||||
)
|
||||
@@ -0,0 +1,248 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// EventKind identifies a structured agent-loop event.
|
||||
type EventKind uint8
|
||||
|
||||
const (
|
||||
// EventKindTurnStart is emitted when a turn begins processing.
|
||||
EventKindTurnStart EventKind = iota
|
||||
// EventKindTurnEnd is emitted when a turn finishes, successfully or with an error.
|
||||
EventKindTurnEnd
|
||||
// EventKindLLMRequest is emitted before a provider chat request is made.
|
||||
EventKindLLMRequest
|
||||
// EventKindLLMDelta is emitted when a streaming provider yields a partial delta.
|
||||
EventKindLLMDelta
|
||||
// EventKindLLMResponse is emitted after a provider chat response is received.
|
||||
EventKindLLMResponse
|
||||
// EventKindLLMRetry is emitted when an LLM request is retried.
|
||||
EventKindLLMRetry
|
||||
// EventKindContextCompress is emitted when session history is forcibly compressed.
|
||||
EventKindContextCompress
|
||||
// EventKindSessionSummarize is emitted when asynchronous summarization completes.
|
||||
EventKindSessionSummarize
|
||||
// EventKindToolExecStart is emitted immediately before a tool executes.
|
||||
EventKindToolExecStart
|
||||
// EventKindToolExecEnd is emitted immediately after a tool finishes executing.
|
||||
EventKindToolExecEnd
|
||||
// EventKindToolExecSkipped is emitted when a queued tool call is skipped.
|
||||
EventKindToolExecSkipped
|
||||
// EventKindSteeringInjected is emitted when queued steering is injected into context.
|
||||
EventKindSteeringInjected
|
||||
// EventKindFollowUpQueued is emitted when an async tool queues a follow-up system message.
|
||||
EventKindFollowUpQueued
|
||||
// EventKindInterruptReceived is emitted when a soft interrupt message is accepted.
|
||||
EventKindInterruptReceived
|
||||
// EventKindSubTurnSpawn is emitted when a sub-turn is spawned.
|
||||
EventKindSubTurnSpawn
|
||||
// EventKindSubTurnEnd is emitted when a sub-turn finishes.
|
||||
EventKindSubTurnEnd
|
||||
// EventKindSubTurnResultDelivered is emitted when a sub-turn result is delivered.
|
||||
EventKindSubTurnResultDelivered
|
||||
// EventKindError is emitted when a turn encounters an execution error.
|
||||
EventKindError
|
||||
|
||||
eventKindCount
|
||||
)
|
||||
|
||||
var eventKindNames = [...]string{
|
||||
"turn_start",
|
||||
"turn_end",
|
||||
"llm_request",
|
||||
"llm_delta",
|
||||
"llm_response",
|
||||
"llm_retry",
|
||||
"context_compress",
|
||||
"session_summarize",
|
||||
"tool_exec_start",
|
||||
"tool_exec_end",
|
||||
"tool_exec_skipped",
|
||||
"steering_injected",
|
||||
"follow_up_queued",
|
||||
"interrupt_received",
|
||||
"subturn_spawn",
|
||||
"subturn_end",
|
||||
"subturn_result_delivered",
|
||||
"error",
|
||||
}
|
||||
|
||||
// String returns the stable string form of an EventKind.
|
||||
func (k EventKind) String() string {
|
||||
if k >= eventKindCount {
|
||||
return fmt.Sprintf("event_kind(%d)", k)
|
||||
}
|
||||
return eventKindNames[k]
|
||||
}
|
||||
|
||||
// Event is the structured envelope broadcast by the agent EventBus.
|
||||
type Event struct {
|
||||
Kind EventKind
|
||||
Time time.Time
|
||||
Meta EventMeta
|
||||
Payload any
|
||||
}
|
||||
|
||||
// EventMeta contains correlation fields shared by all agent-loop events.
|
||||
type EventMeta struct {
|
||||
AgentID string
|
||||
TurnID string
|
||||
ParentTurnID string
|
||||
SessionKey string
|
||||
Iteration int
|
||||
TracePath string
|
||||
Source string
|
||||
}
|
||||
|
||||
// TurnEndStatus describes the terminal state of a turn.
|
||||
type TurnEndStatus string
|
||||
|
||||
const (
|
||||
// TurnEndStatusCompleted indicates the turn finished normally.
|
||||
TurnEndStatusCompleted TurnEndStatus = "completed"
|
||||
// TurnEndStatusError indicates the turn ended because of an error.
|
||||
TurnEndStatusError TurnEndStatus = "error"
|
||||
)
|
||||
|
||||
// TurnStartPayload describes the start of a turn.
|
||||
type TurnStartPayload struct {
|
||||
Channel string
|
||||
ChatID string
|
||||
UserMessage string
|
||||
MediaCount int
|
||||
}
|
||||
|
||||
// TurnEndPayload describes the completion of a turn.
|
||||
type TurnEndPayload struct {
|
||||
Status TurnEndStatus
|
||||
Iterations int
|
||||
Duration time.Duration
|
||||
FinalContentLen int
|
||||
}
|
||||
|
||||
// LLMRequestPayload describes an outbound LLM request.
|
||||
type LLMRequestPayload struct {
|
||||
Model string
|
||||
MessagesCount int
|
||||
ToolsCount int
|
||||
MaxTokens int
|
||||
Temperature float64
|
||||
}
|
||||
|
||||
// LLMResponsePayload describes an inbound LLM response.
|
||||
type LLMResponsePayload struct {
|
||||
ContentLen int
|
||||
ToolCalls int
|
||||
HasReasoning bool
|
||||
}
|
||||
|
||||
// LLMDeltaPayload describes a streamed LLM delta.
|
||||
type LLMDeltaPayload struct {
|
||||
ContentDeltaLen int
|
||||
ReasoningDeltaLen int
|
||||
}
|
||||
|
||||
// LLMRetryPayload describes a retry of an LLM request.
|
||||
type LLMRetryPayload struct {
|
||||
Attempt int
|
||||
MaxRetries int
|
||||
Reason string
|
||||
Error string
|
||||
Backoff time.Duration
|
||||
}
|
||||
|
||||
// ContextCompressReason identifies why emergency compression ran.
|
||||
type ContextCompressReason string
|
||||
|
||||
const (
|
||||
// ContextCompressReasonProactive indicates compression before the first LLM call.
|
||||
ContextCompressReasonProactive ContextCompressReason = "proactive_budget"
|
||||
// ContextCompressReasonRetry indicates compression during context-error retry handling.
|
||||
ContextCompressReasonRetry ContextCompressReason = "llm_retry"
|
||||
)
|
||||
|
||||
// ContextCompressPayload describes a forced history compression.
|
||||
type ContextCompressPayload struct {
|
||||
Reason ContextCompressReason
|
||||
DroppedMessages int
|
||||
RemainingMessages int
|
||||
}
|
||||
|
||||
// SessionSummarizePayload describes a completed async session summarization.
|
||||
type SessionSummarizePayload struct {
|
||||
SummarizedMessages int
|
||||
KeptMessages int
|
||||
SummaryLen int
|
||||
OmittedOversized bool
|
||||
}
|
||||
|
||||
// ToolExecStartPayload describes a tool execution request.
|
||||
type ToolExecStartPayload struct {
|
||||
Tool string
|
||||
Arguments map[string]any
|
||||
}
|
||||
|
||||
// ToolExecEndPayload describes the outcome of a tool execution.
|
||||
type ToolExecEndPayload struct {
|
||||
Tool string
|
||||
Duration time.Duration
|
||||
ForLLMLen int
|
||||
ForUserLen int
|
||||
IsError bool
|
||||
Async bool
|
||||
}
|
||||
|
||||
// ToolExecSkippedPayload describes a skipped tool call.
|
||||
type ToolExecSkippedPayload struct {
|
||||
Tool string
|
||||
Reason string
|
||||
}
|
||||
|
||||
// SteeringInjectedPayload describes steering messages appended before the next LLM call.
|
||||
type SteeringInjectedPayload struct {
|
||||
Count int
|
||||
TotalContentLen int
|
||||
}
|
||||
|
||||
// FollowUpQueuedPayload describes an async follow-up queued back into the inbound bus.
|
||||
type FollowUpQueuedPayload struct {
|
||||
SourceTool string
|
||||
Channel string
|
||||
ChatID string
|
||||
ContentLen int
|
||||
}
|
||||
|
||||
// InterruptReceivedPayload describes a queued soft interrupt.
|
||||
type InterruptReceivedPayload struct {
|
||||
Role string
|
||||
ContentLen int
|
||||
QueueDepth int
|
||||
}
|
||||
|
||||
// SubTurnSpawnPayload describes the creation of a child turn.
|
||||
type SubTurnSpawnPayload struct {
|
||||
AgentID string
|
||||
Label string
|
||||
}
|
||||
|
||||
// SubTurnEndPayload describes the completion of a child turn.
|
||||
type SubTurnEndPayload struct {
|
||||
AgentID string
|
||||
Status string
|
||||
}
|
||||
|
||||
// SubTurnResultDeliveredPayload describes delivery of a sub-turn result.
|
||||
type SubTurnResultDeliveredPayload struct {
|
||||
TargetChannel string
|
||||
TargetChatID string
|
||||
ContentLen int
|
||||
}
|
||||
|
||||
// ErrorPayload describes an execution error inside the agent loop.
|
||||
type ErrorPayload struct {
|
||||
Stage string
|
||||
Message string
|
||||
}
|
||||
+365
-9
@@ -39,6 +39,7 @@ type AgentLoop struct {
|
||||
cfg *config.Config
|
||||
registry *AgentRegistry
|
||||
state *state.Manager
|
||||
eventBus *EventBus
|
||||
running atomic.Bool
|
||||
summarizing sync.Map
|
||||
fallback *providers.FallbackChain
|
||||
@@ -49,6 +50,7 @@ type AgentLoop struct {
|
||||
mcp mcpRuntime
|
||||
steering *steeringQueue
|
||||
mu sync.RWMutex
|
||||
turnSeq atomic.Uint64
|
||||
// Track active requests for safe provider cleanup
|
||||
activeRequests sync.WaitGroup
|
||||
}
|
||||
@@ -103,6 +105,7 @@ func NewAgentLoop(
|
||||
cfg: cfg,
|
||||
registry: registry,
|
||||
state: stateManager,
|
||||
eventBus: NewEventBus(),
|
||||
summarizing: sync.Map{},
|
||||
fallback: fallbackChain,
|
||||
cmdRegistry: commands.NewRegistry(commands.BuiltinDefinitions()),
|
||||
@@ -380,6 +383,185 @@ func (al *AgentLoop) Close() {
|
||||
}
|
||||
|
||||
al.GetRegistry().Close()
|
||||
if al.eventBus != nil {
|
||||
al.eventBus.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeEvents registers a subscriber for agent-loop events.
|
||||
func (al *AgentLoop) SubscribeEvents(buffer int) EventSubscription {
|
||||
if al == nil || al.eventBus == nil {
|
||||
ch := make(chan Event)
|
||||
close(ch)
|
||||
return EventSubscription{C: ch}
|
||||
}
|
||||
return al.eventBus.Subscribe(buffer)
|
||||
}
|
||||
|
||||
// UnsubscribeEvents removes a previously registered event subscriber.
|
||||
func (al *AgentLoop) UnsubscribeEvents(id uint64) {
|
||||
if al == nil || al.eventBus == nil {
|
||||
return
|
||||
}
|
||||
al.eventBus.Unsubscribe(id)
|
||||
}
|
||||
|
||||
// EventDrops returns the number of dropped events for the given kind.
|
||||
func (al *AgentLoop) EventDrops(kind EventKind) int64 {
|
||||
if al == nil || al.eventBus == nil {
|
||||
return 0
|
||||
}
|
||||
return al.eventBus.Dropped(kind)
|
||||
}
|
||||
|
||||
type turnEventScope struct {
|
||||
agentID string
|
||||
sessionKey string
|
||||
turnID string
|
||||
}
|
||||
|
||||
func (al *AgentLoop) newTurnEventScope(agentID, sessionKey string) turnEventScope {
|
||||
seq := al.turnSeq.Add(1)
|
||||
return turnEventScope{
|
||||
agentID: agentID,
|
||||
sessionKey: sessionKey,
|
||||
turnID: fmt.Sprintf("%s-turn-%d", agentID, seq),
|
||||
}
|
||||
}
|
||||
|
||||
func (ts turnEventScope) meta(iteration int, source, tracePath string) EventMeta {
|
||||
return EventMeta{
|
||||
AgentID: ts.agentID,
|
||||
TurnID: ts.turnID,
|
||||
SessionKey: ts.sessionKey,
|
||||
Iteration: iteration,
|
||||
Source: source,
|
||||
TracePath: tracePath,
|
||||
}
|
||||
}
|
||||
|
||||
func (al *AgentLoop) emitEvent(kind EventKind, meta EventMeta, payload any) {
|
||||
evt := Event{
|
||||
Kind: kind,
|
||||
Meta: meta,
|
||||
Payload: payload,
|
||||
}
|
||||
|
||||
al.logEvent(evt)
|
||||
|
||||
if al == nil || al.eventBus == nil {
|
||||
return
|
||||
}
|
||||
al.eventBus.Emit(evt)
|
||||
}
|
||||
|
||||
func cloneEventArguments(args map[string]any) map[string]any {
|
||||
if len(args) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
cloned := make(map[string]any, len(args))
|
||||
for k, v := range args {
|
||||
cloned[k] = v
|
||||
}
|
||||
return cloned
|
||||
}
|
||||
|
||||
func (al *AgentLoop) logEvent(evt Event) {
|
||||
fields := map[string]any{
|
||||
"event_kind": evt.Kind.String(),
|
||||
"agent_id": evt.Meta.AgentID,
|
||||
"turn_id": evt.Meta.TurnID,
|
||||
"session_key": evt.Meta.SessionKey,
|
||||
"iteration": evt.Meta.Iteration,
|
||||
}
|
||||
|
||||
if evt.Meta.TracePath != "" {
|
||||
fields["trace"] = evt.Meta.TracePath
|
||||
}
|
||||
if evt.Meta.Source != "" {
|
||||
fields["source"] = evt.Meta.Source
|
||||
}
|
||||
|
||||
switch payload := evt.Payload.(type) {
|
||||
case TurnStartPayload:
|
||||
fields["channel"] = payload.Channel
|
||||
fields["chat_id"] = payload.ChatID
|
||||
fields["user_len"] = len(payload.UserMessage)
|
||||
fields["media_count"] = payload.MediaCount
|
||||
case TurnEndPayload:
|
||||
fields["status"] = payload.Status
|
||||
fields["iterations_total"] = payload.Iterations
|
||||
fields["duration_ms"] = payload.Duration.Milliseconds()
|
||||
fields["final_len"] = payload.FinalContentLen
|
||||
case LLMRequestPayload:
|
||||
fields["model"] = payload.Model
|
||||
fields["messages"] = payload.MessagesCount
|
||||
fields["tools"] = payload.ToolsCount
|
||||
fields["max_tokens"] = payload.MaxTokens
|
||||
case LLMDeltaPayload:
|
||||
fields["content_delta_len"] = payload.ContentDeltaLen
|
||||
fields["reasoning_delta_len"] = payload.ReasoningDeltaLen
|
||||
case LLMResponsePayload:
|
||||
fields["content_len"] = payload.ContentLen
|
||||
fields["tool_calls"] = payload.ToolCalls
|
||||
fields["has_reasoning"] = payload.HasReasoning
|
||||
case LLMRetryPayload:
|
||||
fields["attempt"] = payload.Attempt
|
||||
fields["max_retries"] = payload.MaxRetries
|
||||
fields["reason"] = payload.Reason
|
||||
fields["error"] = payload.Error
|
||||
fields["backoff_ms"] = payload.Backoff.Milliseconds()
|
||||
case ContextCompressPayload:
|
||||
fields["reason"] = payload.Reason
|
||||
fields["dropped_messages"] = payload.DroppedMessages
|
||||
fields["remaining_messages"] = payload.RemainingMessages
|
||||
case SessionSummarizePayload:
|
||||
fields["summarized_messages"] = payload.SummarizedMessages
|
||||
fields["kept_messages"] = payload.KeptMessages
|
||||
fields["summary_len"] = payload.SummaryLen
|
||||
fields["omitted_oversized"] = payload.OmittedOversized
|
||||
case ToolExecStartPayload:
|
||||
fields["tool"] = payload.Tool
|
||||
fields["args_count"] = len(payload.Arguments)
|
||||
case ToolExecEndPayload:
|
||||
fields["tool"] = payload.Tool
|
||||
fields["duration_ms"] = payload.Duration.Milliseconds()
|
||||
fields["for_llm_len"] = payload.ForLLMLen
|
||||
fields["for_user_len"] = payload.ForUserLen
|
||||
fields["is_error"] = payload.IsError
|
||||
fields["async"] = payload.Async
|
||||
case ToolExecSkippedPayload:
|
||||
fields["tool"] = payload.Tool
|
||||
fields["reason"] = payload.Reason
|
||||
case SteeringInjectedPayload:
|
||||
fields["count"] = payload.Count
|
||||
fields["total_content_len"] = payload.TotalContentLen
|
||||
case FollowUpQueuedPayload:
|
||||
fields["source_tool"] = payload.SourceTool
|
||||
fields["channel"] = payload.Channel
|
||||
fields["chat_id"] = payload.ChatID
|
||||
fields["content_len"] = payload.ContentLen
|
||||
case InterruptReceivedPayload:
|
||||
fields["role"] = payload.Role
|
||||
fields["content_len"] = payload.ContentLen
|
||||
fields["queue_depth"] = payload.QueueDepth
|
||||
case SubTurnSpawnPayload:
|
||||
fields["child_agent_id"] = payload.AgentID
|
||||
fields["label"] = payload.Label
|
||||
case SubTurnEndPayload:
|
||||
fields["child_agent_id"] = payload.AgentID
|
||||
fields["status"] = payload.Status
|
||||
case SubTurnResultDeliveredPayload:
|
||||
fields["target_channel"] = payload.TargetChannel
|
||||
fields["target_chat_id"] = payload.TargetChatID
|
||||
fields["content_len"] = payload.ContentLen
|
||||
case ErrorPayload:
|
||||
fields["stage"] = payload.Stage
|
||||
fields["error"] = payload.Message
|
||||
}
|
||||
|
||||
logger.InfoCF("eventbus", fmt.Sprintf("Agent event: %s", evt.Kind.String()), fields)
|
||||
}
|
||||
|
||||
func (al *AgentLoop) RegisterTool(tool tools.Tool) {
|
||||
@@ -895,6 +1077,35 @@ func (al *AgentLoop) runAgentLoop(
|
||||
agent *AgentInstance,
|
||||
opts processOptions,
|
||||
) (string, error) {
|
||||
turnScope := al.newTurnEventScope(agent.ID, opts.SessionKey)
|
||||
turnStartedAt := time.Now()
|
||||
turnIterations := 0
|
||||
turnFinalContentLen := 0
|
||||
turnStatus := TurnEndStatusCompleted
|
||||
defer func() {
|
||||
al.emitEvent(
|
||||
EventKindTurnEnd,
|
||||
turnScope.meta(turnIterations, "runAgentLoop", "turn.end"),
|
||||
TurnEndPayload{
|
||||
Status: turnStatus,
|
||||
Iterations: turnIterations,
|
||||
Duration: time.Since(turnStartedAt),
|
||||
FinalContentLen: turnFinalContentLen,
|
||||
},
|
||||
)
|
||||
}()
|
||||
|
||||
al.emitEvent(
|
||||
EventKindTurnStart,
|
||||
turnScope.meta(0, "runAgentLoop", "turn.start"),
|
||||
TurnStartPayload{
|
||||
Channel: opts.Channel,
|
||||
ChatID: opts.ChatID,
|
||||
UserMessage: opts.UserMessage,
|
||||
MediaCount: len(opts.Media),
|
||||
},
|
||||
)
|
||||
|
||||
// 0. Record last channel for heartbeat notifications (skip internal channels and cli)
|
||||
if opts.Channel != "" && opts.ChatID != "" {
|
||||
if !constants.IsInternalChannel(opts.Channel) {
|
||||
@@ -937,7 +1148,17 @@ func (al *AgentLoop) runAgentLoop(
|
||||
if isOverContextBudget(agent.ContextWindow, messages, toolDefs, agent.MaxTokens) {
|
||||
logger.WarnCF("agent", "Proactive compression: context budget exceeded before LLM call",
|
||||
map[string]any{"session_key": opts.SessionKey})
|
||||
al.forceCompression(agent, opts.SessionKey)
|
||||
if compression, ok := al.forceCompression(agent, opts.SessionKey); ok {
|
||||
al.emitEvent(
|
||||
EventKindContextCompress,
|
||||
turnScope.meta(0, "runAgentLoop", "turn.context.compress"),
|
||||
ContextCompressPayload{
|
||||
Reason: ContextCompressReasonProactive,
|
||||
DroppedMessages: compression.DroppedMessages,
|
||||
RemainingMessages: compression.RemainingMessages,
|
||||
},
|
||||
)
|
||||
}
|
||||
newHistory := agent.Sessions.GetHistory(opts.SessionKey)
|
||||
newSummary := agent.Sessions.GetSummary(opts.SessionKey)
|
||||
messages = agent.ContextBuilder.BuildMessages(
|
||||
@@ -952,8 +1173,10 @@ func (al *AgentLoop) runAgentLoop(
|
||||
agent.Sessions.AddMessage(opts.SessionKey, "user", opts.UserMessage)
|
||||
|
||||
// 3. Run LLM iteration loop
|
||||
finalContent, iteration, err := al.runLLMIteration(ctx, agent, messages, opts)
|
||||
finalContent, iteration, err := al.runLLMIteration(ctx, agent, messages, opts, turnScope)
|
||||
turnIterations = iteration
|
||||
if err != nil {
|
||||
turnStatus = TurnEndStatusError
|
||||
return "", err
|
||||
}
|
||||
|
||||
@@ -964,6 +1187,7 @@ func (al *AgentLoop) runAgentLoop(
|
||||
if finalContent == "" {
|
||||
finalContent = opts.DefaultResponse
|
||||
}
|
||||
turnFinalContentLen = len(finalContent)
|
||||
|
||||
// 5. Save final assistant message to session
|
||||
agent.Sessions.AddMessage(opts.SessionKey, "assistant", finalContent)
|
||||
@@ -971,7 +1195,7 @@ func (al *AgentLoop) runAgentLoop(
|
||||
|
||||
// 6. Optional: summarization
|
||||
if opts.EnableSummary {
|
||||
al.maybeSummarize(agent, opts.SessionKey, opts.Channel, opts.ChatID)
|
||||
al.maybeSummarize(agent, opts.SessionKey, turnScope)
|
||||
}
|
||||
|
||||
// 7. Optional: send response via bus
|
||||
@@ -1058,6 +1282,7 @@ func (al *AgentLoop) runLLMIteration(
|
||||
agent *AgentInstance,
|
||||
messages []providers.Message,
|
||||
opts processOptions,
|
||||
turnScope turnEventScope,
|
||||
) (string, int, error) {
|
||||
iteration := 0
|
||||
var finalContent string
|
||||
@@ -1084,9 +1309,11 @@ func (al *AgentLoop) runLLMIteration(
|
||||
// Inject pending steering messages into the conversation context
|
||||
// before the next LLM call.
|
||||
if len(pendingMessages) > 0 {
|
||||
totalContentLen := 0
|
||||
for _, pm := range pendingMessages {
|
||||
messages = append(messages, pm)
|
||||
agent.Sessions.AddMessage(opts.SessionKey, pm.Role, pm.Content)
|
||||
totalContentLen += len(pm.Content)
|
||||
logger.InfoCF("agent", "Injected steering message into context",
|
||||
map[string]any{
|
||||
"agent_id": agent.ID,
|
||||
@@ -1094,6 +1321,14 @@ func (al *AgentLoop) runLLMIteration(
|
||||
"content_len": len(pm.Content),
|
||||
})
|
||||
}
|
||||
al.emitEvent(
|
||||
EventKindSteeringInjected,
|
||||
turnScope.meta(iteration, "runLLMIteration", "turn.steering.injected"),
|
||||
SteeringInjectedPayload{
|
||||
Count: len(pendingMessages),
|
||||
TotalContentLen: totalContentLen,
|
||||
},
|
||||
)
|
||||
pendingMessages = nil
|
||||
}
|
||||
|
||||
@@ -1106,6 +1341,17 @@ func (al *AgentLoop) runLLMIteration(
|
||||
|
||||
// Build tool definitions
|
||||
providerToolDefs := agent.Tools.ToProviderDefs()
|
||||
al.emitEvent(
|
||||
EventKindLLMRequest,
|
||||
turnScope.meta(iteration, "runLLMIteration", "turn.llm.request"),
|
||||
LLMRequestPayload{
|
||||
Model: activeModel,
|
||||
MessagesCount: len(messages),
|
||||
ToolsCount: len(providerToolDefs),
|
||||
MaxTokens: agent.MaxTokens,
|
||||
Temperature: agent.Temperature,
|
||||
},
|
||||
)
|
||||
|
||||
// Log LLM request details
|
||||
logger.DebugCF("agent", "LLM request",
|
||||
@@ -1151,6 +1397,8 @@ func (al *AgentLoop) runLLMIteration(
|
||||
callLLM := func() (*providers.LLMResponse, error) {
|
||||
al.activeRequests.Add(1)
|
||||
defer al.activeRequests.Done()
|
||||
// TODO(eventbus): emit EventKindLLMDelta when providers expose
|
||||
// streaming callbacks instead of only the final Chat response.
|
||||
|
||||
if len(activeCandidates) > 1 && al.fallback != nil {
|
||||
fbResult, fbErr := al.fallback.Execute(
|
||||
@@ -1206,6 +1454,17 @@ func (al *AgentLoop) runLLMIteration(
|
||||
|
||||
if isTimeoutError && retry < maxRetries {
|
||||
backoff := time.Duration(retry+1) * 5 * time.Second
|
||||
al.emitEvent(
|
||||
EventKindLLMRetry,
|
||||
turnScope.meta(iteration, "runLLMIteration", "turn.llm.retry"),
|
||||
LLMRetryPayload{
|
||||
Attempt: retry + 1,
|
||||
MaxRetries: maxRetries,
|
||||
Reason: "timeout",
|
||||
Error: err.Error(),
|
||||
Backoff: backoff,
|
||||
},
|
||||
)
|
||||
logger.WarnCF("agent", "Timeout error, retrying after backoff", map[string]any{
|
||||
"error": err.Error(),
|
||||
"retry": retry,
|
||||
@@ -1216,6 +1475,16 @@ func (al *AgentLoop) runLLMIteration(
|
||||
}
|
||||
|
||||
if isContextError && retry < maxRetries {
|
||||
al.emitEvent(
|
||||
EventKindLLMRetry,
|
||||
turnScope.meta(iteration, "runLLMIteration", "turn.llm.retry"),
|
||||
LLMRetryPayload{
|
||||
Attempt: retry + 1,
|
||||
MaxRetries: maxRetries,
|
||||
Reason: "context_limit",
|
||||
Error: err.Error(),
|
||||
},
|
||||
)
|
||||
logger.WarnCF(
|
||||
"agent",
|
||||
"Context window error detected, attempting compression",
|
||||
@@ -1233,7 +1502,17 @@ func (al *AgentLoop) runLLMIteration(
|
||||
})
|
||||
}
|
||||
|
||||
al.forceCompression(agent, opts.SessionKey)
|
||||
if compression, ok := al.forceCompression(agent, opts.SessionKey); ok {
|
||||
al.emitEvent(
|
||||
EventKindContextCompress,
|
||||
turnScope.meta(iteration, "runLLMIteration", "turn.context.compress"),
|
||||
ContextCompressPayload{
|
||||
Reason: ContextCompressReasonRetry,
|
||||
DroppedMessages: compression.DroppedMessages,
|
||||
RemainingMessages: compression.RemainingMessages,
|
||||
},
|
||||
)
|
||||
}
|
||||
newHistory := agent.Sessions.GetHistory(opts.SessionKey)
|
||||
newSummary := agent.Sessions.GetSummary(opts.SessionKey)
|
||||
messages = agent.ContextBuilder.BuildMessages(
|
||||
@@ -1246,6 +1525,14 @@ func (al *AgentLoop) runLLMIteration(
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
al.emitEvent(
|
||||
EventKindError,
|
||||
turnScope.meta(iteration, "runLLMIteration", "turn.error"),
|
||||
ErrorPayload{
|
||||
Stage: "llm",
|
||||
Message: err.Error(),
|
||||
},
|
||||
)
|
||||
logger.ErrorCF("agent", "LLM call failed",
|
||||
map[string]any{
|
||||
"agent_id": agent.ID,
|
||||
@@ -1262,6 +1549,15 @@ func (al *AgentLoop) runLLMIteration(
|
||||
opts.Channel,
|
||||
al.targetReasoningChannelID(opts.Channel),
|
||||
)
|
||||
al.emitEvent(
|
||||
EventKindLLMResponse,
|
||||
turnScope.meta(iteration, "runLLMIteration", "turn.llm.response"),
|
||||
LLMResponsePayload{
|
||||
ContentLen: len(response.Content),
|
||||
ToolCalls: len(response.ToolCalls),
|
||||
HasReasoning: response.Reasoning != "" || response.ReasoningContent != "",
|
||||
},
|
||||
)
|
||||
|
||||
logger.DebugCF("agent", "LLM response",
|
||||
map[string]any{
|
||||
@@ -1352,6 +1648,14 @@ func (al *AgentLoop) runLLMIteration(
|
||||
"tool": tc.Name,
|
||||
"iteration": iteration,
|
||||
})
|
||||
al.emitEvent(
|
||||
EventKindToolExecStart,
|
||||
turnScope.meta(iteration, "runLLMIteration", "turn.tool.start"),
|
||||
ToolExecStartPayload{
|
||||
Tool: tc.Name,
|
||||
Arguments: cloneEventArguments(tc.Arguments),
|
||||
},
|
||||
)
|
||||
|
||||
// Create async callback for tools that implement AsyncExecutor.
|
||||
asyncCallback := func(_ context.Context, result *tools.ToolResult) {
|
||||
@@ -1379,6 +1683,16 @@ func (al *AgentLoop) runLLMIteration(
|
||||
"content_len": len(content),
|
||||
"channel": opts.Channel,
|
||||
})
|
||||
al.emitEvent(
|
||||
EventKindFollowUpQueued,
|
||||
turnScope.meta(iteration, "runLLMIteration", "turn.follow_up.queued"),
|
||||
FollowUpQueuedPayload{
|
||||
SourceTool: tc.Name,
|
||||
Channel: opts.Channel,
|
||||
ChatID: opts.ChatID,
|
||||
ContentLen: len(content),
|
||||
},
|
||||
)
|
||||
|
||||
pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer pubCancel()
|
||||
@@ -1390,6 +1704,7 @@ func (al *AgentLoop) runLLMIteration(
|
||||
})
|
||||
}
|
||||
|
||||
toolStart := time.Now()
|
||||
toolResult := agent.Tools.ExecuteWithContext(
|
||||
ctx,
|
||||
tc.Name,
|
||||
@@ -1398,6 +1713,7 @@ func (al *AgentLoop) runLLMIteration(
|
||||
opts.ChatID,
|
||||
asyncCallback,
|
||||
)
|
||||
toolDuration := time.Since(toolStart)
|
||||
|
||||
// Process tool result
|
||||
if !toolResult.Silent && toolResult.ForUser != "" && opts.SendResponse {
|
||||
@@ -1443,6 +1759,18 @@ func (al *AgentLoop) runLLMIteration(
|
||||
Content: contentForLLM,
|
||||
ToolCallID: tc.ID,
|
||||
}
|
||||
al.emitEvent(
|
||||
EventKindToolExecEnd,
|
||||
turnScope.meta(iteration, "runLLMIteration", "turn.tool.end"),
|
||||
ToolExecEndPayload{
|
||||
Tool: tc.Name,
|
||||
Duration: toolDuration,
|
||||
ForLLMLen: len(contentForLLM),
|
||||
ForUserLen: len(toolResult.ForUser),
|
||||
IsError: toolResult.IsError,
|
||||
Async: toolResult.Async,
|
||||
},
|
||||
)
|
||||
messages = append(messages, toolResultMsg)
|
||||
agent.Sessions.AddFullMessage(opts.SessionKey, toolResultMsg)
|
||||
|
||||
@@ -1464,6 +1792,14 @@ func (al *AgentLoop) runLLMIteration(
|
||||
// Mark remaining tool calls as skipped
|
||||
for j := i + 1; j < len(normalizedToolCalls); j++ {
|
||||
skippedTC := normalizedToolCalls[j]
|
||||
al.emitEvent(
|
||||
EventKindToolExecSkipped,
|
||||
turnScope.meta(iteration, "runLLMIteration", "turn.tool.skipped"),
|
||||
ToolExecSkippedPayload{
|
||||
Tool: skippedTC.Name,
|
||||
Reason: "queued user steering message",
|
||||
},
|
||||
)
|
||||
toolResultMsg := providers.Message{
|
||||
Role: "tool",
|
||||
Content: "Skipped due to queued user message.",
|
||||
@@ -1538,7 +1874,7 @@ func (al *AgentLoop) selectCandidates(
|
||||
}
|
||||
|
||||
// maybeSummarize triggers summarization if the session history exceeds thresholds.
|
||||
func (al *AgentLoop) maybeSummarize(agent *AgentInstance, sessionKey, channel, chatID string) {
|
||||
func (al *AgentLoop) maybeSummarize(agent *AgentInstance, sessionKey string, turnScope turnEventScope) {
|
||||
newHistory := agent.Sessions.GetHistory(sessionKey)
|
||||
tokenEstimate := al.estimateTokens(newHistory)
|
||||
threshold := agent.ContextWindow * agent.SummarizeTokenPercent / 100
|
||||
@@ -1549,12 +1885,17 @@ func (al *AgentLoop) maybeSummarize(agent *AgentInstance, sessionKey, channel, c
|
||||
go func() {
|
||||
defer al.summarizing.Delete(summarizeKey)
|
||||
logger.Debug("Memory threshold reached. Optimizing conversation history...")
|
||||
al.summarizeSession(agent, sessionKey)
|
||||
al.summarizeSession(agent, sessionKey, turnScope)
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type compressionResult struct {
|
||||
DroppedMessages int
|
||||
RemainingMessages int
|
||||
}
|
||||
|
||||
// forceCompression aggressively reduces context when the limit is hit.
|
||||
// It drops the oldest ~50% of Turns (a Turn is a complete user→LLM→response
|
||||
// cycle, as defined in #1316), so tool-call sequences are never split.
|
||||
@@ -1567,10 +1908,10 @@ func (al *AgentLoop) maybeSummarize(agent *AgentInstance, sessionKey, channel, c
|
||||
// prompt is built dynamically by BuildMessages and is NOT stored here.
|
||||
// The compression note is recorded in the session summary so that
|
||||
// BuildMessages can include it in the next system prompt.
|
||||
func (al *AgentLoop) forceCompression(agent *AgentInstance, sessionKey string) {
|
||||
func (al *AgentLoop) forceCompression(agent *AgentInstance, sessionKey string) (compressionResult, bool) {
|
||||
history := agent.Sessions.GetHistory(sessionKey)
|
||||
if len(history) <= 2 {
|
||||
return
|
||||
return compressionResult{}, false
|
||||
}
|
||||
|
||||
// Split at a Turn boundary so no tool-call sequence is torn apart.
|
||||
@@ -1624,6 +1965,11 @@ func (al *AgentLoop) forceCompression(agent *AgentInstance, sessionKey string) {
|
||||
"dropped_msgs": droppedCount,
|
||||
"new_count": len(keptHistory),
|
||||
})
|
||||
|
||||
return compressionResult{
|
||||
DroppedMessages: droppedCount,
|
||||
RemainingMessages: len(keptHistory),
|
||||
}, true
|
||||
}
|
||||
|
||||
// GetStartupInfo returns information about loaded tools and skills for logging.
|
||||
@@ -1715,7 +2061,7 @@ func formatToolsForLog(toolDefs []providers.ToolDefinition) string {
|
||||
}
|
||||
|
||||
// summarizeSession summarizes the conversation history for a session.
|
||||
func (al *AgentLoop) summarizeSession(agent *AgentInstance, sessionKey string) {
|
||||
func (al *AgentLoop) summarizeSession(agent *AgentInstance, sessionKey string, turnScope turnEventScope) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
|
||||
defer cancel()
|
||||
|
||||
@@ -1800,6 +2146,16 @@ func (al *AgentLoop) summarizeSession(agent *AgentInstance, sessionKey string) {
|
||||
agent.Sessions.SetSummary(sessionKey, finalSummary)
|
||||
agent.Sessions.TruncateHistory(sessionKey, keepCount)
|
||||
agent.Sessions.Save(sessionKey)
|
||||
al.emitEvent(
|
||||
EventKindSessionSummarize,
|
||||
turnScope.meta(0, "summarizeSession", "turn.session.summarize"),
|
||||
SessionSummarizePayload{
|
||||
SummarizedMessages: len(validMessages),
|
||||
KeptMessages: keepCount,
|
||||
SummaryLen: len(finalSummary),
|
||||
OmittedOversized: omitted,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -122,6 +122,25 @@ func (al *AgentLoop) Steer(msg providers.Message) error {
|
||||
"content_len": len(msg.Content),
|
||||
"queue_len": al.steering.len(),
|
||||
})
|
||||
agentID := ""
|
||||
if registry := al.GetRegistry(); registry != nil {
|
||||
if agent := registry.GetDefaultAgent(); agent != nil {
|
||||
agentID = agent.ID
|
||||
}
|
||||
}
|
||||
al.emitEvent(
|
||||
EventKindInterruptReceived,
|
||||
EventMeta{
|
||||
AgentID: agentID,
|
||||
Source: "Steer",
|
||||
TracePath: "turn.interrupt.received",
|
||||
},
|
||||
InterruptReceivedPayload{
|
||||
Role: msg.Role,
|
||||
ContentLen: len(msg.Content),
|
||||
QueueDepth: al.steering.len(),
|
||||
},
|
||||
)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -96,6 +96,9 @@ func (t *SpawnTool) execute(ctx context.Context, args map[string]any, cb AsyncCa
|
||||
}
|
||||
|
||||
// Pass callback to manager for async completion notification
|
||||
// TODO(eventbus): when background subagents are migrated onto the
|
||||
// agent package's runTurn/sub-turn tree, emit SubTurnSpawn here and move
|
||||
// lifecycle events out of the legacy SubagentManager path.
|
||||
result, err := t.manager.Spawn(ctx, task, label, agentID, channel, chatID, cb)
|
||||
if err != nil {
|
||||
return ErrorResult(fmt.Sprintf("failed to spawn subagent: %v", err))
|
||||
|
||||
@@ -111,6 +111,9 @@ func (sm *SubagentManager) Spawn(
|
||||
func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask, callback AsyncCallback) {
|
||||
task.Status = "running"
|
||||
task.Created = time.Now().UnixMilli()
|
||||
// TODO(eventbus): once subagents are modeled as child turns inside
|
||||
// pkg/agent, emit SubTurnEnd and SubTurnResultDelivered from the parent
|
||||
// AgentLoop instead of this legacy manager.
|
||||
|
||||
// Build system prompt for subagent
|
||||
systemPrompt := `You are a subagent. Complete the given task independently and report the result.
|
||||
|
||||
Reference in New Issue
Block a user