diff --git a/docs/architecture/subturn.md b/docs/architecture/subturn.md index 0a927b56d..31a56902c 100644 --- a/docs/architecture/subturn.md +++ b/docs/architecture/subturn.md @@ -135,16 +135,16 @@ The agent loop polls for async SubTurn results at two points per iteration: All active turns are registered in `AgentLoop.activeTurnStates` (`sync.Map`, keyed by session key). A reservation sentinel is stored atomically via `LoadOrStore` before the worker starts, then replaced with the real `*turnState` when `runTurn` registers. This prevents a TOCTOU race where multiple messages for the same session could spawn concurrent workers. The sentinel is cleaned up by the worker's deferred cleanup. This allows `HardAbort` and `/subagents` observability commands to find and operate on active turns. -## Event Bus Integration +## Runtime Event Integration -SubTurns emit specific events to the PicoClaw `EventBus` for observability and debugging: +SubTurns emit runtime events through `pkg/events` for observability and debugging: | Event Kind | When Emitted | Payload | |:------|:-------------|:--------| -| `subturn_spawn` | Sub-turn successfully initialized | `SubTurnSpawnPayload{AgentID, Label, ParentTurnID}` | -| `subturn_end` | Sub-turn finishes (success or error) | `SubTurnEndPayload{AgentID, Status}` | -| `subturn_result_delivered` | Async result successfully delivered to parent | `SubTurnResultDeliveredPayload{TargetChannel, TargetChatID, ContentLen}` | -| `subturn_orphan` | Result cannot be delivered (parent finished or channel full) | `SubTurnOrphanPayload{ParentTurnID, ChildTurnID, Reason}` | +| `agent.subturn.spawn` | Sub-turn successfully initialized | `SubTurnSpawnPayload{AgentID, Label, ParentTurnID}` | +| `agent.subturn.end` | Sub-turn finishes (success or error) | `SubTurnEndPayload{AgentID, Status}` | +| `agent.subturn.result_delivered` | Async result successfully delivered to parent | `SubTurnResultDeliveredPayload{TargetChannel, TargetChatID, ContentLen}` | +| `agent.subturn.orphan` | Result cannot be delivered (parent finished or channel full) | `SubTurnOrphanPayload{ParentTurnID, ChildTurnID, Reason}` | ## API Reference @@ -240,13 +240,13 @@ An orphan result occurs when: 2. The `pendingResults` channel is full (buffer size: 16) When a result becomes orphan: -- `SubTurnOrphanResultEvent` is emitted to EventBus +- `agent.subturn.orphan` is emitted to the runtime event bus - The result is **NOT** delivered to the LLM context - External systems can listen to this event for custom handling ### Preventing Orphan Results - Use `Critical: true` for important SubTurns that must complete -- Monitor `SubTurnOrphanResultEvent` for observability +- Monitor `agent.subturn.orphan` for observability - Consider the 16-buffer limit when spawning many async SubTurns ## Tool Inheritance diff --git a/pkg/events/kind.go b/pkg/events/kind.go index 9d7319b93..85e61f741 100644 --- a/pkg/events/kind.go +++ b/pkg/events/kind.go @@ -75,6 +75,12 @@ const ( // KindBusCloseDrained is emitted when message bus close drains buffered messages. KindBusCloseDrained Kind = "bus.close.drained" + // KindGatewayStart is emitted when gateway startup reaches runtime bootstrap. + KindGatewayStart Kind = "gateway.start" + // KindGatewayReady is emitted when gateway services are started and ready. + KindGatewayReady Kind = "gateway.ready" + // KindGatewayShutdown is emitted when gateway shutdown starts. + KindGatewayShutdown Kind = "gateway.shutdown" // KindGatewayReloadStarted is emitted when gateway reload starts. KindGatewayReloadStarted Kind = "gateway.reload.started" // KindGatewayReloadCompleted is emitted when gateway reload completes. diff --git a/pkg/gateway/events.go b/pkg/gateway/events.go index d690a4eaf..fd263fba6 100644 --- a/pkg/gateway/events.go +++ b/pkg/gateway/events.go @@ -10,12 +10,12 @@ import ( const gatewayEventPublishTimeout = 100 * time.Millisecond -type gatewayReloadPayload struct { +type gatewayEventPayload struct { DurationMS int64 `json:"duration_ms,omitempty"` Error string `json:"error,omitempty"` } -func publishGatewayReloadEvent( +func publishGatewayEvent( al *agent.AgentLoop, kind runtimeevents.Kind, startedAt time.Time, @@ -26,7 +26,7 @@ func publishGatewayReloadEvent( } severity := runtimeevents.SeverityInfo - payload := gatewayReloadPayload{} + payload := gatewayEventPayload{} if !startedAt.IsZero() { payload.DurationMS = time.Since(startedAt).Milliseconds() } diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 01b6f7d5e..dd579ed7e 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -115,6 +115,7 @@ func (p *startupBlockedProvider) GetDefaultModel() string { // Run starts the gateway runtime using the configuration loaded from configPath. func Run(debug bool, homePath, configPath string, allowEmptyStartup bool) (runErr error) { + startedAt := time.Now() panicPath := filepath.Join(homePath, logPath, panicFile) panicFunc, err := logger.InitPanic(panicPath) if err != nil { @@ -199,6 +200,7 @@ func Run(debug bool, homePath, configPath string, allowEmptyStartup bool) (runEr msgBus := bus.NewMessageBus() agentLoop := agent.NewAgentLoop(cfg, msgBus, provider) msgBus.SetEventPublisher(agentLoop.RuntimeEventBus()) + publishGatewayEvent(agentLoop, runtimeevents.KindGatewayStart, startedAt, nil) fmt.Println("\n📦 Agent Status:") startupInfo := agentLoop.GetStartupInfo() @@ -218,6 +220,7 @@ func Run(debug bool, homePath, configPath string, allowEmptyStartup bool) (runEr if err != nil { return err } + publishGatewayEvent(agentLoop, runtimeevents.KindGatewayReady, startedAt, nil) closeListeners = false // Setup manual reload channel for /reload endpoint @@ -316,14 +319,14 @@ func executeReload( debug bool, ) (err error) { startedAt := time.Now() - publishGatewayReloadEvent(agentLoop, runtimeevents.KindGatewayReloadStarted, startedAt, nil) + publishGatewayEvent(agentLoop, runtimeevents.KindGatewayReloadStarted, startedAt, nil) defer runningServices.reloading.Store(false) defer func() { if err != nil { - publishGatewayReloadEvent(agentLoop, runtimeevents.KindGatewayReloadFailed, startedAt, err) + publishGatewayEvent(agentLoop, runtimeevents.KindGatewayReloadFailed, startedAt, err) return } - publishGatewayReloadEvent(agentLoop, runtimeevents.KindGatewayReloadCompleted, startedAt, nil) + publishGatewayEvent(agentLoop, runtimeevents.KindGatewayReloadCompleted, startedAt, nil) }() err = handleConfigReload(ctx, agentLoop, newCfg, provider, runningServices, msgBus, allowEmptyStartup, debug) @@ -509,6 +512,8 @@ func shutdownGateway( provider providers.LLMProvider, fullShutdown bool, ) { + publishGatewayEvent(agentLoop, runtimeevents.KindGatewayShutdown, time.Time{}, nil) + if cp, ok := provider.(providers.StatefulProvider); ok && fullShutdown { cp.Close() } diff --git a/pkg/gateway/gateway_test.go b/pkg/gateway/gateway_test.go index 60049337f..9af833b6b 100644 --- a/pkg/gateway/gateway_test.go +++ b/pkg/gateway/gateway_test.go @@ -1,14 +1,19 @@ package gateway import ( + "context" "fmt" "os" "os/exec" "path/filepath" "strings" "testing" + "time" + "github.com/sipeed/picoclaw/pkg/agent" + "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/config" + runtimeevents "github.com/sipeed/picoclaw/pkg/events" ) func TestRun_StartupFailuresReturnErrorAndEmitStructuredLog(t *testing.T) { @@ -106,3 +111,64 @@ func TestGatewayRunStartupFailureHelper(t *testing.T) { fmt.Fprintln(os.Stdout, err.Error()) os.Exit(0) } + +func TestPublishGatewayEvent(t *testing.T) { + eventBus := runtimeevents.NewBus() + t.Cleanup(func() { + if err := eventBus.Close(); err != nil { + t.Fatalf("Close runtime event bus: %v", err) + } + }) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + sub, eventsCh, err := eventBus.Channel().OfKind(runtimeevents.KindGatewayStart).SubscribeChan( + ctx, + runtimeevents.SubscribeOptions{Name: "gateway-test", Buffer: 4}, + ) + if err != nil { + t.Fatalf("SubscribeChan() error = %v", err) + } + t.Cleanup(func() { + if err := sub.Close(); err != nil { + t.Fatalf("Close subscription: %v", err) + } + }) + + al := agent.NewAgentLoop( + config.DefaultConfig(), + bus.NewMessageBus(), + &startupBlockedProvider{reason: "not used"}, + agent.WithRuntimeEvents(eventBus), + ) + t.Cleanup(al.Close) + + startedAt := time.Now().Add(-1500 * time.Millisecond) + publishGatewayEvent(al, runtimeevents.KindGatewayStart, startedAt, nil) + + evt := receiveGatewayRuntimeEvent(t, eventsCh) + if evt.Kind != runtimeevents.KindGatewayStart || + evt.Source.Component != "gateway" || + evt.Severity != runtimeevents.SeverityInfo { + t.Fatalf("gateway event = %+v", evt) + } + payload, ok := evt.Payload.(gatewayEventPayload) + if !ok { + t.Fatalf("payload type = %T, want gatewayEventPayload", evt.Payload) + } + if payload.DurationMS <= 0 { + t.Fatalf("DurationMS = %d, want positive", payload.DurationMS) + } +} + +func receiveGatewayRuntimeEvent(t *testing.T, ch <-chan runtimeevents.Event) runtimeevents.Event { + t.Helper() + + select { + case evt := <-ch: + return evt + case <-time.After(time.Second): + t.Fatal("timed out waiting for gateway runtime event") + return runtimeevents.Event{} + } +}