feat(events): add runtime event bus

Introduce pkg/events with filtered channels, subscription policies, backpressure, and stats. Wire AgentLoop to dual-publish legacy agent events into runtime events while preserving old event APIs.

Validation: go test ./pkg/events/... ./pkg/agent; go test -race ./pkg/events/...; make lint
This commit is contained in:
Hoshina
2026-04-26 15:36:03 +08:00
parent 77be169db4
commit eedebabbea
18 changed files with 1757 additions and 13 deletions
+13 -2
View File
@@ -21,6 +21,7 @@ import (
"github.com/sipeed/picoclaw/pkg/commands"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/constants"
runtimeevents "github.com/sipeed/picoclaw/pkg/events"
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/media"
"github.com/sipeed/picoclaw/pkg/providers"
@@ -38,8 +39,10 @@ type AgentLoop struct {
state *state.Manager
// Event system (from Incoming)
eventBus *EventBus
hooks *HookManager
eventBus *EventBus
runtimeEvents runtimeevents.Bus
ownsRuntimeEvents bool
hooks *HookManager
// Runtime state
running atomic.Bool
@@ -286,6 +289,14 @@ func (al *AgentLoop) Close() {
if al.eventBus != nil {
al.eventBus.Close()
}
if al.runtimeEvents != nil && al.ownsRuntimeEvents {
if err := al.runtimeEvents.Close(); err != nil {
logger.ErrorCF("agent", "Failed to close runtime event bus",
map[string]any{
"error": err.Error(),
})
}
}
}
// MountHook registers an in-process hook on the agent loop.
+22 -2
View File
@@ -5,6 +5,7 @@ package agent
import (
"fmt"
runtimeevents "github.com/sipeed/picoclaw/pkg/events"
"github.com/sipeed/picoclaw/pkg/logger"
)
@@ -39,13 +40,16 @@ func (al *AgentLoop) emitEvent(kind EventKind, meta EventMeta, payload any) {
Payload: payload,
}
if al == nil || al.eventBus == nil {
if al == nil {
return
}
al.logEvent(evt)
al.eventBus.Emit(evt)
if al.eventBus != nil {
al.eventBus.Emit(evt)
}
al.publishRuntimeEvent(evt)
}
func (al *AgentLoop) logEvent(evt Event) {
@@ -186,3 +190,19 @@ func (al *AgentLoop) EventDrops(kind EventKind) int64 {
}
return al.eventBus.Dropped(kind)
}
// RuntimeEvents returns the root runtime event channel.
func (al *AgentLoop) RuntimeEvents() runtimeevents.EventChannel {
if al == nil || al.runtimeEvents == nil {
return nil
}
return al.runtimeEvents.Channel()
}
// RuntimeEventStats returns runtime event bus counters.
func (al *AgentLoop) RuntimeEventStats() runtimeevents.Stats {
if al == nil || al.runtimeEvents == nil {
return runtimeevents.Stats{Closed: true}
}
return al.runtimeEvents.Stats()
}
+21 -9
View File
@@ -13,6 +13,7 @@ import (
"github.com/sipeed/picoclaw/pkg/channels"
"github.com/sipeed/picoclaw/pkg/commands"
"github.com/sipeed/picoclaw/pkg/config"
runtimeevents "github.com/sipeed/picoclaw/pkg/events"
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/providers"
"github.com/sipeed/picoclaw/pkg/skills"
@@ -24,6 +25,7 @@ func NewAgentLoop(
cfg *config.Config,
msgBus *bus.MessageBus,
provider providers.LLMProvider,
opts ...AgentLoopOption,
) *AgentLoop {
registry := NewAgentRegistry(cfg, provider)
@@ -56,15 +58,25 @@ func NewAgentLoop(
}
al := &AgentLoop{
bus: msgBus,
cfg: cfg,
registry: registry,
state: stateManager,
eventBus: eventBus,
fallback: fallbackChain,
cmdRegistry: commands.NewRegistry(commands.BuiltinDefinitions()),
steering: newSteeringQueue(parseSteeringMode(cfg.Agents.Defaults.SteeringMode)),
workerSem: make(chan struct{}, workerPoolSize),
bus: msgBus,
cfg: cfg,
registry: registry,
state: stateManager,
eventBus: eventBus,
fallback: fallbackChain,
cmdRegistry: commands.NewRegistry(commands.BuiltinDefinitions()),
steering: newSteeringQueue(parseSteeringMode(cfg.Agents.Defaults.SteeringMode)),
workerSem: make(chan struct{}, workerPoolSize),
ownsRuntimeEvents: true,
}
for _, opt := range opts {
if opt != nil {
opt(al)
}
}
if al.runtimeEvents == nil {
al.runtimeEvents = runtimeevents.NewBus()
al.ownsRuntimeEvents = true
}
al.providerFactory = providers.CreateProviderFromConfig
al.hooks = NewHookManager(eventBus)
+20
View File
@@ -0,0 +1,20 @@
package agent
import runtimeevents "github.com/sipeed/picoclaw/pkg/events"
// AgentLoopOption configures an AgentLoop at construction time.
type AgentLoopOption func(*AgentLoop)
// WithRuntimeEvents injects the runtime event bus used for new observation APIs.
//
// The injected bus is treated as externally owned and will not be closed by
// AgentLoop.Close. Passing nil leaves the default owned runtime bus enabled.
func WithRuntimeEvents(bus runtimeevents.Bus) AgentLoopOption {
return func(al *AgentLoop) {
if bus == nil {
return
}
al.runtimeEvents = bus
al.ownsRuntimeEvents = false
}
}
+48
View File
@@ -0,0 +1,48 @@
package agent
import runtimeevents "github.com/sipeed/picoclaw/pkg/events"
func runtimeKindForAgentEvent(kind EventKind) runtimeevents.Kind {
switch kind {
case EventKindTurnStart:
return runtimeevents.KindAgentTurnStart
case EventKindTurnEnd:
return runtimeevents.KindAgentTurnEnd
case EventKindLLMRequest:
return runtimeevents.KindAgentLLMRequest
case EventKindLLMDelta:
return runtimeevents.KindAgentLLMDelta
case EventKindLLMResponse:
return runtimeevents.KindAgentLLMResponse
case EventKindLLMRetry:
return runtimeevents.KindAgentLLMRetry
case EventKindContextCompress:
return runtimeevents.KindAgentContextCompress
case EventKindSessionSummarize:
return runtimeevents.KindAgentSessionSummarize
case EventKindToolExecStart:
return runtimeevents.KindAgentToolExecStart
case EventKindToolExecEnd:
return runtimeevents.KindAgentToolExecEnd
case EventKindToolExecSkipped:
return runtimeevents.KindAgentToolExecSkipped
case EventKindSteeringInjected:
return runtimeevents.KindAgentSteeringInjected
case EventKindFollowUpQueued:
return runtimeevents.KindAgentFollowUpQueued
case EventKindInterruptReceived:
return runtimeevents.KindAgentInterruptReceived
case EventKindSubTurnSpawn:
return runtimeevents.KindAgentSubTurnSpawn
case EventKindSubTurnEnd:
return runtimeevents.KindAgentSubTurnEnd
case EventKindSubTurnResultDelivered:
return runtimeevents.KindAgentSubTurnResultDelivered
case EventKindSubTurnOrphan:
return runtimeevents.KindAgentSubTurnOrphan
case EventKindError:
return runtimeevents.KindAgentError
default:
return runtimeevents.Kind("agent." + kind.String())
}
}
+125
View File
@@ -9,6 +9,7 @@ import (
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/config"
runtimeevents "github.com/sipeed/picoclaw/pkg/events"
"github.com/sipeed/picoclaw/pkg/providers"
"github.com/sipeed/picoclaw/pkg/routing"
"github.com/sipeed/picoclaw/pkg/session"
@@ -67,6 +68,100 @@ func TestEventBus_DropsWhenSubscriberIsFull(t *testing.T) {
}
}
func TestAgentLoop_DualPublishesRuntimeEvents(t *testing.T) {
runtimeBus := runtimeevents.NewBus()
al := &AgentLoop{
eventBus: NewEventBus(),
runtimeEvents: runtimeBus,
}
defer al.eventBus.Close()
defer func() {
if err := runtimeBus.Close(); err != nil {
t.Errorf("runtime bus close failed: %v", err)
}
}()
legacySub := al.SubscribeEvents(1)
defer al.UnsubscribeEvents(legacySub.ID)
runtimeSub, runtimeCh, err := al.RuntimeEvents().OfKind(runtimeevents.KindAgentToolExecStart).SubscribeChan(
context.Background(),
runtimeevents.SubscribeOptions{Name: "runtime", Buffer: 1},
)
if err != nil {
t.Fatalf("SubscribeChan failed: %v", err)
}
defer func() {
if err := runtimeSub.Close(); err != nil {
t.Errorf("runtime subscription close failed: %v", err)
}
}()
al.emitEvent(
EventKindToolExecStart,
EventMeta{
AgentID: "main",
TurnID: "turn-1",
ParentTurnID: "parent-turn",
SessionKey: "session-1",
Iteration: 2,
TracePath: "trace/root",
Source: "pipeline_execute",
turnContext: &TurnContext{
Inbound: &bus.InboundContext{
Channel: "cli",
Account: "default",
ChatID: "direct",
ChatType: "direct",
SenderID: "tester",
MessageID: "msg-1",
TopicID: "topic-1",
},
},
},
ToolExecStartPayload{Tool: "mock_custom", Arguments: map[string]any{"task": "ping"}},
)
legacyEvt := receiveLegacyEvent(t, legacySub.C)
if legacyEvt.Kind != EventKindToolExecStart {
t.Fatalf("legacy kind = %v, want %v", legacyEvt.Kind, EventKindToolExecStart)
}
runtimeEvt := receiveRuntimeEvent(t, runtimeCh)
if runtimeEvt.Kind != runtimeevents.KindAgentToolExecStart {
t.Fatalf("runtime kind = %q, want %q", runtimeEvt.Kind, runtimeevents.KindAgentToolExecStart)
}
if runtimeEvt.Source != (runtimeevents.Source{Component: "agent", Name: "main"}) {
t.Fatalf("runtime source = %+v", runtimeEvt.Source)
}
if runtimeEvt.Scope.AgentID != "main" ||
runtimeEvt.Scope.SessionKey != "session-1" ||
runtimeEvt.Scope.TurnID != "turn-1" ||
runtimeEvt.Scope.Channel != "cli" ||
runtimeEvt.Scope.Account != "default" ||
runtimeEvt.Scope.ChatID != "direct" ||
runtimeEvt.Scope.TopicID != "topic-1" ||
runtimeEvt.Scope.ChatType != "direct" ||
runtimeEvt.Scope.SenderID != "tester" ||
runtimeEvt.Scope.MessageID != "msg-1" {
t.Fatalf("runtime scope = %+v", runtimeEvt.Scope)
}
if runtimeEvt.Correlation.TraceID != "trace/root" ||
runtimeEvt.Correlation.ParentTurnID != "parent-turn" {
t.Fatalf("runtime correlation = %+v", runtimeEvt.Correlation)
}
if runtimeEvt.Attrs["agent_source"] != "pipeline_execute" || runtimeEvt.Attrs["iteration"] != 2 {
t.Fatalf("runtime attrs = %+v", runtimeEvt.Attrs)
}
payload, ok := runtimeEvt.Payload.(ToolExecStartPayload)
if !ok {
t.Fatalf("runtime payload = %T, want ToolExecStartPayload", runtimeEvt.Payload)
}
if payload.Tool != "mock_custom" {
t.Fatalf("runtime payload tool = %q, want mock_custom", payload.Tool)
}
}
type scriptedToolProvider struct {
calls int
}
@@ -636,6 +731,36 @@ func collectEventStream(ch <-chan Event) []Event {
}
}
func receiveLegacyEvent(t *testing.T, ch <-chan Event) Event {
t.Helper()
select {
case evt, ok := <-ch:
if !ok {
t.Fatal("legacy event stream closed before expected event arrived")
}
return evt
case <-time.After(time.Second):
t.Fatal("timed out waiting for legacy event")
return Event{}
}
}
func receiveRuntimeEvent(t *testing.T, ch <-chan runtimeevents.Event) runtimeevents.Event {
t.Helper()
select {
case evt, ok := <-ch:
if !ok {
t.Fatal("runtime event stream closed before expected event arrived")
}
return evt
case <-time.After(time.Second):
t.Fatal("timed out waiting for runtime event")
return runtimeevents.Event{}
}
}
func waitForEvent(t *testing.T, ch <-chan Event, timeout time.Duration, match func(Event) bool) Event {
t.Helper()
+104
View File
@@ -0,0 +1,104 @@
package agent
import (
"context"
"time"
runtimeevents "github.com/sipeed/picoclaw/pkg/events"
)
const runtimeEventPublishTimeout = 100 * time.Millisecond
func (al *AgentLoop) publishRuntimeEvent(evt Event) {
if al == nil || al.runtimeEvents == nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), runtimeEventPublishTimeout)
defer cancel()
al.runtimeEvents.Publish(ctx, runtimeevents.Event{
Kind: runtimeKindForAgentEvent(evt.Kind),
Source: runtimeevents.Source{Component: "agent", Name: evt.Meta.AgentID},
Scope: runtimeScopeFromAgentEvent(evt),
Correlation: runtimeCorrelationFromAgentEvent(evt),
Severity: runtimeSeverityForAgentEvent(evt),
Payload: evt.Payload,
Attrs: runtimeAttrsFromAgentEvent(evt),
})
}
func runtimeScopeFromAgentEvent(evt Event) runtimeevents.Scope {
scope := runtimeevents.Scope{
AgentID: evt.Meta.AgentID,
SessionKey: evt.Meta.SessionKey,
TurnID: evt.Meta.TurnID,
}
if evt.Context == nil || evt.Context.Inbound == nil {
return scope
}
inbound := evt.Context.Inbound
scope.Channel = inbound.Channel
scope.Account = inbound.Account
scope.ChatID = inbound.ChatID
scope.TopicID = inbound.TopicID
scope.SpaceID = inbound.SpaceID
scope.SpaceType = inbound.SpaceType
scope.ChatType = inbound.ChatType
scope.SenderID = inbound.SenderID
scope.MessageID = inbound.MessageID
return scope
}
func runtimeCorrelationFromAgentEvent(evt Event) runtimeevents.Correlation {
return runtimeevents.Correlation{
TraceID: evt.Meta.TracePath,
ParentTurnID: evt.Meta.ParentTurnID,
}
}
func runtimeSeverityForAgentEvent(evt Event) runtimeevents.Severity {
switch evt.Kind {
case EventKindError, EventKindSubTurnOrphan:
return runtimeevents.SeverityError
case EventKindLLMRetry, EventKindContextCompress, EventKindToolExecSkipped:
return runtimeevents.SeverityWarn
case EventKindTurnEnd:
payload, ok := evt.Payload.(TurnEndPayload)
if !ok {
return runtimeevents.SeverityInfo
}
switch payload.Status {
case TurnEndStatusError:
return runtimeevents.SeverityError
case TurnEndStatusAborted:
return runtimeevents.SeverityWarn
default:
return runtimeevents.SeverityInfo
}
case EventKindToolExecEnd:
payload, ok := evt.Payload.(ToolExecEndPayload)
if ok && payload.IsError {
return runtimeevents.SeverityWarn
}
return runtimeevents.SeverityInfo
default:
return runtimeevents.SeverityInfo
}
}
func runtimeAttrsFromAgentEvent(evt Event) map[string]any {
attrs := make(map[string]any, 2)
if evt.Meta.Source != "" {
attrs["agent_source"] = evt.Meta.Source
}
if evt.Meta.Iteration != 0 {
attrs["iteration"] = evt.Meta.Iteration
}
if len(attrs) == 0 {
return nil
}
return attrs
}