Files
picoclaw/pkg/events/subscription.go
T
Hoshina eedebabbea feat(events): add runtime event bus
Introduce pkg/events with filtered channels, subscription policies, backpressure, and stats. Wire AgentLoop to dual-publish legacy agent events into runtime events while preserving old event APIs.

Validation: go test ./pkg/events/... ./pkg/agent; go test -race ./pkg/events/...; make lint
2026-04-26 15:36:03 +08:00

385 lines
8.0 KiB
Go

package events
import (
"context"
"errors"
"log"
"sync"
"sync/atomic"
"time"
)
const defaultSubscriberBuffer = 16
var (
// ErrBusClosed is returned when subscribing to a closed event bus.
ErrBusClosed = errors.New("events: bus is closed")
// ErrNilHandler is returned when subscribing without a handler.
ErrNilHandler = errors.New("events: handler is nil")
)
// Handler processes a runtime event delivered to a subscription.
type Handler func(context.Context, Event) error
// SubscribeOptions controls how a subscription receives events.
type SubscribeOptions struct {
Name string
Buffer int
Priority int
Concurrency ConcurrencyKind
Backpressure BackpressurePolicy
Timeout time.Duration
PanicPolicy PanicPolicy
}
// ConcurrencyKind controls how handler subscriptions process queued events.
type ConcurrencyKind string
const (
// Concurrent processes each event in its own goroutine.
Concurrent ConcurrencyKind = "concurrent"
// Locked processes events sequentially in subscription order.
Locked ConcurrencyKind = "locked"
// Keyed is reserved for keyed sequential processing and currently behaves as Locked.
Keyed ConcurrencyKind = "keyed"
)
// BackpressurePolicy controls delivery when a subscription queue is full.
type BackpressurePolicy string
const (
// DropNewest drops the event being published when the queue is full.
DropNewest BackpressurePolicy = "drop_newest"
// DropOldest drops one queued event and enqueues the event being published.
DropOldest BackpressurePolicy = "drop_oldest"
// Block waits for queue capacity until Publish's context is canceled.
Block BackpressurePolicy = "block"
)
// PanicPolicy controls handler panic behavior.
type PanicPolicy string
const (
// RecoverAndLog recovers handler panics and records them in subscription stats.
RecoverAndLog PanicPolicy = "recover_and_log"
// Crash lets handler panics propagate from the worker goroutine.
Crash PanicPolicy = "crash"
)
// Subscription represents an active event subscription.
type Subscription interface {
ID() uint64
Name() string
Close() error
Done() <-chan struct{}
Stats() SubscriberStats
}
type subscriberCounters struct {
received atomic.Uint64
handled atomic.Uint64
failed atomic.Uint64
dropped atomic.Uint64
panicked atomic.Uint64
timedOut atomic.Uint64
}
type eventSubscription struct {
bus *EventBus
id uint64
name string
opts SubscribeOptions
filters []Filter
handler Handler
once bool
ch chan Event
done chan struct{}
closing chan struct{}
closeOnce sync.Once
doneOnce sync.Once
mu sync.RWMutex
closed bool
wg sync.WaitGroup
counters subscriberCounters
}
func normalizeSubscribeOptions(opts SubscribeOptions) SubscribeOptions {
if opts.Buffer <= 0 {
opts.Buffer = defaultSubscriberBuffer
}
if opts.Concurrency == "" {
opts.Concurrency = Locked
}
if opts.Backpressure == "" {
opts.Backpressure = DropNewest
}
if opts.PanicPolicy == "" {
opts.PanicPolicy = RecoverAndLog
}
return opts
}
func newSubscription(
bus *EventBus,
id uint64,
filters []Filter,
opts SubscribeOptions,
handler Handler,
once bool,
) *eventSubscription {
opts = normalizeSubscribeOptions(opts)
return &eventSubscription{
bus: bus,
id: id,
name: opts.Name,
opts: opts,
filters: append([]Filter(nil), filters...),
handler: handler,
once: once,
ch: make(chan Event, opts.Buffer),
done: make(chan struct{}),
closing: make(chan struct{}),
}
}
// ID returns the subscription identifier.
func (s *eventSubscription) ID() uint64 {
if s == nil {
return 0
}
return s.id
}
// Name returns the subscription name.
func (s *eventSubscription) Name() string {
if s == nil {
return ""
}
return s.name
}
// Close removes the subscription and closes its delivery channel.
func (s *eventSubscription) Close() error {
if s == nil || s.bus == nil {
return nil
}
s.bus.unsubscribe(s.id)
return nil
}
// Done returns a channel closed after the subscription has stopped processing.
func (s *eventSubscription) Done() <-chan struct{} {
if s == nil {
ch := make(chan struct{})
close(ch)
return ch
}
return s.done
}
// Stats returns a snapshot of the subscription counters.
func (s *eventSubscription) Stats() SubscriberStats {
if s == nil {
return SubscriberStats{}
}
return SubscriberStats{
ID: s.id,
Name: s.name,
Received: s.counters.received.Load(),
Handled: s.counters.handled.Load(),
Failed: s.counters.failed.Load(),
Dropped: s.counters.dropped.Load(),
Panicked: s.counters.panicked.Load(),
TimedOut: s.counters.timedOut.Load(),
}
}
func (s *eventSubscription) run(ctx context.Context) {
defer func() {
s.wg.Wait()
s.closeDone()
}()
for evt := range s.ch {
s.dispatch(ctx, evt)
if s.once {
_ = s.Close()
return
}
}
}
func (s *eventSubscription) dispatch(ctx context.Context, evt Event) {
switch s.opts.Concurrency {
case Concurrent:
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.handle(ctx, evt)
}()
case Keyed:
// TODO: replace this with keyed executors when runtime events need
// per-scope ordering with cross-scope concurrency.
s.handle(ctx, evt)
default:
s.handle(ctx, evt)
}
}
func (s *eventSubscription) handle(ctx context.Context, evt Event) {
if ctx == nil {
ctx = context.Background()
}
if s.opts.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, s.opts.Timeout)
defer cancel()
}
if s.opts.PanicPolicy != Crash {
defer func() {
if recovered := recover(); recovered != nil {
s.counters.panicked.Add(1)
log.Printf("events: subscriber %q recovered panic: %v", s.name, recovered)
}
}()
}
err := s.handler(ctx, evt)
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
s.counters.timedOut.Add(1)
}
if err != nil {
s.counters.failed.Add(1)
return
}
s.counters.handled.Add(1)
}
func (s *eventSubscription) watchContext(ctx context.Context) {
if ctx == nil {
return
}
go func() {
select {
case <-ctx.Done():
_ = s.Close()
case <-s.done:
}
}()
}
func (s *eventSubscription) closeInput() {
s.closeOnce.Do(func() {
close(s.closing)
s.mu.Lock()
s.closed = true
close(s.ch)
s.mu.Unlock()
if s.handler == nil {
s.closeDone()
}
})
}
func (s *eventSubscription) closeDone() {
s.doneOnce.Do(func() {
close(s.done)
})
}
type deliveryResult struct {
delivered int
dropped int
blocked int
closed bool
}
func (s *eventSubscription) enqueue(ctx context.Context, evt Event) deliveryResult {
if ctx == nil {
ctx = context.Background()
}
s.mu.RLock()
defer s.mu.RUnlock()
if s.closed {
return deliveryResult{closed: true}
}
s.counters.received.Add(1)
switch s.opts.Backpressure {
case DropOldest:
return s.enqueueDropOldest(evt)
case Block:
return s.enqueueBlock(ctx, evt)
default:
return s.enqueueDropNewest(evt)
}
}
func (s *eventSubscription) enqueueDropNewest(evt Event) deliveryResult {
select {
case <-s.closing:
return deliveryResult{closed: true}
default:
}
select {
case s.ch <- evt:
return deliveryResult{delivered: 1}
default:
s.counters.dropped.Add(1)
return deliveryResult{dropped: 1}
}
}
func (s *eventSubscription) enqueueDropOldest(evt Event) deliveryResult {
select {
case <-s.closing:
return deliveryResult{closed: true}
default:
}
select {
case s.ch <- evt:
return deliveryResult{delivered: 1}
default:
}
dropped := 0
select {
case <-s.ch:
s.counters.dropped.Add(1)
dropped = 1
default:
}
select {
case <-s.closing:
return deliveryResult{dropped: dropped, closed: true}
case s.ch <- evt:
return deliveryResult{delivered: 1, dropped: dropped}
default:
s.counters.dropped.Add(1)
return deliveryResult{dropped: dropped + 1}
}
}
func (s *eventSubscription) enqueueBlock(ctx context.Context, evt Event) deliveryResult {
select {
case <-s.closing:
return deliveryResult{closed: true}
case s.ch <- evt:
return deliveryResult{delivered: 1}
case <-ctx.Done():
s.counters.dropped.Add(1)
return deliveryResult{dropped: 1, blocked: 1}
}
}