diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 2bec6897f..7a9384a23 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -38,8 +38,7 @@ type AgentLoop struct { registry *AgentRegistry state *state.Manager - // Event system (from Incoming) - eventBus *EventBus + // Runtime event system runtimeEvents runtimeevents.Bus ownsRuntimeEvents bool hooks *HookManager @@ -286,9 +285,6 @@ func (al *AgentLoop) Close() { if al.hooks != nil { al.hooks.Close() } - if al.eventBus != nil { - al.eventBus.Close() - } if al.runtimeEvents != nil && al.ownsRuntimeEvents { if err := al.runtimeEvents.Close(); err != nil { logger.ErrorCF("agent", "Failed to close runtime event bus", @@ -303,12 +299,6 @@ func (al *AgentLoop) Close() { // UnmountHook removes a previously registered in-process hook. -// SubscribeEvents registers a subscriber for agent-loop events. - -// UnsubscribeEvents removes a previously registered event subscriber. - -// EventDrops returns the number of dropped events for the given kind. - type turnEventScope struct { agentID string sessionKey string diff --git a/pkg/agent/agent_event.go b/pkg/agent/agent_event.go index 6bbd275e5..5b9acfdb0 100644 --- a/pkg/agent/agent_event.go +++ b/pkg/agent/agent_event.go @@ -46,9 +46,6 @@ func (al *AgentLoop) emitEvent(kind EventKind, meta EventMeta, payload any) { al.logEvent(evt) - if al.eventBus != nil { - al.eventBus.Emit(evt) - } al.publishRuntimeEvent(evt) } @@ -165,38 +162,6 @@ func (al *AgentLoop) UnmountHook(name string) { al.hooks.Unmount(name) } -// SubscribeEvents registers a subscriber for agent-loop events. -// -// Deprecated: use RuntimeEvents for new event observation code. -func (al *AgentLoop) SubscribeEvents(buffer int) EventSubscription { - if al == nil || al.eventBus == nil { - ch := make(chan Event) - close(ch) - return EventSubscription{C: ch} - } - return al.eventBus.Subscribe(buffer) -} - -// UnsubscribeEvents removes a previously registered event subscriber. -// -// Deprecated: use the Subscription returned by RuntimeEvents instead. -func (al *AgentLoop) UnsubscribeEvents(id uint64) { - if al == nil || al.eventBus == nil { - return - } - al.eventBus.Unsubscribe(id) -} - -// EventDrops returns the number of dropped events for the given kind. -// -// Deprecated: use RuntimeEventStats for runtime event drop counters. -func (al *AgentLoop) EventDrops(kind EventKind) int64 { - if al == nil || al.eventBus == nil { - return 0 - } - return al.eventBus.Dropped(kind) -} - // RuntimeEvents returns the root runtime event channel. func (al *AgentLoop) RuntimeEvents() runtimeevents.EventChannel { if al == nil || al.runtimeEvents == nil { diff --git a/pkg/agent/agent_init.go b/pkg/agent/agent_init.go index 7ae844656..7ad884cfe 100644 --- a/pkg/agent/agent_init.go +++ b/pkg/agent/agent_init.go @@ -49,8 +49,6 @@ func NewAgentLoop( stateManager = state.NewManager(defaultAgent.Workspace) } - eventBus := NewEventBus() - // Determine worker pool size from config (default: 1 = sequential) workerPoolSize := cfg.Agents.Defaults.MaxParallelTurns if workerPoolSize <= 0 { @@ -62,7 +60,6 @@ func NewAgentLoop( cfg: cfg, registry: registry, state: stateManager, - eventBus: eventBus, fallback: fallbackChain, cmdRegistry: commands.NewRegistry(commands.BuiltinDefinitions()), steering: newSteeringQueue(parseSteeringMode(cfg.Agents.Defaults.SteeringMode)), @@ -79,7 +76,7 @@ func NewAgentLoop( al.ownsRuntimeEvents = true } al.providerFactory = providers.CreateProviderFromConfig - al.hooks = NewHookManagerWithRuntimeEvents(eventBus, al.runtimeEvents.Channel()) + al.hooks = NewHookManager(al.runtimeEvents.Channel()) configureHookManagerFromConfig(al.hooks, cfg) al.contextManager = al.resolveContextManager() diff --git a/pkg/agent/eventbus.go b/pkg/agent/eventbus.go deleted file mode 100644 index 54ada62fb..000000000 --- a/pkg/agent/eventbus.go +++ /dev/null @@ -1,128 +0,0 @@ -package agent - -import ( - "sync" - "sync/atomic" - "time" -) - -const defaultEventSubscriberBuffer = 16 - -// EventSubscription identifies a subscriber channel returned by EventBus.Subscribe. -// -// Deprecated: use pkg/events.Subscription from RuntimeEvents for new code. -type EventSubscription struct { - ID uint64 - C <-chan Event -} - -type eventSubscriber struct { - ch chan Event -} - -// EventBus is a lightweight multi-subscriber broadcaster for agent-loop events. -// -// Deprecated: use pkg/events.EventBus for new code. This legacy bus remains -// only while existing agent event consumers migrate. -type EventBus struct { - mu sync.RWMutex - subs map[uint64]eventSubscriber - nextID uint64 - closed bool - dropped [eventKindCount]atomic.Int64 -} - -// NewEventBus creates a new in-process event broadcaster. -// -// Deprecated: use events.NewBus for new runtime event publishers. -func NewEventBus() *EventBus { - return &EventBus{ - subs: make(map[uint64]eventSubscriber), - } -} - -// Subscribe registers a new subscriber with the requested channel buffer size. -// A non-positive buffer uses the default size. -func (b *EventBus) Subscribe(buffer int) EventSubscription { - if buffer <= 0 { - buffer = defaultEventSubscriberBuffer - } - - b.mu.Lock() - defer b.mu.Unlock() - - if b.closed { - ch := make(chan Event) - close(ch) - return EventSubscription{C: ch} - } - - b.nextID++ - id := b.nextID - ch := make(chan Event, buffer) - b.subs[id] = eventSubscriber{ch: ch} - return EventSubscription{ID: id, C: ch} -} - -// Unsubscribe removes a subscriber and closes its channel. -func (b *EventBus) Unsubscribe(id uint64) { - b.mu.Lock() - defer b.mu.Unlock() - - sub, ok := b.subs[id] - if !ok { - return - } - - delete(b.subs, id) - close(sub.ch) -} - -// Emit broadcasts an event to all current subscribers without blocking. -// When a subscriber channel is full, the event is dropped for that subscriber. -func (b *EventBus) Emit(evt Event) { - if evt.Time.IsZero() { - evt.Time = time.Now() - } - - b.mu.RLock() - defer b.mu.RUnlock() - - if b.closed { - return - } - - for _, sub := range b.subs { - select { - case sub.ch <- evt: - default: - if evt.Kind < eventKindCount { - b.dropped[evt.Kind].Add(1) - } - } - } -} - -// Dropped returns the number of dropped events for a given kind. -func (b *EventBus) Dropped(kind EventKind) int64 { - if kind >= eventKindCount { - return 0 - } - return b.dropped[kind].Load() -} - -// Close closes all subscriber channels and stops future broadcasts. -func (b *EventBus) Close() { - b.mu.Lock() - defer b.mu.Unlock() - - if b.closed { - return - } - - b.closed = true - for id, sub := range b.subs { - close(sub.ch) - delete(b.subs, id) - } -} diff --git a/pkg/agent/eventbus_test.go b/pkg/agent/eventbus_test.go index 87b1c68b1..b7678e87b 100644 --- a/pkg/agent/eventbus_test.go +++ b/pkg/agent/eventbus_test.go @@ -16,74 +16,17 @@ import ( "github.com/sipeed/picoclaw/pkg/tools" ) -func TestEventBus_SubscribeEmitUnsubscribeClose(t *testing.T) { - eventBus := NewEventBus() - sub := eventBus.Subscribe(1) - - eventBus.Emit(Event{ - Kind: EventKindTurnStart, - Meta: EventMeta{TurnID: "turn-1"}, - }) - - select { - case evt := <-sub.C: - if evt.Kind != EventKindTurnStart { - t.Fatalf("expected %v, got %v", EventKindTurnStart, evt.Kind) - } - if evt.Meta.TurnID != "turn-1" { - t.Fatalf("expected turn id turn-1, got %q", evt.Meta.TurnID) - } - case <-time.After(time.Second): - t.Fatal("timed out waiting for event") - } - - eventBus.Unsubscribe(sub.ID) - if _, ok := <-sub.C; ok { - t.Fatal("expected subscriber channel to be closed after unsubscribe") - } - - eventBus.Close() - closedSub := eventBus.Subscribe(1) - if _, ok := <-closedSub.C; ok { - t.Fatal("expected closed bus to return a closed subscriber channel") - } -} - -func TestEventBus_DropsWhenSubscriberIsFull(t *testing.T) { - eventBus := NewEventBus() - sub := eventBus.Subscribe(1) - defer eventBus.Unsubscribe(sub.ID) - - start := time.Now() - for i := 0; i < 1000; i++ { - eventBus.Emit(Event{Kind: EventKindLLMRequest}) - } - - if elapsed := time.Since(start); elapsed > 100*time.Millisecond { - t.Fatalf("Emit took too long with a blocked subscriber: %s", elapsed) - } - - if got := eventBus.Dropped(EventKindLLMRequest); got != 999 { - t.Fatalf("expected 999 dropped events, got %d", got) - } -} - -func TestAgentLoop_DualPublishesRuntimeEvents(t *testing.T) { +func TestAgentLoop_PublishesRuntimeEvents(t *testing.T) { runtimeBus := runtimeevents.NewBus() al := &AgentLoop{ - eventBus: NewEventBus(), runtimeEvents: runtimeBus, } - defer al.eventBus.Close() defer func() { if err := runtimeBus.Close(); err != nil { t.Errorf("runtime bus close failed: %v", err) } }() - legacySub := al.SubscribeEvents(1) - defer al.UnsubscribeEvents(legacySub.ID) - runtimeSub, runtimeCh, err := al.RuntimeEvents().OfKind(runtimeevents.KindAgentToolExecStart).SubscribeChan( context.Background(), runtimeevents.SubscribeOptions{Name: "runtime", Buffer: 1}, @@ -122,11 +65,6 @@ func TestAgentLoop_DualPublishesRuntimeEvents(t *testing.T) { ToolExecStartPayload{Tool: "mock_custom", Arguments: map[string]any{"task": "ping"}}, ) - legacyEvt := receiveLegacyEvent(t, legacySub.C) - if legacyEvt.Kind != EventKindToolExecStart { - t.Fatalf("legacy kind = %v, want %v", legacyEvt.Kind, EventKindToolExecStart) - } - runtimeEvt := receiveRuntimeEvent(t, runtimeCh) if runtimeEvt.Kind != runtimeevents.KindAgentToolExecStart { t.Fatalf("runtime kind = %q, want %q", runtimeEvt.Kind, runtimeevents.KindAgentToolExecStart) @@ -736,21 +674,6 @@ func TestAgentLoop_EmitsFollowUpQueuedEvent(t *testing.T) { } } -func receiveLegacyEvent(t *testing.T, ch <-chan Event) Event { - t.Helper() - - select { - case evt, ok := <-ch: - if !ok { - t.Fatal("legacy event stream closed before expected event arrived") - } - return evt - case <-time.After(time.Second): - t.Fatal("timed out waiting for legacy event") - return Event{} - } -} - func receiveRuntimeEvent(t *testing.T, ch <-chan runtimeevents.Event) runtimeevents.Event { t.Helper() diff --git a/pkg/agent/hook_process.go b/pkg/agent/hook_process.go index 0ba5d8fc7..ce8e932d2 100644 --- a/pkg/agent/hook_process.go +++ b/pkg/agent/hook_process.go @@ -184,19 +184,6 @@ func (ph *ProcessHook) Close() error { return ph.closeErr } -func (ph *ProcessHook) OnEvent(ctx context.Context, evt Event) error { - if ph == nil || !ph.opts.Observe { - return nil - } - if len(ph.observeKinds) > 0 { - kind := runtimeKindForAgentEvent(evt.Kind).String() - if _, ok := ph.observeKinds[kind]; !ok { - return nil - } - } - return ph.notify(ctx, "hook.event", evt) -} - func (ph *ProcessHook) OnRuntimeEvent(ctx context.Context, evt runtimeevents.Event) error { if ph == nil || !ph.opts.Observe { return nil diff --git a/pkg/agent/hooks.go b/pkg/agent/hooks.go index 0a6c6785a..79b72017d 100644 --- a/pkg/agent/hooks.go +++ b/pkg/agent/hooks.go @@ -72,11 +72,6 @@ func NamedHook(name string, hook any) HookRegistration { } } -type EventObserver interface { - // Deprecated: implement RuntimeEventObserver for new observation code. - OnEvent(ctx context.Context, evt Event) error -} - type RuntimeEventObserver interface { OnRuntimeEvent(ctx context.Context, evt runtimeevents.Event) error } @@ -198,7 +193,6 @@ func (r *ToolResultHookResponse) Clone() *ToolResultHookResponse { } type HookManager struct { - eventBus *EventBus runtimeEvents runtimeevents.EventChannel observerTimeout time.Duration interceptorTimeout time.Duration @@ -208,37 +202,21 @@ type HookManager struct { hooks map[string]HookRegistration ordered []HookRegistration - sub EventSubscription - runtimeSub runtimeevents.Subscription - done chan struct{} - runtimeDone chan struct{} - runtimeObserveEnabled bool - closeOnce sync.Once + runtimeSub runtimeevents.Subscription + runtimeDone chan struct{} + closeOnce sync.Once } -func NewHookManager(eventBus *EventBus) *HookManager { - return NewHookManagerWithRuntimeEvents(eventBus, nil) -} - -func NewHookManagerWithRuntimeEvents(eventBus *EventBus, runtimeEvents runtimeevents.EventChannel) *HookManager { +func NewHookManager(runtimeEvents runtimeevents.EventChannel) *HookManager { hm := &HookManager{ - eventBus: eventBus, runtimeEvents: runtimeEvents, observerTimeout: defaultHookObserverTimeout, interceptorTimeout: defaultHookInterceptorTimeout, approvalTimeout: defaultHookApprovalTimeout, hooks: make(map[string]HookRegistration), - done: make(chan struct{}), runtimeDone: make(chan struct{}), } - if eventBus != nil { - hm.sub = eventBus.Subscribe(hookObserverBufferSize) - go hm.dispatchEvents() - } else { - close(hm.done) - } - if runtimeEvents != nil { sub, ch, err := runtimeEvents.SubscribeChan(context.Background(), runtimeevents.SubscribeOptions{ Name: "hook-manager-observer", @@ -251,7 +229,6 @@ func NewHookManagerWithRuntimeEvents(eventBus *EventBus, runtimeEvents runtimeev close(hm.runtimeDone) } else { hm.runtimeSub = sub - hm.runtimeObserveEnabled = true go hm.dispatchRuntimeEvents(ch) } } else { @@ -267,9 +244,6 @@ func (hm *HookManager) Close() { } hm.closeOnce.Do(func() { - if hm.eventBus != nil { - hm.eventBus.Unsubscribe(hm.sub.ID) - } if hm.runtimeSub != nil { if err := hm.runtimeSub.Close(); err != nil { logger.WarnCF("hooks", "Failed to close runtime event hook subscription", map[string]any{ @@ -277,7 +251,6 @@ func (hm *HookManager) Close() { }) } } - <-hm.done <-hm.runtimeDone hm.closeAllHooks() }) @@ -335,25 +308,6 @@ func (hm *HookManager) Unmount(name string) { hm.rebuildOrdered() } -func (hm *HookManager) dispatchEvents() { - defer close(hm.done) - - for evt := range hm.sub.C { - for _, reg := range hm.snapshotHooks() { - if hm.runtimeObserveEnabled { - if _, ok := reg.Hook.(RuntimeEventObserver); ok { - continue - } - } - observer, ok := reg.Hook.(EventObserver) - if !ok { - continue - } - hm.runObserver(reg.Name, observer, evt) - } - } -} - func (hm *HookManager) dispatchRuntimeEvents(ch <-chan runtimeevents.Event) { defer close(hm.runtimeDone) @@ -643,33 +597,6 @@ func (hm *HookManager) closeAllHooks() { hm.ordered = nil } -func (hm *HookManager) runObserver(name string, observer EventObserver, evt Event) { - ctx, cancel := context.WithTimeout(context.Background(), hm.observerTimeout) - defer cancel() - - done := make(chan error, 1) - go func() { - done <- observer.OnEvent(ctx, evt) - }() - - select { - case err := <-done: - if err != nil { - logger.WarnCF("hooks", "Event observer failed", map[string]any{ - "hook": name, - "event": evt.Kind.String(), - "error": err.Error(), - }) - } - case <-ctx.Done(): - logger.WarnCF("hooks", "Event observer timed out", map[string]any{ - "hook": name, - "event": evt.Kind.String(), - "timeout_ms": hm.observerTimeout.Milliseconds(), - }) - } -} - func (hm *HookManager) runRuntimeObserver( name string, observer RuntimeEventObserver, diff --git a/pkg/agent/hooks_test.go b/pkg/agent/hooks_test.go index 592c32c1e..4deef38c7 100644 --- a/pkg/agent/hooks_test.go +++ b/pkg/agent/hooks_test.go @@ -152,20 +152,9 @@ func (h *llmObserverHook) AfterLLM( } type dualRuntimeObserverHook struct { - legacyCh chan Event runtimeCh chan runtimeevents.Event } -func (h *dualRuntimeObserverHook) OnEvent(ctx context.Context, evt Event) error { - if evt.Kind == EventKindTurnEnd { - select { - case h.legacyCh <- evt: - default: - } - } - return nil -} - func (h *dualRuntimeObserverHook) OnRuntimeEvent(ctx context.Context, evt runtimeevents.Event) error { if evt.Kind == runtimeevents.KindAgentTurnEnd { select { @@ -522,13 +511,12 @@ func TestAgentLoop_Hooks_ObserverAndLLMInterceptor(t *testing.T) { } } -func TestAgentLoop_Hooks_RuntimeObserverPreferredOverLegacyObserver(t *testing.T) { +func TestAgentLoop_Hooks_RuntimeObserverReceivesEvents(t *testing.T) { provider := &llmHookTestProvider{} al, agent, cleanup := newHookTestLoop(t, provider) defer cleanup() hook := &dualRuntimeObserverHook{ - legacyCh: make(chan Event, 1), runtimeCh: make(chan runtimeevents.Event, 1), } if err := al.MountHook(NamedHook("runtime-observer", hook)); err != nil { @@ -573,12 +561,6 @@ func TestAgentLoop_Hooks_RuntimeObserverPreferredOverLegacyObserver(t *testing.T case <-time.After(2 * time.Second): t.Fatal("timed out waiting for runtime observer event") } - - select { - case evt := <-hook.legacyCh: - t.Fatalf("legacy observer unexpectedly received %v", evt.Kind) - case <-time.After(100 * time.Millisecond): - } } func TestAgentLoop_BtwCommand_UsesLLMHooks(t *testing.T) {