refactor(events): remove legacy agent event bus

Drop the old agent EventBus, SubscribeEvents/EventDrops public surface, legacy hook observer dispatch, and hook.event process notification path. Agent observations now flow through pkg/events runtime events.

Validation: go test ./pkg/agent; make lint
This commit is contained in:
Hoshina
2026-04-26 16:39:35 +08:00
parent fce800414d
commit b954e6b8dc
8 changed files with 8 additions and 365 deletions
+1 -11
View File
@@ -38,8 +38,7 @@ type AgentLoop struct {
registry *AgentRegistry
state *state.Manager
// Event system (from Incoming)
eventBus *EventBus
// Runtime event system
runtimeEvents runtimeevents.Bus
ownsRuntimeEvents bool
hooks *HookManager
@@ -286,9 +285,6 @@ func (al *AgentLoop) Close() {
if al.hooks != nil {
al.hooks.Close()
}
if al.eventBus != nil {
al.eventBus.Close()
}
if al.runtimeEvents != nil && al.ownsRuntimeEvents {
if err := al.runtimeEvents.Close(); err != nil {
logger.ErrorCF("agent", "Failed to close runtime event bus",
@@ -303,12 +299,6 @@ func (al *AgentLoop) Close() {
// UnmountHook removes a previously registered in-process hook.
// SubscribeEvents registers a subscriber for agent-loop events.
// UnsubscribeEvents removes a previously registered event subscriber.
// EventDrops returns the number of dropped events for the given kind.
type turnEventScope struct {
agentID string
sessionKey string
-35
View File
@@ -46,9 +46,6 @@ func (al *AgentLoop) emitEvent(kind EventKind, meta EventMeta, payload any) {
al.logEvent(evt)
if al.eventBus != nil {
al.eventBus.Emit(evt)
}
al.publishRuntimeEvent(evt)
}
@@ -165,38 +162,6 @@ func (al *AgentLoop) UnmountHook(name string) {
al.hooks.Unmount(name)
}
// SubscribeEvents registers a subscriber for agent-loop events.
//
// Deprecated: use RuntimeEvents for new event observation code.
func (al *AgentLoop) SubscribeEvents(buffer int) EventSubscription {
if al == nil || al.eventBus == nil {
ch := make(chan Event)
close(ch)
return EventSubscription{C: ch}
}
return al.eventBus.Subscribe(buffer)
}
// UnsubscribeEvents removes a previously registered event subscriber.
//
// Deprecated: use the Subscription returned by RuntimeEvents instead.
func (al *AgentLoop) UnsubscribeEvents(id uint64) {
if al == nil || al.eventBus == nil {
return
}
al.eventBus.Unsubscribe(id)
}
// EventDrops returns the number of dropped events for the given kind.
//
// Deprecated: use RuntimeEventStats for runtime event drop counters.
func (al *AgentLoop) EventDrops(kind EventKind) int64 {
if al == nil || al.eventBus == nil {
return 0
}
return al.eventBus.Dropped(kind)
}
// RuntimeEvents returns the root runtime event channel.
func (al *AgentLoop) RuntimeEvents() runtimeevents.EventChannel {
if al == nil || al.runtimeEvents == nil {
+1 -4
View File
@@ -49,8 +49,6 @@ func NewAgentLoop(
stateManager = state.NewManager(defaultAgent.Workspace)
}
eventBus := NewEventBus()
// Determine worker pool size from config (default: 1 = sequential)
workerPoolSize := cfg.Agents.Defaults.MaxParallelTurns
if workerPoolSize <= 0 {
@@ -62,7 +60,6 @@ func NewAgentLoop(
cfg: cfg,
registry: registry,
state: stateManager,
eventBus: eventBus,
fallback: fallbackChain,
cmdRegistry: commands.NewRegistry(commands.BuiltinDefinitions()),
steering: newSteeringQueue(parseSteeringMode(cfg.Agents.Defaults.SteeringMode)),
@@ -79,7 +76,7 @@ func NewAgentLoop(
al.ownsRuntimeEvents = true
}
al.providerFactory = providers.CreateProviderFromConfig
al.hooks = NewHookManagerWithRuntimeEvents(eventBus, al.runtimeEvents.Channel())
al.hooks = NewHookManager(al.runtimeEvents.Channel())
configureHookManagerFromConfig(al.hooks, cfg)
al.contextManager = al.resolveContextManager()
-128
View File
@@ -1,128 +0,0 @@
package agent
import (
"sync"
"sync/atomic"
"time"
)
const defaultEventSubscriberBuffer = 16
// EventSubscription identifies a subscriber channel returned by EventBus.Subscribe.
//
// Deprecated: use pkg/events.Subscription from RuntimeEvents for new code.
type EventSubscription struct {
ID uint64
C <-chan Event
}
type eventSubscriber struct {
ch chan Event
}
// EventBus is a lightweight multi-subscriber broadcaster for agent-loop events.
//
// Deprecated: use pkg/events.EventBus for new code. This legacy bus remains
// only while existing agent event consumers migrate.
type EventBus struct {
mu sync.RWMutex
subs map[uint64]eventSubscriber
nextID uint64
closed bool
dropped [eventKindCount]atomic.Int64
}
// NewEventBus creates a new in-process event broadcaster.
//
// Deprecated: use events.NewBus for new runtime event publishers.
func NewEventBus() *EventBus {
return &EventBus{
subs: make(map[uint64]eventSubscriber),
}
}
// Subscribe registers a new subscriber with the requested channel buffer size.
// A non-positive buffer uses the default size.
func (b *EventBus) Subscribe(buffer int) EventSubscription {
if buffer <= 0 {
buffer = defaultEventSubscriberBuffer
}
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
ch := make(chan Event)
close(ch)
return EventSubscription{C: ch}
}
b.nextID++
id := b.nextID
ch := make(chan Event, buffer)
b.subs[id] = eventSubscriber{ch: ch}
return EventSubscription{ID: id, C: ch}
}
// Unsubscribe removes a subscriber and closes its channel.
func (b *EventBus) Unsubscribe(id uint64) {
b.mu.Lock()
defer b.mu.Unlock()
sub, ok := b.subs[id]
if !ok {
return
}
delete(b.subs, id)
close(sub.ch)
}
// Emit broadcasts an event to all current subscribers without blocking.
// When a subscriber channel is full, the event is dropped for that subscriber.
func (b *EventBus) Emit(evt Event) {
if evt.Time.IsZero() {
evt.Time = time.Now()
}
b.mu.RLock()
defer b.mu.RUnlock()
if b.closed {
return
}
for _, sub := range b.subs {
select {
case sub.ch <- evt:
default:
if evt.Kind < eventKindCount {
b.dropped[evt.Kind].Add(1)
}
}
}
}
// Dropped returns the number of dropped events for a given kind.
func (b *EventBus) Dropped(kind EventKind) int64 {
if kind >= eventKindCount {
return 0
}
return b.dropped[kind].Load()
}
// Close closes all subscriber channels and stops future broadcasts.
func (b *EventBus) Close() {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return
}
b.closed = true
for id, sub := range b.subs {
close(sub.ch)
delete(b.subs, id)
}
}
+1 -78
View File
@@ -16,74 +16,17 @@ import (
"github.com/sipeed/picoclaw/pkg/tools"
)
func TestEventBus_SubscribeEmitUnsubscribeClose(t *testing.T) {
eventBus := NewEventBus()
sub := eventBus.Subscribe(1)
eventBus.Emit(Event{
Kind: EventKindTurnStart,
Meta: EventMeta{TurnID: "turn-1"},
})
select {
case evt := <-sub.C:
if evt.Kind != EventKindTurnStart {
t.Fatalf("expected %v, got %v", EventKindTurnStart, evt.Kind)
}
if evt.Meta.TurnID != "turn-1" {
t.Fatalf("expected turn id turn-1, got %q", evt.Meta.TurnID)
}
case <-time.After(time.Second):
t.Fatal("timed out waiting for event")
}
eventBus.Unsubscribe(sub.ID)
if _, ok := <-sub.C; ok {
t.Fatal("expected subscriber channel to be closed after unsubscribe")
}
eventBus.Close()
closedSub := eventBus.Subscribe(1)
if _, ok := <-closedSub.C; ok {
t.Fatal("expected closed bus to return a closed subscriber channel")
}
}
func TestEventBus_DropsWhenSubscriberIsFull(t *testing.T) {
eventBus := NewEventBus()
sub := eventBus.Subscribe(1)
defer eventBus.Unsubscribe(sub.ID)
start := time.Now()
for i := 0; i < 1000; i++ {
eventBus.Emit(Event{Kind: EventKindLLMRequest})
}
if elapsed := time.Since(start); elapsed > 100*time.Millisecond {
t.Fatalf("Emit took too long with a blocked subscriber: %s", elapsed)
}
if got := eventBus.Dropped(EventKindLLMRequest); got != 999 {
t.Fatalf("expected 999 dropped events, got %d", got)
}
}
func TestAgentLoop_DualPublishesRuntimeEvents(t *testing.T) {
func TestAgentLoop_PublishesRuntimeEvents(t *testing.T) {
runtimeBus := runtimeevents.NewBus()
al := &AgentLoop{
eventBus: NewEventBus(),
runtimeEvents: runtimeBus,
}
defer al.eventBus.Close()
defer func() {
if err := runtimeBus.Close(); err != nil {
t.Errorf("runtime bus close failed: %v", err)
}
}()
legacySub := al.SubscribeEvents(1)
defer al.UnsubscribeEvents(legacySub.ID)
runtimeSub, runtimeCh, err := al.RuntimeEvents().OfKind(runtimeevents.KindAgentToolExecStart).SubscribeChan(
context.Background(),
runtimeevents.SubscribeOptions{Name: "runtime", Buffer: 1},
@@ -122,11 +65,6 @@ func TestAgentLoop_DualPublishesRuntimeEvents(t *testing.T) {
ToolExecStartPayload{Tool: "mock_custom", Arguments: map[string]any{"task": "ping"}},
)
legacyEvt := receiveLegacyEvent(t, legacySub.C)
if legacyEvt.Kind != EventKindToolExecStart {
t.Fatalf("legacy kind = %v, want %v", legacyEvt.Kind, EventKindToolExecStart)
}
runtimeEvt := receiveRuntimeEvent(t, runtimeCh)
if runtimeEvt.Kind != runtimeevents.KindAgentToolExecStart {
t.Fatalf("runtime kind = %q, want %q", runtimeEvt.Kind, runtimeevents.KindAgentToolExecStart)
@@ -736,21 +674,6 @@ func TestAgentLoop_EmitsFollowUpQueuedEvent(t *testing.T) {
}
}
func receiveLegacyEvent(t *testing.T, ch <-chan Event) Event {
t.Helper()
select {
case evt, ok := <-ch:
if !ok {
t.Fatal("legacy event stream closed before expected event arrived")
}
return evt
case <-time.After(time.Second):
t.Fatal("timed out waiting for legacy event")
return Event{}
}
}
func receiveRuntimeEvent(t *testing.T, ch <-chan runtimeevents.Event) runtimeevents.Event {
t.Helper()
-13
View File
@@ -184,19 +184,6 @@ func (ph *ProcessHook) Close() error {
return ph.closeErr
}
func (ph *ProcessHook) OnEvent(ctx context.Context, evt Event) error {
if ph == nil || !ph.opts.Observe {
return nil
}
if len(ph.observeKinds) > 0 {
kind := runtimeKindForAgentEvent(evt.Kind).String()
if _, ok := ph.observeKinds[kind]; !ok {
return nil
}
}
return ph.notify(ctx, "hook.event", evt)
}
func (ph *ProcessHook) OnRuntimeEvent(ctx context.Context, evt runtimeevents.Event) error {
if ph == nil || !ph.opts.Observe {
return nil
+4 -77
View File
@@ -72,11 +72,6 @@ func NamedHook(name string, hook any) HookRegistration {
}
}
type EventObserver interface {
// Deprecated: implement RuntimeEventObserver for new observation code.
OnEvent(ctx context.Context, evt Event) error
}
type RuntimeEventObserver interface {
OnRuntimeEvent(ctx context.Context, evt runtimeevents.Event) error
}
@@ -198,7 +193,6 @@ func (r *ToolResultHookResponse) Clone() *ToolResultHookResponse {
}
type HookManager struct {
eventBus *EventBus
runtimeEvents runtimeevents.EventChannel
observerTimeout time.Duration
interceptorTimeout time.Duration
@@ -208,37 +202,21 @@ type HookManager struct {
hooks map[string]HookRegistration
ordered []HookRegistration
sub EventSubscription
runtimeSub runtimeevents.Subscription
done chan struct{}
runtimeDone chan struct{}
runtimeObserveEnabled bool
closeOnce sync.Once
runtimeSub runtimeevents.Subscription
runtimeDone chan struct{}
closeOnce sync.Once
}
func NewHookManager(eventBus *EventBus) *HookManager {
return NewHookManagerWithRuntimeEvents(eventBus, nil)
}
func NewHookManagerWithRuntimeEvents(eventBus *EventBus, runtimeEvents runtimeevents.EventChannel) *HookManager {
func NewHookManager(runtimeEvents runtimeevents.EventChannel) *HookManager {
hm := &HookManager{
eventBus: eventBus,
runtimeEvents: runtimeEvents,
observerTimeout: defaultHookObserverTimeout,
interceptorTimeout: defaultHookInterceptorTimeout,
approvalTimeout: defaultHookApprovalTimeout,
hooks: make(map[string]HookRegistration),
done: make(chan struct{}),
runtimeDone: make(chan struct{}),
}
if eventBus != nil {
hm.sub = eventBus.Subscribe(hookObserverBufferSize)
go hm.dispatchEvents()
} else {
close(hm.done)
}
if runtimeEvents != nil {
sub, ch, err := runtimeEvents.SubscribeChan(context.Background(), runtimeevents.SubscribeOptions{
Name: "hook-manager-observer",
@@ -251,7 +229,6 @@ func NewHookManagerWithRuntimeEvents(eventBus *EventBus, runtimeEvents runtimeev
close(hm.runtimeDone)
} else {
hm.runtimeSub = sub
hm.runtimeObserveEnabled = true
go hm.dispatchRuntimeEvents(ch)
}
} else {
@@ -267,9 +244,6 @@ func (hm *HookManager) Close() {
}
hm.closeOnce.Do(func() {
if hm.eventBus != nil {
hm.eventBus.Unsubscribe(hm.sub.ID)
}
if hm.runtimeSub != nil {
if err := hm.runtimeSub.Close(); err != nil {
logger.WarnCF("hooks", "Failed to close runtime event hook subscription", map[string]any{
@@ -277,7 +251,6 @@ func (hm *HookManager) Close() {
})
}
}
<-hm.done
<-hm.runtimeDone
hm.closeAllHooks()
})
@@ -335,25 +308,6 @@ func (hm *HookManager) Unmount(name string) {
hm.rebuildOrdered()
}
func (hm *HookManager) dispatchEvents() {
defer close(hm.done)
for evt := range hm.sub.C {
for _, reg := range hm.snapshotHooks() {
if hm.runtimeObserveEnabled {
if _, ok := reg.Hook.(RuntimeEventObserver); ok {
continue
}
}
observer, ok := reg.Hook.(EventObserver)
if !ok {
continue
}
hm.runObserver(reg.Name, observer, evt)
}
}
}
func (hm *HookManager) dispatchRuntimeEvents(ch <-chan runtimeevents.Event) {
defer close(hm.runtimeDone)
@@ -643,33 +597,6 @@ func (hm *HookManager) closeAllHooks() {
hm.ordered = nil
}
func (hm *HookManager) runObserver(name string, observer EventObserver, evt Event) {
ctx, cancel := context.WithTimeout(context.Background(), hm.observerTimeout)
defer cancel()
done := make(chan error, 1)
go func() {
done <- observer.OnEvent(ctx, evt)
}()
select {
case err := <-done:
if err != nil {
logger.WarnCF("hooks", "Event observer failed", map[string]any{
"hook": name,
"event": evt.Kind.String(),
"error": err.Error(),
})
}
case <-ctx.Done():
logger.WarnCF("hooks", "Event observer timed out", map[string]any{
"hook": name,
"event": evt.Kind.String(),
"timeout_ms": hm.observerTimeout.Milliseconds(),
})
}
}
func (hm *HookManager) runRuntimeObserver(
name string,
observer RuntimeEventObserver,
+1 -19
View File
@@ -152,20 +152,9 @@ func (h *llmObserverHook) AfterLLM(
}
type dualRuntimeObserverHook struct {
legacyCh chan Event
runtimeCh chan runtimeevents.Event
}
func (h *dualRuntimeObserverHook) OnEvent(ctx context.Context, evt Event) error {
if evt.Kind == EventKindTurnEnd {
select {
case h.legacyCh <- evt:
default:
}
}
return nil
}
func (h *dualRuntimeObserverHook) OnRuntimeEvent(ctx context.Context, evt runtimeevents.Event) error {
if evt.Kind == runtimeevents.KindAgentTurnEnd {
select {
@@ -522,13 +511,12 @@ func TestAgentLoop_Hooks_ObserverAndLLMInterceptor(t *testing.T) {
}
}
func TestAgentLoop_Hooks_RuntimeObserverPreferredOverLegacyObserver(t *testing.T) {
func TestAgentLoop_Hooks_RuntimeObserverReceivesEvents(t *testing.T) {
provider := &llmHookTestProvider{}
al, agent, cleanup := newHookTestLoop(t, provider)
defer cleanup()
hook := &dualRuntimeObserverHook{
legacyCh: make(chan Event, 1),
runtimeCh: make(chan runtimeevents.Event, 1),
}
if err := al.MountHook(NamedHook("runtime-observer", hook)); err != nil {
@@ -573,12 +561,6 @@ func TestAgentLoop_Hooks_RuntimeObserverPreferredOverLegacyObserver(t *testing.T
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for runtime observer event")
}
select {
case evt := <-hook.legacyCh:
t.Fatalf("legacy observer unexpectedly received %v", evt.Kind)
case <-time.After(100 * time.Millisecond):
}
}
func TestAgentLoop_BtwCommand_UsesLLMHooks(t *testing.T) {