feat(agent): add event bus foundation

This commit is contained in:
Hoshina
2026-03-20 14:53:22 +08:00
parent 899558bbfa
commit af61d0bca7
4 changed files with 650 additions and 1 deletions
+121
View File
@@ -0,0 +1,121 @@
package agent
import (
"sync"
"sync/atomic"
"time"
)
const defaultEventSubscriberBuffer = 16
// EventSubscription identifies a subscriber channel returned by EventBus.Subscribe.
type EventSubscription struct {
ID uint64
C <-chan Event
}
type eventSubscriber struct {
ch chan Event
}
// EventBus is a lightweight multi-subscriber broadcaster for agent-loop events.
type EventBus struct {
mu sync.RWMutex
subs map[uint64]eventSubscriber
nextID uint64
closed bool
dropped [eventKindCount]atomic.Int64
}
// NewEventBus creates a new in-process event broadcaster.
func NewEventBus() *EventBus {
return &EventBus{
subs: make(map[uint64]eventSubscriber),
}
}
// Subscribe registers a new subscriber with the requested channel buffer size.
// A non-positive buffer uses the default size.
func (b *EventBus) Subscribe(buffer int) EventSubscription {
if buffer <= 0 {
buffer = defaultEventSubscriberBuffer
}
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
ch := make(chan Event)
close(ch)
return EventSubscription{C: ch}
}
b.nextID++
id := b.nextID
ch := make(chan Event, buffer)
b.subs[id] = eventSubscriber{ch: ch}
return EventSubscription{ID: id, C: ch}
}
// Unsubscribe removes a subscriber and closes its channel.
func (b *EventBus) Unsubscribe(id uint64) {
b.mu.Lock()
defer b.mu.Unlock()
sub, ok := b.subs[id]
if !ok {
return
}
delete(b.subs, id)
close(sub.ch)
}
// Emit broadcasts an event to all current subscribers without blocking.
// When a subscriber channel is full, the event is dropped for that subscriber.
func (b *EventBus) Emit(evt Event) {
if evt.Time.IsZero() {
evt.Time = time.Now()
}
b.mu.RLock()
defer b.mu.RUnlock()
if b.closed {
return
}
for _, sub := range b.subs {
select {
case sub.ch <- evt:
default:
if evt.Kind < eventKindCount {
b.dropped[evt.Kind].Add(1)
}
}
}
}
// Dropped returns the number of dropped events for a given kind.
func (b *EventBus) Dropped(kind EventKind) int64 {
if kind >= eventKindCount {
return 0
}
return b.dropped[kind].Load()
}
// Close closes all subscriber channels and stops future broadcasts.
func (b *EventBus) Close() {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return
}
b.closed = true
for id, sub := range b.subs {
close(sub.ch)
delete(b.subs, id)
}
}
+235
View File
@@ -0,0 +1,235 @@
package agent
import (
"context"
"os"
"slices"
"testing"
"time"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/providers"
"github.com/sipeed/picoclaw/pkg/tools"
)
func TestEventBus_SubscribeEmitUnsubscribeClose(t *testing.T) {
eventBus := NewEventBus()
sub := eventBus.Subscribe(1)
eventBus.Emit(Event{
Kind: EventKindTurnStart,
Meta: EventMeta{TurnID: "turn-1"},
})
select {
case evt := <-sub.C:
if evt.Kind != EventKindTurnStart {
t.Fatalf("expected %v, got %v", EventKindTurnStart, evt.Kind)
}
if evt.Meta.TurnID != "turn-1" {
t.Fatalf("expected turn id turn-1, got %q", evt.Meta.TurnID)
}
case <-time.After(time.Second):
t.Fatal("timed out waiting for event")
}
eventBus.Unsubscribe(sub.ID)
if _, ok := <-sub.C; ok {
t.Fatal("expected subscriber channel to be closed after unsubscribe")
}
eventBus.Close()
closedSub := eventBus.Subscribe(1)
if _, ok := <-closedSub.C; ok {
t.Fatal("expected closed bus to return a closed subscriber channel")
}
}
func TestEventBus_DropsWhenSubscriberIsFull(t *testing.T) {
eventBus := NewEventBus()
sub := eventBus.Subscribe(1)
defer eventBus.Unsubscribe(sub.ID)
start := time.Now()
for i := 0; i < 1000; i++ {
eventBus.Emit(Event{Kind: EventKindLLMRequest})
}
if elapsed := time.Since(start); elapsed > 100*time.Millisecond {
t.Fatalf("Emit took too long with a blocked subscriber: %s", elapsed)
}
if got := eventBus.Dropped(EventKindLLMRequest); got != 999 {
t.Fatalf("expected 999 dropped events, got %d", got)
}
}
type scriptedToolProvider struct {
calls int
}
func (m *scriptedToolProvider) Chat(
ctx context.Context,
messages []providers.Message,
toolDefs []providers.ToolDefinition,
model string,
opts map[string]any,
) (*providers.LLMResponse, error) {
m.calls++
if m.calls == 1 {
return &providers.LLMResponse{
ToolCalls: []providers.ToolCall{
{
ID: "call-1",
Name: "mock_custom",
Arguments: map[string]any{"task": "ping"},
},
},
}, nil
}
return &providers.LLMResponse{
Content: "done",
}, nil
}
func (m *scriptedToolProvider) GetDefaultModel() string {
return "scripted-tool-model"
}
func TestAgentLoop_EmitsMinimalTurnEvents(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "agent-eventbus-*")
if err != nil {
t.Fatalf("failed to create temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
cfg := &config.Config{
Agents: config.AgentsConfig{
Defaults: config.AgentDefaults{
Workspace: tmpDir,
Model: "test-model",
MaxTokens: 4096,
MaxToolIterations: 10,
},
},
}
msgBus := bus.NewMessageBus()
provider := &scriptedToolProvider{}
al := NewAgentLoop(cfg, msgBus, provider)
al.RegisterTool(&mockCustomTool{})
defaultAgent := al.registry.GetDefaultAgent()
if defaultAgent == nil {
t.Fatal("expected default agent")
}
sub := al.SubscribeEvents(16)
defer al.UnsubscribeEvents(sub.ID)
response, err := al.runAgentLoop(context.Background(), defaultAgent, processOptions{
SessionKey: "session-1",
Channel: "cli",
ChatID: "direct",
UserMessage: "run tool",
DefaultResponse: defaultResponse,
EnableSummary: false,
SendResponse: false,
})
if err != nil {
t.Fatalf("runAgentLoop failed: %v", err)
}
if response != "done" {
t.Fatalf("expected final response 'done', got %q", response)
}
events := collectEventStream(sub.C)
if len(events) != 8 {
t.Fatalf("expected 8 events, got %d", len(events))
}
kinds := make([]EventKind, 0, len(events))
for _, evt := range events {
kinds = append(kinds, evt.Kind)
}
expectedKinds := []EventKind{
EventKindTurnStart,
EventKindLLMRequest,
EventKindLLMResponse,
EventKindToolExecStart,
EventKindToolExecEnd,
EventKindLLMRequest,
EventKindLLMResponse,
EventKindTurnEnd,
}
if !slices.Equal(kinds, expectedKinds) {
t.Fatalf("unexpected event sequence: got %v want %v", kinds, expectedKinds)
}
turnID := events[0].Meta.TurnID
for i, evt := range events {
if evt.Meta.TurnID != turnID {
t.Fatalf("event %d has mismatched turn id %q, want %q", i, evt.Meta.TurnID, turnID)
}
if evt.Meta.SessionKey != "session-1" {
t.Fatalf("event %d has session key %q, want session-1", i, evt.Meta.SessionKey)
}
}
startPayload, ok := events[0].Payload.(TurnStartPayload)
if !ok {
t.Fatalf("expected TurnStartPayload, got %T", events[0].Payload)
}
if startPayload.UserMessage != "run tool" {
t.Fatalf("expected user message 'run tool', got %q", startPayload.UserMessage)
}
toolStartPayload, ok := events[3].Payload.(ToolExecStartPayload)
if !ok {
t.Fatalf("expected ToolExecStartPayload, got %T", events[3].Payload)
}
if toolStartPayload.Tool != "mock_custom" {
t.Fatalf("expected tool name mock_custom, got %q", toolStartPayload.Tool)
}
toolEndPayload, ok := events[4].Payload.(ToolExecEndPayload)
if !ok {
t.Fatalf("expected ToolExecEndPayload, got %T", events[4].Payload)
}
if toolEndPayload.Tool != "mock_custom" {
t.Fatalf("expected tool end payload for mock_custom, got %q", toolEndPayload.Tool)
}
if toolEndPayload.IsError {
t.Fatal("expected mock_custom tool to succeed")
}
turnEndPayload, ok := events[len(events)-1].Payload.(TurnEndPayload)
if !ok {
t.Fatalf("expected TurnEndPayload, got %T", events[len(events)-1].Payload)
}
if turnEndPayload.Status != TurnEndStatusCompleted {
t.Fatalf("expected completed turn, got %q", turnEndPayload.Status)
}
if turnEndPayload.Iterations != 2 {
t.Fatalf("expected 2 iterations, got %d", turnEndPayload.Iterations)
}
}
func collectEventStream(ch <-chan Event) []Event {
var events []Event
for {
select {
case evt, ok := <-ch:
if !ok {
return events
}
events = append(events, evt)
default:
return events
}
}
}
var _ tools.Tool = (*mockCustomTool)(nil)
+129
View File
@@ -0,0 +1,129 @@
package agent
import (
"fmt"
"time"
)
// EventKind identifies a structured agent-loop event.
type EventKind uint8
const (
// EventKindTurnStart is emitted when a turn begins processing.
EventKindTurnStart EventKind = iota
// EventKindTurnEnd is emitted when a turn finishes, successfully or with an error.
EventKindTurnEnd
// EventKindLLMRequest is emitted before a provider chat request is made.
EventKindLLMRequest
// EventKindLLMResponse is emitted after a provider chat response is received.
EventKindLLMResponse
// EventKindToolExecStart is emitted immediately before a tool executes.
EventKindToolExecStart
// EventKindToolExecEnd is emitted immediately after a tool finishes executing.
EventKindToolExecEnd
// EventKindError is emitted when a turn encounters an execution error.
EventKindError
eventKindCount
)
var eventKindNames = [...]string{
"turn_start",
"turn_end",
"llm_request",
"llm_response",
"tool_exec_start",
"tool_exec_end",
"error",
}
// String returns the stable string form of an EventKind.
func (k EventKind) String() string {
if k >= eventKindCount {
return fmt.Sprintf("event_kind(%d)", k)
}
return eventKindNames[k]
}
// Event is the structured envelope broadcast by the agent EventBus.
type Event struct {
Kind EventKind
Time time.Time
Meta EventMeta
Payload any
}
// EventMeta contains correlation fields shared by all agent-loop events.
type EventMeta struct {
AgentID string
TurnID string
ParentTurnID string
SessionKey string
Iteration int
TracePath string
Source string
}
// TurnEndStatus describes the terminal state of a turn.
type TurnEndStatus string
const (
// TurnEndStatusCompleted indicates the turn finished normally.
TurnEndStatusCompleted TurnEndStatus = "completed"
// TurnEndStatusError indicates the turn ended because of an error.
TurnEndStatusError TurnEndStatus = "error"
)
// TurnStartPayload describes the start of a turn.
type TurnStartPayload struct {
Channel string
ChatID string
UserMessage string
MediaCount int
}
// TurnEndPayload describes the completion of a turn.
type TurnEndPayload struct {
Status TurnEndStatus
Iterations int
Duration time.Duration
FinalContentLen int
}
// LLMRequestPayload describes an outbound LLM request.
type LLMRequestPayload struct {
Model string
MessagesCount int
ToolsCount int
MaxTokens int
Temperature float64
}
// LLMResponsePayload describes an inbound LLM response.
type LLMResponsePayload struct {
ContentLen int
ToolCalls int
HasReasoning bool
}
// ToolExecStartPayload describes a tool execution request.
type ToolExecStartPayload struct {
Tool string
Arguments map[string]any
}
// ToolExecEndPayload describes the outcome of a tool execution.
type ToolExecEndPayload struct {
Tool string
Duration time.Duration
ForLLMLen int
ForUserLen int
IsError bool
Async bool
}
// ErrorPayload describes an execution error inside the agent loop.
type ErrorPayload struct {
Stage string
Message string
}
+165 -1
View File
@@ -39,6 +39,7 @@ type AgentLoop struct {
cfg *config.Config
registry *AgentRegistry
state *state.Manager
eventBus *EventBus
running atomic.Bool
summarizing sync.Map
fallback *providers.FallbackChain
@@ -49,6 +50,7 @@ type AgentLoop struct {
mcp mcpRuntime
steering *steeringQueue
mu sync.RWMutex
turnSeq atomic.Uint64
// Track active requests for safe provider cleanup
activeRequests sync.WaitGroup
}
@@ -103,6 +105,7 @@ func NewAgentLoop(
cfg: cfg,
registry: registry,
state: stateManager,
eventBus: NewEventBus(),
summarizing: sync.Map{},
fallback: fallbackChain,
cmdRegistry: commands.NewRegistry(commands.BuiltinDefinitions()),
@@ -380,6 +383,84 @@ func (al *AgentLoop) Close() {
}
al.GetRegistry().Close()
if al.eventBus != nil {
al.eventBus.Close()
}
}
// SubscribeEvents registers a subscriber for agent-loop events.
func (al *AgentLoop) SubscribeEvents(buffer int) EventSubscription {
if al == nil || al.eventBus == nil {
ch := make(chan Event)
close(ch)
return EventSubscription{C: ch}
}
return al.eventBus.Subscribe(buffer)
}
// UnsubscribeEvents removes a previously registered event subscriber.
func (al *AgentLoop) UnsubscribeEvents(id uint64) {
if al == nil || al.eventBus == nil {
return
}
al.eventBus.Unsubscribe(id)
}
// EventDrops returns the number of dropped events for the given kind.
func (al *AgentLoop) EventDrops(kind EventKind) int64 {
if al == nil || al.eventBus == nil {
return 0
}
return al.eventBus.Dropped(kind)
}
type turnEventScope struct {
agentID string
sessionKey string
turnID string
}
func (al *AgentLoop) newTurnEventScope(agentID, sessionKey string) turnEventScope {
seq := al.turnSeq.Add(1)
return turnEventScope{
agentID: agentID,
sessionKey: sessionKey,
turnID: fmt.Sprintf("%s-turn-%d", agentID, seq),
}
}
func (ts turnEventScope) meta(iteration int, source, tracePath string) EventMeta {
return EventMeta{
AgentID: ts.agentID,
TurnID: ts.turnID,
SessionKey: ts.sessionKey,
Iteration: iteration,
Source: source,
TracePath: tracePath,
}
}
func (al *AgentLoop) emitEvent(kind EventKind, meta EventMeta, payload any) {
if al == nil || al.eventBus == nil {
return
}
al.eventBus.Emit(Event{
Kind: kind,
Meta: meta,
Payload: payload,
})
}
func cloneEventArguments(args map[string]any) map[string]any {
if len(args) == 0 {
return nil
}
cloned := make(map[string]any, len(args))
for k, v := range args {
cloned[k] = v
}
return cloned
}
func (al *AgentLoop) RegisterTool(tool tools.Tool) {
@@ -895,6 +976,35 @@ func (al *AgentLoop) runAgentLoop(
agent *AgentInstance,
opts processOptions,
) (string, error) {
turnScope := al.newTurnEventScope(agent.ID, opts.SessionKey)
turnStartedAt := time.Now()
turnIterations := 0
turnFinalContentLen := 0
turnStatus := TurnEndStatusCompleted
defer func() {
al.emitEvent(
EventKindTurnEnd,
turnScope.meta(turnIterations, "runAgentLoop", "turn.end"),
TurnEndPayload{
Status: turnStatus,
Iterations: turnIterations,
Duration: time.Since(turnStartedAt),
FinalContentLen: turnFinalContentLen,
},
)
}()
al.emitEvent(
EventKindTurnStart,
turnScope.meta(0, "runAgentLoop", "turn.start"),
TurnStartPayload{
Channel: opts.Channel,
ChatID: opts.ChatID,
UserMessage: opts.UserMessage,
MediaCount: len(opts.Media),
},
)
// 0. Record last channel for heartbeat notifications (skip internal channels and cli)
if opts.Channel != "" && opts.ChatID != "" {
if !constants.IsInternalChannel(opts.Channel) {
@@ -952,8 +1062,10 @@ func (al *AgentLoop) runAgentLoop(
agent.Sessions.AddMessage(opts.SessionKey, "user", opts.UserMessage)
// 3. Run LLM iteration loop
finalContent, iteration, err := al.runLLMIteration(ctx, agent, messages, opts)
finalContent, iteration, err := al.runLLMIteration(ctx, agent, messages, opts, turnScope)
turnIterations = iteration
if err != nil {
turnStatus = TurnEndStatusError
return "", err
}
@@ -964,6 +1076,7 @@ func (al *AgentLoop) runAgentLoop(
if finalContent == "" {
finalContent = opts.DefaultResponse
}
turnFinalContentLen = len(finalContent)
// 5. Save final assistant message to session
agent.Sessions.AddMessage(opts.SessionKey, "assistant", finalContent)
@@ -1058,6 +1171,7 @@ func (al *AgentLoop) runLLMIteration(
agent *AgentInstance,
messages []providers.Message,
opts processOptions,
turnScope turnEventScope,
) (string, int, error) {
iteration := 0
var finalContent string
@@ -1106,6 +1220,17 @@ func (al *AgentLoop) runLLMIteration(
// Build tool definitions
providerToolDefs := agent.Tools.ToProviderDefs()
al.emitEvent(
EventKindLLMRequest,
turnScope.meta(iteration, "runLLMIteration", "turn.llm.request"),
LLMRequestPayload{
Model: activeModel,
MessagesCount: len(messages),
ToolsCount: len(providerToolDefs),
MaxTokens: agent.MaxTokens,
Temperature: agent.Temperature,
},
)
// Log LLM request details
logger.DebugCF("agent", "LLM request",
@@ -1246,6 +1371,14 @@ func (al *AgentLoop) runLLMIteration(
}
if err != nil {
al.emitEvent(
EventKindError,
turnScope.meta(iteration, "runLLMIteration", "turn.error"),
ErrorPayload{
Stage: "llm",
Message: err.Error(),
},
)
logger.ErrorCF("agent", "LLM call failed",
map[string]any{
"agent_id": agent.ID,
@@ -1262,6 +1395,15 @@ func (al *AgentLoop) runLLMIteration(
opts.Channel,
al.targetReasoningChannelID(opts.Channel),
)
al.emitEvent(
EventKindLLMResponse,
turnScope.meta(iteration, "runLLMIteration", "turn.llm.response"),
LLMResponsePayload{
ContentLen: len(response.Content),
ToolCalls: len(response.ToolCalls),
HasReasoning: response.Reasoning != "" || response.ReasoningContent != "",
},
)
logger.DebugCF("agent", "LLM response",
map[string]any{
@@ -1352,6 +1494,14 @@ func (al *AgentLoop) runLLMIteration(
"tool": tc.Name,
"iteration": iteration,
})
al.emitEvent(
EventKindToolExecStart,
turnScope.meta(iteration, "runLLMIteration", "turn.tool.start"),
ToolExecStartPayload{
Tool: tc.Name,
Arguments: cloneEventArguments(tc.Arguments),
},
)
// Create async callback for tools that implement AsyncExecutor.
asyncCallback := func(_ context.Context, result *tools.ToolResult) {
@@ -1390,6 +1540,7 @@ func (al *AgentLoop) runLLMIteration(
})
}
toolStart := time.Now()
toolResult := agent.Tools.ExecuteWithContext(
ctx,
tc.Name,
@@ -1398,6 +1549,7 @@ func (al *AgentLoop) runLLMIteration(
opts.ChatID,
asyncCallback,
)
toolDuration := time.Since(toolStart)
// Process tool result
if !toolResult.Silent && toolResult.ForUser != "" && opts.SendResponse {
@@ -1443,6 +1595,18 @@ func (al *AgentLoop) runLLMIteration(
Content: contentForLLM,
ToolCallID: tc.ID,
}
al.emitEvent(
EventKindToolExecEnd,
turnScope.meta(iteration, "runLLMIteration", "turn.tool.end"),
ToolExecEndPayload{
Tool: tc.Name,
Duration: toolDuration,
ForLLMLen: len(contentForLLM),
ForUserLen: len(toolResult.ForUser),
IsError: toolResult.IsError,
Async: toolResult.Async,
},
)
messages = append(messages, toolResultMsg)
agent.Sessions.AddFullMessage(opts.SessionKey, toolResultMsg)