mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
fix(events): keep runtime observers non-blocking
Add a non-blocking runtime publish path and switch hot-path publishers to it. Enforce subscription timeout boundaries, keep ordered subscriber snapshots up to date on subscribe changes, expose all runtime kinds to process hooks, add safe log attrs for non-agent events, and close the gateway message bus on full shutdown.
This commit is contained in:
@@ -1,23 +1,13 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
runtimeevents "github.com/sipeed/picoclaw/pkg/events"
|
||||
)
|
||||
|
||||
const runtimeEventPublishTimeout = 100 * time.Millisecond
|
||||
import runtimeevents "github.com/sipeed/picoclaw/pkg/events"
|
||||
|
||||
func (al *AgentLoop) publishRuntimeEvent(evt runtimeevents.Event) {
|
||||
if al == nil || al.runtimeEvents == nil {
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), runtimeEventPublishTimeout)
|
||||
defer cancel()
|
||||
|
||||
al.runtimeEvents.Publish(ctx, evt)
|
||||
al.runtimeEvents.PublishNonBlocking(evt)
|
||||
}
|
||||
|
||||
func runtimeScopeFromHookMeta(meta HookMeta, eventCtx *TurnContext) runtimeevents.Scope {
|
||||
|
||||
+1
-21
@@ -311,27 +311,7 @@ func processHookObserveKindsFromConfig(observe []string) ([]string, bool, error)
|
||||
}
|
||||
|
||||
func validHookEventKinds() map[string]string {
|
||||
runtimeKinds := []runtimeevents.Kind{
|
||||
runtimeevents.KindAgentTurnStart,
|
||||
runtimeevents.KindAgentTurnEnd,
|
||||
runtimeevents.KindAgentLLMRequest,
|
||||
runtimeevents.KindAgentLLMDelta,
|
||||
runtimeevents.KindAgentLLMResponse,
|
||||
runtimeevents.KindAgentLLMRetry,
|
||||
runtimeevents.KindAgentContextCompress,
|
||||
runtimeevents.KindAgentSessionSummarize,
|
||||
runtimeevents.KindAgentToolExecStart,
|
||||
runtimeevents.KindAgentToolExecEnd,
|
||||
runtimeevents.KindAgentToolExecSkipped,
|
||||
runtimeevents.KindAgentSteeringInjected,
|
||||
runtimeevents.KindAgentFollowUpQueued,
|
||||
runtimeevents.KindAgentInterruptReceived,
|
||||
runtimeevents.KindAgentSubTurnSpawn,
|
||||
runtimeevents.KindAgentSubTurnEnd,
|
||||
runtimeevents.KindAgentSubTurnResultDelivered,
|
||||
runtimeevents.KindAgentSubTurnOrphan,
|
||||
runtimeevents.KindAgentError,
|
||||
}
|
||||
runtimeKinds := runtimeevents.KnownKinds()
|
||||
kinds := make(map[string]string, len(runtimeKinds)*2)
|
||||
for _, kind := range runtimeKinds {
|
||||
kinds[kind.String()] = kind.String()
|
||||
|
||||
@@ -163,6 +163,8 @@ func TestProcessHookObserveKindsFromConfigAcceptsRuntimeNames(t *testing.T) {
|
||||
kinds, enabled, err := processHookObserveKindsFromConfig([]string{
|
||||
"tool_exec_start",
|
||||
"agent.tool.exec_end",
|
||||
"gateway.ready",
|
||||
"mcp.server.failed",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("processHookObserveKindsFromConfig failed: %v", err)
|
||||
@@ -171,7 +173,7 @@ func TestProcessHookObserveKindsFromConfigAcceptsRuntimeNames(t *testing.T) {
|
||||
t.Fatal("expected observe to be enabled")
|
||||
}
|
||||
|
||||
want := []string{"agent.tool.exec_start", "agent.tool.exec_end"}
|
||||
want := []string{"agent.tool.exec_start", "agent.tool.exec_end", "gateway.ready", "mcp.server.failed"}
|
||||
if !slices.Equal(kinds, want) {
|
||||
t.Fatalf("observe kinds = %v, want %v", kinds, want)
|
||||
}
|
||||
|
||||
@@ -102,6 +102,32 @@ func TestRuntimeEventLogFieldsSummarizeAgentPayload(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuntimeEventLogFieldsIncludeSafeAttrs(t *testing.T) {
|
||||
fields := runtimeEventLogFields(runtimeevents.Event{
|
||||
ID: "evt-gateway",
|
||||
Kind: runtimeevents.KindGatewayReady,
|
||||
Severity: runtimeevents.SeverityInfo,
|
||||
Attrs: map[string]any{
|
||||
"duration_ms": 42,
|
||||
"error": "startup failed",
|
||||
"event_kind": "conflict",
|
||||
},
|
||||
})
|
||||
|
||||
if fields["duration_ms"] != 42 || fields["error"] != "startup failed" {
|
||||
t.Fatalf("missing safe attrs: %#v", fields)
|
||||
}
|
||||
if fields["event_kind"] != runtimeevents.KindGatewayReady.String() {
|
||||
t.Fatalf("event_kind overwritten by attrs: %#v", fields)
|
||||
}
|
||||
if fields["attr_event_kind"] != "conflict" {
|
||||
t.Fatalf("conflicting attr not preserved with prefix: %#v", fields)
|
||||
}
|
||||
if _, ok := fields["payload"]; ok {
|
||||
t.Fatalf("raw payload should not be included by runtimeEventLogFields: %#v", fields)
|
||||
}
|
||||
}
|
||||
|
||||
func runtimeEventLoggerStateForTest(
|
||||
al *AgentLoop,
|
||||
) (*runtimeEventLogger, runtimeevents.Subscription) {
|
||||
|
||||
Reference in New Issue
Block a user