From af61d0bca720340030fdc2afe2d858e57ff9a583 Mon Sep 17 00:00:00 2001 From: Hoshina Date: Fri, 20 Mar 2026 14:53:22 +0800 Subject: [PATCH] feat(agent): add event bus foundation --- pkg/agent/eventbus.go | 121 +++++++++++++++++++ pkg/agent/eventbus_test.go | 235 +++++++++++++++++++++++++++++++++++++ pkg/agent/events.go | 129 ++++++++++++++++++++ pkg/agent/loop.go | 166 +++++++++++++++++++++++++- 4 files changed, 650 insertions(+), 1 deletion(-) create mode 100644 pkg/agent/eventbus.go create mode 100644 pkg/agent/eventbus_test.go create mode 100644 pkg/agent/events.go diff --git a/pkg/agent/eventbus.go b/pkg/agent/eventbus.go new file mode 100644 index 000000000..546d8436d --- /dev/null +++ b/pkg/agent/eventbus.go @@ -0,0 +1,121 @@ +package agent + +import ( + "sync" + "sync/atomic" + "time" +) + +const defaultEventSubscriberBuffer = 16 + +// EventSubscription identifies a subscriber channel returned by EventBus.Subscribe. +type EventSubscription struct { + ID uint64 + C <-chan Event +} + +type eventSubscriber struct { + ch chan Event +} + +// EventBus is a lightweight multi-subscriber broadcaster for agent-loop events. +type EventBus struct { + mu sync.RWMutex + subs map[uint64]eventSubscriber + nextID uint64 + closed bool + dropped [eventKindCount]atomic.Int64 +} + +// NewEventBus creates a new in-process event broadcaster. +func NewEventBus() *EventBus { + return &EventBus{ + subs: make(map[uint64]eventSubscriber), + } +} + +// Subscribe registers a new subscriber with the requested channel buffer size. +// A non-positive buffer uses the default size. +func (b *EventBus) Subscribe(buffer int) EventSubscription { + if buffer <= 0 { + buffer = defaultEventSubscriberBuffer + } + + b.mu.Lock() + defer b.mu.Unlock() + + if b.closed { + ch := make(chan Event) + close(ch) + return EventSubscription{C: ch} + } + + b.nextID++ + id := b.nextID + ch := make(chan Event, buffer) + b.subs[id] = eventSubscriber{ch: ch} + return EventSubscription{ID: id, C: ch} +} + +// Unsubscribe removes a subscriber and closes its channel. +func (b *EventBus) Unsubscribe(id uint64) { + b.mu.Lock() + defer b.mu.Unlock() + + sub, ok := b.subs[id] + if !ok { + return + } + + delete(b.subs, id) + close(sub.ch) +} + +// Emit broadcasts an event to all current subscribers without blocking. +// When a subscriber channel is full, the event is dropped for that subscriber. +func (b *EventBus) Emit(evt Event) { + if evt.Time.IsZero() { + evt.Time = time.Now() + } + + b.mu.RLock() + defer b.mu.RUnlock() + + if b.closed { + return + } + + for _, sub := range b.subs { + select { + case sub.ch <- evt: + default: + if evt.Kind < eventKindCount { + b.dropped[evt.Kind].Add(1) + } + } + } +} + +// Dropped returns the number of dropped events for a given kind. +func (b *EventBus) Dropped(kind EventKind) int64 { + if kind >= eventKindCount { + return 0 + } + return b.dropped[kind].Load() +} + +// Close closes all subscriber channels and stops future broadcasts. +func (b *EventBus) Close() { + b.mu.Lock() + defer b.mu.Unlock() + + if b.closed { + return + } + + b.closed = true + for id, sub := range b.subs { + close(sub.ch) + delete(b.subs, id) + } +} diff --git a/pkg/agent/eventbus_test.go b/pkg/agent/eventbus_test.go new file mode 100644 index 000000000..d57fac094 --- /dev/null +++ b/pkg/agent/eventbus_test.go @@ -0,0 +1,235 @@ +package agent + +import ( + "context" + "os" + "slices" + "testing" + "time" + + "github.com/sipeed/picoclaw/pkg/bus" + "github.com/sipeed/picoclaw/pkg/config" + "github.com/sipeed/picoclaw/pkg/providers" + "github.com/sipeed/picoclaw/pkg/tools" +) + +func TestEventBus_SubscribeEmitUnsubscribeClose(t *testing.T) { + eventBus := NewEventBus() + sub := eventBus.Subscribe(1) + + eventBus.Emit(Event{ + Kind: EventKindTurnStart, + Meta: EventMeta{TurnID: "turn-1"}, + }) + + select { + case evt := <-sub.C: + if evt.Kind != EventKindTurnStart { + t.Fatalf("expected %v, got %v", EventKindTurnStart, evt.Kind) + } + if evt.Meta.TurnID != "turn-1" { + t.Fatalf("expected turn id turn-1, got %q", evt.Meta.TurnID) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for event") + } + + eventBus.Unsubscribe(sub.ID) + if _, ok := <-sub.C; ok { + t.Fatal("expected subscriber channel to be closed after unsubscribe") + } + + eventBus.Close() + closedSub := eventBus.Subscribe(1) + if _, ok := <-closedSub.C; ok { + t.Fatal("expected closed bus to return a closed subscriber channel") + } +} + +func TestEventBus_DropsWhenSubscriberIsFull(t *testing.T) { + eventBus := NewEventBus() + sub := eventBus.Subscribe(1) + defer eventBus.Unsubscribe(sub.ID) + + start := time.Now() + for i := 0; i < 1000; i++ { + eventBus.Emit(Event{Kind: EventKindLLMRequest}) + } + + if elapsed := time.Since(start); elapsed > 100*time.Millisecond { + t.Fatalf("Emit took too long with a blocked subscriber: %s", elapsed) + } + + if got := eventBus.Dropped(EventKindLLMRequest); got != 999 { + t.Fatalf("expected 999 dropped events, got %d", got) + } +} + +type scriptedToolProvider struct { + calls int +} + +func (m *scriptedToolProvider) Chat( + ctx context.Context, + messages []providers.Message, + toolDefs []providers.ToolDefinition, + model string, + opts map[string]any, +) (*providers.LLMResponse, error) { + m.calls++ + if m.calls == 1 { + return &providers.LLMResponse{ + ToolCalls: []providers.ToolCall{ + { + ID: "call-1", + Name: "mock_custom", + Arguments: map[string]any{"task": "ping"}, + }, + }, + }, nil + } + + return &providers.LLMResponse{ + Content: "done", + }, nil +} + +func (m *scriptedToolProvider) GetDefaultModel() string { + return "scripted-tool-model" +} + +func TestAgentLoop_EmitsMinimalTurnEvents(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "agent-eventbus-*") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + cfg := &config.Config{ + Agents: config.AgentsConfig{ + Defaults: config.AgentDefaults{ + Workspace: tmpDir, + Model: "test-model", + MaxTokens: 4096, + MaxToolIterations: 10, + }, + }, + } + + msgBus := bus.NewMessageBus() + provider := &scriptedToolProvider{} + al := NewAgentLoop(cfg, msgBus, provider) + al.RegisterTool(&mockCustomTool{}) + defaultAgent := al.registry.GetDefaultAgent() + if defaultAgent == nil { + t.Fatal("expected default agent") + } + + sub := al.SubscribeEvents(16) + defer al.UnsubscribeEvents(sub.ID) + + response, err := al.runAgentLoop(context.Background(), defaultAgent, processOptions{ + SessionKey: "session-1", + Channel: "cli", + ChatID: "direct", + UserMessage: "run tool", + DefaultResponse: defaultResponse, + EnableSummary: false, + SendResponse: false, + }) + if err != nil { + t.Fatalf("runAgentLoop failed: %v", err) + } + if response != "done" { + t.Fatalf("expected final response 'done', got %q", response) + } + + events := collectEventStream(sub.C) + if len(events) != 8 { + t.Fatalf("expected 8 events, got %d", len(events)) + } + + kinds := make([]EventKind, 0, len(events)) + for _, evt := range events { + kinds = append(kinds, evt.Kind) + } + + expectedKinds := []EventKind{ + EventKindTurnStart, + EventKindLLMRequest, + EventKindLLMResponse, + EventKindToolExecStart, + EventKindToolExecEnd, + EventKindLLMRequest, + EventKindLLMResponse, + EventKindTurnEnd, + } + if !slices.Equal(kinds, expectedKinds) { + t.Fatalf("unexpected event sequence: got %v want %v", kinds, expectedKinds) + } + + turnID := events[0].Meta.TurnID + for i, evt := range events { + if evt.Meta.TurnID != turnID { + t.Fatalf("event %d has mismatched turn id %q, want %q", i, evt.Meta.TurnID, turnID) + } + if evt.Meta.SessionKey != "session-1" { + t.Fatalf("event %d has session key %q, want session-1", i, evt.Meta.SessionKey) + } + } + + startPayload, ok := events[0].Payload.(TurnStartPayload) + if !ok { + t.Fatalf("expected TurnStartPayload, got %T", events[0].Payload) + } + if startPayload.UserMessage != "run tool" { + t.Fatalf("expected user message 'run tool', got %q", startPayload.UserMessage) + } + + toolStartPayload, ok := events[3].Payload.(ToolExecStartPayload) + if !ok { + t.Fatalf("expected ToolExecStartPayload, got %T", events[3].Payload) + } + if toolStartPayload.Tool != "mock_custom" { + t.Fatalf("expected tool name mock_custom, got %q", toolStartPayload.Tool) + } + + toolEndPayload, ok := events[4].Payload.(ToolExecEndPayload) + if !ok { + t.Fatalf("expected ToolExecEndPayload, got %T", events[4].Payload) + } + if toolEndPayload.Tool != "mock_custom" { + t.Fatalf("expected tool end payload for mock_custom, got %q", toolEndPayload.Tool) + } + if toolEndPayload.IsError { + t.Fatal("expected mock_custom tool to succeed") + } + + turnEndPayload, ok := events[len(events)-1].Payload.(TurnEndPayload) + if !ok { + t.Fatalf("expected TurnEndPayload, got %T", events[len(events)-1].Payload) + } + if turnEndPayload.Status != TurnEndStatusCompleted { + t.Fatalf("expected completed turn, got %q", turnEndPayload.Status) + } + if turnEndPayload.Iterations != 2 { + t.Fatalf("expected 2 iterations, got %d", turnEndPayload.Iterations) + } +} + +func collectEventStream(ch <-chan Event) []Event { + var events []Event + for { + select { + case evt, ok := <-ch: + if !ok { + return events + } + events = append(events, evt) + default: + return events + } + } +} + +var _ tools.Tool = (*mockCustomTool)(nil) diff --git a/pkg/agent/events.go b/pkg/agent/events.go new file mode 100644 index 000000000..92aec7436 --- /dev/null +++ b/pkg/agent/events.go @@ -0,0 +1,129 @@ +package agent + +import ( + "fmt" + "time" +) + +// EventKind identifies a structured agent-loop event. +type EventKind uint8 + +const ( + // EventKindTurnStart is emitted when a turn begins processing. + EventKindTurnStart EventKind = iota + // EventKindTurnEnd is emitted when a turn finishes, successfully or with an error. + EventKindTurnEnd + // EventKindLLMRequest is emitted before a provider chat request is made. + EventKindLLMRequest + // EventKindLLMResponse is emitted after a provider chat response is received. + EventKindLLMResponse + // EventKindToolExecStart is emitted immediately before a tool executes. + EventKindToolExecStart + // EventKindToolExecEnd is emitted immediately after a tool finishes executing. + EventKindToolExecEnd + // EventKindError is emitted when a turn encounters an execution error. + EventKindError + + eventKindCount +) + +var eventKindNames = [...]string{ + "turn_start", + "turn_end", + "llm_request", + "llm_response", + "tool_exec_start", + "tool_exec_end", + "error", +} + +// String returns the stable string form of an EventKind. +func (k EventKind) String() string { + if k >= eventKindCount { + return fmt.Sprintf("event_kind(%d)", k) + } + return eventKindNames[k] +} + +// Event is the structured envelope broadcast by the agent EventBus. +type Event struct { + Kind EventKind + Time time.Time + Meta EventMeta + Payload any +} + +// EventMeta contains correlation fields shared by all agent-loop events. +type EventMeta struct { + AgentID string + TurnID string + ParentTurnID string + SessionKey string + Iteration int + TracePath string + Source string +} + +// TurnEndStatus describes the terminal state of a turn. +type TurnEndStatus string + +const ( + // TurnEndStatusCompleted indicates the turn finished normally. + TurnEndStatusCompleted TurnEndStatus = "completed" + // TurnEndStatusError indicates the turn ended because of an error. + TurnEndStatusError TurnEndStatus = "error" +) + +// TurnStartPayload describes the start of a turn. +type TurnStartPayload struct { + Channel string + ChatID string + UserMessage string + MediaCount int +} + +// TurnEndPayload describes the completion of a turn. +type TurnEndPayload struct { + Status TurnEndStatus + Iterations int + Duration time.Duration + FinalContentLen int +} + +// LLMRequestPayload describes an outbound LLM request. +type LLMRequestPayload struct { + Model string + MessagesCount int + ToolsCount int + MaxTokens int + Temperature float64 +} + +// LLMResponsePayload describes an inbound LLM response. +type LLMResponsePayload struct { + ContentLen int + ToolCalls int + HasReasoning bool +} + +// ToolExecStartPayload describes a tool execution request. +type ToolExecStartPayload struct { + Tool string + Arguments map[string]any +} + +// ToolExecEndPayload describes the outcome of a tool execution. +type ToolExecEndPayload struct { + Tool string + Duration time.Duration + ForLLMLen int + ForUserLen int + IsError bool + Async bool +} + +// ErrorPayload describes an execution error inside the agent loop. +type ErrorPayload struct { + Stage string + Message string +} diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index c583f5ca5..2c9c86cf9 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -39,6 +39,7 @@ type AgentLoop struct { cfg *config.Config registry *AgentRegistry state *state.Manager + eventBus *EventBus running atomic.Bool summarizing sync.Map fallback *providers.FallbackChain @@ -49,6 +50,7 @@ type AgentLoop struct { mcp mcpRuntime steering *steeringQueue mu sync.RWMutex + turnSeq atomic.Uint64 // Track active requests for safe provider cleanup activeRequests sync.WaitGroup } @@ -103,6 +105,7 @@ func NewAgentLoop( cfg: cfg, registry: registry, state: stateManager, + eventBus: NewEventBus(), summarizing: sync.Map{}, fallback: fallbackChain, cmdRegistry: commands.NewRegistry(commands.BuiltinDefinitions()), @@ -380,6 +383,84 @@ func (al *AgentLoop) Close() { } al.GetRegistry().Close() + if al.eventBus != nil { + al.eventBus.Close() + } +} + +// SubscribeEvents registers a subscriber for agent-loop events. +func (al *AgentLoop) SubscribeEvents(buffer int) EventSubscription { + if al == nil || al.eventBus == nil { + ch := make(chan Event) + close(ch) + return EventSubscription{C: ch} + } + return al.eventBus.Subscribe(buffer) +} + +// UnsubscribeEvents removes a previously registered event subscriber. +func (al *AgentLoop) UnsubscribeEvents(id uint64) { + if al == nil || al.eventBus == nil { + return + } + al.eventBus.Unsubscribe(id) +} + +// EventDrops returns the number of dropped events for the given kind. +func (al *AgentLoop) EventDrops(kind EventKind) int64 { + if al == nil || al.eventBus == nil { + return 0 + } + return al.eventBus.Dropped(kind) +} + +type turnEventScope struct { + agentID string + sessionKey string + turnID string +} + +func (al *AgentLoop) newTurnEventScope(agentID, sessionKey string) turnEventScope { + seq := al.turnSeq.Add(1) + return turnEventScope{ + agentID: agentID, + sessionKey: sessionKey, + turnID: fmt.Sprintf("%s-turn-%d", agentID, seq), + } +} + +func (ts turnEventScope) meta(iteration int, source, tracePath string) EventMeta { + return EventMeta{ + AgentID: ts.agentID, + TurnID: ts.turnID, + SessionKey: ts.sessionKey, + Iteration: iteration, + Source: source, + TracePath: tracePath, + } +} + +func (al *AgentLoop) emitEvent(kind EventKind, meta EventMeta, payload any) { + if al == nil || al.eventBus == nil { + return + } + al.eventBus.Emit(Event{ + Kind: kind, + Meta: meta, + Payload: payload, + }) +} + +func cloneEventArguments(args map[string]any) map[string]any { + if len(args) == 0 { + return nil + } + + cloned := make(map[string]any, len(args)) + for k, v := range args { + cloned[k] = v + } + return cloned } func (al *AgentLoop) RegisterTool(tool tools.Tool) { @@ -895,6 +976,35 @@ func (al *AgentLoop) runAgentLoop( agent *AgentInstance, opts processOptions, ) (string, error) { + turnScope := al.newTurnEventScope(agent.ID, opts.SessionKey) + turnStartedAt := time.Now() + turnIterations := 0 + turnFinalContentLen := 0 + turnStatus := TurnEndStatusCompleted + defer func() { + al.emitEvent( + EventKindTurnEnd, + turnScope.meta(turnIterations, "runAgentLoop", "turn.end"), + TurnEndPayload{ + Status: turnStatus, + Iterations: turnIterations, + Duration: time.Since(turnStartedAt), + FinalContentLen: turnFinalContentLen, + }, + ) + }() + + al.emitEvent( + EventKindTurnStart, + turnScope.meta(0, "runAgentLoop", "turn.start"), + TurnStartPayload{ + Channel: opts.Channel, + ChatID: opts.ChatID, + UserMessage: opts.UserMessage, + MediaCount: len(opts.Media), + }, + ) + // 0. Record last channel for heartbeat notifications (skip internal channels and cli) if opts.Channel != "" && opts.ChatID != "" { if !constants.IsInternalChannel(opts.Channel) { @@ -952,8 +1062,10 @@ func (al *AgentLoop) runAgentLoop( agent.Sessions.AddMessage(opts.SessionKey, "user", opts.UserMessage) // 3. Run LLM iteration loop - finalContent, iteration, err := al.runLLMIteration(ctx, agent, messages, opts) + finalContent, iteration, err := al.runLLMIteration(ctx, agent, messages, opts, turnScope) + turnIterations = iteration if err != nil { + turnStatus = TurnEndStatusError return "", err } @@ -964,6 +1076,7 @@ func (al *AgentLoop) runAgentLoop( if finalContent == "" { finalContent = opts.DefaultResponse } + turnFinalContentLen = len(finalContent) // 5. Save final assistant message to session agent.Sessions.AddMessage(opts.SessionKey, "assistant", finalContent) @@ -1058,6 +1171,7 @@ func (al *AgentLoop) runLLMIteration( agent *AgentInstance, messages []providers.Message, opts processOptions, + turnScope turnEventScope, ) (string, int, error) { iteration := 0 var finalContent string @@ -1106,6 +1220,17 @@ func (al *AgentLoop) runLLMIteration( // Build tool definitions providerToolDefs := agent.Tools.ToProviderDefs() + al.emitEvent( + EventKindLLMRequest, + turnScope.meta(iteration, "runLLMIteration", "turn.llm.request"), + LLMRequestPayload{ + Model: activeModel, + MessagesCount: len(messages), + ToolsCount: len(providerToolDefs), + MaxTokens: agent.MaxTokens, + Temperature: agent.Temperature, + }, + ) // Log LLM request details logger.DebugCF("agent", "LLM request", @@ -1246,6 +1371,14 @@ func (al *AgentLoop) runLLMIteration( } if err != nil { + al.emitEvent( + EventKindError, + turnScope.meta(iteration, "runLLMIteration", "turn.error"), + ErrorPayload{ + Stage: "llm", + Message: err.Error(), + }, + ) logger.ErrorCF("agent", "LLM call failed", map[string]any{ "agent_id": agent.ID, @@ -1262,6 +1395,15 @@ func (al *AgentLoop) runLLMIteration( opts.Channel, al.targetReasoningChannelID(opts.Channel), ) + al.emitEvent( + EventKindLLMResponse, + turnScope.meta(iteration, "runLLMIteration", "turn.llm.response"), + LLMResponsePayload{ + ContentLen: len(response.Content), + ToolCalls: len(response.ToolCalls), + HasReasoning: response.Reasoning != "" || response.ReasoningContent != "", + }, + ) logger.DebugCF("agent", "LLM response", map[string]any{ @@ -1352,6 +1494,14 @@ func (al *AgentLoop) runLLMIteration( "tool": tc.Name, "iteration": iteration, }) + al.emitEvent( + EventKindToolExecStart, + turnScope.meta(iteration, "runLLMIteration", "turn.tool.start"), + ToolExecStartPayload{ + Tool: tc.Name, + Arguments: cloneEventArguments(tc.Arguments), + }, + ) // Create async callback for tools that implement AsyncExecutor. asyncCallback := func(_ context.Context, result *tools.ToolResult) { @@ -1390,6 +1540,7 @@ func (al *AgentLoop) runLLMIteration( }) } + toolStart := time.Now() toolResult := agent.Tools.ExecuteWithContext( ctx, tc.Name, @@ -1398,6 +1549,7 @@ func (al *AgentLoop) runLLMIteration( opts.ChatID, asyncCallback, ) + toolDuration := time.Since(toolStart) // Process tool result if !toolResult.Silent && toolResult.ForUser != "" && opts.SendResponse { @@ -1443,6 +1595,18 @@ func (al *AgentLoop) runLLMIteration( Content: contentForLLM, ToolCallID: tc.ID, } + al.emitEvent( + EventKindToolExecEnd, + turnScope.meta(iteration, "runLLMIteration", "turn.tool.end"), + ToolExecEndPayload{ + Tool: tc.Name, + Duration: toolDuration, + ForLLMLen: len(contentForLLM), + ForUserLen: len(toolResult.ForUser), + IsError: toolResult.IsError, + Async: toolResult.Async, + }, + ) messages = append(messages, toolResultMsg) agent.Sessions.AddFullMessage(opts.SessionKey, toolResultMsg)