From 795ee362ead99924e600618c6509b980494c698e Mon Sep 17 00:00:00 2001 From: Hoshina Date: Sun, 26 Apr 2026 16:55:02 +0800 Subject: [PATCH] refactor(events): emit agent runtime events directly Remove the legacy EventKind/Event envelope mapping and let agent event emission build pkg/events.Event values directly. Keep HookMeta as the shared hook metadata shape and preserve legacy observe string aliases by mapping them to runtime event kinds. Validation: GOCACHE=/tmp/picoclaw-go-cache go test ./pkg/agent; make lint --- docs/architecture/hooks/README.md | 2 +- docs/architecture/hooks/README.zh.md | 2 +- docs/design/hook-system-design.zh.md | 15 ++-- pkg/agent/agent_event.go | 44 ++++++------ pkg/agent/context_legacy.go | 5 +- pkg/agent/event_kind_runtime.go | 48 ------------- pkg/agent/eventbus_test.go | 4 +- pkg/agent/events.go | 102 +-------------------------- pkg/agent/events_runtime.go | 58 +++++++-------- pkg/agent/hook_mount.go | 51 +++++++++++--- pkg/agent/hooks.go | 20 +++--- pkg/agent/pipeline_execute.go | 19 ++--- pkg/agent/pipeline_finalize.go | 3 +- pkg/agent/pipeline_llm.go | 13 ++-- pkg/agent/steering.go | 9 +-- pkg/agent/subturn.go | 13 ++-- pkg/agent/turn_context.go | 2 +- pkg/agent/turn_coord.go | 17 ++--- pkg/agent/turn_state.go | 4 +- 19 files changed, 162 insertions(+), 269 deletions(-) delete mode 100644 pkg/agent/event_kind_runtime.go diff --git a/docs/architecture/hooks/README.md b/docs/architecture/hooks/README.md index d9d5ece8f..06f1a2c07 100644 --- a/docs/architecture/hooks/README.md +++ b/docs/architecture/hooks/README.md @@ -13,7 +13,7 @@ The repository no longer ships standalone example source files. The Go and Pytho | Type | Interface | Stage | Can modify data | | --- | --- | --- | --- | -| Observer | `RuntimeEventObserver` / `EventObserver` | Runtime event bus broadcast | No | +| Observer | `RuntimeEventObserver` | Runtime event bus broadcast | No | | LLM interceptor | `LLMInterceptor` | `before_llm` / `after_llm` | Yes | | Tool interceptor | `ToolInterceptor` | `before_tool` / `after_tool` | Yes | | Tool approver | `ToolApprover` | `approve_tool` | No, returns allow/deny | diff --git a/docs/architecture/hooks/README.zh.md b/docs/architecture/hooks/README.zh.md index b9d45b386..1fff40832 100644 --- a/docs/architecture/hooks/README.zh.md +++ b/docs/architecture/hooks/README.zh.md @@ -13,7 +13,7 @@ | 类型 | 接口 | 作用阶段 | 能否改写 | | --- | --- | --- | --- | -| 观察型 | `RuntimeEventObserver` / `EventObserver` | runtime event bus 广播事件时 | 否 | +| 观察型 | `RuntimeEventObserver` | runtime event bus 广播事件时 | 否 | | LLM 拦截型 | `LLMInterceptor` | `before_llm` / `after_llm` | 是 | | Tool 拦截型 | `ToolInterceptor` | `before_tool` / `after_tool` | 是 | | Tool 审批型 | `ToolApprover` | `approve_tool` | 否,返回批准/拒绝 | diff --git a/docs/design/hook-system-design.zh.md b/docs/design/hook-system-design.zh.md index 8f815934a..b654112bf 100644 --- a/docs/design/hook-system-design.zh.md +++ b/docs/design/hook-system-design.zh.md @@ -2,7 +2,7 @@ > 当前状态:本文是 hook 系统的早期设计记录。事件系统升级后,观察型 hook 的主路径已经切到 > `pkg/events.Event`、`RuntimeEventObserver` 和进程 hook 的 `hook.runtime_event`。 -> 文中提到的 `agent.Event`、`EventKind`、`hook.event` 只代表迁移期兼容层,不应作为新代码接口。 +> 旧 `agent.Event`、`EventKind`、`hook.event` 兼容层已经删除。 ## 背景 @@ -58,19 +58,16 @@ pi-mono 的核心思路更接近当前分支: - `pkg/events` 定义 runtime event envelope、kind、scope、source、severity 和 fan-out bus - `pkg/agent/event_payloads.go` 保留 agent domain payload -- `pkg/agent/eventbus.go` 只作为迁移期兼容层存在 +- agent domain payload 保留在 `pkg/agent/event_payloads.go` - `pkg/agent/loop.go` 中的 `runTurn()` 已在 turn、llm、tool、interrupt、follow-up、summary 等节点发射事件 - `pkg/agent/steering.go` 已支持 steering、graceful interrupt、hard abort - `pkg/agent/turn.go` 已维护 turn phase、恢复点、active turn、abort 状态 ### 现有缺口 -当前分支还缺四件事: - -- 没有 HookManager,只有旧 agent EventBus -- 没有 Before/After LLM、Before/After Tool 这种同步拦截点 -- 没有审批型 hook -- 子 agent 仍走 `pkg/tools/SubagentManager + RunToolLoop`,没有接入 `pkg/agent` 的 turn tree 和事件流 +早期设计时的缺口包括 HookManager、Before/After LLM、Before/After Tool、审批型 hook +以及 sub-turn 接入。当前实现已经覆盖主 turn 的 HookManager、LLM/Tool 拦截和审批; +sub-turn 事件已接入 runtime event bus。 ### 一个关键现实 @@ -122,7 +119,7 @@ type EventObserver interface { } ``` -这类 hook 直接订阅 runtime event bus 即可。旧 `OnEvent(ctx, agent.Event)` 仅用于迁移期兼容。 +这类 hook 直接订阅 runtime event bus 即可。 适用场景: diff --git a/pkg/agent/agent_event.go b/pkg/agent/agent_event.go index 5b9acfdb0..d4174ced5 100644 --- a/pkg/agent/agent_event.go +++ b/pkg/agent/agent_event.go @@ -19,8 +19,8 @@ func (al *AgentLoop) newTurnEventScope(agentID, sessionKey string, turnCtx *Turn } } -func (ts turnEventScope) meta(iteration int, source, tracePath string) EventMeta { - return EventMeta{ +func (ts turnEventScope) meta(iteration int, source, tracePath string) HookMeta { + return HookMeta{ AgentID: ts.agentID, TurnID: ts.turnID, SessionKey: ts.sessionKey, @@ -31,41 +31,45 @@ func (ts turnEventScope) meta(iteration int, source, tracePath string) EventMeta } } -func (al *AgentLoop) emitEvent(kind EventKind, meta EventMeta, payload any) { - clonedMeta := cloneEventMeta(meta) - evt := Event{ - Kind: kind, - Meta: clonedMeta, - Context: cloneTurnContext(clonedMeta.turnContext), - Payload: payload, +func (al *AgentLoop) emitEvent(kind runtimeevents.Kind, meta HookMeta, payload any) { + clonedMeta := cloneHookMeta(meta) + eventCtx := cloneTurnContext(clonedMeta.turnContext) + evt := runtimeevents.Event{ + Kind: kind, + Source: runtimeevents.Source{Component: "agent", Name: clonedMeta.AgentID}, + Scope: runtimeScopeFromHookMeta(clonedMeta, eventCtx), + Correlation: runtimeCorrelationFromHookMeta(clonedMeta), + Severity: runtimeSeverityForAgentEvent(kind, payload), + Payload: payload, + Attrs: runtimeAttrsFromHookMeta(clonedMeta), } if al == nil { return } - al.logEvent(evt) + al.logEvent(evt, clonedMeta, eventCtx) al.publishRuntimeEvent(evt) } -func (al *AgentLoop) logEvent(evt Event) { +func (al *AgentLoop) logEvent(evt runtimeevents.Event, meta HookMeta, eventCtx *TurnContext) { fields := map[string]any{ "event_kind": evt.Kind.String(), - "agent_id": evt.Meta.AgentID, - "turn_id": evt.Meta.TurnID, - "session_key": evt.Meta.SessionKey, - "iteration": evt.Meta.Iteration, + "agent_id": meta.AgentID, + "turn_id": meta.TurnID, + "session_key": meta.SessionKey, + "iteration": meta.Iteration, } - if evt.Meta.TracePath != "" { - fields["trace"] = evt.Meta.TracePath + if meta.TracePath != "" { + fields["trace"] = meta.TracePath } - if evt.Meta.Source != "" { - fields["source"] = evt.Meta.Source + if meta.Source != "" { + fields["source"] = meta.Source } - appendEventContextFields(fields, evt.Context) + appendEventContextFields(fields, eventCtx) switch payload := evt.Payload.(type) { case TurnStartPayload: diff --git a/pkg/agent/context_legacy.go b/pkg/agent/context_legacy.go index 5644571fb..94ef5367d 100644 --- a/pkg/agent/context_legacy.go +++ b/pkg/agent/context_legacy.go @@ -7,6 +7,7 @@ import ( "sync" "time" + runtimeevents "github.com/sipeed/picoclaw/pkg/events" "github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/providers" ) @@ -41,7 +42,7 @@ func (m *legacyContextManager) Compact(_ context.Context, req *CompactRequest) e // Sync emergency compression — budget exceeded. if result, ok := m.forceCompression(req.SessionKey); ok { m.al.emitEvent( - EventKindContextCompress, + runtimeevents.KindAgentContextCompress, m.al.newTurnEventScope("", req.SessionKey, nil).meta(0, "forceCompression", "turn.context.compress"), ContextCompressPayload{ Reason: req.Reason, @@ -246,7 +247,7 @@ func (m *legacyContextManager) summarizeSession(agent *AgentInstance, sessionKey agent.Sessions.TruncateHistory(sessionKey, keepCount) agent.Sessions.Save(sessionKey) m.al.emitEvent( - EventKindSessionSummarize, + runtimeevents.KindAgentSessionSummarize, m.al.newTurnEventScope(agent.ID, sessionKey, nil).meta(0, "summarizeSession", "turn.session.summarize"), SessionSummarizePayload{ SummarizedMessages: len(validMessages), diff --git a/pkg/agent/event_kind_runtime.go b/pkg/agent/event_kind_runtime.go deleted file mode 100644 index 031f3b35e..000000000 --- a/pkg/agent/event_kind_runtime.go +++ /dev/null @@ -1,48 +0,0 @@ -package agent - -import runtimeevents "github.com/sipeed/picoclaw/pkg/events" - -func runtimeKindForAgentEvent(kind EventKind) runtimeevents.Kind { - switch kind { - case EventKindTurnStart: - return runtimeevents.KindAgentTurnStart - case EventKindTurnEnd: - return runtimeevents.KindAgentTurnEnd - case EventKindLLMRequest: - return runtimeevents.KindAgentLLMRequest - case EventKindLLMDelta: - return runtimeevents.KindAgentLLMDelta - case EventKindLLMResponse: - return runtimeevents.KindAgentLLMResponse - case EventKindLLMRetry: - return runtimeevents.KindAgentLLMRetry - case EventKindContextCompress: - return runtimeevents.KindAgentContextCompress - case EventKindSessionSummarize: - return runtimeevents.KindAgentSessionSummarize - case EventKindToolExecStart: - return runtimeevents.KindAgentToolExecStart - case EventKindToolExecEnd: - return runtimeevents.KindAgentToolExecEnd - case EventKindToolExecSkipped: - return runtimeevents.KindAgentToolExecSkipped - case EventKindSteeringInjected: - return runtimeevents.KindAgentSteeringInjected - case EventKindFollowUpQueued: - return runtimeevents.KindAgentFollowUpQueued - case EventKindInterruptReceived: - return runtimeevents.KindAgentInterruptReceived - case EventKindSubTurnSpawn: - return runtimeevents.KindAgentSubTurnSpawn - case EventKindSubTurnEnd: - return runtimeevents.KindAgentSubTurnEnd - case EventKindSubTurnResultDelivered: - return runtimeevents.KindAgentSubTurnResultDelivered - case EventKindSubTurnOrphan: - return runtimeevents.KindAgentSubTurnOrphan - case EventKindError: - return runtimeevents.KindAgentError - default: - return runtimeevents.Kind("agent." + kind.String()) - } -} diff --git a/pkg/agent/eventbus_test.go b/pkg/agent/eventbus_test.go index b7678e87b..86d7f4afa 100644 --- a/pkg/agent/eventbus_test.go +++ b/pkg/agent/eventbus_test.go @@ -41,8 +41,8 @@ func TestAgentLoop_PublishesRuntimeEvents(t *testing.T) { }() al.emitEvent( - EventKindToolExecStart, - EventMeta{ + runtimeevents.KindAgentToolExecStart, + HookMeta{ AgentID: "main", TurnID: "turn-1", ParentTurnID: "parent-turn", diff --git a/pkg/agent/events.go b/pkg/agent/events.go index 6b5603284..0dd861f43 100644 --- a/pkg/agent/events.go +++ b/pkg/agent/events.go @@ -1,104 +1,8 @@ package agent -import ( - "fmt" - "time" -) - -// EventKind identifies a structured agent-loop event. -// -// Deprecated: use github.com/sipeed/picoclaw/pkg/events.Kind for new runtime -// event consumers. This legacy kind exists only during the runtime event -// migration window. -type EventKind uint8 - -const ( - // EventKindTurnStart is emitted when a turn begins processing. - EventKindTurnStart EventKind = iota - // EventKindTurnEnd is emitted when a turn finishes, successfully or with an error. - EventKindTurnEnd - // EventKindLLMRequest is emitted before a provider chat request is made. - EventKindLLMRequest - // EventKindLLMDelta is emitted when a streaming provider yields a partial delta. - EventKindLLMDelta - // EventKindLLMResponse is emitted after a provider chat response is received. - EventKindLLMResponse - // EventKindLLMRetry is emitted when an LLM request is retried. - EventKindLLMRetry - // EventKindContextCompress is emitted when session history is forcibly compressed. - EventKindContextCompress - // EventKindSessionSummarize is emitted when asynchronous summarization completes. - EventKindSessionSummarize - // EventKindToolExecStart is emitted immediately before a tool executes. - EventKindToolExecStart - // EventKindToolExecEnd is emitted immediately after a tool finishes executing. - EventKindToolExecEnd - // EventKindToolExecSkipped is emitted when a queued tool call is skipped. - EventKindToolExecSkipped - // EventKindSteeringInjected is emitted when queued steering is injected into context. - EventKindSteeringInjected - // EventKindFollowUpQueued is emitted when an async tool queues a follow-up system message. - EventKindFollowUpQueued - // EventKindInterruptReceived is emitted when a soft interrupt message is accepted. - EventKindInterruptReceived - // EventKindSubTurnSpawn is emitted when a sub-turn is spawned. - EventKindSubTurnSpawn - // EventKindSubTurnEnd is emitted when a sub-turn finishes. - EventKindSubTurnEnd - // EventKindSubTurnResultDelivered is emitted when a sub-turn result is delivered. - EventKindSubTurnResultDelivered - // EventKindSubTurnOrphan is emitted when a sub-turn result cannot be delivered. - EventKindSubTurnOrphan - // EventKindError is emitted when a turn encounters an execution error. - EventKindError - - eventKindCount -) - -var eventKindNames = [...]string{ - "turn_start", - "turn_end", - "llm_request", - "llm_delta", - "llm_response", - "llm_retry", - "context_compress", - "session_summarize", - "tool_exec_start", - "tool_exec_end", - "tool_exec_skipped", - "steering_injected", - "follow_up_queued", - "interrupt_received", - "subturn_spawn", - "subturn_end", - "subturn_result_delivered", - "subturn_orphan", - "error", -} - -// String returns the stable string form of an EventKind. -func (k EventKind) String() string { - if k >= eventKindCount { - return fmt.Sprintf("event_kind(%d)", k) - } - return eventKindNames[k] -} - -// Event is the structured envelope broadcast by the agent EventBus. -// -// Deprecated: use github.com/sipeed/picoclaw/pkg/events.Event for new -// observation code. Agent payload types remain supported. -type Event struct { - Kind EventKind - Time time.Time - Meta EventMeta - Context *TurnContext - Payload any -} - -// EventMeta contains correlation fields shared by all agent-loop events. -type EventMeta struct { +// HookMeta contains correlation fields shared by agent hook requests and +// runtime events emitted from turn processing. +type HookMeta struct { AgentID string TurnID string ParentTurnID string diff --git a/pkg/agent/events_runtime.go b/pkg/agent/events_runtime.go index 822c7e23a..b530f6161 100644 --- a/pkg/agent/events_runtime.go +++ b/pkg/agent/events_runtime.go @@ -9,7 +9,7 @@ import ( const runtimeEventPublishTimeout = 100 * time.Millisecond -func (al *AgentLoop) publishRuntimeEvent(evt Event) { +func (al *AgentLoop) publishRuntimeEvent(evt runtimeevents.Event) { if al == nil || al.runtimeEvents == nil { return } @@ -17,29 +17,21 @@ func (al *AgentLoop) publishRuntimeEvent(evt Event) { ctx, cancel := context.WithTimeout(context.Background(), runtimeEventPublishTimeout) defer cancel() - al.runtimeEvents.Publish(ctx, runtimeevents.Event{ - Kind: runtimeKindForAgentEvent(evt.Kind), - Source: runtimeevents.Source{Component: "agent", Name: evt.Meta.AgentID}, - Scope: runtimeScopeFromAgentEvent(evt), - Correlation: runtimeCorrelationFromAgentEvent(evt), - Severity: runtimeSeverityForAgentEvent(evt), - Payload: evt.Payload, - Attrs: runtimeAttrsFromAgentEvent(evt), - }) + al.runtimeEvents.Publish(ctx, evt) } -func runtimeScopeFromAgentEvent(evt Event) runtimeevents.Scope { +func runtimeScopeFromHookMeta(meta HookMeta, eventCtx *TurnContext) runtimeevents.Scope { scope := runtimeevents.Scope{ - AgentID: evt.Meta.AgentID, - SessionKey: evt.Meta.SessionKey, - TurnID: evt.Meta.TurnID, + AgentID: meta.AgentID, + SessionKey: meta.SessionKey, + TurnID: meta.TurnID, } - if evt.Context == nil || evt.Context.Inbound == nil { + if eventCtx == nil || eventCtx.Inbound == nil { return scope } - inbound := evt.Context.Inbound + inbound := eventCtx.Inbound scope.Channel = inbound.Channel scope.Account = inbound.Account scope.ChatID = inbound.ChatID @@ -52,21 +44,23 @@ func runtimeScopeFromAgentEvent(evt Event) runtimeevents.Scope { return scope } -func runtimeCorrelationFromAgentEvent(evt Event) runtimeevents.Correlation { +func runtimeCorrelationFromHookMeta(meta HookMeta) runtimeevents.Correlation { return runtimeevents.Correlation{ - TraceID: evt.Meta.TracePath, - ParentTurnID: evt.Meta.ParentTurnID, + TraceID: meta.TracePath, + ParentTurnID: meta.ParentTurnID, } } -func runtimeSeverityForAgentEvent(evt Event) runtimeevents.Severity { - switch evt.Kind { - case EventKindError, EventKindSubTurnOrphan: +func runtimeSeverityForAgentEvent(kind runtimeevents.Kind, payload any) runtimeevents.Severity { + switch kind { + case runtimeevents.KindAgentError, runtimeevents.KindAgentSubTurnOrphan: return runtimeevents.SeverityError - case EventKindLLMRetry, EventKindContextCompress, EventKindToolExecSkipped: + case runtimeevents.KindAgentLLMRetry, + runtimeevents.KindAgentContextCompress, + runtimeevents.KindAgentToolExecSkipped: return runtimeevents.SeverityWarn - case EventKindTurnEnd: - payload, ok := evt.Payload.(TurnEndPayload) + case runtimeevents.KindAgentTurnEnd: + payload, ok := payload.(TurnEndPayload) if !ok { return runtimeevents.SeverityInfo } @@ -78,8 +72,8 @@ func runtimeSeverityForAgentEvent(evt Event) runtimeevents.Severity { default: return runtimeevents.SeverityInfo } - case EventKindToolExecEnd: - payload, ok := evt.Payload.(ToolExecEndPayload) + case runtimeevents.KindAgentToolExecEnd: + payload, ok := payload.(ToolExecEndPayload) if ok && payload.IsError { return runtimeevents.SeverityWarn } @@ -89,13 +83,13 @@ func runtimeSeverityForAgentEvent(evt Event) runtimeevents.Severity { } } -func runtimeAttrsFromAgentEvent(evt Event) map[string]any { +func runtimeAttrsFromHookMeta(meta HookMeta) map[string]any { attrs := make(map[string]any, 2) - if evt.Meta.Source != "" { - attrs["agent_source"] = evt.Meta.Source + if meta.Source != "" { + attrs["agent_source"] = meta.Source } - if evt.Meta.Iteration != 0 { - attrs["iteration"] = evt.Meta.Iteration + if meta.Iteration != 0 { + attrs["iteration"] = meta.Iteration } if len(attrs) == 0 { return nil diff --git a/pkg/agent/hook_mount.go b/pkg/agent/hook_mount.go index dc002a508..409d56b32 100644 --- a/pkg/agent/hook_mount.go +++ b/pkg/agent/hook_mount.go @@ -311,14 +311,49 @@ func processHookObserveKindsFromConfig(observe []string) ([]string, bool, error) } func validHookEventKinds() map[string]string { - kinds := make(map[string]string, int(eventKindCount)*2) - for kind := EventKind(0); kind < eventKindCount; kind++ { - runtimeKind := runtimeKindForAgentEvent(kind).String() - kinds[kind.String()] = runtimeKind - kinds[runtimeKind] = runtimeKind + runtimeKinds := []runtimeevents.Kind{ + runtimeevents.KindAgentTurnStart, + runtimeevents.KindAgentTurnEnd, + runtimeevents.KindAgentLLMRequest, + runtimeevents.KindAgentLLMDelta, + runtimeevents.KindAgentLLMResponse, + runtimeevents.KindAgentLLMRetry, + runtimeevents.KindAgentContextCompress, + runtimeevents.KindAgentSessionSummarize, + runtimeevents.KindAgentToolExecStart, + runtimeevents.KindAgentToolExecEnd, + runtimeevents.KindAgentToolExecSkipped, + runtimeevents.KindAgentSteeringInjected, + runtimeevents.KindAgentFollowUpQueued, + runtimeevents.KindAgentInterruptReceived, + runtimeevents.KindAgentSubTurnSpawn, + runtimeevents.KindAgentSubTurnEnd, + runtimeevents.KindAgentSubTurnResultDelivered, + runtimeevents.KindAgentSubTurnOrphan, + runtimeevents.KindAgentError, } - kinds[runtimeevents.KindAgentToolExecStart.String()] = runtimeevents.KindAgentToolExecStart.String() - kinds[runtimeevents.KindAgentToolExecEnd.String()] = runtimeevents.KindAgentToolExecEnd.String() - kinds[runtimeevents.KindAgentToolExecSkipped.String()] = runtimeevents.KindAgentToolExecSkipped.String() + kinds := make(map[string]string, len(runtimeKinds)*2) + for _, kind := range runtimeKinds { + kinds[kind.String()] = kind.String() + } + kinds["turn_start"] = runtimeevents.KindAgentTurnStart.String() + kinds["turn_end"] = runtimeevents.KindAgentTurnEnd.String() + kinds["llm_request"] = runtimeevents.KindAgentLLMRequest.String() + kinds["llm_delta"] = runtimeevents.KindAgentLLMDelta.String() + kinds["llm_response"] = runtimeevents.KindAgentLLMResponse.String() + kinds["llm_retry"] = runtimeevents.KindAgentLLMRetry.String() + kinds["context_compress"] = runtimeevents.KindAgentContextCompress.String() + kinds["session_summarize"] = runtimeevents.KindAgentSessionSummarize.String() + kinds["tool_exec_start"] = runtimeevents.KindAgentToolExecStart.String() + kinds["tool_exec_end"] = runtimeevents.KindAgentToolExecEnd.String() + kinds["tool_exec_skipped"] = runtimeevents.KindAgentToolExecSkipped.String() + kinds["steering_injected"] = runtimeevents.KindAgentSteeringInjected.String() + kinds["follow_up_queued"] = runtimeevents.KindAgentFollowUpQueued.String() + kinds["interrupt_received"] = runtimeevents.KindAgentInterruptReceived.String() + kinds["subturn_spawn"] = runtimeevents.KindAgentSubTurnSpawn.String() + kinds["subturn_end"] = runtimeevents.KindAgentSubTurnEnd.String() + kinds["subturn_result_delivered"] = runtimeevents.KindAgentSubTurnResultDelivered.String() + kinds["subturn_orphan"] = runtimeevents.KindAgentSubTurnOrphan.String() + kinds["error"] = runtimeevents.KindAgentError.String() return kinds } diff --git a/pkg/agent/hooks.go b/pkg/agent/hooks.go index 79b72017d..a4f0fac82 100644 --- a/pkg/agent/hooks.go +++ b/pkg/agent/hooks.go @@ -91,7 +91,7 @@ type ToolApprover interface { } type LLMHookRequest struct { - Meta EventMeta `json:"meta"` + Meta HookMeta `json:"meta"` Context *TurnContext `json:"context,omitempty"` Model string `json:"model"` Messages []providers.Message `json:"messages,omitempty"` @@ -105,7 +105,7 @@ func (r *LLMHookRequest) Clone() *LLMHookRequest { return nil } cloned := *r - cloned.Meta = cloneEventMeta(r.Meta) + cloned.Meta = cloneHookMeta(r.Meta) cloned.Context = cloneTurnContext(r.Context) cloned.Messages = cloneProviderMessages(r.Messages) cloned.Tools = cloneToolDefinitions(r.Tools) @@ -114,7 +114,7 @@ func (r *LLMHookRequest) Clone() *LLMHookRequest { } type LLMHookResponse struct { - Meta EventMeta `json:"meta"` + Meta HookMeta `json:"meta"` Context *TurnContext `json:"context,omitempty"` Model string `json:"model"` Response *providers.LLMResponse `json:"response,omitempty"` @@ -125,14 +125,14 @@ func (r *LLMHookResponse) Clone() *LLMHookResponse { return nil } cloned := *r - cloned.Meta = cloneEventMeta(r.Meta) + cloned.Meta = cloneHookMeta(r.Meta) cloned.Context = cloneTurnContext(r.Context) cloned.Response = cloneLLMResponse(r.Response) return &cloned } type ToolCallHookRequest struct { - Meta EventMeta `json:"meta"` + Meta HookMeta `json:"meta"` Context *TurnContext `json:"context,omitempty"` Tool string `json:"tool"` Arguments map[string]any `json:"arguments,omitempty"` @@ -146,7 +146,7 @@ func (r *ToolCallHookRequest) Clone() *ToolCallHookRequest { return nil } cloned := *r - cloned.Meta = cloneEventMeta(r.Meta) + cloned.Meta = cloneHookMeta(r.Meta) cloned.Context = cloneTurnContext(r.Context) cloned.Arguments = cloneStringAnyMap(r.Arguments) cloned.HookResult = cloneToolResult(r.HookResult) @@ -154,7 +154,7 @@ func (r *ToolCallHookRequest) Clone() *ToolCallHookRequest { } type ToolApprovalRequest struct { - Meta EventMeta `json:"meta"` + Meta HookMeta `json:"meta"` Context *TurnContext `json:"context,omitempty"` Tool string `json:"tool"` Arguments map[string]any `json:"arguments,omitempty"` @@ -165,14 +165,14 @@ func (r *ToolApprovalRequest) Clone() *ToolApprovalRequest { return nil } cloned := *r - cloned.Meta = cloneEventMeta(r.Meta) + cloned.Meta = cloneHookMeta(r.Meta) cloned.Context = cloneTurnContext(r.Context) cloned.Arguments = cloneStringAnyMap(r.Arguments) return &cloned } type ToolResultHookResponse struct { - Meta EventMeta `json:"meta"` + Meta HookMeta `json:"meta"` Context *TurnContext `json:"context,omitempty"` Tool string `json:"tool"` Arguments map[string]any `json:"arguments,omitempty"` @@ -185,7 +185,7 @@ func (r *ToolResultHookResponse) Clone() *ToolResultHookResponse { return nil } cloned := *r - cloned.Meta = cloneEventMeta(r.Meta) + cloned.Meta = cloneHookMeta(r.Meta) cloned.Context = cloneTurnContext(r.Context) cloned.Arguments = cloneStringAnyMap(r.Arguments) cloned.Result = cloneToolResult(r.Result) diff --git a/pkg/agent/pipeline_execute.go b/pkg/agent/pipeline_execute.go index 0cf3eaa9a..1f8430edd 100644 --- a/pkg/agent/pipeline_execute.go +++ b/pkg/agent/pipeline_execute.go @@ -10,6 +10,7 @@ import ( "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/constants" + runtimeevents "github.com/sipeed/picoclaw/pkg/events" "github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/tools" @@ -72,7 +73,7 @@ toolLoop: }) al.emitEvent( - EventKindToolExecStart, + runtimeevents.KindAgentToolExecStart, ts.eventMeta("runTurn", "turn.tool.start"), ToolExecStartPayload{ Tool: toolName, @@ -192,7 +193,7 @@ toolLoop: } al.emitEvent( - EventKindToolExecEnd, + runtimeevents.KindAgentToolExecEnd, ts.eventMeta("runTurn", "turn.tool.end"), ToolExecEndPayload{ Tool: toolName, @@ -238,7 +239,7 @@ toolLoop: for j := i + 1; j < len(normalizedToolCalls); j++ { skippedTC := normalizedToolCalls[j] al.emitEvent( - EventKindToolExecSkipped, + runtimeevents.KindAgentToolExecSkipped, ts.eventMeta("runTurn", "turn.tool.skipped"), ToolExecSkippedPayload{ Tool: skippedTC.Name, @@ -285,7 +286,7 @@ toolLoop: exec.allResponsesHandled = false denyContent := hookDeniedToolContent("Tool execution denied by hook", decision.Reason) al.emitEvent( - EventKindToolExecSkipped, + runtimeevents.KindAgentToolExecSkipped, ts.eventMeta("runTurn", "turn.tool.skipped"), ToolExecSkippedPayload{ Tool: toolName, @@ -324,7 +325,7 @@ toolLoop: exec.allResponsesHandled = false denyContent := hookDeniedToolContent("Tool execution denied by approval hook", approval.Reason) al.emitEvent( - EventKindToolExecSkipped, + runtimeevents.KindAgentToolExecSkipped, ts.eventMeta("runTurn", "turn.tool.skipped"), ToolExecSkippedPayload{ Tool: toolName, @@ -354,7 +355,7 @@ toolLoop: "iteration": iteration, }) al.emitEvent( - EventKindToolExecStart, + runtimeevents.KindAgentToolExecStart, ts.eventMeta("runTurn", "turn.tool.start"), ToolExecStartPayload{ Tool: toolName, @@ -403,7 +404,7 @@ toolLoop: "channel": ts.channel, }) al.emitEvent( - EventKindFollowUpQueued, + runtimeevents.KindAgentFollowUpQueued, ts.scope.meta(iteration, "runTurn", "turn.follow_up.queued"), FollowUpQueuedPayload{ SourceTool: asyncToolName, @@ -569,7 +570,7 @@ toolLoop: toolResultMsg.Media = append(toolResultMsg.Media, toolResult.Media...) } al.emitEvent( - EventKindToolExecEnd, + runtimeevents.KindAgentToolExecEnd, ts.eventMeta("runTurn", "turn.tool.end"), ToolExecEndPayload{ Tool: toolName, @@ -614,7 +615,7 @@ toolLoop: for j := i + 1; j < len(normalizedToolCalls); j++ { skippedTC := normalizedToolCalls[j] al.emitEvent( - EventKindToolExecSkipped, + runtimeevents.KindAgentToolExecSkipped, ts.eventMeta("runTurn", "turn.tool.skipped"), ToolExecSkippedPayload{ Tool: skippedTC.Name, diff --git a/pkg/agent/pipeline_finalize.go b/pkg/agent/pipeline_finalize.go index a2be6f65b..1f407825e 100644 --- a/pkg/agent/pipeline_finalize.go +++ b/pkg/agent/pipeline_finalize.go @@ -6,6 +6,7 @@ import ( "context" "github.com/sipeed/picoclaw/pkg/bus" + runtimeevents "github.com/sipeed/picoclaw/pkg/events" "github.com/sipeed/picoclaw/pkg/providers" ) @@ -50,7 +51,7 @@ func (p *Pipeline) Finalize( ts.ingestMessage(turnCtx, al, finalMsg) if err := ts.agent.Sessions.Save(ts.sessionKey); err != nil { al.emitEvent( - EventKindError, + runtimeevents.KindAgentError, ts.eventMeta("runTurn", "turn.error"), ErrorPayload{ Stage: "session_save", diff --git a/pkg/agent/pipeline_llm.go b/pkg/agent/pipeline_llm.go index a954c0ca6..afec9a486 100644 --- a/pkg/agent/pipeline_llm.go +++ b/pkg/agent/pipeline_llm.go @@ -12,6 +12,7 @@ import ( "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/constants" + runtimeevents "github.com/sipeed/picoclaw/pkg/events" "github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/providers" ) @@ -114,7 +115,7 @@ func (p *Pipeline) CallLLM( } al.emitEvent( - EventKindLLMRequest, + runtimeevents.KindAgentLLMRequest, ts.eventMeta("runTurn", "turn.llm.request"), LLMRequestPayload{ Model: exec.llmModel, @@ -200,7 +201,7 @@ func (p *Pipeline) CallLLM( // Retry without media if vision is unsupported if hasMediaRefs(exec.callMessages) && isVisionUnsupportedError(err) && retry < maxRetries { al.emitEvent( - EventKindLLMRetry, + runtimeevents.KindAgentLLMRetry, ts.eventMeta("runTurn", "turn.llm.retry"), LLMRetryPayload{ Attempt: retry + 1, @@ -247,7 +248,7 @@ func (p *Pipeline) CallLLM( if isTimeoutError && retry < maxRetries { backoff := time.Duration(retry+1) * 5 * time.Second al.emitEvent( - EventKindLLMRetry, + runtimeevents.KindAgentLLMRetry, ts.eventMeta("runTurn", "turn.llm.retry"), LLMRetryPayload{ Attempt: retry + 1, @@ -275,7 +276,7 @@ func (p *Pipeline) CallLLM( if isContextError && retry < maxRetries && !ts.opts.NoHistory { al.emitEvent( - EventKindLLMRetry, + runtimeevents.KindAgentLLMRetry, ts.eventMeta("runTurn", "turn.llm.retry"), LLMRetryPayload{ Attempt: retry + 1, @@ -334,7 +335,7 @@ func (p *Pipeline) CallLLM( if err != nil { al.emitEvent( - EventKindError, + runtimeevents.KindAgentError, ts.eventMeta("runTurn", "turn.error"), ErrorPayload{ Stage: "llm", @@ -394,7 +395,7 @@ func (p *Pipeline) CallLLM( ) } al.emitEvent( - EventKindLLMResponse, + runtimeevents.KindAgentLLMResponse, ts.eventMeta("runTurn", "turn.llm.response"), LLMResponsePayload{ ContentLen: len(exec.response.Content), diff --git a/pkg/agent/steering.go b/pkg/agent/steering.go index 2efa7bbf4..ba171fe5d 100644 --- a/pkg/agent/steering.go +++ b/pkg/agent/steering.go @@ -8,6 +8,7 @@ import ( "sync" "github.com/sipeed/picoclaw/pkg/bus" + runtimeevents "github.com/sipeed/picoclaw/pkg/events" "github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/session" @@ -206,7 +207,7 @@ func (al *AgentLoop) enqueueSteeringMessage(scope, agentID string, msg providers "scope": normalizeSteeringScope(scope), }) - meta := EventMeta{ + meta := HookMeta{ Source: "Steer", TracePath: "turn.interrupt.received", } @@ -230,7 +231,7 @@ func (al *AgentLoop) enqueueSteeringMessage(scope, agentID string, msg providers } al.emitEvent( - EventKindInterruptReceived, + runtimeevents.KindAgentInterruptReceived, meta, InterruptReceivedPayload{ Kind: InterruptKindSteering, @@ -410,7 +411,7 @@ func (al *AgentLoop) InterruptGraceful(hint string) error { } al.emitEvent( - EventKindInterruptReceived, + runtimeevents.KindAgentInterruptReceived, ts.eventMeta("InterruptGraceful", "turn.interrupt.received"), InterruptReceivedPayload{ Kind: InterruptKindGraceful, @@ -438,7 +439,7 @@ func (al *AgentLoop) InterruptHard() error { } al.emitEvent( - EventKindInterruptReceived, + runtimeevents.KindAgentInterruptReceived, ts.eventMeta("InterruptHard", "turn.interrupt.received"), InterruptReceivedPayload{ Kind: InterruptKindHard, diff --git a/pkg/agent/subturn.go b/pkg/agent/subturn.go index 4d824bd3a..62ea0e7e7 100644 --- a/pkg/agent/subturn.go +++ b/pkg/agent/subturn.go @@ -8,6 +8,7 @@ import ( "sync/atomic" "time" + runtimeevents "github.com/sipeed/picoclaw/pkg/events" "github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/providers/messageutil" @@ -422,7 +423,7 @@ func spawnSubTurn( parentTS.mu.Unlock() // 6. Emit Spawn event - al.emitEvent(EventKindSubTurnSpawn, + al.emitEvent(runtimeevents.KindAgentSubTurnSpawn, childTS.eventMeta("spawnSubTurn", "subturn.spawn"), SubTurnSpawnPayload{ AgentID: childTS.agentID, @@ -453,7 +454,7 @@ func spawnSubTurn( if err != nil { status = "error" } - al.emitEvent(EventKindSubTurnEnd, + al.emitEvent(runtimeevents.KindAgentSubTurnEnd, childTS.eventMeta("spawnSubTurn", "subturn.end"), SubTurnEndPayload{ AgentID: childTS.agentID, @@ -526,7 +527,7 @@ func deliverSubTurnResult(al *AgentLoop, parentTS *turnState, childID string, re "recover": r, }) if result != nil && al != nil { - al.emitEvent(EventKindSubTurnOrphan, + al.emitEvent(runtimeevents.KindAgentSubTurnOrphan, parentTS.eventMeta("deliverSubTurnResult", "subturn.orphan"), SubTurnOrphanPayload{ParentTurnID: parentTS.turnID, ChildTurnID: childID, Reason: "panic"}, ) @@ -541,7 +542,7 @@ func deliverSubTurnResult(al *AgentLoop, parentTS *turnState, childID string, re // If parent turn has already finished, treat this as an orphan result if isFinished || resultChan == nil { if result != nil && al != nil { - al.emitEvent(EventKindSubTurnOrphan, + al.emitEvent(runtimeevents.KindAgentSubTurnOrphan, parentTS.eventMeta("deliverSubTurnResult", "subturn.orphan"), SubTurnOrphanPayload{ParentTurnID: parentTS.turnID, ChildTurnID: childID, Reason: "parent_finished"}, ) @@ -557,7 +558,7 @@ func deliverSubTurnResult(al *AgentLoop, parentTS *turnState, childID string, re case resultChan <- result: // Successfully delivered if al != nil { - al.emitEvent(EventKindSubTurnResultDelivered, + al.emitEvent(runtimeevents.KindAgentSubTurnResultDelivered, parentTS.eventMeta("deliverSubTurnResult", "subturn.result_delivered"), SubTurnResultDeliveredPayload{ContentLen: len(result.ForLLM)}, ) @@ -571,7 +572,7 @@ func deliverSubTurnResult(al *AgentLoop, parentTS *turnState, childID string, re }) if result != nil && al != nil { al.emitEvent( - EventKindSubTurnOrphan, + runtimeevents.KindAgentSubTurnOrphan, parentTS.eventMeta("deliverSubTurnResult", "subturn.orphan"), SubTurnOrphanPayload{ ParentTurnID: parentTS.turnID, diff --git a/pkg/agent/turn_context.go b/pkg/agent/turn_context.go index 8913993aa..c675590ce 100644 --- a/pkg/agent/turn_context.go +++ b/pkg/agent/turn_context.go @@ -61,7 +61,7 @@ func cloneStringMap(src map[string]string) map[string]string { return cloned } -func cloneEventMeta(meta EventMeta) EventMeta { +func cloneHookMeta(meta HookMeta) HookMeta { meta.turnContext = cloneTurnContext(meta.turnContext) return meta } diff --git a/pkg/agent/turn_coord.go b/pkg/agent/turn_coord.go index ade2b7c21..ae6bd8c82 100644 --- a/pkg/agent/turn_coord.go +++ b/pkg/agent/turn_coord.go @@ -9,6 +9,7 @@ import ( "time" "github.com/sipeed/picoclaw/pkg/config" + runtimeevents "github.com/sipeed/picoclaw/pkg/events" "github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/providers" ) @@ -28,7 +29,7 @@ func (al *AgentLoop) runTurn(ctx context.Context, ts *turnState, pipeline *Pipel turnStatus := TurnEndStatusCompleted defer func() { al.emitEvent( - EventKindTurnEnd, + runtimeevents.KindAgentTurnEnd, ts.eventMeta("runTurn", "turn.end"), TurnEndPayload{ Status: turnStatus, @@ -40,7 +41,7 @@ func (al *AgentLoop) runTurn(ctx context.Context, ts *turnState, pipeline *Pipel }() al.emitEvent( - EventKindTurnStart, + runtimeevents.KindAgentTurnStart, ts.eventMeta("runTurn", "turn.start"), TurnStartPayload{ UserMessage: ts.userMessage, @@ -140,7 +141,7 @@ func (al *AgentLoop) runTurn(ctx context.Context, ts *turnState, pipeline *Pipel }) } al.emitEvent( - EventKindSteeringInjected, + runtimeevents.KindAgentSteeringInjected, ts.eventMeta("runTurn", "turn.steering.injected"), SteeringInjectedPayload{ Count: len(pendingMessages), @@ -249,7 +250,7 @@ func (al *AgentLoop) abortTurn(ts *turnState) (turnResult, error) { if !ts.opts.NoHistory { if err := ts.restoreSession(ts.agent); err != nil { al.emitEvent( - EventKindError, + runtimeevents.KindAgentError, ts.eventMeta("abortTurn", "turn.error"), ErrorPayload{ Stage: "session_restore", @@ -414,7 +415,7 @@ func (al *AgentLoop) askSideQuestion( llmModel := activeModel if al.hooks != nil { llmReq, decision := al.hooks.BeforeLLM(ctx, &LLMHookRequest{ - Meta: EventMeta{ + Meta: HookMeta{ Source: "askSideQuestion", TracePath: "turn.llm.request", turnContext: cloneTurnContext(turnCtx), @@ -494,8 +495,8 @@ func (al *AgentLoop) askSideQuestion( resp, err = callSideLLM(messages) if err != nil && hasMediaRefs(messages) && isVisionUnsupportedError(err) { al.emitEvent( - EventKindLLMRetry, - EventMeta{ + runtimeevents.KindAgentLLMRetry, + HookMeta{ Source: "askSideQuestion", TracePath: "turn.llm.retry", turnContext: cloneTurnContext(turnCtx), @@ -521,7 +522,7 @@ func (al *AgentLoop) askSideQuestion( // Apply after_llm hooks if al.hooks != nil { llmResp, decision := al.hooks.AfterLLM(ctx, &LLMHookResponse{ - Meta: EventMeta{ + Meta: HookMeta{ Source: "askSideQuestion", TracePath: "turn.llm.response", turnContext: cloneTurnContext(turnCtx), diff --git a/pkg/agent/turn_state.go b/pkg/agent/turn_state.go index 360c3b7d5..85e7dd3c0 100644 --- a/pkg/agent/turn_state.go +++ b/pkg/agent/turn_state.go @@ -442,9 +442,9 @@ func (ts *turnState) hardAbortRequested() bool { return ts.hardAbort } -func (ts *turnState) eventMeta(source, tracePath string) EventMeta { +func (ts *turnState) eventMeta(source, tracePath string) HookMeta { snap := ts.snapshot() - return EventMeta{ + return HookMeta{ AgentID: snap.AgentID, TurnID: snap.TurnID, SessionKey: snap.SessionKey,