From e613258fa5655a2c5d9eb75cd1006005dc7cdcee Mon Sep 17 00:00:00 2001 From: Hoshina Date: Sun, 26 Apr 2026 17:02:48 +0800 Subject: [PATCH] feat(gateway): publish lifecycle runtime events Emit gateway.start, gateway.ready, and gateway.shutdown on the shared runtime event bus, while keeping reload events on the same helper path. Update subturn architecture docs to refer to runtime event kinds instead of the removed agent EventBus names. Validation: GOCACHE=/tmp/picoclaw-go-cache go test ./pkg/gateway ./pkg/events; GOCACHE=/tmp/picoclaw-go-cache go test ./pkg/bus ./pkg/channels ./pkg/mcp ./pkg/tools/integration ./pkg/events ./pkg/gateway; make lint --- docs/architecture/subturn.md | 16 ++++----- pkg/events/kind.go | 6 ++++ pkg/gateway/events.go | 6 ++-- pkg/gateway/gateway.go | 11 ++++-- pkg/gateway/gateway_test.go | 66 ++++++++++++++++++++++++++++++++++++ 5 files changed, 91 insertions(+), 14 deletions(-) diff --git a/docs/architecture/subturn.md b/docs/architecture/subturn.md index 0a927b56d..31a56902c 100644 --- a/docs/architecture/subturn.md +++ b/docs/architecture/subturn.md @@ -135,16 +135,16 @@ The agent loop polls for async SubTurn results at two points per iteration: All active turns are registered in `AgentLoop.activeTurnStates` (`sync.Map`, keyed by session key). A reservation sentinel is stored atomically via `LoadOrStore` before the worker starts, then replaced with the real `*turnState` when `runTurn` registers. This prevents a TOCTOU race where multiple messages for the same session could spawn concurrent workers. The sentinel is cleaned up by the worker's deferred cleanup. This allows `HardAbort` and `/subagents` observability commands to find and operate on active turns. -## Event Bus Integration +## Runtime Event Integration -SubTurns emit specific events to the PicoClaw `EventBus` for observability and debugging: +SubTurns emit runtime events through `pkg/events` for observability and debugging: | Event Kind | When Emitted | Payload | |:------|:-------------|:--------| -| `subturn_spawn` | Sub-turn successfully initialized | `SubTurnSpawnPayload{AgentID, Label, ParentTurnID}` | -| `subturn_end` | Sub-turn finishes (success or error) | `SubTurnEndPayload{AgentID, Status}` | -| `subturn_result_delivered` | Async result successfully delivered to parent | `SubTurnResultDeliveredPayload{TargetChannel, TargetChatID, ContentLen}` | -| `subturn_orphan` | Result cannot be delivered (parent finished or channel full) | `SubTurnOrphanPayload{ParentTurnID, ChildTurnID, Reason}` | +| `agent.subturn.spawn` | Sub-turn successfully initialized | `SubTurnSpawnPayload{AgentID, Label, ParentTurnID}` | +| `agent.subturn.end` | Sub-turn finishes (success or error) | `SubTurnEndPayload{AgentID, Status}` | +| `agent.subturn.result_delivered` | Async result successfully delivered to parent | `SubTurnResultDeliveredPayload{TargetChannel, TargetChatID, ContentLen}` | +| `agent.subturn.orphan` | Result cannot be delivered (parent finished or channel full) | `SubTurnOrphanPayload{ParentTurnID, ChildTurnID, Reason}` | ## API Reference @@ -240,13 +240,13 @@ An orphan result occurs when: 2. The `pendingResults` channel is full (buffer size: 16) When a result becomes orphan: -- `SubTurnOrphanResultEvent` is emitted to EventBus +- `agent.subturn.orphan` is emitted to the runtime event bus - The result is **NOT** delivered to the LLM context - External systems can listen to this event for custom handling ### Preventing Orphan Results - Use `Critical: true` for important SubTurns that must complete -- Monitor `SubTurnOrphanResultEvent` for observability +- Monitor `agent.subturn.orphan` for observability - Consider the 16-buffer limit when spawning many async SubTurns ## Tool Inheritance diff --git a/pkg/events/kind.go b/pkg/events/kind.go index 9d7319b93..85e61f741 100644 --- a/pkg/events/kind.go +++ b/pkg/events/kind.go @@ -75,6 +75,12 @@ const ( // KindBusCloseDrained is emitted when message bus close drains buffered messages. KindBusCloseDrained Kind = "bus.close.drained" + // KindGatewayStart is emitted when gateway startup reaches runtime bootstrap. + KindGatewayStart Kind = "gateway.start" + // KindGatewayReady is emitted when gateway services are started and ready. + KindGatewayReady Kind = "gateway.ready" + // KindGatewayShutdown is emitted when gateway shutdown starts. + KindGatewayShutdown Kind = "gateway.shutdown" // KindGatewayReloadStarted is emitted when gateway reload starts. KindGatewayReloadStarted Kind = "gateway.reload.started" // KindGatewayReloadCompleted is emitted when gateway reload completes. diff --git a/pkg/gateway/events.go b/pkg/gateway/events.go index d690a4eaf..fd263fba6 100644 --- a/pkg/gateway/events.go +++ b/pkg/gateway/events.go @@ -10,12 +10,12 @@ import ( const gatewayEventPublishTimeout = 100 * time.Millisecond -type gatewayReloadPayload struct { +type gatewayEventPayload struct { DurationMS int64 `json:"duration_ms,omitempty"` Error string `json:"error,omitempty"` } -func publishGatewayReloadEvent( +func publishGatewayEvent( al *agent.AgentLoop, kind runtimeevents.Kind, startedAt time.Time, @@ -26,7 +26,7 @@ func publishGatewayReloadEvent( } severity := runtimeevents.SeverityInfo - payload := gatewayReloadPayload{} + payload := gatewayEventPayload{} if !startedAt.IsZero() { payload.DurationMS = time.Since(startedAt).Milliseconds() } diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 01b6f7d5e..dd579ed7e 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -115,6 +115,7 @@ func (p *startupBlockedProvider) GetDefaultModel() string { // Run starts the gateway runtime using the configuration loaded from configPath. func Run(debug bool, homePath, configPath string, allowEmptyStartup bool) (runErr error) { + startedAt := time.Now() panicPath := filepath.Join(homePath, logPath, panicFile) panicFunc, err := logger.InitPanic(panicPath) if err != nil { @@ -199,6 +200,7 @@ func Run(debug bool, homePath, configPath string, allowEmptyStartup bool) (runEr msgBus := bus.NewMessageBus() agentLoop := agent.NewAgentLoop(cfg, msgBus, provider) msgBus.SetEventPublisher(agentLoop.RuntimeEventBus()) + publishGatewayEvent(agentLoop, runtimeevents.KindGatewayStart, startedAt, nil) fmt.Println("\n📦 Agent Status:") startupInfo := agentLoop.GetStartupInfo() @@ -218,6 +220,7 @@ func Run(debug bool, homePath, configPath string, allowEmptyStartup bool) (runEr if err != nil { return err } + publishGatewayEvent(agentLoop, runtimeevents.KindGatewayReady, startedAt, nil) closeListeners = false // Setup manual reload channel for /reload endpoint @@ -316,14 +319,14 @@ func executeReload( debug bool, ) (err error) { startedAt := time.Now() - publishGatewayReloadEvent(agentLoop, runtimeevents.KindGatewayReloadStarted, startedAt, nil) + publishGatewayEvent(agentLoop, runtimeevents.KindGatewayReloadStarted, startedAt, nil) defer runningServices.reloading.Store(false) defer func() { if err != nil { - publishGatewayReloadEvent(agentLoop, runtimeevents.KindGatewayReloadFailed, startedAt, err) + publishGatewayEvent(agentLoop, runtimeevents.KindGatewayReloadFailed, startedAt, err) return } - publishGatewayReloadEvent(agentLoop, runtimeevents.KindGatewayReloadCompleted, startedAt, nil) + publishGatewayEvent(agentLoop, runtimeevents.KindGatewayReloadCompleted, startedAt, nil) }() err = handleConfigReload(ctx, agentLoop, newCfg, provider, runningServices, msgBus, allowEmptyStartup, debug) @@ -509,6 +512,8 @@ func shutdownGateway( provider providers.LLMProvider, fullShutdown bool, ) { + publishGatewayEvent(agentLoop, runtimeevents.KindGatewayShutdown, time.Time{}, nil) + if cp, ok := provider.(providers.StatefulProvider); ok && fullShutdown { cp.Close() } diff --git a/pkg/gateway/gateway_test.go b/pkg/gateway/gateway_test.go index 60049337f..9af833b6b 100644 --- a/pkg/gateway/gateway_test.go +++ b/pkg/gateway/gateway_test.go @@ -1,14 +1,19 @@ package gateway import ( + "context" "fmt" "os" "os/exec" "path/filepath" "strings" "testing" + "time" + "github.com/sipeed/picoclaw/pkg/agent" + "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/config" + runtimeevents "github.com/sipeed/picoclaw/pkg/events" ) func TestRun_StartupFailuresReturnErrorAndEmitStructuredLog(t *testing.T) { @@ -106,3 +111,64 @@ func TestGatewayRunStartupFailureHelper(t *testing.T) { fmt.Fprintln(os.Stdout, err.Error()) os.Exit(0) } + +func TestPublishGatewayEvent(t *testing.T) { + eventBus := runtimeevents.NewBus() + t.Cleanup(func() { + if err := eventBus.Close(); err != nil { + t.Fatalf("Close runtime event bus: %v", err) + } + }) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + sub, eventsCh, err := eventBus.Channel().OfKind(runtimeevents.KindGatewayStart).SubscribeChan( + ctx, + runtimeevents.SubscribeOptions{Name: "gateway-test", Buffer: 4}, + ) + if err != nil { + t.Fatalf("SubscribeChan() error = %v", err) + } + t.Cleanup(func() { + if err := sub.Close(); err != nil { + t.Fatalf("Close subscription: %v", err) + } + }) + + al := agent.NewAgentLoop( + config.DefaultConfig(), + bus.NewMessageBus(), + &startupBlockedProvider{reason: "not used"}, + agent.WithRuntimeEvents(eventBus), + ) + t.Cleanup(al.Close) + + startedAt := time.Now().Add(-1500 * time.Millisecond) + publishGatewayEvent(al, runtimeevents.KindGatewayStart, startedAt, nil) + + evt := receiveGatewayRuntimeEvent(t, eventsCh) + if evt.Kind != runtimeevents.KindGatewayStart || + evt.Source.Component != "gateway" || + evt.Severity != runtimeevents.SeverityInfo { + t.Fatalf("gateway event = %+v", evt) + } + payload, ok := evt.Payload.(gatewayEventPayload) + if !ok { + t.Fatalf("payload type = %T, want gatewayEventPayload", evt.Payload) + } + if payload.DurationMS <= 0 { + t.Fatalf("DurationMS = %d, want positive", payload.DurationMS) + } +} + +func receiveGatewayRuntimeEvent(t *testing.T, ch <-chan runtimeevents.Event) runtimeevents.Event { + t.Helper() + + select { + case evt := <-ch: + return evt + case <-time.After(time.Second): + t.Fatal("timed out waiting for gateway runtime event") + return runtimeevents.Event{} + } +}