From eedebabbeabfb616b919297354434baf2b8c2f6c Mon Sep 17 00:00:00 2001 From: Hoshina Date: Sun, 26 Apr 2026 15:36:03 +0800 Subject: [PATCH] feat(events): add runtime event bus Introduce pkg/events with filtered channels, subscription policies, backpressure, and stats. Wire AgentLoop to dual-publish legacy agent events into runtime events while preserving old event APIs. Validation: go test ./pkg/events/... ./pkg/agent; go test -race ./pkg/events/...; make lint --- pkg/agent/agent.go | 15 +- pkg/agent/agent_event.go | 24 +- pkg/agent/agent_init.go | 30 ++- pkg/agent/agent_options.go | 20 ++ pkg/agent/event_kind_runtime.go | 48 ++++ pkg/agent/eventbus_test.go | 125 +++++++++++ pkg/agent/events_runtime.go | 104 +++++++++ pkg/events/bus.go | 233 +++++++++++++++++++ pkg/events/channel.go | 75 +++++++ pkg/events/doc.go | 3 + pkg/events/events_test.go | 155 +++++++++++++ pkg/events/filter.go | 131 +++++++++++ pkg/events/filter_test.go | 96 ++++++++ pkg/events/kind.go | 68 ++++++ pkg/events/stats.go | 26 +++ pkg/events/subscription.go | 384 ++++++++++++++++++++++++++++++++ pkg/events/subscription_test.go | 156 +++++++++++++ pkg/events/types.go | 77 +++++++ 18 files changed, 1757 insertions(+), 13 deletions(-) create mode 100644 pkg/agent/agent_options.go create mode 100644 pkg/agent/event_kind_runtime.go create mode 100644 pkg/agent/events_runtime.go create mode 100644 pkg/events/bus.go create mode 100644 pkg/events/channel.go create mode 100644 pkg/events/doc.go create mode 100644 pkg/events/events_test.go create mode 100644 pkg/events/filter.go create mode 100644 pkg/events/filter_test.go create mode 100644 pkg/events/kind.go create mode 100644 pkg/events/stats.go create mode 100644 pkg/events/subscription.go create mode 100644 pkg/events/subscription_test.go create mode 100644 pkg/events/types.go diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 3e9bd845e..2bec6897f 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -21,6 +21,7 @@ import ( "github.com/sipeed/picoclaw/pkg/commands" "github.com/sipeed/picoclaw/pkg/config" "github.com/sipeed/picoclaw/pkg/constants" + runtimeevents "github.com/sipeed/picoclaw/pkg/events" "github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/media" "github.com/sipeed/picoclaw/pkg/providers" @@ -38,8 +39,10 @@ type AgentLoop struct { state *state.Manager // Event system (from Incoming) - eventBus *EventBus - hooks *HookManager + eventBus *EventBus + runtimeEvents runtimeevents.Bus + ownsRuntimeEvents bool + hooks *HookManager // Runtime state running atomic.Bool @@ -286,6 +289,14 @@ func (al *AgentLoop) Close() { if al.eventBus != nil { al.eventBus.Close() } + if al.runtimeEvents != nil && al.ownsRuntimeEvents { + if err := al.runtimeEvents.Close(); err != nil { + logger.ErrorCF("agent", "Failed to close runtime event bus", + map[string]any{ + "error": err.Error(), + }) + } + } } // MountHook registers an in-process hook on the agent loop. diff --git a/pkg/agent/agent_event.go b/pkg/agent/agent_event.go index 9b8625df1..d634d8707 100644 --- a/pkg/agent/agent_event.go +++ b/pkg/agent/agent_event.go @@ -5,6 +5,7 @@ package agent import ( "fmt" + runtimeevents "github.com/sipeed/picoclaw/pkg/events" "github.com/sipeed/picoclaw/pkg/logger" ) @@ -39,13 +40,16 @@ func (al *AgentLoop) emitEvent(kind EventKind, meta EventMeta, payload any) { Payload: payload, } - if al == nil || al.eventBus == nil { + if al == nil { return } al.logEvent(evt) - al.eventBus.Emit(evt) + if al.eventBus != nil { + al.eventBus.Emit(evt) + } + al.publishRuntimeEvent(evt) } func (al *AgentLoop) logEvent(evt Event) { @@ -186,3 +190,19 @@ func (al *AgentLoop) EventDrops(kind EventKind) int64 { } return al.eventBus.Dropped(kind) } + +// RuntimeEvents returns the root runtime event channel. +func (al *AgentLoop) RuntimeEvents() runtimeevents.EventChannel { + if al == nil || al.runtimeEvents == nil { + return nil + } + return al.runtimeEvents.Channel() +} + +// RuntimeEventStats returns runtime event bus counters. +func (al *AgentLoop) RuntimeEventStats() runtimeevents.Stats { + if al == nil || al.runtimeEvents == nil { + return runtimeevents.Stats{Closed: true} + } + return al.runtimeEvents.Stats() +} diff --git a/pkg/agent/agent_init.go b/pkg/agent/agent_init.go index 611d634e8..da2e79aa8 100644 --- a/pkg/agent/agent_init.go +++ b/pkg/agent/agent_init.go @@ -13,6 +13,7 @@ import ( "github.com/sipeed/picoclaw/pkg/channels" "github.com/sipeed/picoclaw/pkg/commands" "github.com/sipeed/picoclaw/pkg/config" + runtimeevents "github.com/sipeed/picoclaw/pkg/events" "github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/skills" @@ -24,6 +25,7 @@ func NewAgentLoop( cfg *config.Config, msgBus *bus.MessageBus, provider providers.LLMProvider, + opts ...AgentLoopOption, ) *AgentLoop { registry := NewAgentRegistry(cfg, provider) @@ -56,15 +58,25 @@ func NewAgentLoop( } al := &AgentLoop{ - bus: msgBus, - cfg: cfg, - registry: registry, - state: stateManager, - eventBus: eventBus, - fallback: fallbackChain, - cmdRegistry: commands.NewRegistry(commands.BuiltinDefinitions()), - steering: newSteeringQueue(parseSteeringMode(cfg.Agents.Defaults.SteeringMode)), - workerSem: make(chan struct{}, workerPoolSize), + bus: msgBus, + cfg: cfg, + registry: registry, + state: stateManager, + eventBus: eventBus, + fallback: fallbackChain, + cmdRegistry: commands.NewRegistry(commands.BuiltinDefinitions()), + steering: newSteeringQueue(parseSteeringMode(cfg.Agents.Defaults.SteeringMode)), + workerSem: make(chan struct{}, workerPoolSize), + ownsRuntimeEvents: true, + } + for _, opt := range opts { + if opt != nil { + opt(al) + } + } + if al.runtimeEvents == nil { + al.runtimeEvents = runtimeevents.NewBus() + al.ownsRuntimeEvents = true } al.providerFactory = providers.CreateProviderFromConfig al.hooks = NewHookManager(eventBus) diff --git a/pkg/agent/agent_options.go b/pkg/agent/agent_options.go new file mode 100644 index 000000000..224062a3f --- /dev/null +++ b/pkg/agent/agent_options.go @@ -0,0 +1,20 @@ +package agent + +import runtimeevents "github.com/sipeed/picoclaw/pkg/events" + +// AgentLoopOption configures an AgentLoop at construction time. +type AgentLoopOption func(*AgentLoop) + +// WithRuntimeEvents injects the runtime event bus used for new observation APIs. +// +// The injected bus is treated as externally owned and will not be closed by +// AgentLoop.Close. Passing nil leaves the default owned runtime bus enabled. +func WithRuntimeEvents(bus runtimeevents.Bus) AgentLoopOption { + return func(al *AgentLoop) { + if bus == nil { + return + } + al.runtimeEvents = bus + al.ownsRuntimeEvents = false + } +} diff --git a/pkg/agent/event_kind_runtime.go b/pkg/agent/event_kind_runtime.go new file mode 100644 index 000000000..031f3b35e --- /dev/null +++ b/pkg/agent/event_kind_runtime.go @@ -0,0 +1,48 @@ +package agent + +import runtimeevents "github.com/sipeed/picoclaw/pkg/events" + +func runtimeKindForAgentEvent(kind EventKind) runtimeevents.Kind { + switch kind { + case EventKindTurnStart: + return runtimeevents.KindAgentTurnStart + case EventKindTurnEnd: + return runtimeevents.KindAgentTurnEnd + case EventKindLLMRequest: + return runtimeevents.KindAgentLLMRequest + case EventKindLLMDelta: + return runtimeevents.KindAgentLLMDelta + case EventKindLLMResponse: + return runtimeevents.KindAgentLLMResponse + case EventKindLLMRetry: + return runtimeevents.KindAgentLLMRetry + case EventKindContextCompress: + return runtimeevents.KindAgentContextCompress + case EventKindSessionSummarize: + return runtimeevents.KindAgentSessionSummarize + case EventKindToolExecStart: + return runtimeevents.KindAgentToolExecStart + case EventKindToolExecEnd: + return runtimeevents.KindAgentToolExecEnd + case EventKindToolExecSkipped: + return runtimeevents.KindAgentToolExecSkipped + case EventKindSteeringInjected: + return runtimeevents.KindAgentSteeringInjected + case EventKindFollowUpQueued: + return runtimeevents.KindAgentFollowUpQueued + case EventKindInterruptReceived: + return runtimeevents.KindAgentInterruptReceived + case EventKindSubTurnSpawn: + return runtimeevents.KindAgentSubTurnSpawn + case EventKindSubTurnEnd: + return runtimeevents.KindAgentSubTurnEnd + case EventKindSubTurnResultDelivered: + return runtimeevents.KindAgentSubTurnResultDelivered + case EventKindSubTurnOrphan: + return runtimeevents.KindAgentSubTurnOrphan + case EventKindError: + return runtimeevents.KindAgentError + default: + return runtimeevents.Kind("agent." + kind.String()) + } +} diff --git a/pkg/agent/eventbus_test.go b/pkg/agent/eventbus_test.go index 31b996260..4a5de46c9 100644 --- a/pkg/agent/eventbus_test.go +++ b/pkg/agent/eventbus_test.go @@ -9,6 +9,7 @@ import ( "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/config" + runtimeevents "github.com/sipeed/picoclaw/pkg/events" "github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/routing" "github.com/sipeed/picoclaw/pkg/session" @@ -67,6 +68,100 @@ func TestEventBus_DropsWhenSubscriberIsFull(t *testing.T) { } } +func TestAgentLoop_DualPublishesRuntimeEvents(t *testing.T) { + runtimeBus := runtimeevents.NewBus() + al := &AgentLoop{ + eventBus: NewEventBus(), + runtimeEvents: runtimeBus, + } + defer al.eventBus.Close() + defer func() { + if err := runtimeBus.Close(); err != nil { + t.Errorf("runtime bus close failed: %v", err) + } + }() + + legacySub := al.SubscribeEvents(1) + defer al.UnsubscribeEvents(legacySub.ID) + + runtimeSub, runtimeCh, err := al.RuntimeEvents().OfKind(runtimeevents.KindAgentToolExecStart).SubscribeChan( + context.Background(), + runtimeevents.SubscribeOptions{Name: "runtime", Buffer: 1}, + ) + if err != nil { + t.Fatalf("SubscribeChan failed: %v", err) + } + defer func() { + if err := runtimeSub.Close(); err != nil { + t.Errorf("runtime subscription close failed: %v", err) + } + }() + + al.emitEvent( + EventKindToolExecStart, + EventMeta{ + AgentID: "main", + TurnID: "turn-1", + ParentTurnID: "parent-turn", + SessionKey: "session-1", + Iteration: 2, + TracePath: "trace/root", + Source: "pipeline_execute", + turnContext: &TurnContext{ + Inbound: &bus.InboundContext{ + Channel: "cli", + Account: "default", + ChatID: "direct", + ChatType: "direct", + SenderID: "tester", + MessageID: "msg-1", + TopicID: "topic-1", + }, + }, + }, + ToolExecStartPayload{Tool: "mock_custom", Arguments: map[string]any{"task": "ping"}}, + ) + + legacyEvt := receiveLegacyEvent(t, legacySub.C) + if legacyEvt.Kind != EventKindToolExecStart { + t.Fatalf("legacy kind = %v, want %v", legacyEvt.Kind, EventKindToolExecStart) + } + + runtimeEvt := receiveRuntimeEvent(t, runtimeCh) + if runtimeEvt.Kind != runtimeevents.KindAgentToolExecStart { + t.Fatalf("runtime kind = %q, want %q", runtimeEvt.Kind, runtimeevents.KindAgentToolExecStart) + } + if runtimeEvt.Source != (runtimeevents.Source{Component: "agent", Name: "main"}) { + t.Fatalf("runtime source = %+v", runtimeEvt.Source) + } + if runtimeEvt.Scope.AgentID != "main" || + runtimeEvt.Scope.SessionKey != "session-1" || + runtimeEvt.Scope.TurnID != "turn-1" || + runtimeEvt.Scope.Channel != "cli" || + runtimeEvt.Scope.Account != "default" || + runtimeEvt.Scope.ChatID != "direct" || + runtimeEvt.Scope.TopicID != "topic-1" || + runtimeEvt.Scope.ChatType != "direct" || + runtimeEvt.Scope.SenderID != "tester" || + runtimeEvt.Scope.MessageID != "msg-1" { + t.Fatalf("runtime scope = %+v", runtimeEvt.Scope) + } + if runtimeEvt.Correlation.TraceID != "trace/root" || + runtimeEvt.Correlation.ParentTurnID != "parent-turn" { + t.Fatalf("runtime correlation = %+v", runtimeEvt.Correlation) + } + if runtimeEvt.Attrs["agent_source"] != "pipeline_execute" || runtimeEvt.Attrs["iteration"] != 2 { + t.Fatalf("runtime attrs = %+v", runtimeEvt.Attrs) + } + payload, ok := runtimeEvt.Payload.(ToolExecStartPayload) + if !ok { + t.Fatalf("runtime payload = %T, want ToolExecStartPayload", runtimeEvt.Payload) + } + if payload.Tool != "mock_custom" { + t.Fatalf("runtime payload tool = %q, want mock_custom", payload.Tool) + } +} + type scriptedToolProvider struct { calls int } @@ -636,6 +731,36 @@ func collectEventStream(ch <-chan Event) []Event { } } +func receiveLegacyEvent(t *testing.T, ch <-chan Event) Event { + t.Helper() + + select { + case evt, ok := <-ch: + if !ok { + t.Fatal("legacy event stream closed before expected event arrived") + } + return evt + case <-time.After(time.Second): + t.Fatal("timed out waiting for legacy event") + return Event{} + } +} + +func receiveRuntimeEvent(t *testing.T, ch <-chan runtimeevents.Event) runtimeevents.Event { + t.Helper() + + select { + case evt, ok := <-ch: + if !ok { + t.Fatal("runtime event stream closed before expected event arrived") + } + return evt + case <-time.After(time.Second): + t.Fatal("timed out waiting for runtime event") + return runtimeevents.Event{} + } +} + func waitForEvent(t *testing.T, ch <-chan Event, timeout time.Duration, match func(Event) bool) Event { t.Helper() diff --git a/pkg/agent/events_runtime.go b/pkg/agent/events_runtime.go new file mode 100644 index 000000000..822c7e23a --- /dev/null +++ b/pkg/agent/events_runtime.go @@ -0,0 +1,104 @@ +package agent + +import ( + "context" + "time" + + runtimeevents "github.com/sipeed/picoclaw/pkg/events" +) + +const runtimeEventPublishTimeout = 100 * time.Millisecond + +func (al *AgentLoop) publishRuntimeEvent(evt Event) { + if al == nil || al.runtimeEvents == nil { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), runtimeEventPublishTimeout) + defer cancel() + + al.runtimeEvents.Publish(ctx, runtimeevents.Event{ + Kind: runtimeKindForAgentEvent(evt.Kind), + Source: runtimeevents.Source{Component: "agent", Name: evt.Meta.AgentID}, + Scope: runtimeScopeFromAgentEvent(evt), + Correlation: runtimeCorrelationFromAgentEvent(evt), + Severity: runtimeSeverityForAgentEvent(evt), + Payload: evt.Payload, + Attrs: runtimeAttrsFromAgentEvent(evt), + }) +} + +func runtimeScopeFromAgentEvent(evt Event) runtimeevents.Scope { + scope := runtimeevents.Scope{ + AgentID: evt.Meta.AgentID, + SessionKey: evt.Meta.SessionKey, + TurnID: evt.Meta.TurnID, + } + + if evt.Context == nil || evt.Context.Inbound == nil { + return scope + } + + inbound := evt.Context.Inbound + scope.Channel = inbound.Channel + scope.Account = inbound.Account + scope.ChatID = inbound.ChatID + scope.TopicID = inbound.TopicID + scope.SpaceID = inbound.SpaceID + scope.SpaceType = inbound.SpaceType + scope.ChatType = inbound.ChatType + scope.SenderID = inbound.SenderID + scope.MessageID = inbound.MessageID + return scope +} + +func runtimeCorrelationFromAgentEvent(evt Event) runtimeevents.Correlation { + return runtimeevents.Correlation{ + TraceID: evt.Meta.TracePath, + ParentTurnID: evt.Meta.ParentTurnID, + } +} + +func runtimeSeverityForAgentEvent(evt Event) runtimeevents.Severity { + switch evt.Kind { + case EventKindError, EventKindSubTurnOrphan: + return runtimeevents.SeverityError + case EventKindLLMRetry, EventKindContextCompress, EventKindToolExecSkipped: + return runtimeevents.SeverityWarn + case EventKindTurnEnd: + payload, ok := evt.Payload.(TurnEndPayload) + if !ok { + return runtimeevents.SeverityInfo + } + switch payload.Status { + case TurnEndStatusError: + return runtimeevents.SeverityError + case TurnEndStatusAborted: + return runtimeevents.SeverityWarn + default: + return runtimeevents.SeverityInfo + } + case EventKindToolExecEnd: + payload, ok := evt.Payload.(ToolExecEndPayload) + if ok && payload.IsError { + return runtimeevents.SeverityWarn + } + return runtimeevents.SeverityInfo + default: + return runtimeevents.SeverityInfo + } +} + +func runtimeAttrsFromAgentEvent(evt Event) map[string]any { + attrs := make(map[string]any, 2) + if evt.Meta.Source != "" { + attrs["agent_source"] = evt.Meta.Source + } + if evt.Meta.Iteration != 0 { + attrs["iteration"] = evt.Meta.Iteration + } + if len(attrs) == 0 { + return nil + } + return attrs +} diff --git a/pkg/events/bus.go b/pkg/events/bus.go new file mode 100644 index 000000000..80ed9eb80 --- /dev/null +++ b/pkg/events/bus.go @@ -0,0 +1,233 @@ +package events + +import ( + "context" + "sort" + "strconv" + "sync" + "sync/atomic" + "time" +) + +var globalEventSeq atomic.Uint64 + +// Bus publishes runtime events and creates filtered channels. +type Bus interface { + Publish(ctx context.Context, evt Event) PublishResult + Channel() EventChannel + Close() error + Stats() Stats +} + +// PublishResult reports per-publish delivery outcomes. +type PublishResult struct { + Matched int + Delivered int + Dropped int + Blocked int + Closed bool +} + +// EventBus is an in-process runtime event broadcaster. +type EventBus struct { + mu sync.RWMutex + subs map[uint64]*eventSubscription + closed bool + + nextSubID atomic.Uint64 + published atomic.Uint64 + matched atomic.Uint64 + delivered atomic.Uint64 + dropped atomic.Uint64 + blocked atomic.Uint64 +} + +var _ Bus = (*EventBus)(nil) + +// NewBus creates an in-process runtime event bus. +func NewBus() *EventBus { + return &EventBus{ + subs: make(map[uint64]*eventSubscription), + } +} + +// Publish broadcasts evt to subscriptions whose filters match it. +func (b *EventBus) Publish(ctx context.Context, evt Event) PublishResult { + if b == nil { + return PublishResult{Closed: true} + } + if ctx == nil { + ctx = context.Background() + } + if evt.Time.IsZero() { + evt.Time = time.Now() + } + if evt.ID == "" { + evt.ID = nextEventID() + } + + subs, closed := b.snapshotSubscribers() + if closed { + return PublishResult{Closed: true} + } + + b.published.Add(1) + result := PublishResult{} + + for _, sub := range subs { + if !matchesFilters(sub.filters, evt) { + continue + } + + result.Matched++ + b.matched.Add(1) + + delivery := sub.enqueue(ctx, evt) + if delivery.closed { + continue + } + result.Delivered += delivery.delivered + result.Dropped += delivery.dropped + result.Blocked += delivery.blocked + b.delivered.Add(uint64(delivery.delivered)) + b.dropped.Add(uint64(delivery.dropped)) + b.blocked.Add(uint64(delivery.blocked)) + } + + return result +} + +// Channel returns the root event channel for this bus. +func (b *EventBus) Channel() EventChannel { + return eventChannel{bus: b} +} + +// Close closes the bus and all active subscriptions. +func (b *EventBus) Close() error { + if b == nil { + return nil + } + + b.mu.Lock() + if b.closed { + b.mu.Unlock() + return nil + } + b.closed = true + subs := make([]*eventSubscription, 0, len(b.subs)) + for id, sub := range b.subs { + subs = append(subs, sub) + delete(b.subs, id) + } + b.mu.Unlock() + + for _, sub := range subs { + sub.closeInput() + } + return nil +} + +// Stats returns a snapshot of bus and subscription counters. +func (b *EventBus) Stats() Stats { + if b == nil { + return Stats{Closed: true} + } + + b.mu.RLock() + closed := b.closed + subs := make([]*eventSubscription, 0, len(b.subs)) + for _, sub := range b.subs { + subs = append(subs, sub) + } + b.mu.RUnlock() + + sortSubscriptions(subs) + + stats := Stats{ + Published: b.published.Load(), + Matched: b.matched.Load(), + Delivered: b.delivered.Load(), + Dropped: b.dropped.Load(), + Blocked: b.blocked.Load(), + Closed: closed, + Subscribers: len(subs), + SubscriberStats: make([]SubscriberStats, 0, len(subs)), + } + for _, sub := range subs { + stats.SubscriberStats = append(stats.SubscriberStats, sub.Stats()) + } + return stats +} + +func (b *EventBus) subscribe( + ctx context.Context, + filters []Filter, + opts SubscribeOptions, + handler Handler, + once bool, +) (Subscription, error) { + if b == nil { + return nil, ErrBusClosed + } + + id := b.nextSubID.Add(1) + sub := newSubscription(b, id, filters, opts, handler, once) + + b.mu.Lock() + if b.closed { + b.mu.Unlock() + sub.closeInput() + return nil, ErrBusClosed + } + b.subs[id] = sub + b.mu.Unlock() + + if handler != nil { + go sub.run(ctx) + } + sub.watchContext(ctx) + return sub, nil +} + +func (b *EventBus) unsubscribe(id uint64) { + b.mu.Lock() + sub, ok := b.subs[id] + if ok { + delete(b.subs, id) + } + b.mu.Unlock() + + if ok { + sub.closeInput() + } +} + +func (b *EventBus) snapshotSubscribers() ([]*eventSubscription, bool) { + b.mu.RLock() + defer b.mu.RUnlock() + + if b.closed { + return nil, true + } + + subs := make([]*eventSubscription, 0, len(b.subs)) + for _, sub := range b.subs { + subs = append(subs, sub) + } + sortSubscriptions(subs) + return subs, false +} + +func sortSubscriptions(subs []*eventSubscription) { + sort.Slice(subs, func(i, j int) bool { + if subs[i].opts.Priority == subs[j].opts.Priority { + return subs[i].id < subs[j].id + } + return subs[i].opts.Priority > subs[j].opts.Priority + }) +} + +func nextEventID() string { + id := globalEventSeq.Add(1) + return "evt-" + strconv.FormatUint(id, 10) +} diff --git a/pkg/events/channel.go b/pkg/events/channel.go new file mode 100644 index 000000000..9cf6d8d8c --- /dev/null +++ b/pkg/events/channel.go @@ -0,0 +1,75 @@ +package events + +import "context" + +// EventChannel is a filtered view over an EventBus. +type EventChannel interface { + Filter(filter Filter) EventChannel + OfKind(kinds ...Kind) EventChannel + KindPrefix(prefix string) EventChannel + Source(component string, names ...string) EventChannel + Scope(scope ScopeFilter) EventChannel + + Subscribe(ctx context.Context, opts SubscribeOptions, handler Handler) (Subscription, error) + SubscribeChan(ctx context.Context, opts SubscribeOptions) (Subscription, <-chan Event, error) + SubscribeOnce(ctx context.Context, opts SubscribeOptions, handler Handler) (Subscription, error) +} + +type eventChannel struct { + bus *EventBus + filters []Filter +} + +// Filter returns a new EventChannel with filter appended. +func (c eventChannel) Filter(filter Filter) EventChannel { + filters := append([]Filter(nil), c.filters...) + if filter != nil { + filters = append(filters, filter) + } + return eventChannel{bus: c.bus, filters: filters} +} + +// OfKind returns a new EventChannel matching any of kinds. +func (c eventChannel) OfKind(kinds ...Kind) EventChannel { + return c.Filter(MatchKind(kinds...)) +} + +// KindPrefix returns a new EventChannel matching events with the kind prefix. +func (c eventChannel) KindPrefix(prefix string) EventChannel { + return c.Filter(MatchKindPrefix(prefix)) +} + +// Source returns a new EventChannel matching source component and optional names. +func (c eventChannel) Source(component string, names ...string) EventChannel { + return c.Filter(MatchSource(component, names...)) +} + +// Scope returns a new EventChannel matching non-empty scope fields. +func (c eventChannel) Scope(scope ScopeFilter) EventChannel { + return c.Filter(MatchScope(scope)) +} + +// Subscribe registers handler for events matching this channel. +func (c eventChannel) Subscribe(ctx context.Context, opts SubscribeOptions, handler Handler) (Subscription, error) { + if handler == nil { + return nil, ErrNilHandler + } + return c.bus.subscribe(ctx, c.filters, opts, handler, false) +} + +// SubscribeChan registers a channel subscription for events matching this channel. +func (c eventChannel) SubscribeChan(ctx context.Context, opts SubscribeOptions) (Subscription, <-chan Event, error) { + sub, err := c.bus.subscribe(ctx, c.filters, opts, nil, false) + if err != nil { + return nil, nil, err + } + return sub, sub.(*eventSubscription).ch, nil +} + +// SubscribeOnce registers handler and closes the subscription after the first event. +func (c eventChannel) SubscribeOnce(ctx context.Context, opts SubscribeOptions, handler Handler) (Subscription, error) { + if handler == nil { + return nil, ErrNilHandler + } + return c.bus.subscribe(ctx, c.filters, opts, handler, true) +} diff --git a/pkg/events/doc.go b/pkg/events/doc.go new file mode 100644 index 000000000..dc2f55631 --- /dev/null +++ b/pkg/events/doc.go @@ -0,0 +1,3 @@ +// Package events provides the process-local runtime event bus used to observe +// PicoClaw components without coupling them to agent-specific event envelopes. +package events diff --git a/pkg/events/events_test.go b/pkg/events/events_test.go new file mode 100644 index 000000000..19b9df96d --- /dev/null +++ b/pkg/events/events_test.go @@ -0,0 +1,155 @@ +package events + +import ( + "context" + "testing" + "time" +) + +func TestPublishDeliversToMatchingSubscriber(t *testing.T) { + t.Parallel() + + bus := NewBus() + defer closeBus(t, bus) + + _, ch, err := bus.Channel().OfKind(KindAgentTurnStart).SubscribeChan( + context.Background(), + SubscribeOptions{Name: "turn-starts", Buffer: 1}, + ) + if err != nil { + t.Fatalf("SubscribeChan failed: %v", err) + } + + unmatched := bus.Publish(context.Background(), Event{Kind: KindAgentTurnEnd}) + if unmatched.Matched != 0 || unmatched.Delivered != 0 { + t.Fatalf("unmatched Publish = %+v, want no delivery", unmatched) + } + + result := bus.Publish(context.Background(), Event{Kind: KindAgentTurnStart}) + if result.Matched != 1 || result.Delivered != 1 || result.Dropped != 0 { + t.Fatalf("Publish = %+v, want one delivered event", result) + } + + evt := receiveEvent(t, ch) + if evt.Kind != KindAgentTurnStart { + t.Fatalf("event kind = %q, want %q", evt.Kind, KindAgentTurnStart) + } + if evt.ID == "" { + t.Fatal("event ID is empty") + } + if evt.Time.IsZero() { + t.Fatal("event Time is zero") + } +} + +func TestDropNewestIncrementsStats(t *testing.T) { + t.Parallel() + + bus := NewBus() + defer closeBus(t, bus) + + sub, _, err := bus.Channel().SubscribeChan( + context.Background(), + SubscribeOptions{Name: "drop-newest", Buffer: 1, Backpressure: DropNewest}, + ) + if err != nil { + t.Fatalf("SubscribeChan failed: %v", err) + } + + first := bus.Publish(context.Background(), Event{Kind: KindAgentTurnStart}) + if first.Delivered != 1 || first.Dropped != 0 { + t.Fatalf("first Publish = %+v, want one delivered event", first) + } + + second := bus.Publish(context.Background(), Event{Kind: KindAgentTurnEnd}) + if second.Delivered != 0 || second.Dropped != 1 { + t.Fatalf("second Publish = %+v, want one dropped event", second) + } + + if got := sub.Stats().Dropped; got != 1 { + t.Fatalf("subscription dropped = %d, want 1", got) + } + if got := bus.Stats().Dropped; got != 1 { + t.Fatalf("bus dropped = %d, want 1", got) + } +} + +func TestDropOldestKeepsNewestEvent(t *testing.T) { + t.Parallel() + + bus := NewBus() + defer closeBus(t, bus) + + sub, ch, err := bus.Channel().SubscribeChan( + context.Background(), + SubscribeOptions{Name: "drop-oldest", Buffer: 1, Backpressure: DropOldest}, + ) + if err != nil { + t.Fatalf("SubscribeChan failed: %v", err) + } + + bus.Publish(context.Background(), Event{Kind: Kind("test.old"), Payload: "old"}) + result := bus.Publish(context.Background(), Event{Kind: Kind("test.new"), Payload: "new"}) + if result.Delivered != 1 || result.Dropped != 1 { + t.Fatalf("Publish = %+v, want replacement delivery", result) + } + + evt := receiveEvent(t, ch) + if evt.Payload != "new" { + t.Fatalf("payload = %v, want new", evt.Payload) + } + if got := sub.Stats().Dropped; got != 1 { + t.Fatalf("subscription dropped = %d, want 1", got) + } +} + +func TestBlockRespectsContext(t *testing.T) { + t.Parallel() + + bus := NewBus() + defer closeBus(t, bus) + + _, _, err := bus.Channel().SubscribeChan( + context.Background(), + SubscribeOptions{Name: "block", Buffer: 1, Backpressure: Block}, + ) + if err != nil { + t.Fatalf("SubscribeChan failed: %v", err) + } + + first := bus.Publish(context.Background(), Event{Kind: Kind("test.first")}) + if first.Delivered != 1 { + t.Fatalf("first Publish = %+v, want one delivered event", first) + } + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer cancel() + + second := bus.Publish(ctx, Event{Kind: Kind("test.second")}) + if second.Blocked != 1 || second.Dropped != 1 || second.Delivered != 0 { + t.Fatalf("second Publish = %+v, want one blocked drop", second) + } +} + +func receiveEvent(t *testing.T, ch <-chan Event) Event { + t.Helper() + + select { + case evt, ok := <-ch: + if !ok { + t.Fatal("event channel closed before receive") + } + return evt + case <-time.After(time.Second): + t.Fatal("timed out waiting for event") + return Event{} + } +} + +func closeBus(t *testing.T, bus *EventBus) { + t.Helper() + + if err := bus.Close(); err != nil { + t.Fatalf("Close failed: %v", err) + } +} diff --git a/pkg/events/filter.go b/pkg/events/filter.go new file mode 100644 index 000000000..0af2c85c1 --- /dev/null +++ b/pkg/events/filter.go @@ -0,0 +1,131 @@ +package events + +import "strings" + +// Filter decides whether an event should pass through an EventChannel. +type Filter func(Event) bool + +// ScopeFilter matches selected non-empty fields against Event.Scope. +type ScopeFilter struct { + AgentID string + SessionKey string + TurnID string + Channel string + ChatID string + MessageID string +} + +// MatchKind matches events whose kind is in kinds. Empty kinds match all events. +func MatchKind(kinds ...Kind) Filter { + if len(kinds) == 0 { + return matchAll + } + + allowed := make(map[Kind]struct{}, len(kinds)) + for _, kind := range kinds { + allowed[kind] = struct{}{} + } + + return func(evt Event) bool { + _, ok := allowed[evt.Kind] + return ok + } +} + +// MatchKindPrefix matches events whose kind starts with prefix. +func MatchKindPrefix(prefix string) Filter { + if prefix == "" { + return matchAll + } + return func(evt Event) bool { + return strings.HasPrefix(evt.Kind.String(), prefix) + } +} + +// MatchSource matches events emitted by component and, optionally, one of names. +func MatchSource(component string, names ...string) Filter { + if component == "" && len(names) == 0 { + return matchAll + } + + allowedNames := make(map[string]struct{}, len(names)) + for _, name := range names { + allowedNames[name] = struct{}{} + } + + return func(evt Event) bool { + if component != "" && evt.Source.Component != component { + return false + } + if len(allowedNames) == 0 { + return true + } + _, ok := allowedNames[evt.Source.Name] + return ok + } +} + +// MatchScope matches events whose Scope contains all non-empty filter fields. +func MatchScope(scope ScopeFilter) Filter { + if scope == (ScopeFilter{}) { + return matchAll + } + + return func(evt Event) bool { + return matchesString(scope.AgentID, evt.Scope.AgentID) && + matchesString(scope.SessionKey, evt.Scope.SessionKey) && + matchesString(scope.TurnID, evt.Scope.TurnID) && + matchesString(scope.Channel, evt.Scope.Channel) && + matchesString(scope.ChatID, evt.Scope.ChatID) && + matchesString(scope.MessageID, evt.Scope.MessageID) + } +} + +// And combines filters and short-circuits on the first non-match. +func And(filters ...Filter) Filter { + if len(filters) == 0 { + return matchAll + } + + return func(evt Event) bool { + for _, filter := range filters { + if filter != nil && !filter(evt) { + return false + } + } + return true + } +} + +// Or combines filters and short-circuits on the first match. +func Or(filters ...Filter) Filter { + if len(filters) == 0 { + return matchAll + } + + return func(evt Event) bool { + for _, filter := range filters { + if filter == nil || filter(evt) { + return true + } + } + return false + } +} + +func matchAll(Event) bool { + return true +} + +func matchesString(want, got string) bool { + return want == "" || want == got +} + +func matchesFilters(filters []Filter, evt Event) bool { + for _, filter := range filters { + if filter != nil && !filter(evt) { + return false + } + } + return true +} diff --git a/pkg/events/filter_test.go b/pkg/events/filter_test.go new file mode 100644 index 000000000..9b0112754 --- /dev/null +++ b/pkg/events/filter_test.go @@ -0,0 +1,96 @@ +package events + +import "testing" + +func TestFilterKindPrefix(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + prefix string + event Event + want bool + }{ + { + name: "matches agent prefix", + prefix: "agent.", + event: Event{Kind: KindAgentTurnStart}, + want: true, + }, + { + name: "rejects different prefix", + prefix: "channel.", + event: Event{Kind: KindAgentTurnStart}, + want: false, + }, + { + name: "empty prefix matches all", + prefix: "", + event: Event{Kind: KindAgentTurnStart}, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + if got := MatchKindPrefix(tt.prefix)(tt.event); got != tt.want { + t.Fatalf("MatchKindPrefix(%q) = %v, want %v", tt.prefix, got, tt.want) + } + }) + } +} + +func TestFilterScope(t *testing.T) { + t.Parallel() + + evt := Event{ + Scope: Scope{ + AgentID: "agent-a", + SessionKey: "session-1", + TurnID: "turn-1", + Channel: "telegram", + ChatID: "chat-1", + MessageID: "msg-1", + }, + } + + tests := []struct { + name string + scope ScopeFilter + want bool + }{ + { + name: "empty filter matches", + scope: ScopeFilter{}, + want: true, + }, + { + name: "matches selected fields", + scope: ScopeFilter{ + AgentID: "agent-a", + ChatID: "chat-1", + }, + want: true, + }, + { + name: "rejects mismatched field", + scope: ScopeFilter{ + AgentID: "agent-a", + MessageID: "msg-2", + }, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + if got := MatchScope(tt.scope)(evt); got != tt.want { + t.Fatalf("MatchScope(%+v) = %v, want %v", tt.scope, got, tt.want) + } + }) + } +} diff --git a/pkg/events/kind.go b/pkg/events/kind.go new file mode 100644 index 000000000..86abfd55a --- /dev/null +++ b/pkg/events/kind.go @@ -0,0 +1,68 @@ +package events + +const ( + // KindAgentTurnStart is emitted when an agent turn starts. + KindAgentTurnStart Kind = "agent.turn.start" + // KindAgentTurnEnd is emitted when an agent turn ends. + KindAgentTurnEnd Kind = "agent.turn.end" + + // KindAgentLLMRequest is emitted before an LLM request. + KindAgentLLMRequest Kind = "agent.llm.request" + // KindAgentLLMDelta is emitted for streaming LLM deltas. + KindAgentLLMDelta Kind = "agent.llm.delta" + // KindAgentLLMResponse is emitted after an LLM response. + KindAgentLLMResponse Kind = "agent.llm.response" + // KindAgentLLMRetry is emitted before retrying an LLM request. + KindAgentLLMRetry Kind = "agent.llm.retry" + + // KindAgentContextCompress is emitted when agent context is compressed. + KindAgentContextCompress Kind = "agent.context.compress" + // KindAgentSessionSummarize is emitted when session summarization completes. + KindAgentSessionSummarize Kind = "agent.session.summarize" + + // KindAgentToolExecStart is emitted before a tool executes. + KindAgentToolExecStart Kind = "agent.tool.exec_start" + // KindAgentToolExecEnd is emitted after a tool finishes. + KindAgentToolExecEnd Kind = "agent.tool.exec_end" + // KindAgentToolExecSkipped is emitted when a tool call is skipped. + KindAgentToolExecSkipped Kind = "agent.tool.exec_skipped" + + // KindAgentSteeringInjected is emitted when steering is injected into context. + KindAgentSteeringInjected Kind = "agent.steering.injected" + // KindAgentFollowUpQueued is emitted when async follow-up input is queued. + KindAgentFollowUpQueued Kind = "agent.follow_up.queued" + // KindAgentInterruptReceived is emitted when a turn interrupt is accepted. + KindAgentInterruptReceived Kind = "agent.interrupt.received" + + // KindAgentSubTurnSpawn is emitted when a sub-turn is spawned. + KindAgentSubTurnSpawn Kind = "agent.subturn.spawn" + // KindAgentSubTurnEnd is emitted when a sub-turn ends. + KindAgentSubTurnEnd Kind = "agent.subturn.end" + // KindAgentSubTurnResultDelivered is emitted when a sub-turn result is delivered. + KindAgentSubTurnResultDelivered Kind = "agent.subturn.result_delivered" + // KindAgentSubTurnOrphan is emitted when a sub-turn result cannot be delivered. + KindAgentSubTurnOrphan Kind = "agent.subturn.orphan" + // KindAgentError is emitted when agent execution reports an error. + KindAgentError Kind = "agent.error" + + // KindChannelLifecycleStarted is emitted when a channel starts. + KindChannelLifecycleStarted Kind = "channel.lifecycle.started" + // KindChannelLifecycleStartFailed is emitted when a channel fails to start. + KindChannelLifecycleStartFailed Kind = "channel.lifecycle.start_failed" + // KindChannelMessageOutboundSent is emitted when an outbound channel message is sent. + KindChannelMessageOutboundSent Kind = "channel.message.outbound_sent" + // KindChannelMessageOutboundFailed is emitted when an outbound channel message fails. + KindChannelMessageOutboundFailed Kind = "channel.message.outbound_failed" + + // KindGatewayReloadStarted is emitted when gateway reload starts. + KindGatewayReloadStarted Kind = "gateway.reload.started" + // KindGatewayReloadCompleted is emitted when gateway reload completes. + KindGatewayReloadCompleted Kind = "gateway.reload.completed" + // KindGatewayReloadFailed is emitted when gateway reload fails. + KindGatewayReloadFailed Kind = "gateway.reload.failed" + + // KindMCPServerConnected is emitted when an MCP server connects. + KindMCPServerConnected Kind = "mcp.server.connected" + // KindMCPServerFailed is emitted when an MCP server fails. + KindMCPServerFailed Kind = "mcp.server.failed" +) diff --git a/pkg/events/stats.go b/pkg/events/stats.go new file mode 100644 index 000000000..7931c5ef3 --- /dev/null +++ b/pkg/events/stats.go @@ -0,0 +1,26 @@ +package events + +// Stats reports aggregate EventBus counters. +type Stats struct { + Published uint64 + Matched uint64 + Delivered uint64 + Dropped uint64 + Blocked uint64 + Closed bool + Subscribers int + + SubscriberStats []SubscriberStats +} + +// SubscriberStats reports counters for one subscription. +type SubscriberStats struct { + ID uint64 + Name string + Received uint64 + Handled uint64 + Failed uint64 + Dropped uint64 + Panicked uint64 + TimedOut uint64 +} diff --git a/pkg/events/subscription.go b/pkg/events/subscription.go new file mode 100644 index 000000000..5ff47eddd --- /dev/null +++ b/pkg/events/subscription.go @@ -0,0 +1,384 @@ +package events + +import ( + "context" + "errors" + "log" + "sync" + "sync/atomic" + "time" +) + +const defaultSubscriberBuffer = 16 + +var ( + // ErrBusClosed is returned when subscribing to a closed event bus. + ErrBusClosed = errors.New("events: bus is closed") + // ErrNilHandler is returned when subscribing without a handler. + ErrNilHandler = errors.New("events: handler is nil") +) + +// Handler processes a runtime event delivered to a subscription. +type Handler func(context.Context, Event) error + +// SubscribeOptions controls how a subscription receives events. +type SubscribeOptions struct { + Name string + Buffer int + Priority int + Concurrency ConcurrencyKind + Backpressure BackpressurePolicy + Timeout time.Duration + PanicPolicy PanicPolicy +} + +// ConcurrencyKind controls how handler subscriptions process queued events. +type ConcurrencyKind string + +const ( + // Concurrent processes each event in its own goroutine. + Concurrent ConcurrencyKind = "concurrent" + // Locked processes events sequentially in subscription order. + Locked ConcurrencyKind = "locked" + // Keyed is reserved for keyed sequential processing and currently behaves as Locked. + Keyed ConcurrencyKind = "keyed" +) + +// BackpressurePolicy controls delivery when a subscription queue is full. +type BackpressurePolicy string + +const ( + // DropNewest drops the event being published when the queue is full. + DropNewest BackpressurePolicy = "drop_newest" + // DropOldest drops one queued event and enqueues the event being published. + DropOldest BackpressurePolicy = "drop_oldest" + // Block waits for queue capacity until Publish's context is canceled. + Block BackpressurePolicy = "block" +) + +// PanicPolicy controls handler panic behavior. +type PanicPolicy string + +const ( + // RecoverAndLog recovers handler panics and records them in subscription stats. + RecoverAndLog PanicPolicy = "recover_and_log" + // Crash lets handler panics propagate from the worker goroutine. + Crash PanicPolicy = "crash" +) + +// Subscription represents an active event subscription. +type Subscription interface { + ID() uint64 + Name() string + Close() error + Done() <-chan struct{} + Stats() SubscriberStats +} + +type subscriberCounters struct { + received atomic.Uint64 + handled atomic.Uint64 + failed atomic.Uint64 + dropped atomic.Uint64 + panicked atomic.Uint64 + timedOut atomic.Uint64 +} + +type eventSubscription struct { + bus *EventBus + id uint64 + name string + opts SubscribeOptions + filters []Filter + handler Handler + once bool + + ch chan Event + done chan struct{} + closing chan struct{} + + closeOnce sync.Once + doneOnce sync.Once + mu sync.RWMutex + closed bool + wg sync.WaitGroup + + counters subscriberCounters +} + +func normalizeSubscribeOptions(opts SubscribeOptions) SubscribeOptions { + if opts.Buffer <= 0 { + opts.Buffer = defaultSubscriberBuffer + } + if opts.Concurrency == "" { + opts.Concurrency = Locked + } + if opts.Backpressure == "" { + opts.Backpressure = DropNewest + } + if opts.PanicPolicy == "" { + opts.PanicPolicy = RecoverAndLog + } + return opts +} + +func newSubscription( + bus *EventBus, + id uint64, + filters []Filter, + opts SubscribeOptions, + handler Handler, + once bool, +) *eventSubscription { + opts = normalizeSubscribeOptions(opts) + return &eventSubscription{ + bus: bus, + id: id, + name: opts.Name, + opts: opts, + filters: append([]Filter(nil), filters...), + handler: handler, + once: once, + ch: make(chan Event, opts.Buffer), + done: make(chan struct{}), + closing: make(chan struct{}), + } +} + +// ID returns the subscription identifier. +func (s *eventSubscription) ID() uint64 { + if s == nil { + return 0 + } + return s.id +} + +// Name returns the subscription name. +func (s *eventSubscription) Name() string { + if s == nil { + return "" + } + return s.name +} + +// Close removes the subscription and closes its delivery channel. +func (s *eventSubscription) Close() error { + if s == nil || s.bus == nil { + return nil + } + s.bus.unsubscribe(s.id) + return nil +} + +// Done returns a channel closed after the subscription has stopped processing. +func (s *eventSubscription) Done() <-chan struct{} { + if s == nil { + ch := make(chan struct{}) + close(ch) + return ch + } + return s.done +} + +// Stats returns a snapshot of the subscription counters. +func (s *eventSubscription) Stats() SubscriberStats { + if s == nil { + return SubscriberStats{} + } + return SubscriberStats{ + ID: s.id, + Name: s.name, + Received: s.counters.received.Load(), + Handled: s.counters.handled.Load(), + Failed: s.counters.failed.Load(), + Dropped: s.counters.dropped.Load(), + Panicked: s.counters.panicked.Load(), + TimedOut: s.counters.timedOut.Load(), + } +} + +func (s *eventSubscription) run(ctx context.Context) { + defer func() { + s.wg.Wait() + s.closeDone() + }() + + for evt := range s.ch { + s.dispatch(ctx, evt) + if s.once { + _ = s.Close() + return + } + } +} + +func (s *eventSubscription) dispatch(ctx context.Context, evt Event) { + switch s.opts.Concurrency { + case Concurrent: + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.handle(ctx, evt) + }() + case Keyed: + // TODO: replace this with keyed executors when runtime events need + // per-scope ordering with cross-scope concurrency. + s.handle(ctx, evt) + default: + s.handle(ctx, evt) + } +} + +func (s *eventSubscription) handle(ctx context.Context, evt Event) { + if ctx == nil { + ctx = context.Background() + } + if s.opts.Timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, s.opts.Timeout) + defer cancel() + } + + if s.opts.PanicPolicy != Crash { + defer func() { + if recovered := recover(); recovered != nil { + s.counters.panicked.Add(1) + log.Printf("events: subscriber %q recovered panic: %v", s.name, recovered) + } + }() + } + + err := s.handler(ctx, evt) + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + s.counters.timedOut.Add(1) + } + if err != nil { + s.counters.failed.Add(1) + return + } + s.counters.handled.Add(1) +} + +func (s *eventSubscription) watchContext(ctx context.Context) { + if ctx == nil { + return + } + + go func() { + select { + case <-ctx.Done(): + _ = s.Close() + case <-s.done: + } + }() +} + +func (s *eventSubscription) closeInput() { + s.closeOnce.Do(func() { + close(s.closing) + s.mu.Lock() + s.closed = true + close(s.ch) + s.mu.Unlock() + if s.handler == nil { + s.closeDone() + } + }) +} + +func (s *eventSubscription) closeDone() { + s.doneOnce.Do(func() { + close(s.done) + }) +} + +type deliveryResult struct { + delivered int + dropped int + blocked int + closed bool +} + +func (s *eventSubscription) enqueue(ctx context.Context, evt Event) deliveryResult { + if ctx == nil { + ctx = context.Background() + } + + s.mu.RLock() + defer s.mu.RUnlock() + + if s.closed { + return deliveryResult{closed: true} + } + + s.counters.received.Add(1) + + switch s.opts.Backpressure { + case DropOldest: + return s.enqueueDropOldest(evt) + case Block: + return s.enqueueBlock(ctx, evt) + default: + return s.enqueueDropNewest(evt) + } +} + +func (s *eventSubscription) enqueueDropNewest(evt Event) deliveryResult { + select { + case <-s.closing: + return deliveryResult{closed: true} + default: + } + + select { + case s.ch <- evt: + return deliveryResult{delivered: 1} + default: + s.counters.dropped.Add(1) + return deliveryResult{dropped: 1} + } +} + +func (s *eventSubscription) enqueueDropOldest(evt Event) deliveryResult { + select { + case <-s.closing: + return deliveryResult{closed: true} + default: + } + + select { + case s.ch <- evt: + return deliveryResult{delivered: 1} + default: + } + + dropped := 0 + select { + case <-s.ch: + s.counters.dropped.Add(1) + dropped = 1 + default: + } + + select { + case <-s.closing: + return deliveryResult{dropped: dropped, closed: true} + case s.ch <- evt: + return deliveryResult{delivered: 1, dropped: dropped} + default: + s.counters.dropped.Add(1) + return deliveryResult{dropped: dropped + 1} + } +} + +func (s *eventSubscription) enqueueBlock(ctx context.Context, evt Event) deliveryResult { + select { + case <-s.closing: + return deliveryResult{closed: true} + case s.ch <- evt: + return deliveryResult{delivered: 1} + case <-ctx.Done(): + s.counters.dropped.Add(1) + return deliveryResult{dropped: 1, blocked: 1} + } +} diff --git a/pkg/events/subscription_test.go b/pkg/events/subscription_test.go new file mode 100644 index 000000000..8cb5f58dc --- /dev/null +++ b/pkg/events/subscription_test.go @@ -0,0 +1,156 @@ +package events + +import ( + "context" + "sync/atomic" + "testing" + "time" +) + +func TestSubscribeOnceClosesAfterFirstEvent(t *testing.T) { + t.Parallel() + + bus := NewBus() + defer closeBus(t, bus) + + var handled atomic.Uint64 + sub, err := bus.Channel().SubscribeOnce( + context.Background(), + SubscribeOptions{Name: "once", Buffer: 2}, + func(context.Context, Event) error { + handled.Add(1) + return nil + }, + ) + if err != nil { + t.Fatalf("SubscribeOnce failed: %v", err) + } + + bus.Publish(context.Background(), Event{Kind: KindAgentTurnStart}) + waitForSubscriptionDone(t, sub) + bus.Publish(context.Background(), Event{Kind: KindAgentTurnEnd}) + + if got := handled.Load(); got != 1 { + t.Fatalf("handled = %d, want 1", got) + } + if got := sub.Stats().Handled; got != 1 { + t.Fatalf("subscription handled = %d, want 1", got) + } +} + +func TestUnsubscribeClosesChannel(t *testing.T) { + t.Parallel() + + bus := NewBus() + defer closeBus(t, bus) + + sub, ch, err := bus.Channel().SubscribeChan(context.Background(), SubscribeOptions{Name: "chan"}) + if err != nil { + t.Fatalf("SubscribeChan failed: %v", err) + } + if err := sub.Close(); err != nil { + t.Fatalf("Close failed: %v", err) + } + + select { + case _, ok := <-ch: + if ok { + t.Fatal("channel is open, want closed") + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for channel close") + } + waitForSubscriptionDone(t, sub) +} + +func TestHandlerPanicRecovered(t *testing.T) { + t.Parallel() + + bus := NewBus() + defer closeBus(t, bus) + + sub, err := bus.Channel().Subscribe( + context.Background(), + SubscribeOptions{Name: "panic", Buffer: 1}, + func(context.Context, Event) error { + panic("boom") + }, + ) + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } + + bus.Publish(context.Background(), Event{Kind: KindAgentError}) + waitForStat(t, func() uint64 { + return sub.Stats().Panicked + }, 1) +} + +func TestLockedHandlerProcessesSequentially(t *testing.T) { + t.Parallel() + + bus := NewBus() + defer closeBus(t, bus) + + var active atomic.Int64 + var maxActive atomic.Int64 + sub, err := bus.Channel().Subscribe( + context.Background(), + SubscribeOptions{Name: "locked", Buffer: 8, Concurrency: Locked}, + func(context.Context, Event) error { + current := active.Add(1) + for { + currentMax := maxActive.Load() + if current <= currentMax || maxActive.CompareAndSwap(currentMax, current) { + break + } + } + time.Sleep(10 * time.Millisecond) + active.Add(-1) + return nil + }, + ) + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } + + for i := 0; i < 5; i++ { + bus.Publish(context.Background(), Event{Kind: KindAgentLLMDelta}) + } + waitForStat(t, func() uint64 { + return sub.Stats().Handled + }, 5) + + if got := maxActive.Load(); got != 1 { + t.Fatalf("max active handlers = %d, want 1", got) + } +} + +func waitForSubscriptionDone(t *testing.T, sub Subscription) { + t.Helper() + + select { + case <-sub.Done(): + case <-time.After(time.Second): + t.Fatal("timed out waiting for subscription to stop") + } +} + +func waitForStat(t *testing.T, stat func() uint64, want uint64) { + t.Helper() + + deadline := time.After(time.Second) + ticker := time.NewTicker(time.Millisecond) + defer ticker.Stop() + + for { + if got := stat(); got >= want { + return + } + select { + case <-ticker.C: + case <-deadline: + t.Fatalf("timed out waiting for stat >= %d", want) + } + } +} diff --git a/pkg/events/types.go b/pkg/events/types.go new file mode 100644 index 000000000..2cfc0eaac --- /dev/null +++ b/pkg/events/types.go @@ -0,0 +1,77 @@ +package events + +import "time" + +// Kind identifies a runtime event category. +type Kind string + +// String returns the string representation of the event kind. +func (k Kind) String() string { + return string(k) +} + +// Event is the runtime event envelope shared across PicoClaw components. +type Event struct { + ID string `json:"id"` + Kind Kind `json:"kind"` + Time time.Time `json:"time"` + Source Source `json:"source"` + Scope Scope `json:"scope,omitempty"` + Correlation Correlation `json:"correlation,omitempty"` + Severity Severity `json:"severity,omitempty"` + Payload any `json:"payload,omitempty"` + Attrs map[string]any `json:"attrs,omitempty"` +} + +// Source identifies the component that emitted an event. +type Source struct { + Component string `json:"component"` + Name string `json:"name,omitempty"` +} + +// Scope identifies the runtime ownership of an event. +// +// Scope is intentionally limited to agent, session, turn, channel, chat, +// message, and sender identity. Tool, provider, model, and MCP details belong +// in Source, Payload, or Attrs. +type Scope struct { + RuntimeID string `json:"runtime_id,omitempty"` + + AgentID string `json:"agent_id,omitempty"` + SessionKey string `json:"session_key,omitempty"` + TurnID string `json:"turn_id,omitempty"` + + Channel string `json:"channel,omitempty"` + Account string `json:"account,omitempty"` + ChatID string `json:"chat_id,omitempty"` + TopicID string `json:"topic_id,omitempty"` + + SpaceID string `json:"space_id,omitempty"` + SpaceType string `json:"space_type,omitempty"` + ChatType string `json:"chat_type,omitempty"` + + SenderID string `json:"sender_id,omitempty"` + MessageID string `json:"message_id,omitempty"` +} + +// Correlation carries cross-event tracing fields. +type Correlation struct { + TraceID string `json:"trace_id,omitempty"` + ParentTurnID string `json:"parent_turn_id,omitempty"` + RequestID string `json:"request_id,omitempty"` + ReplyToID string `json:"reply_to_id,omitempty"` +} + +// Severity describes the operational severity of an event. +type Severity string + +const ( + // SeverityDebug is used for verbose diagnostic events. + SeverityDebug Severity = "debug" + // SeverityInfo is used for normal lifecycle and activity events. + SeverityInfo Severity = "info" + // SeverityWarn is used for recoverable abnormal events. + SeverityWarn Severity = "warn" + // SeverityError is used for failed operations and unrecoverable events. + SeverityError Severity = "error" +)