From cf68c91ecaa15c3518686e7cfa9c637fabfcbead Mon Sep 17 00:00:00 2001 From: Hoshina Date: Sat, 21 Mar 2026 19:15:10 +0800 Subject: [PATCH 1/3] feat(agent): add hook manager foundation --- docs/design/hook-system-design.zh.md | 476 +++++++++++++++++ pkg/agent/hooks.go | 751 +++++++++++++++++++++++++++ pkg/agent/hooks_test.go | 312 +++++++++++ pkg/agent/loop.go | 309 +++++++++-- 4 files changed, 1801 insertions(+), 47 deletions(-) create mode 100644 docs/design/hook-system-design.zh.md create mode 100644 pkg/agent/hooks.go create mode 100644 pkg/agent/hooks_test.go diff --git a/docs/design/hook-system-design.zh.md b/docs/design/hook-system-design.zh.md new file mode 100644 index 000000000..ab5566bec --- /dev/null +++ b/docs/design/hook-system-design.zh.md @@ -0,0 +1,476 @@ +# PicoClaw Hook 系统设计(基于 `refactor/agent`) + +## 背景 + +本设计围绕两个议题展开: + +- `#1316`:把 agent loop 重构为事件驱动、可中断、可追加、可观测 +- `#1796`:在 EventBus 稳定后,把 hooks 设计为 EventBus 的 consumer,而不是重新发明一套事件模型 + +当前分支已经完成了第一步里的“事件系统基础”,但还没有真正的 hook 挂载层。因此这里的目标不是重新设计 event,而是在已有实现上补出一层可扩展、可拦截、可外挂的 HookManager。 + +## 外部项目对比 + +### OpenClaw + +OpenClaw 的扩展能力分成三层: + +- Internal hooks:目录发现,运行在 Gateway 进程内 +- Plugin hooks:插件在运行时注册 hook,也在进程内 +- Webhooks:外部系统通过 HTTP 触发 Gateway 动作,属于进程外 + +值得借鉴的点: + +- 有“项目内挂载”和“项目外挂载”两种路径 +- hook 是配置驱动,可启停 +- 外部入口有明确的安全边界和映射层 + +不建议直接照搬的点: + +- OpenClaw 的 hooks / plugin hooks / webhooks 是三套路由,PicoClaw 当前体量下会偏重 +- HTTP webhook 更适合“事件进入系统”,不适合作为“可同步拦截 agent loop”的基础机制 + +### pi-mono + +pi-mono 的核心思路更接近当前分支: + +- 扩展统一为 extension API +- 事件分为观察型和可变更型 +- 某些阶段允许 `transform` / `block` / `replace` +- 扩展代码主要是进程内执行 +- RPC mode 把 UI 交互桥接到进程外客户端 + +值得借鉴的点: + +- 不把“观察”和“拦截”混成一个接口 +- 允许返回结构化动作,而不是只有回调 +- 进程外通信只暴露必要协议,不把整个内部对象图泄露出去 + +## 当前分支现状 + +### 已有能力 + +当前分支已经具备 hook 系统的地基: + +- `pkg/agent/events.go` 定义了稳定的 `EventKind`、`EventMeta` 和 payload +- `pkg/agent/eventbus.go` 提供了非阻塞 fan-out 的 `EventBus` +- `pkg/agent/loop.go` 中的 `runTurn()` 已在 turn、llm、tool、interrupt、follow-up、summary 等节点发射事件 +- `pkg/agent/steering.go` 已支持 steering、graceful interrupt、hard abort +- `pkg/agent/turn.go` 已维护 turn phase、恢复点、active turn、abort 状态 + +### 现有缺口 + +当前分支还缺四件事: + +- 没有 HookManager,只有 EventBus +- 没有 Before/After LLM、Before/After Tool 这种同步拦截点 +- 没有审批型 hook +- 子 agent 仍走 `pkg/tools/SubagentManager + RunToolLoop`,没有接入 `pkg/agent` 的 turn tree 和事件流 + +### 一个关键现实 + +`#1316` 文案里提到“只读并行、写入串行”的工具执行策略,但当前 `runTurn()` 实现已经先收敛成“顺序执行 + 每个工具后检查 steering / interrupt”。因此 hook 设计不应依赖未来的并行模型,而应该先兼容当前顺序执行,再为以后增加 `ReadOnlyIndicator` 留口子。 + +## 设计原则 + +- Hook 必须建立在 `pkg/agent` 的 EventBus 和 turn 上下文之上 +- EventBus 负责广播,HookManager 负责拦截,两者职责分离 +- 项目内挂载要简单,项目外挂载必须走 IPC +- 观察型 hook 不能阻塞 loop;拦截型 hook 必须有超时 +- 先覆盖主 turn,不把 sub-turn 一次做满 +- 不新增第二套用户事件命名系统,优先复用 `EventKind.String()` + +## 总体架构 + +分成三层: + +1. `EventBus` + 负责广播只读事件,现有实现直接复用 + +2. `HookManager` + 负责管理 hook、排序、超时、错误隔离,并在 `runTurn()` 的明确检查点执行同步拦截 + +3. `HookMount` + 负责两种挂载方式: + - 进程内 Go hook + - 进程外 IPC hook + +换句话说: + +- EventBus 是“发生了什么” +- HookManager 是“谁能介入” +- HookMount 是“这些 hook 从哪里来” + +## Hook 分类 + +不建议把所有 hook 都设计成 `OnEvent(evt)`。 + +建议拆成两类。 + +### 1. 观察型 + +只消费事件,不修改流程: + +```go +type EventObserver interface { + OnEvent(ctx context.Context, evt agent.Event) error +} +``` + +这类 hook 直接订阅 EventBus 即可。 + +适用场景: + +- 审计日志 +- 指标上报 +- 调试 trace +- 将事件转发给外部 UI / TUI / Web 面板 + +### 2. 拦截型 + +只在少数明确节点触发,允许返回动作: + +```go +type LLMInterceptor interface { + BeforeLLM(ctx context.Context, req *LLMRequest) HookDecision[*LLMRequest] + AfterLLM(ctx context.Context, resp *LLMResponse) HookDecision[*LLMResponse] +} + +type ToolInterceptor interface { + BeforeTool(ctx context.Context, call *ToolCall) HookDecision[*ToolCall] + AfterTool(ctx context.Context, result *ToolResultView) HookDecision[*ToolResultView] +} + +type ToolApprover interface { + ApproveTool(ctx context.Context, req *ToolApprovalRequest) ApprovalDecision +} +``` + +这里的 `HookDecision` 统一支持: + +- `continue` +- `modify` +- `deny_tool` +- `abort_turn` +- `hard_abort` + +## 对外暴露的最小 hook 面 + +V1 不需要把所有 EventKind 都变成可拦截点。 + +建议只开放这些同步 hook: + +- `before_llm` +- `after_llm` +- `before_tool` +- `after_tool` +- `approve_tool` + +其余节点继续作为只读事件暴露: + +- `turn_start` +- `turn_end` +- `llm_request` +- `llm_response` +- `tool_exec_start` +- `tool_exec_end` +- `tool_exec_skipped` +- `steering_injected` +- `follow_up_queued` +- `interrupt_received` +- `context_compress` +- `session_summarize` +- `error` + +`subturn_*` 在 V1 中保留名字,但不承诺一定触发,直到子 turn 迁移完成。 + +## 项目内挂载 + +内部挂载必须尽量低摩擦。 + +建议提供两种等价方式,底层都走 HookManager。 + +### 方式 A:代码显式挂载 + +```go +al.MountHook(hooks.Named("audit", &AuditHook{})) +``` + +适用于: + +- 仓内内建 hook +- 单元测试 +- feature flag 控制 + +### 方式 B:内建 registry + +```go +func init() { + hooks.RegisterBuiltin("audit", func() hooks.Hook { + return &AuditHook{} + }) +} +``` + +启动时根据配置启用: + +```json +{ + "hooks": { + "builtins": { + "audit": { "enabled": true } + } + } +} +``` + +这比 OpenClaw 的目录扫描更轻,也更贴合 Go 项目。 + +## 项目外挂载 + +这是本设计的硬要求。 + +建议 V1 采用: + +- `JSON-RPC over stdio` + +原因: + +- 跨平台最简单 +- 不依赖额外端口 +- 非常适合“由 PicoClaw 启动一个外部 hook 进程” +- 比 HTTP webhook 更适合同步拦截 + +### 外部 hook 进程模型 + +PicoClaw 启动外部进程,并在其 stdin/stdout 上跑协议。 + +配置示例: + +```json +{ + "hooks": { + "processes": { + "review-gate": { + "enabled": true, + "transport": "stdio", + "command": ["uvx", "picoclaw-hook-reviewer"], + "observe": ["turn_start", "turn_end", "tool_exec_end"], + "intercept": ["before_tool", "approve_tool"], + "timeout_ms": 5000 + } + } + } +} +``` + +### 协议边界 + +不要把内部 Go 结构体直接暴露给 IPC。 + +建议定义稳定的协议对象: + +- `HookHandshake` +- `HookEventNotification` +- `BeforeLLMRequest` +- `AfterLLMRequest` +- `BeforeToolRequest` +- `AfterToolRequest` +- `ApproveToolRequest` +- `HookDecision` + +其中: + +- 观察型事件用 notification,fire-and-forget +- 拦截型事件用 request/response,同步等待 + +### 为什么是 stdio,而不是直接用 HTTP webhook + +因为两者用途不同: + +- HTTP webhook 更适合“外部系统向 PicoClaw 投递事件” +- stdio/RPC 更适合“PicoClaw 在 turn 内同步询问外部 hook 是否改写 / 放行 / 拒绝” + +如果未来需要 OpenClaw 式 webhook,可以作为独立入口层,再把外部事件转成 inbound message 或 steering,而不是直接替代 hook IPC。 + +## Hook 执行顺序 + +建议统一排序规则: + +- 先内建 in-process hook +- 再外部 IPC hook +- 同组内按 `priority` 从小到大执行 + +原因: + +- 内建 hook 延迟更低,适合做基础规范化 +- 外部 hook 更适合做审批、审计、组织级策略 + +## 超时与错误策略 + +### 观察型 + +- 默认超时:`500ms` +- 超时或报错:记录日志,继续主流程 + +### 拦截型 + +- `before_llm` / `after_llm` / `before_tool` / `after_tool`:默认 `5s` +- `approve_tool`:默认 `60s` + +超时行为: + +- 普通拦截:`continue` +- 审批:`deny` + +这点应直接沿用 `#1316` 的安全倾向。 + +## 与当前分支的对接点 + +### 直接复用 + +- 事件定义:`pkg/agent/events.go` +- 事件广播:`pkg/agent/eventbus.go` +- 活跃 turn / interrupt / rollback:`pkg/agent/turn.go` +- 事件发射点:`pkg/agent/loop.go` + +### 需要新增 + +- `pkg/agent/hooks.go` + - Hook 接口 + - HookDecision / ApprovalDecision + - HookManager + +- `pkg/agent/hook_mount.go` + - 内建 hook 注册 + - 外部进程 hook 注册 + +- `pkg/agent/hook_ipc.go` + - stdio JSON-RPC bridge + +- `pkg/agent/hook_types.go` + - IPC 稳定载荷 + +### 需要改造 + +- `pkg/agent/loop.go` + - 在 LLM 和 tool 关键路径前后插入 HookManager 调用 + +- `pkg/tools/base.go` + - 可选新增 `ReadOnlyIndicator` + +- `pkg/tools/spawn.go` +- `pkg/tools/subagent.go` + - 先保留现状 + - 等 sub-turn 迁移后再接入 `subturn_*` hook + +## 一个更贴合当前分支的数据流 + +### 观察链路 + +```text +runTurn() -> emitEvent() -> EventBus -> observers +``` + +### 拦截链路 + +```text +runTurn() + -> HookManager.BeforeLLM() + -> Provider.Chat() + -> HookManager.AfterLLM() + -> HookManager.BeforeTool() + -> HookManager.ApproveTool() + -> tool.Execute() + -> HookManager.AfterTool() +``` + +也就是说: + +- observer 不改变现有 `emitEvent()` +- interceptor 直接插在 `runTurn()` 热路径 + +## 用户可见配置 + +建议新增: + +```json +{ + "hooks": { + "enabled": true, + "builtins": {}, + "processes": {}, + "defaults": { + "observer_timeout_ms": 500, + "interceptor_timeout_ms": 5000, + "approval_timeout_ms": 60000 + } + } +} +``` + +V1 不做复杂自动发现。 + +原因: + +- 当前分支重点是把地基打稳 +- 目录扫描、安装器、脚手架可以后置 +- 先让仓内和仓外都能挂上去,比“管理体验完整”更重要 + +## 推荐的 V1 范围 + +### 必做 + +- HookManager +- in-process 挂载 +- stdio IPC 挂载 +- observer hooks +- `before_tool` / `after_tool` / `approve_tool` +- `before_llm` / `after_llm` + +### 可后置 + +- hook CLI 管理命令 +- hook 自动发现 +- Unix socket / named pipe transport +- sub-turn hook 生命周期 +- read-only 并行分组 +- webhook 到 inbound message 的映射入口 + +## 分阶段落地 + +### Phase 1 + +- 引入 HookManager +- 支持 in-process observer + interceptor +- 先只接主 turn + +### Phase 2 + +- 引入 `stdio` 外部 hook 进程桥 +- 支持组织级审批 / 审计 / 参数改写 + +### Phase 3 + +- 把 `SubagentManager` 迁移到 `runTurn/sub-turn` +- 接通 `subturn_spawn` / `subturn_end` / `subturn_result_delivered` + +### Phase 4 + +- 视需求补 `ReadOnlyIndicator` +- 在主 turn 和 sub-turn 上统一只读并行策略 + +## 最终结论 + +最适合 PicoClaw 当前分支的方案,不是直接复制 OpenClaw 的 hooks,也不是完整照搬 pi-mono 的 extension system,而是: + +- 以现有 `EventBus` 为只读观察面 +- 以新增 `HookManager` 为同步拦截面 +- 项目内通过 Go 对象直接挂载 +- 项目外通过 `stdio JSON-RPC` 进程通信挂载 + +这样做有三个好处: + +- 和 `#1796` 一致,hooks 只是 EventBus 之上的消费层 +- 和当前 `refactor/agent` 实现一致,不需要推翻已有事件系统 +- 同时满足“仓内简单挂载”和“仓外进程通信挂载”两个硬需求 diff --git a/pkg/agent/hooks.go b/pkg/agent/hooks.go new file mode 100644 index 000000000..74af542fa --- /dev/null +++ b/pkg/agent/hooks.go @@ -0,0 +1,751 @@ +package agent + +import ( + "context" + "fmt" + "sort" + "sync" + "time" + + "github.com/sipeed/picoclaw/pkg/logger" + "github.com/sipeed/picoclaw/pkg/providers" + "github.com/sipeed/picoclaw/pkg/tools" +) + +const ( + defaultHookObserverTimeout = 500 * time.Millisecond + defaultHookInterceptorTimeout = 5 * time.Second + defaultHookApprovalTimeout = 60 * time.Second + hookObserverBufferSize = 64 +) + +type HookAction string + +const ( + HookActionContinue HookAction = "continue" + HookActionModify HookAction = "modify" + HookActionDenyTool HookAction = "deny_tool" + HookActionAbortTurn HookAction = "abort_turn" + HookActionHardAbort HookAction = "hard_abort" +) + +type HookDecision struct { + Action HookAction + Reason string +} + +func (d HookDecision) normalizedAction() HookAction { + if d.Action == "" { + return HookActionContinue + } + return d.Action +} + +type ApprovalDecision struct { + Approved bool + Reason string +} + +type HookRegistration struct { + Name string + Priority int + Hook any +} + +func NamedHook(name string, hook any) HookRegistration { + return HookRegistration{ + Name: name, + Hook: hook, + } +} + +type EventObserver interface { + OnEvent(ctx context.Context, evt Event) error +} + +type LLMInterceptor interface { + BeforeLLM(ctx context.Context, req *LLMHookRequest) (*LLMHookRequest, HookDecision, error) + AfterLLM(ctx context.Context, resp *LLMHookResponse) (*LLMHookResponse, HookDecision, error) +} + +type ToolInterceptor interface { + BeforeTool(ctx context.Context, call *ToolCallHookRequest) (*ToolCallHookRequest, HookDecision, error) + AfterTool(ctx context.Context, result *ToolResultHookResponse) (*ToolResultHookResponse, HookDecision, error) +} + +type ToolApprover interface { + ApproveTool(ctx context.Context, req *ToolApprovalRequest) (ApprovalDecision, error) +} + +type LLMHookRequest struct { + Meta EventMeta + Model string + Messages []providers.Message + Tools []providers.ToolDefinition + Options map[string]any + Channel string + ChatID string + GracefulTerminal bool +} + +func (r *LLMHookRequest) Clone() *LLMHookRequest { + if r == nil { + return nil + } + cloned := *r + cloned.Messages = cloneProviderMessages(r.Messages) + cloned.Tools = cloneToolDefinitions(r.Tools) + cloned.Options = cloneStringAnyMap(r.Options) + return &cloned +} + +type LLMHookResponse struct { + Meta EventMeta + Model string + Response *providers.LLMResponse + Channel string + ChatID string +} + +func (r *LLMHookResponse) Clone() *LLMHookResponse { + if r == nil { + return nil + } + cloned := *r + cloned.Response = cloneLLMResponse(r.Response) + return &cloned +} + +type ToolCallHookRequest struct { + Meta EventMeta + Tool string + Arguments map[string]any + Channel string + ChatID string +} + +func (r *ToolCallHookRequest) Clone() *ToolCallHookRequest { + if r == nil { + return nil + } + cloned := *r + cloned.Arguments = cloneStringAnyMap(r.Arguments) + return &cloned +} + +type ToolApprovalRequest struct { + Meta EventMeta + Tool string + Arguments map[string]any + Channel string + ChatID string +} + +func (r *ToolApprovalRequest) Clone() *ToolApprovalRequest { + if r == nil { + return nil + } + cloned := *r + cloned.Arguments = cloneStringAnyMap(r.Arguments) + return &cloned +} + +type ToolResultHookResponse struct { + Meta EventMeta + Tool string + Arguments map[string]any + Result *tools.ToolResult + Duration time.Duration + Channel string + ChatID string +} + +func (r *ToolResultHookResponse) Clone() *ToolResultHookResponse { + if r == nil { + return nil + } + cloned := *r + cloned.Arguments = cloneStringAnyMap(r.Arguments) + cloned.Result = cloneToolResult(r.Result) + return &cloned +} + +type HookManager struct { + eventBus *EventBus + observerTimeout time.Duration + interceptorTimeout time.Duration + approvalTimeout time.Duration + + mu sync.RWMutex + hooks map[string]HookRegistration + ordered []HookRegistration + + sub EventSubscription + done chan struct{} + closeOnce sync.Once +} + +func NewHookManager(eventBus *EventBus) *HookManager { + hm := &HookManager{ + eventBus: eventBus, + observerTimeout: defaultHookObserverTimeout, + interceptorTimeout: defaultHookInterceptorTimeout, + approvalTimeout: defaultHookApprovalTimeout, + hooks: make(map[string]HookRegistration), + done: make(chan struct{}), + } + + if eventBus == nil { + close(hm.done) + return hm + } + + hm.sub = eventBus.Subscribe(hookObserverBufferSize) + go hm.dispatchEvents() + return hm +} + +func (hm *HookManager) Close() { + if hm == nil { + return + } + + hm.closeOnce.Do(func() { + if hm.eventBus != nil { + hm.eventBus.Unsubscribe(hm.sub.ID) + } + <-hm.done + }) +} + +func (hm *HookManager) Mount(reg HookRegistration) error { + if hm == nil { + return fmt.Errorf("hook manager is nil") + } + if reg.Name == "" { + return fmt.Errorf("hook name is required") + } + if reg.Hook == nil { + return fmt.Errorf("hook %q is nil", reg.Name) + } + + hm.mu.Lock() + defer hm.mu.Unlock() + + hm.hooks[reg.Name] = reg + hm.rebuildOrdered() + return nil +} + +func (hm *HookManager) Unmount(name string) { + if hm == nil || name == "" { + return + } + + hm.mu.Lock() + defer hm.mu.Unlock() + + delete(hm.hooks, name) + hm.rebuildOrdered() +} + +func (hm *HookManager) dispatchEvents() { + defer close(hm.done) + + for evt := range hm.sub.C { + for _, reg := range hm.snapshotHooks() { + observer, ok := reg.Hook.(EventObserver) + if !ok { + continue + } + hm.runObserver(reg.Name, observer, evt) + } + } +} + +func (hm *HookManager) BeforeLLM(ctx context.Context, req *LLMHookRequest) (*LLMHookRequest, HookDecision) { + if hm == nil || req == nil { + return req, HookDecision{Action: HookActionContinue} + } + + current := req.Clone() + for _, reg := range hm.snapshotHooks() { + interceptor, ok := reg.Hook.(LLMInterceptor) + if !ok { + continue + } + + next, decision, ok := hm.callBeforeLLM(ctx, reg.Name, interceptor, current.Clone()) + if !ok { + continue + } + + switch decision.normalizedAction() { + case HookActionContinue, HookActionModify: + if next != nil { + current = next + } + case HookActionAbortTurn, HookActionHardAbort: + return current, decision + default: + hm.logUnsupportedAction(reg.Name, "before_llm", decision.Action) + } + } + return current, HookDecision{Action: HookActionContinue} +} + +func (hm *HookManager) AfterLLM(ctx context.Context, resp *LLMHookResponse) (*LLMHookResponse, HookDecision) { + if hm == nil || resp == nil { + return resp, HookDecision{Action: HookActionContinue} + } + + current := resp.Clone() + for _, reg := range hm.snapshotHooks() { + interceptor, ok := reg.Hook.(LLMInterceptor) + if !ok { + continue + } + + next, decision, ok := hm.callAfterLLM(ctx, reg.Name, interceptor, current.Clone()) + if !ok { + continue + } + + switch decision.normalizedAction() { + case HookActionContinue, HookActionModify: + if next != nil { + current = next + } + case HookActionAbortTurn, HookActionHardAbort: + return current, decision + default: + hm.logUnsupportedAction(reg.Name, "after_llm", decision.Action) + } + } + return current, HookDecision{Action: HookActionContinue} +} + +func (hm *HookManager) BeforeTool( + ctx context.Context, + call *ToolCallHookRequest, +) (*ToolCallHookRequest, HookDecision) { + if hm == nil || call == nil { + return call, HookDecision{Action: HookActionContinue} + } + + current := call.Clone() + for _, reg := range hm.snapshotHooks() { + interceptor, ok := reg.Hook.(ToolInterceptor) + if !ok { + continue + } + + next, decision, ok := hm.callBeforeTool(ctx, reg.Name, interceptor, current.Clone()) + if !ok { + continue + } + + switch decision.normalizedAction() { + case HookActionContinue, HookActionModify: + if next != nil { + current = next + } + case HookActionDenyTool, HookActionAbortTurn, HookActionHardAbort: + return current, decision + default: + hm.logUnsupportedAction(reg.Name, "before_tool", decision.Action) + } + } + return current, HookDecision{Action: HookActionContinue} +} + +func (hm *HookManager) AfterTool( + ctx context.Context, + result *ToolResultHookResponse, +) (*ToolResultHookResponse, HookDecision) { + if hm == nil || result == nil { + return result, HookDecision{Action: HookActionContinue} + } + + current := result.Clone() + for _, reg := range hm.snapshotHooks() { + interceptor, ok := reg.Hook.(ToolInterceptor) + if !ok { + continue + } + + next, decision, ok := hm.callAfterTool(ctx, reg.Name, interceptor, current.Clone()) + if !ok { + continue + } + + switch decision.normalizedAction() { + case HookActionContinue, HookActionModify: + if next != nil { + current = next + } + case HookActionAbortTurn, HookActionHardAbort: + return current, decision + default: + hm.logUnsupportedAction(reg.Name, "after_tool", decision.Action) + } + } + return current, HookDecision{Action: HookActionContinue} +} + +func (hm *HookManager) ApproveTool(ctx context.Context, req *ToolApprovalRequest) ApprovalDecision { + if hm == nil || req == nil { + return ApprovalDecision{Approved: true} + } + + for _, reg := range hm.snapshotHooks() { + approver, ok := reg.Hook.(ToolApprover) + if !ok { + continue + } + + decision, ok := hm.callApproveTool(ctx, reg.Name, approver, req.Clone()) + if !ok { + return ApprovalDecision{ + Approved: false, + Reason: fmt.Sprintf("tool approval hook %q failed", reg.Name), + } + } + if !decision.Approved { + return decision + } + } + + return ApprovalDecision{Approved: true} +} + +func (hm *HookManager) rebuildOrdered() { + hm.ordered = hm.ordered[:0] + for _, reg := range hm.hooks { + hm.ordered = append(hm.ordered, reg) + } + sort.SliceStable(hm.ordered, func(i, j int) bool { + if hm.ordered[i].Priority == hm.ordered[j].Priority { + return hm.ordered[i].Name < hm.ordered[j].Name + } + return hm.ordered[i].Priority < hm.ordered[j].Priority + }) +} + +func (hm *HookManager) snapshotHooks() []HookRegistration { + hm.mu.RLock() + defer hm.mu.RUnlock() + + snapshot := make([]HookRegistration, len(hm.ordered)) + copy(snapshot, hm.ordered) + return snapshot +} + +func (hm *HookManager) runObserver(name string, observer EventObserver, evt Event) { + ctx, cancel := context.WithTimeout(context.Background(), hm.observerTimeout) + defer cancel() + + done := make(chan error, 1) + go func() { + done <- observer.OnEvent(ctx, evt) + }() + + select { + case err := <-done: + if err != nil { + logger.WarnCF("hooks", "Event observer failed", map[string]any{ + "hook": name, + "event": evt.Kind.String(), + "error": err.Error(), + }) + } + case <-ctx.Done(): + logger.WarnCF("hooks", "Event observer timed out", map[string]any{ + "hook": name, + "event": evt.Kind.String(), + "timeout_ms": hm.observerTimeout.Milliseconds(), + }) + } +} + +func (hm *HookManager) callBeforeLLM( + parent context.Context, + name string, + interceptor LLMInterceptor, + req *LLMHookRequest, +) (*LLMHookRequest, HookDecision, bool) { + return runInterceptorHook( + parent, + hm.interceptorTimeout, + name, + "before_llm", + func(ctx context.Context) (*LLMHookRequest, HookDecision, error) { + return interceptor.BeforeLLM(ctx, req) + }, + ) +} + +func (hm *HookManager) callAfterLLM( + parent context.Context, + name string, + interceptor LLMInterceptor, + resp *LLMHookResponse, +) (*LLMHookResponse, HookDecision, bool) { + return runInterceptorHook( + parent, + hm.interceptorTimeout, + name, + "after_llm", + func(ctx context.Context) (*LLMHookResponse, HookDecision, error) { + return interceptor.AfterLLM(ctx, resp) + }, + ) +} + +func (hm *HookManager) callBeforeTool( + parent context.Context, + name string, + interceptor ToolInterceptor, + call *ToolCallHookRequest, +) (*ToolCallHookRequest, HookDecision, bool) { + return runInterceptorHook( + parent, + hm.interceptorTimeout, + name, + "before_tool", + func(ctx context.Context) (*ToolCallHookRequest, HookDecision, error) { + return interceptor.BeforeTool(ctx, call) + }, + ) +} + +func (hm *HookManager) callAfterTool( + parent context.Context, + name string, + interceptor ToolInterceptor, + resultView *ToolResultHookResponse, +) (*ToolResultHookResponse, HookDecision, bool) { + return runInterceptorHook( + parent, + hm.interceptorTimeout, + name, + "after_tool", + func(ctx context.Context) (*ToolResultHookResponse, HookDecision, error) { + return interceptor.AfterTool(ctx, resultView) + }, + ) +} + +func (hm *HookManager) callApproveTool( + parent context.Context, + name string, + approver ToolApprover, + req *ToolApprovalRequest, +) (ApprovalDecision, bool) { + return runApprovalHook( + parent, + hm.approvalTimeout, + name, + "approve_tool", + func(ctx context.Context) (ApprovalDecision, error) { + return approver.ApproveTool(ctx, req) + }, + ) +} + +func runInterceptorHook[T any]( + parent context.Context, + timeout time.Duration, + name string, + stage string, + fn func(ctx context.Context) (T, HookDecision, error), +) (T, HookDecision, bool) { + var zero T + + ctx, cancel := context.WithTimeout(parent, timeout) + defer cancel() + + type result struct { + value T + decision HookDecision + err error + } + done := make(chan result, 1) + go func() { + value, decision, err := fn(ctx) + done <- result{value: value, decision: decision, err: err} + }() + + select { + case res := <-done: + if res.err != nil { + logger.WarnCF("hooks", "Interceptor hook failed", map[string]any{ + "hook": name, + "stage": stage, + "error": res.err.Error(), + }) + return zero, HookDecision{}, false + } + return res.value, res.decision, true + case <-ctx.Done(): + logger.WarnCF("hooks", "Interceptor hook timed out", map[string]any{ + "hook": name, + "stage": stage, + "timeout_ms": timeout.Milliseconds(), + }) + return zero, HookDecision{}, false + } +} + +func runApprovalHook( + parent context.Context, + timeout time.Duration, + name string, + stage string, + fn func(ctx context.Context) (ApprovalDecision, error), +) (ApprovalDecision, bool) { + ctx, cancel := context.WithTimeout(parent, timeout) + defer cancel() + + type result struct { + decision ApprovalDecision + err error + } + done := make(chan result, 1) + go func() { + decision, err := fn(ctx) + done <- result{decision: decision, err: err} + }() + + select { + case res := <-done: + if res.err != nil { + logger.WarnCF("hooks", "Approval hook failed", map[string]any{ + "hook": name, + "stage": stage, + "error": res.err.Error(), + }) + return ApprovalDecision{}, false + } + return res.decision, true + case <-ctx.Done(): + logger.WarnCF("hooks", "Approval hook timed out", map[string]any{ + "hook": name, + "stage": stage, + "timeout_ms": timeout.Milliseconds(), + }) + return ApprovalDecision{ + Approved: false, + Reason: fmt.Sprintf("tool approval hook %q timed out", name), + }, true + } +} + +func (hm *HookManager) logUnsupportedAction(name, stage string, action HookAction) { + logger.WarnCF("hooks", "Hook returned unsupported action for stage", map[string]any{ + "hook": name, + "stage": stage, + "action": action, + }) +} + +func cloneProviderMessages(messages []providers.Message) []providers.Message { + if len(messages) == 0 { + return nil + } + + cloned := make([]providers.Message, len(messages)) + for i, msg := range messages { + cloned[i] = msg + if len(msg.Media) > 0 { + cloned[i].Media = append([]string(nil), msg.Media...) + } + if len(msg.SystemParts) > 0 { + cloned[i].SystemParts = append([]providers.ContentBlock(nil), msg.SystemParts...) + } + if len(msg.ToolCalls) > 0 { + cloned[i].ToolCalls = cloneProviderToolCalls(msg.ToolCalls) + } + } + return cloned +} + +func cloneProviderToolCalls(calls []providers.ToolCall) []providers.ToolCall { + if len(calls) == 0 { + return nil + } + + cloned := make([]providers.ToolCall, len(calls)) + for i, call := range calls { + cloned[i] = call + if call.Function != nil { + fn := *call.Function + cloned[i].Function = &fn + } + if call.Arguments != nil { + cloned[i].Arguments = cloneStringAnyMap(call.Arguments) + } + if call.ExtraContent != nil { + extra := *call.ExtraContent + if call.ExtraContent.Google != nil { + google := *call.ExtraContent.Google + extra.Google = &google + } + cloned[i].ExtraContent = &extra + } + } + return cloned +} + +func cloneToolDefinitions(defs []providers.ToolDefinition) []providers.ToolDefinition { + if len(defs) == 0 { + return nil + } + + cloned := make([]providers.ToolDefinition, len(defs)) + for i, def := range defs { + cloned[i] = def + cloned[i].Function.Parameters = cloneStringAnyMap(def.Function.Parameters) + } + return cloned +} + +func cloneLLMResponse(resp *providers.LLMResponse) *providers.LLMResponse { + if resp == nil { + return nil + } + cloned := *resp + cloned.ToolCalls = cloneProviderToolCalls(resp.ToolCalls) + if len(resp.ReasoningDetails) > 0 { + cloned.ReasoningDetails = append(cloned.ReasoningDetails[:0:0], resp.ReasoningDetails...) + } + if resp.Usage != nil { + usage := *resp.Usage + cloned.Usage = &usage + } + return &cloned +} + +func cloneStringAnyMap(src map[string]any) map[string]any { + if len(src) == 0 { + return nil + } + + cloned := make(map[string]any, len(src)) + for k, v := range src { + cloned[k] = v + } + return cloned +} + +func cloneToolResult(result *tools.ToolResult) *tools.ToolResult { + if result == nil { + return nil + } + + cloned := *result + if len(result.Media) > 0 { + cloned.Media = append([]string(nil), result.Media...) + } + return &cloned +} diff --git a/pkg/agent/hooks_test.go b/pkg/agent/hooks_test.go new file mode 100644 index 000000000..6607b5fe7 --- /dev/null +++ b/pkg/agent/hooks_test.go @@ -0,0 +1,312 @@ +package agent + +import ( + "context" + "os" + "sync" + "testing" + "time" + + "github.com/sipeed/picoclaw/pkg/bus" + "github.com/sipeed/picoclaw/pkg/config" + "github.com/sipeed/picoclaw/pkg/providers" + "github.com/sipeed/picoclaw/pkg/tools" +) + +func newHookTestLoop( + t *testing.T, + provider providers.LLMProvider, +) (*AgentLoop, *AgentInstance, func()) { + t.Helper() + + tmpDir, err := os.MkdirTemp("", "agent-hooks-*") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + + cfg := &config.Config{ + Agents: config.AgentsConfig{ + Defaults: config.AgentDefaults{ + Workspace: tmpDir, + Model: "test-model", + MaxTokens: 4096, + MaxToolIterations: 10, + }, + }, + } + + al := NewAgentLoop(cfg, bus.NewMessageBus(), provider) + agent := al.registry.GetDefaultAgent() + if agent == nil { + t.Fatal("expected default agent") + } + + return al, agent, func() { + al.Close() + _ = os.RemoveAll(tmpDir) + } +} + +type llmHookTestProvider struct { + mu sync.Mutex + lastModel string +} + +func (p *llmHookTestProvider) Chat( + ctx context.Context, + messages []providers.Message, + tools []providers.ToolDefinition, + model string, + opts map[string]any, +) (*providers.LLMResponse, error) { + p.mu.Lock() + p.lastModel = model + p.mu.Unlock() + + return &providers.LLMResponse{ + Content: "provider content", + }, nil +} + +func (p *llmHookTestProvider) GetDefaultModel() string { + return "llm-hook-provider" +} + +type llmObserverHook struct { + eventCh chan Event +} + +func (h *llmObserverHook) OnEvent(ctx context.Context, evt Event) error { + if evt.Kind == EventKindTurnEnd { + select { + case h.eventCh <- evt: + default: + } + } + return nil +} + +func (h *llmObserverHook) BeforeLLM( + ctx context.Context, + req *LLMHookRequest, +) (*LLMHookRequest, HookDecision, error) { + next := req.Clone() + next.Model = "hook-model" + return next, HookDecision{Action: HookActionModify}, nil +} + +func (h *llmObserverHook) AfterLLM( + ctx context.Context, + resp *LLMHookResponse, +) (*LLMHookResponse, HookDecision, error) { + next := resp.Clone() + next.Response.Content = "hooked content" + return next, HookDecision{Action: HookActionModify}, nil +} + +func TestAgentLoop_Hooks_ObserverAndLLMInterceptor(t *testing.T) { + provider := &llmHookTestProvider{} + al, agent, cleanup := newHookTestLoop(t, provider) + defer cleanup() + + hook := &llmObserverHook{eventCh: make(chan Event, 1)} + if err := al.MountHook(NamedHook("llm-observer", hook)); err != nil { + t.Fatalf("MountHook failed: %v", err) + } + + resp, err := al.runAgentLoop(context.Background(), agent, processOptions{ + SessionKey: "session-1", + Channel: "cli", + ChatID: "direct", + UserMessage: "hello", + DefaultResponse: defaultResponse, + EnableSummary: false, + SendResponse: false, + }) + if err != nil { + t.Fatalf("runAgentLoop failed: %v", err) + } + if resp != "hooked content" { + t.Fatalf("expected hooked content, got %q", resp) + } + + provider.mu.Lock() + lastModel := provider.lastModel + provider.mu.Unlock() + if lastModel != "hook-model" { + t.Fatalf("expected model hook-model, got %q", lastModel) + } + + select { + case evt := <-hook.eventCh: + if evt.Kind != EventKindTurnEnd { + t.Fatalf("expected turn end event, got %v", evt.Kind) + } + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for hook observer event") + } +} + +type toolHookProvider struct { + mu sync.Mutex + calls int +} + +func (p *toolHookProvider) Chat( + ctx context.Context, + messages []providers.Message, + tools []providers.ToolDefinition, + model string, + opts map[string]any, +) (*providers.LLMResponse, error) { + p.mu.Lock() + defer p.mu.Unlock() + + p.calls++ + if p.calls == 1 { + return &providers.LLMResponse{ + ToolCalls: []providers.ToolCall{ + { + ID: "call-1", + Name: "echo_text", + Arguments: map[string]any{"text": "original"}, + }, + }, + }, nil + } + + last := messages[len(messages)-1] + return &providers.LLMResponse{ + Content: last.Content, + }, nil +} + +func (p *toolHookProvider) GetDefaultModel() string { + return "tool-hook-provider" +} + +type echoTextTool struct{} + +func (t *echoTextTool) Name() string { + return "echo_text" +} + +func (t *echoTextTool) Description() string { + return "echo a text argument" +} + +func (t *echoTextTool) Parameters() map[string]any { + return map[string]any{ + "type": "object", + "properties": map[string]any{ + "text": map[string]any{ + "type": "string", + }, + }, + } +} + +func (t *echoTextTool) Execute(ctx context.Context, args map[string]any) *tools.ToolResult { + text, _ := args["text"].(string) + return tools.SilentResult(text) +} + +type toolRewriteHook struct{} + +func (h *toolRewriteHook) BeforeTool( + ctx context.Context, + call *ToolCallHookRequest, +) (*ToolCallHookRequest, HookDecision, error) { + next := call.Clone() + next.Arguments["text"] = "modified" + return next, HookDecision{Action: HookActionModify}, nil +} + +func (h *toolRewriteHook) AfterTool( + ctx context.Context, + result *ToolResultHookResponse, +) (*ToolResultHookResponse, HookDecision, error) { + next := result.Clone() + next.Result.ForLLM = "after:" + next.Result.ForLLM + return next, HookDecision{Action: HookActionModify}, nil +} + +func TestAgentLoop_Hooks_ToolInterceptorCanRewrite(t *testing.T) { + provider := &toolHookProvider{} + al, agent, cleanup := newHookTestLoop(t, provider) + defer cleanup() + + al.RegisterTool(&echoTextTool{}) + if err := al.MountHook(NamedHook("tool-rewrite", &toolRewriteHook{})); err != nil { + t.Fatalf("MountHook failed: %v", err) + } + + resp, err := al.runAgentLoop(context.Background(), agent, processOptions{ + SessionKey: "session-1", + Channel: "cli", + ChatID: "direct", + UserMessage: "run tool", + DefaultResponse: defaultResponse, + EnableSummary: false, + SendResponse: false, + }) + if err != nil { + t.Fatalf("runAgentLoop failed: %v", err) + } + if resp != "after:modified" { + t.Fatalf("expected rewritten tool result, got %q", resp) + } +} + +type denyApprovalHook struct{} + +func (h *denyApprovalHook) ApproveTool(ctx context.Context, req *ToolApprovalRequest) (ApprovalDecision, error) { + return ApprovalDecision{ + Approved: false, + Reason: "blocked", + }, nil +} + +func TestAgentLoop_Hooks_ToolApproverCanDeny(t *testing.T) { + provider := &toolHookProvider{} + al, agent, cleanup := newHookTestLoop(t, provider) + defer cleanup() + + al.RegisterTool(&echoTextTool{}) + if err := al.MountHook(NamedHook("deny-approval", &denyApprovalHook{})); err != nil { + t.Fatalf("MountHook failed: %v", err) + } + + sub := al.SubscribeEvents(16) + defer al.UnsubscribeEvents(sub.ID) + + resp, err := al.runAgentLoop(context.Background(), agent, processOptions{ + SessionKey: "session-1", + Channel: "cli", + ChatID: "direct", + UserMessage: "run tool", + DefaultResponse: defaultResponse, + EnableSummary: false, + SendResponse: false, + }) + if err != nil { + t.Fatalf("runAgentLoop failed: %v", err) + } + expected := "Tool execution denied by approval hook: blocked" + if resp != expected { + t.Fatalf("expected %q, got %q", expected, resp) + } + + events := collectEventStream(sub.C) + skippedEvt, ok := findEvent(events, EventKindToolExecSkipped) + if !ok { + t.Fatal("expected tool skipped event") + } + payload, ok := skippedEvt.Payload.(ToolExecSkippedPayload) + if !ok { + t.Fatalf("expected ToolExecSkippedPayload, got %T", skippedEvt.Payload) + } + if payload.Reason != expected { + t.Fatalf("expected skipped reason %q, got %q", expected, payload.Reason) + } +} diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index f54482ae8..a85abcb60 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -40,6 +40,7 @@ type AgentLoop struct { registry *AgentRegistry state *state.Manager eventBus *EventBus + hooks *HookManager running atomic.Bool summarizing sync.Map fallback *providers.FallbackChain @@ -108,17 +109,19 @@ func NewAgentLoop( stateManager = state.NewManager(defaultAgent.Workspace) } + eventBus := NewEventBus() al := &AgentLoop{ bus: msgBus, cfg: cfg, registry: registry, state: stateManager, - eventBus: NewEventBus(), + eventBus: eventBus, summarizing: sync.Map{}, fallback: fallbackChain, cmdRegistry: commands.NewRegistry(commands.BuiltinDefinitions()), steering: newSteeringQueue(parseSteeringMode(cfg.Agents.Defaults.SteeringMode)), } + al.hooks = NewHookManager(eventBus) return al } @@ -460,11 +463,30 @@ func (al *AgentLoop) Close() { } al.GetRegistry().Close() + if al.hooks != nil { + al.hooks.Close() + } if al.eventBus != nil { al.eventBus.Close() } } +// MountHook registers an in-process hook on the agent loop. +func (al *AgentLoop) MountHook(reg HookRegistration) error { + if al == nil || al.hooks == nil { + return fmt.Errorf("hook manager is not initialized") + } + return al.hooks.Mount(reg) +} + +// UnmountHook removes a previously registered in-process hook. +func (al *AgentLoop) UnmountHook(name string) { + if al == nil || al.hooks == nil { + return + } + al.hooks.Unmount(name) +} + // SubscribeEvents registers a subscriber for agent-loop events. func (al *AgentLoop) SubscribeEvents(buffer int) EventSubscription { if al == nil || al.eventBus == nil { @@ -544,6 +566,31 @@ func cloneEventArguments(args map[string]any) map[string]any { return cloned } +func (al *AgentLoop) hookAbortError(ts *turnState, stage string, decision HookDecision) error { + reason := decision.Reason + if reason == "" { + reason = "hook requested turn abort" + } + + err := fmt.Errorf("hook aborted turn during %s: %s", stage, reason) + al.emitEvent( + EventKindError, + ts.eventMeta("hooks", "turn.error"), + ErrorPayload{ + Stage: "hook." + stage, + Message: err.Error(), + }, + ) + return err +} + +func hookDeniedToolContent(prefix, reason string) string { + if reason == "" { + return prefix + } + return prefix + ": " + reason +} + func (al *AgentLoop) logEvent(evt Event) { fields := map[string]any{ "event_kind": evt.Kind.String(), @@ -1418,36 +1465,6 @@ func (al *AgentLoop) runTurn(ctx context.Context, ts *turnState) (turnResult, er ts.markGracefulTerminalUsed() } - al.emitEvent( - EventKindLLMRequest, - ts.eventMeta("runTurn", "turn.llm.request"), - LLMRequestPayload{ - Model: activeModel, - MessagesCount: len(callMessages), - ToolsCount: len(providerToolDefs), - MaxTokens: ts.agent.MaxTokens, - Temperature: ts.agent.Temperature, - }, - ) - - logger.DebugCF("agent", "LLM request", - map[string]any{ - "agent_id": ts.agent.ID, - "iteration": iteration, - "model": activeModel, - "messages_count": len(callMessages), - "tools_count": len(providerToolDefs), - "max_tokens": ts.agent.MaxTokens, - "temperature": ts.agent.Temperature, - "system_prompt_len": len(callMessages[0].Content), - }) - logger.DebugCF("agent", "Full LLM request", - map[string]any{ - "iteration": iteration, - "messages_json": formatMessagesForLog(callMessages), - "tools_json": formatToolsForLog(providerToolDefs), - }) - llmOpts := map[string]any{ "max_tokens": ts.agent.MaxTokens, "temperature": ts.agent.Temperature, @@ -1462,6 +1479,66 @@ func (al *AgentLoop) runTurn(ctx context.Context, ts *turnState) (turnResult, er } } + llmModel := activeModel + if al.hooks != nil { + llmReq, decision := al.hooks.BeforeLLM(turnCtx, &LLMHookRequest{ + Meta: ts.eventMeta("runTurn", "turn.llm.request"), + Model: llmModel, + Messages: callMessages, + Tools: providerToolDefs, + Options: llmOpts, + Channel: ts.channel, + ChatID: ts.chatID, + GracefulTerminal: gracefulTerminal, + }) + switch decision.normalizedAction() { + case HookActionContinue, HookActionModify: + if llmReq != nil { + llmModel = llmReq.Model + callMessages = llmReq.Messages + providerToolDefs = llmReq.Tools + llmOpts = llmReq.Options + } + case HookActionAbortTurn: + turnStatus = TurnEndStatusError + return turnResult{}, al.hookAbortError(ts, "before_llm", decision) + case HookActionHardAbort: + _ = ts.requestHardAbort() + turnStatus = TurnEndStatusAborted + return al.abortTurn(ts) + } + } + + al.emitEvent( + EventKindLLMRequest, + ts.eventMeta("runTurn", "turn.llm.request"), + LLMRequestPayload{ + Model: llmModel, + MessagesCount: len(callMessages), + ToolsCount: len(providerToolDefs), + MaxTokens: ts.agent.MaxTokens, + Temperature: ts.agent.Temperature, + }, + ) + + logger.DebugCF("agent", "LLM request", + map[string]any{ + "agent_id": ts.agent.ID, + "iteration": iteration, + "model": llmModel, + "messages_count": len(callMessages), + "tools_count": len(providerToolDefs), + "max_tokens": ts.agent.MaxTokens, + "temperature": ts.agent.Temperature, + "system_prompt_len": len(callMessages[0].Content), + }) + logger.DebugCF("agent", "Full LLM request", + map[string]any{ + "iteration": iteration, + "messages_json": formatMessagesForLog(callMessages), + "tools_json": formatToolsForLog(providerToolDefs), + }) + callLLM := func(messagesForCall []providers.Message, toolDefsForCall []providers.ToolDefinition) (*providers.LLMResponse, error) { providerCtx, providerCancel := context.WithCancel(turnCtx) ts.setProviderCancel(providerCancel) @@ -1494,7 +1571,7 @@ func (al *AgentLoop) runTurn(ctx context.Context, ts *turnState) (turnResult, er } return fbResult.Response, nil } - return ts.agent.Provider.Chat(providerCtx, messagesForCall, toolDefsForCall, activeModel, llmOpts) + return ts.agent.Provider.Chat(providerCtx, messagesForCall, toolDefsForCall, llmModel, llmOpts) } var response *providers.LLMResponse @@ -1626,12 +1703,35 @@ func (al *AgentLoop) runTurn(ctx context.Context, ts *turnState) (turnResult, er map[string]any{ "agent_id": ts.agent.ID, "iteration": iteration, - "model": activeModel, + "model": llmModel, "error": err.Error(), }) return turnResult{}, fmt.Errorf("LLM call failed after retries: %w", err) } + if al.hooks != nil { + llmResp, decision := al.hooks.AfterLLM(turnCtx, &LLMHookResponse{ + Meta: ts.eventMeta("runTurn", "turn.llm.response"), + Model: llmModel, + Response: response, + Channel: ts.channel, + ChatID: ts.chatID, + }) + switch decision.normalizedAction() { + case HookActionContinue, HookActionModify: + if llmResp != nil && llmResp.Response != nil { + response = llmResp.Response + } + case HookActionAbortTurn: + turnStatus = TurnEndStatusError + return turnResult{}, al.hookAbortError(ts, "after_llm", decision) + case HookActionHardAbort: + _ = ts.requestHardAbort() + turnStatus = TurnEndStatusAborted + return al.abortTurn(ts) + } + } + go al.handleReasoning( turnCtx, response.Reasoning, @@ -1728,25 +1828,106 @@ func (al *AgentLoop) runTurn(ctx context.Context, ts *turnState) (turnResult, er return al.abortTurn(ts) } - argsJSON, _ := json.Marshal(tc.Arguments) + toolName := tc.Name + toolArgs := cloneStringAnyMap(tc.Arguments) + + if al.hooks != nil { + toolReq, decision := al.hooks.BeforeTool(turnCtx, &ToolCallHookRequest{ + Meta: ts.eventMeta("runTurn", "turn.tool.before"), + Tool: toolName, + Arguments: toolArgs, + Channel: ts.channel, + ChatID: ts.chatID, + }) + switch decision.normalizedAction() { + case HookActionContinue, HookActionModify: + if toolReq != nil { + toolName = toolReq.Tool + toolArgs = toolReq.Arguments + } + case HookActionDenyTool: + denyContent := hookDeniedToolContent("Tool execution denied by hook", decision.Reason) + al.emitEvent( + EventKindToolExecSkipped, + ts.eventMeta("runTurn", "turn.tool.skipped"), + ToolExecSkippedPayload{ + Tool: toolName, + Reason: denyContent, + }, + ) + deniedMsg := providers.Message{ + Role: "tool", + Content: denyContent, + ToolCallID: tc.ID, + } + messages = append(messages, deniedMsg) + if !ts.opts.NoHistory { + ts.agent.Sessions.AddFullMessage(ts.sessionKey, deniedMsg) + ts.recordPersistedMessage(deniedMsg) + } + continue + case HookActionAbortTurn: + turnStatus = TurnEndStatusError + return turnResult{}, al.hookAbortError(ts, "before_tool", decision) + case HookActionHardAbort: + _ = ts.requestHardAbort() + turnStatus = TurnEndStatusAborted + return al.abortTurn(ts) + } + } + + if al.hooks != nil { + approval := al.hooks.ApproveTool(turnCtx, &ToolApprovalRequest{ + Meta: ts.eventMeta("runTurn", "turn.tool.approve"), + Tool: toolName, + Arguments: toolArgs, + Channel: ts.channel, + ChatID: ts.chatID, + }) + if !approval.Approved { + denyContent := hookDeniedToolContent("Tool execution denied by approval hook", approval.Reason) + al.emitEvent( + EventKindToolExecSkipped, + ts.eventMeta("runTurn", "turn.tool.skipped"), + ToolExecSkippedPayload{ + Tool: toolName, + Reason: denyContent, + }, + ) + deniedMsg := providers.Message{ + Role: "tool", + Content: denyContent, + ToolCallID: tc.ID, + } + messages = append(messages, deniedMsg) + if !ts.opts.NoHistory { + ts.agent.Sessions.AddFullMessage(ts.sessionKey, deniedMsg) + ts.recordPersistedMessage(deniedMsg) + } + continue + } + } + + argsJSON, _ := json.Marshal(toolArgs) argsPreview := utils.Truncate(string(argsJSON), 200) - logger.InfoCF("agent", fmt.Sprintf("Tool call: %s(%s)", tc.Name, argsPreview), + logger.InfoCF("agent", fmt.Sprintf("Tool call: %s(%s)", toolName, argsPreview), map[string]any{ "agent_id": ts.agent.ID, - "tool": tc.Name, + "tool": toolName, "iteration": iteration, }) al.emitEvent( EventKindToolExecStart, ts.eventMeta("runTurn", "turn.tool.start"), ToolExecStartPayload{ - Tool: tc.Name, - Arguments: cloneEventArguments(tc.Arguments), + Tool: toolName, + Arguments: cloneEventArguments(toolArgs), }, ) - toolCall := tc + toolCallID := tc.ID toolIteration := iteration + asyncToolName := toolName asyncCallback := func(_ context.Context, result *tools.ToolResult) { if !result.Silent && result.ForUser != "" { outCtx, outCancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -1768,7 +1949,7 @@ func (al *AgentLoop) runTurn(ctx context.Context, ts *turnState) (turnResult, er logger.InfoCF("agent", "Async tool completed, publishing result", map[string]any{ - "tool": toolCall.Name, + "tool": asyncToolName, "content_len": len(content), "channel": ts.channel, }) @@ -1776,7 +1957,7 @@ func (al *AgentLoop) runTurn(ctx context.Context, ts *turnState) (turnResult, er EventKindFollowUpQueued, ts.scope.meta(toolIteration, "runTurn", "turn.follow_up.queued"), FollowUpQueuedPayload{ - SourceTool: toolCall.Name, + SourceTool: asyncToolName, Channel: ts.channel, ChatID: ts.chatID, ContentLen: len(content), @@ -1787,7 +1968,7 @@ func (al *AgentLoop) runTurn(ctx context.Context, ts *turnState) (turnResult, er defer pubCancel() _ = al.bus.PublishInbound(pubCtx, bus.InboundMessage{ Channel: "system", - SenderID: fmt.Sprintf("async:%s", toolCall.Name), + SenderID: fmt.Sprintf("async:%s", asyncToolName), ChatID: fmt.Sprintf("%s:%s", ts.channel, ts.chatID), Content: content, }) @@ -1796,8 +1977,8 @@ func (al *AgentLoop) runTurn(ctx context.Context, ts *turnState) (turnResult, er toolStart := time.Now() toolResult := ts.agent.Tools.ExecuteWithContext( turnCtx, - toolCall.Name, - toolCall.Arguments, + toolName, + toolArgs, ts.channel, ts.chatID, asyncCallback, @@ -1809,6 +1990,40 @@ func (al *AgentLoop) runTurn(ctx context.Context, ts *turnState) (turnResult, er return al.abortTurn(ts) } + if al.hooks != nil { + toolResp, decision := al.hooks.AfterTool(turnCtx, &ToolResultHookResponse{ + Meta: ts.eventMeta("runTurn", "turn.tool.after"), + Tool: toolName, + Arguments: toolArgs, + Result: toolResult, + Duration: toolDuration, + Channel: ts.channel, + ChatID: ts.chatID, + }) + switch decision.normalizedAction() { + case HookActionContinue, HookActionModify: + if toolResp != nil { + if toolResp.Tool != "" { + toolName = toolResp.Tool + } + if toolResp.Result != nil { + toolResult = toolResp.Result + } + } + case HookActionAbortTurn: + turnStatus = TurnEndStatusError + return turnResult{}, al.hookAbortError(ts, "after_tool", decision) + case HookActionHardAbort: + _ = ts.requestHardAbort() + turnStatus = TurnEndStatusAborted + return al.abortTurn(ts) + } + } + + if toolResult == nil { + toolResult = tools.ErrorResult("hook returned nil tool result") + } + if !toolResult.Silent && toolResult.ForUser != "" && ts.opts.SendResponse { al.bus.PublishOutbound(ctx, bus.OutboundMessage{ Channel: ts.channel, @@ -1817,7 +2032,7 @@ func (al *AgentLoop) runTurn(ctx context.Context, ts *turnState) (turnResult, er }) logger.DebugCF("agent", "Sent tool result to user", map[string]any{ - "tool": toolCall.Name, + "tool": toolName, "content_len": len(toolResult.ForUser), }) } @@ -1850,13 +2065,13 @@ func (al *AgentLoop) runTurn(ctx context.Context, ts *turnState) (turnResult, er toolResultMsg := providers.Message{ Role: "tool", Content: contentForLLM, - ToolCallID: toolCall.ID, + ToolCallID: toolCallID, } al.emitEvent( EventKindToolExecEnd, ts.eventMeta("runTurn", "turn.tool.end"), ToolExecEndPayload{ - Tool: toolCall.Name, + Tool: toolName, Duration: toolDuration, ForLLMLen: len(contentForLLM), ForUserLen: len(toolResult.ForUser), From 337e43e5a5a2f0a12598a3ac982419bacdde0b15 Mon Sep 17 00:00:00 2001 From: Hoshina Date: Sat, 21 Mar 2026 19:46:16 +0800 Subject: [PATCH 2/3] feat(agent): add configurable hook mounting --- pkg/agent/hook_mount.go | 317 ++++++++++++++++++++ pkg/agent/hook_mount_test.go | 179 ++++++++++++ pkg/agent/hook_process.go | 511 +++++++++++++++++++++++++++++++++ pkg/agent/hook_process_test.go | 339 ++++++++++++++++++++++ pkg/agent/hooks.go | 130 ++++++--- pkg/agent/hooks_test.go | 33 +++ pkg/agent/loop.go | 18 ++ pkg/agent/steering.go | 6 + pkg/config/config.go | 31 ++ pkg/config/config_test.go | 98 +++++++ pkg/config/defaults.go | 8 + 11 files changed, 1634 insertions(+), 36 deletions(-) create mode 100644 pkg/agent/hook_mount.go create mode 100644 pkg/agent/hook_mount_test.go create mode 100644 pkg/agent/hook_process.go create mode 100644 pkg/agent/hook_process_test.go diff --git a/pkg/agent/hook_mount.go b/pkg/agent/hook_mount.go new file mode 100644 index 000000000..c92145f1f --- /dev/null +++ b/pkg/agent/hook_mount.go @@ -0,0 +1,317 @@ +package agent + +import ( + "context" + "fmt" + "sort" + "sync" + "time" + + "github.com/sipeed/picoclaw/pkg/config" +) + +type hookRuntime struct { + initOnce sync.Once + mu sync.Mutex + initErr error + mounted []string +} + +func (r *hookRuntime) setInitErr(err error) { + r.mu.Lock() + r.initErr = err + r.mu.Unlock() +} + +func (r *hookRuntime) getInitErr() error { + r.mu.Lock() + defer r.mu.Unlock() + return r.initErr +} + +func (r *hookRuntime) setMounted(names []string) { + r.mu.Lock() + r.mounted = append([]string(nil), names...) + r.mu.Unlock() +} + +func (r *hookRuntime) reset(al *AgentLoop) { + r.mu.Lock() + names := append([]string(nil), r.mounted...) + r.mounted = nil + r.initErr = nil + r.initOnce = sync.Once{} + r.mu.Unlock() + + for _, name := range names { + al.UnmountHook(name) + } +} + +// BuiltinHookFactory constructs an in-process hook from config. +type BuiltinHookFactory func(ctx context.Context, spec config.BuiltinHookConfig) (any, error) + +var ( + builtinHookRegistryMu sync.RWMutex + builtinHookRegistry = map[string]BuiltinHookFactory{} +) + +// RegisterBuiltinHook registers a named in-process hook factory for config-driven mounting. +func RegisterBuiltinHook(name string, factory BuiltinHookFactory) error { + if name == "" { + return fmt.Errorf("builtin hook name is required") + } + if factory == nil { + return fmt.Errorf("builtin hook %q factory is nil", name) + } + + builtinHookRegistryMu.Lock() + defer builtinHookRegistryMu.Unlock() + + if _, exists := builtinHookRegistry[name]; exists { + return fmt.Errorf("builtin hook %q is already registered", name) + } + builtinHookRegistry[name] = factory + return nil +} + +func unregisterBuiltinHook(name string) { + if name == "" { + return + } + builtinHookRegistryMu.Lock() + delete(builtinHookRegistry, name) + builtinHookRegistryMu.Unlock() +} + +func lookupBuiltinHook(name string) (BuiltinHookFactory, bool) { + builtinHookRegistryMu.RLock() + defer builtinHookRegistryMu.RUnlock() + + factory, ok := builtinHookRegistry[name] + return factory, ok +} + +func configureHookManagerFromConfig(hm *HookManager, cfg *config.Config) { + if hm == nil || cfg == nil { + return + } + hm.ConfigureTimeouts( + hookTimeoutFromMS(cfg.Hooks.Defaults.ObserverTimeoutMS), + hookTimeoutFromMS(cfg.Hooks.Defaults.InterceptorTimeoutMS), + hookTimeoutFromMS(cfg.Hooks.Defaults.ApprovalTimeoutMS), + ) +} + +func hookTimeoutFromMS(ms int) time.Duration { + if ms <= 0 { + return 0 + } + return time.Duration(ms) * time.Millisecond +} + +func (al *AgentLoop) ensureHooksInitialized(ctx context.Context) error { + if al == nil || al.cfg == nil || al.hooks == nil { + return nil + } + + al.hookRuntime.initOnce.Do(func() { + al.hookRuntime.setInitErr(al.loadConfiguredHooks(ctx)) + }) + + return al.hookRuntime.getInitErr() +} + +func (al *AgentLoop) loadConfiguredHooks(ctx context.Context) (err error) { + if al == nil || al.cfg == nil || !al.cfg.Hooks.Enabled { + return nil + } + + mounted := make([]string, 0) + defer func() { + if err != nil { + for _, name := range mounted { + al.UnmountHook(name) + } + return + } + al.hookRuntime.setMounted(mounted) + }() + + builtinNames := enabledBuiltinHookNames(al.cfg.Hooks.Builtins) + for _, name := range builtinNames { + spec := al.cfg.Hooks.Builtins[name] + factory, ok := lookupBuiltinHook(name) + if !ok { + return fmt.Errorf("builtin hook %q is not registered", name) + } + + hook, factoryErr := factory(ctx, spec) + if factoryErr != nil { + return fmt.Errorf("build builtin hook %q: %w", name, factoryErr) + } + if err := al.MountHook(HookRegistration{ + Name: name, + Priority: spec.Priority, + Source: HookSourceInProcess, + Hook: hook, + }); err != nil { + return fmt.Errorf("mount builtin hook %q: %w", name, err) + } + mounted = append(mounted, name) + } + + processNames := enabledProcessHookNames(al.cfg.Hooks.Processes) + for _, name := range processNames { + spec := al.cfg.Hooks.Processes[name] + opts, buildErr := processHookOptionsFromConfig(spec) + if buildErr != nil { + return fmt.Errorf("configure process hook %q: %w", name, buildErr) + } + + processHook, buildErr := NewProcessHook(ctx, name, opts) + if buildErr != nil { + return fmt.Errorf("start process hook %q: %w", name, buildErr) + } + if err := al.MountHook(HookRegistration{ + Name: name, + Priority: spec.Priority, + Source: HookSourceProcess, + Hook: processHook, + }); err != nil { + _ = processHook.Close() + return fmt.Errorf("mount process hook %q: %w", name, err) + } + mounted = append(mounted, name) + } + + return nil +} + +func enabledBuiltinHookNames(specs map[string]config.BuiltinHookConfig) []string { + if len(specs) == 0 { + return nil + } + + names := make([]string, 0, len(specs)) + for name, spec := range specs { + if spec.Enabled { + names = append(names, name) + } + } + sort.Strings(names) + return names +} + +func enabledProcessHookNames(specs map[string]config.ProcessHookConfig) []string { + if len(specs) == 0 { + return nil + } + + names := make([]string, 0, len(specs)) + for name, spec := range specs { + if spec.Enabled { + names = append(names, name) + } + } + sort.Strings(names) + return names +} + +func processHookOptionsFromConfig(spec config.ProcessHookConfig) (ProcessHookOptions, error) { + transport := spec.Transport + if transport == "" { + transport = "stdio" + } + if transport != "stdio" { + return ProcessHookOptions{}, fmt.Errorf("unsupported transport %q", transport) + } + if len(spec.Command) == 0 { + return ProcessHookOptions{}, fmt.Errorf("command is required") + } + + opts := ProcessHookOptions{ + Command: append([]string(nil), spec.Command...), + Dir: spec.Dir, + Env: processHookEnvFromMap(spec.Env), + } + + observeKinds, observeEnabled, err := processHookObserveKindsFromConfig(spec.Observe) + if err != nil { + return ProcessHookOptions{}, err + } + opts.Observe = observeEnabled + opts.ObserveKinds = observeKinds + + for _, intercept := range spec.Intercept { + switch intercept { + case "before_llm", "after_llm": + opts.InterceptLLM = true + case "before_tool", "after_tool": + opts.InterceptTool = true + case "approve_tool": + opts.ApproveTool = true + case "": + continue + default: + return ProcessHookOptions{}, fmt.Errorf("unsupported intercept %q", intercept) + } + } + + if !opts.Observe && !opts.InterceptLLM && !opts.InterceptTool && !opts.ApproveTool { + return ProcessHookOptions{}, fmt.Errorf("no hook modes enabled") + } + + return opts, nil +} + +func processHookEnvFromMap(envMap map[string]string) []string { + if len(envMap) == 0 { + return nil + } + + keys := make([]string, 0, len(envMap)) + for key := range envMap { + keys = append(keys, key) + } + sort.Strings(keys) + + env := make([]string, 0, len(keys)) + for _, key := range keys { + env = append(env, key+"="+envMap[key]) + } + return env +} + +func processHookObserveKindsFromConfig(observe []string) ([]string, bool, error) { + if len(observe) == 0 { + return nil, false, nil + } + + validKinds := validHookEventKinds() + normalized := make([]string, 0, len(observe)) + for _, kind := range observe { + switch kind { + case "", "*", "all": + return nil, true, nil + default: + if _, ok := validKinds[kind]; !ok { + return nil, false, fmt.Errorf("unsupported observe event %q", kind) + } + normalized = append(normalized, kind) + } + } + + if len(normalized) == 0 { + return nil, false, nil + } + return normalized, true, nil +} + +func validHookEventKinds() map[string]struct{} { + kinds := make(map[string]struct{}, int(eventKindCount)) + for kind := EventKind(0); kind < eventKindCount; kind++ { + kinds[kind.String()] = struct{}{} + } + return kinds +} diff --git a/pkg/agent/hook_mount_test.go b/pkg/agent/hook_mount_test.go new file mode 100644 index 000000000..a9d8f27c5 --- /dev/null +++ b/pkg/agent/hook_mount_test.go @@ -0,0 +1,179 @@ +package agent + +import ( + "context" + "encoding/json" + "path/filepath" + "testing" + + "github.com/sipeed/picoclaw/pkg/bus" + "github.com/sipeed/picoclaw/pkg/config" +) + +type builtinAutoHookConfig struct { + Model string `json:"model"` + Suffix string `json:"suffix"` +} + +type builtinAutoHook struct { + model string + suffix string +} + +func (h *builtinAutoHook) BeforeLLM( + ctx context.Context, + req *LLMHookRequest, +) (*LLMHookRequest, HookDecision, error) { + next := req.Clone() + next.Model = h.model + return next, HookDecision{Action: HookActionModify}, nil +} + +func (h *builtinAutoHook) AfterLLM( + ctx context.Context, + resp *LLMHookResponse, +) (*LLMHookResponse, HookDecision, error) { + next := resp.Clone() + if next.Response != nil { + next.Response.Content += h.suffix + } + return next, HookDecision{Action: HookActionModify}, nil +} + +func newConfiguredHookLoop(t *testing.T, provider *llmHookTestProvider, hooks config.HooksConfig) *AgentLoop { + t.Helper() + + cfg := &config.Config{ + Agents: config.AgentsConfig{ + Defaults: config.AgentDefaults{ + Workspace: t.TempDir(), + Model: "test-model", + MaxTokens: 4096, + MaxToolIterations: 10, + }, + }, + Hooks: hooks, + } + + return NewAgentLoop(cfg, bus.NewMessageBus(), provider) +} + +func TestAgentLoop_ProcessDirectWithChannel_AutoMountsBuiltinHook(t *testing.T) { + const hookName = "test-auto-builtin-hook" + + if err := RegisterBuiltinHook(hookName, func( + ctx context.Context, + spec config.BuiltinHookConfig, + ) (any, error) { + var hookCfg builtinAutoHookConfig + if len(spec.Config) > 0 { + if err := json.Unmarshal(spec.Config, &hookCfg); err != nil { + return nil, err + } + } + return &builtinAutoHook{ + model: hookCfg.Model, + suffix: hookCfg.Suffix, + }, nil + }); err != nil { + t.Fatalf("RegisterBuiltinHook failed: %v", err) + } + t.Cleanup(func() { + unregisterBuiltinHook(hookName) + }) + + rawCfg, err := json.Marshal(builtinAutoHookConfig{ + Model: "builtin-model", + Suffix: "|builtin", + }) + if err != nil { + t.Fatalf("json.Marshal failed: %v", err) + } + + provider := &llmHookTestProvider{} + al := newConfiguredHookLoop(t, provider, config.HooksConfig{ + Enabled: true, + Builtins: map[string]config.BuiltinHookConfig{ + hookName: { + Enabled: true, + Config: rawCfg, + }, + }, + }) + defer al.Close() + + resp, err := al.ProcessDirectWithChannel(context.Background(), "hello", "session-1", "cli", "direct") + if err != nil { + t.Fatalf("ProcessDirectWithChannel failed: %v", err) + } + if resp != "provider content|builtin" { + t.Fatalf("expected builtin-hooked content, got %q", resp) + } + + provider.mu.Lock() + lastModel := provider.lastModel + provider.mu.Unlock() + if lastModel != "builtin-model" { + t.Fatalf("expected builtin model, got %q", lastModel) + } +} + +func TestAgentLoop_ProcessDirectWithChannel_AutoMountsProcessHook(t *testing.T) { + provider := &llmHookTestProvider{} + eventLog := filepath.Join(t.TempDir(), "events.log") + + al := newConfiguredHookLoop(t, provider, config.HooksConfig{ + Enabled: true, + Processes: map[string]config.ProcessHookConfig{ + "ipc-auto": { + Enabled: true, + Command: processHookHelperCommand(), + Env: map[string]string{ + "PICOCLAW_HOOK_HELPER": "1", + "PICOCLAW_HOOK_MODE": "rewrite", + "PICOCLAW_HOOK_EVENT_LOG": eventLog, + }, + Observe: []string{"turn_end"}, + Intercept: []string{"before_llm", "after_llm"}, + }, + }, + }) + defer al.Close() + + resp, err := al.ProcessDirectWithChannel(context.Background(), "hello", "session-1", "cli", "direct") + if err != nil { + t.Fatalf("ProcessDirectWithChannel failed: %v", err) + } + if resp != "provider content|ipc" { + t.Fatalf("expected process-hooked content, got %q", resp) + } + + provider.mu.Lock() + lastModel := provider.lastModel + provider.mu.Unlock() + if lastModel != "process-model" { + t.Fatalf("expected process model, got %q", lastModel) + } + + waitForFileContains(t, eventLog, "turn_end") +} + +func TestAgentLoop_ProcessDirectWithChannel_InvalidConfiguredHookFails(t *testing.T) { + provider := &llmHookTestProvider{} + al := newConfiguredHookLoop(t, provider, config.HooksConfig{ + Enabled: true, + Processes: map[string]config.ProcessHookConfig{ + "bad-hook": { + Enabled: true, + Command: processHookHelperCommand(), + Intercept: []string{"not_supported"}, + }, + }, + }) + defer al.Close() + + _, err := al.ProcessDirectWithChannel(context.Background(), "hello", "session-1", "cli", "direct") + if err == nil { + t.Fatal("expected invalid configured hook error") + } +} diff --git a/pkg/agent/hook_process.go b/pkg/agent/hook_process.go new file mode 100644 index 000000000..e5632913d --- /dev/null +++ b/pkg/agent/hook_process.go @@ -0,0 +1,511 @@ +package agent + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "io" + "os" + "os/exec" + "sync" + "sync/atomic" + "time" + + "github.com/sipeed/picoclaw/pkg/logger" +) + +const ( + processHookJSONRPCVersion = "2.0" + processHookReadBufferSize = 1024 * 1024 + processHookCloseTimeout = 2 * time.Second +) + +type ProcessHookOptions struct { + Command []string + Dir string + Env []string + Observe bool + ObserveKinds []string + InterceptLLM bool + InterceptTool bool + ApproveTool bool +} + +type ProcessHook struct { + name string + opts ProcessHookOptions + + cmd *exec.Cmd + stdin io.WriteCloser + observeKinds map[string]struct{} + + writeMu sync.Mutex + + pendingMu sync.Mutex + pending map[uint64]chan processHookRPCMessage + nextID atomic.Uint64 + + closed atomic.Bool + done chan struct{} + closeErr error + closeMu sync.Mutex + closeOnce sync.Once +} + +type processHookRPCMessage struct { + JSONRPC string `json:"jsonrpc,omitempty"` + ID uint64 `json:"id,omitempty"` + Method string `json:"method,omitempty"` + Params json.RawMessage `json:"params,omitempty"` + Result json.RawMessage `json:"result,omitempty"` + Error *processHookRPCError `json:"error,omitempty"` +} + +type processHookRPCError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +type processHookHelloParams struct { + Name string `json:"name"` + Version int `json:"version"` + Modes []string `json:"modes,omitempty"` +} + +type processHookDecisionResponse struct { + Action HookAction `json:"action"` + Reason string `json:"reason,omitempty"` +} + +type processHookBeforeLLMResponse struct { + processHookDecisionResponse + Request *LLMHookRequest `json:"request,omitempty"` +} + +type processHookAfterLLMResponse struct { + processHookDecisionResponse + Response *LLMHookResponse `json:"response,omitempty"` +} + +type processHookBeforeToolResponse struct { + processHookDecisionResponse + Call *ToolCallHookRequest `json:"call,omitempty"` +} + +type processHookAfterToolResponse struct { + processHookDecisionResponse + Result *ToolResultHookResponse `json:"result,omitempty"` +} + +func NewProcessHook(ctx context.Context, name string, opts ProcessHookOptions) (*ProcessHook, error) { + if len(opts.Command) == 0 { + return nil, fmt.Errorf("process hook command is required") + } + + cmd := exec.Command(opts.Command[0], opts.Command[1:]...) + cmd.Dir = opts.Dir + if len(opts.Env) > 0 { + cmd.Env = append(os.Environ(), opts.Env...) + } + stdin, err := cmd.StdinPipe() + if err != nil { + return nil, fmt.Errorf("create process hook stdin: %w", err) + } + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, fmt.Errorf("create process hook stdout: %w", err) + } + stderr, err := cmd.StderrPipe() + if err != nil { + return nil, fmt.Errorf("create process hook stderr: %w", err) + } + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("start process hook: %w", err) + } + + ph := &ProcessHook{ + name: name, + opts: opts, + cmd: cmd, + stdin: stdin, + observeKinds: newProcessHookObserveKinds(opts.ObserveKinds), + pending: make(map[uint64]chan processHookRPCMessage), + done: make(chan struct{}), + } + + go ph.readLoop(stdout) + go ph.readStderr(stderr) + go ph.waitLoop() + + helloCtx := ctx + if helloCtx == nil { + var cancel context.CancelFunc + helloCtx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + } + if err := ph.hello(helloCtx); err != nil { + _ = ph.Close() + return nil, err + } + + return ph, nil +} + +func (ph *ProcessHook) Close() error { + if ph == nil { + return nil + } + + ph.closeOnce.Do(func() { + ph.closed.Store(true) + if ph.stdin != nil { + _ = ph.stdin.Close() + } + + select { + case <-ph.done: + case <-time.After(processHookCloseTimeout): + if ph.cmd != nil && ph.cmd.Process != nil { + _ = ph.cmd.Process.Kill() + } + <-ph.done + } + }) + + ph.closeMu.Lock() + defer ph.closeMu.Unlock() + return ph.closeErr +} + +func (ph *ProcessHook) OnEvent(ctx context.Context, evt Event) error { + if ph == nil || !ph.opts.Observe { + return nil + } + if len(ph.observeKinds) > 0 { + if _, ok := ph.observeKinds[evt.Kind.String()]; !ok { + return nil + } + } + return ph.notify(ctx, "hook.event", evt) +} + +func (ph *ProcessHook) BeforeLLM( + ctx context.Context, + req *LLMHookRequest, +) (*LLMHookRequest, HookDecision, error) { + if ph == nil || !ph.opts.InterceptLLM { + return req, HookDecision{Action: HookActionContinue}, nil + } + + var resp processHookBeforeLLMResponse + if err := ph.call(ctx, "hook.before_llm", req, &resp); err != nil { + return nil, HookDecision{}, err + } + if resp.Request == nil { + resp.Request = req + } + return resp.Request, HookDecision{Action: resp.Action, Reason: resp.Reason}, nil +} + +func (ph *ProcessHook) AfterLLM( + ctx context.Context, + resp *LLMHookResponse, +) (*LLMHookResponse, HookDecision, error) { + if ph == nil || !ph.opts.InterceptLLM { + return resp, HookDecision{Action: HookActionContinue}, nil + } + + var result processHookAfterLLMResponse + if err := ph.call(ctx, "hook.after_llm", resp, &result); err != nil { + return nil, HookDecision{}, err + } + if result.Response == nil { + result.Response = resp + } + return result.Response, HookDecision{Action: result.Action, Reason: result.Reason}, nil +} + +func (ph *ProcessHook) BeforeTool( + ctx context.Context, + call *ToolCallHookRequest, +) (*ToolCallHookRequest, HookDecision, error) { + if ph == nil || !ph.opts.InterceptTool { + return call, HookDecision{Action: HookActionContinue}, nil + } + + var resp processHookBeforeToolResponse + if err := ph.call(ctx, "hook.before_tool", call, &resp); err != nil { + return nil, HookDecision{}, err + } + if resp.Call == nil { + resp.Call = call + } + return resp.Call, HookDecision{Action: resp.Action, Reason: resp.Reason}, nil +} + +func (ph *ProcessHook) AfterTool( + ctx context.Context, + result *ToolResultHookResponse, +) (*ToolResultHookResponse, HookDecision, error) { + if ph == nil || !ph.opts.InterceptTool { + return result, HookDecision{Action: HookActionContinue}, nil + } + + var resp processHookAfterToolResponse + if err := ph.call(ctx, "hook.after_tool", result, &resp); err != nil { + return nil, HookDecision{}, err + } + if resp.Result == nil { + resp.Result = result + } + return resp.Result, HookDecision{Action: resp.Action, Reason: resp.Reason}, nil +} + +func (ph *ProcessHook) ApproveTool(ctx context.Context, req *ToolApprovalRequest) (ApprovalDecision, error) { + if ph == nil || !ph.opts.ApproveTool { + return ApprovalDecision{Approved: true}, nil + } + + var resp ApprovalDecision + if err := ph.call(ctx, "hook.approve_tool", req, &resp); err != nil { + return ApprovalDecision{}, err + } + return resp, nil +} + +func (ph *ProcessHook) hello(ctx context.Context) error { + modes := make([]string, 0, 4) + if ph.opts.Observe { + modes = append(modes, "observe") + } + if ph.opts.InterceptLLM { + modes = append(modes, "llm") + } + if ph.opts.InterceptTool { + modes = append(modes, "tool") + } + if ph.opts.ApproveTool { + modes = append(modes, "approve") + } + + var result map[string]any + return ph.call(ctx, "hook.hello", processHookHelloParams{ + Name: ph.name, + Version: 1, + Modes: modes, + }, &result) +} + +func (ph *ProcessHook) notify(ctx context.Context, method string, params any) error { + msg := processHookRPCMessage{ + JSONRPC: processHookJSONRPCVersion, + Method: method, + } + if params != nil { + body, err := json.Marshal(params) + if err != nil { + return err + } + msg.Params = body + } + return ph.send(ctx, msg) +} + +func (ph *ProcessHook) call(ctx context.Context, method string, params any, out any) error { + if ph.closed.Load() { + return fmt.Errorf("process hook %q is closed", ph.name) + } + + id := ph.nextID.Add(1) + respCh := make(chan processHookRPCMessage, 1) + ph.pendingMu.Lock() + ph.pending[id] = respCh + ph.pendingMu.Unlock() + + msg := processHookRPCMessage{ + JSONRPC: processHookJSONRPCVersion, + ID: id, + Method: method, + } + if params != nil { + body, err := json.Marshal(params) + if err != nil { + ph.removePending(id) + return err + } + msg.Params = body + } + + if err := ph.send(ctx, msg); err != nil { + ph.removePending(id) + return err + } + + select { + case resp, ok := <-respCh: + if !ok { + return fmt.Errorf("process hook %q closed while waiting for %s", ph.name, method) + } + if resp.Error != nil { + return fmt.Errorf("process hook %q %s failed: %s", ph.name, method, resp.Error.Message) + } + if out != nil && len(resp.Result) > 0 { + if err := json.Unmarshal(resp.Result, out); err != nil { + return fmt.Errorf("decode process hook %q %s result: %w", ph.name, method, err) + } + } + return nil + case <-ctx.Done(): + ph.removePending(id) + return ctx.Err() + } +} + +func (ph *ProcessHook) send(ctx context.Context, msg processHookRPCMessage) error { + body, err := json.Marshal(msg) + if err != nil { + return err + } + body = append(body, '\n') + + ph.writeMu.Lock() + defer ph.writeMu.Unlock() + + if ph.closed.Load() { + return fmt.Errorf("process hook %q is closed", ph.name) + } + + done := make(chan error, 1) + go func() { + _, writeErr := ph.stdin.Write(body) + done <- writeErr + }() + + select { + case err := <-done: + if err != nil { + return fmt.Errorf("write process hook %q message: %w", ph.name, err) + } + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (ph *ProcessHook) readLoop(stdout io.Reader) { + scanner := bufio.NewScanner(stdout) + scanner.Buffer(make([]byte, 0, 64*1024), processHookReadBufferSize) + + for scanner.Scan() { + var msg processHookRPCMessage + if err := json.Unmarshal(scanner.Bytes(), &msg); err != nil { + logger.WarnCF("hooks", "Failed to decode process hook message", map[string]any{ + "hook": ph.name, + "error": err.Error(), + }) + continue + } + if msg.ID == 0 { + continue + } + ph.pendingMu.Lock() + respCh, ok := ph.pending[msg.ID] + if ok { + delete(ph.pending, msg.ID) + } + ph.pendingMu.Unlock() + if ok { + respCh <- msg + close(respCh) + } + } +} + +func (ph *ProcessHook) readStderr(stderr io.Reader) { + scanner := bufio.NewScanner(stderr) + scanner.Buffer(make([]byte, 0, 16*1024), processHookReadBufferSize) + for scanner.Scan() { + logger.WarnCF("hooks", "Process hook stderr", map[string]any{ + "hook": ph.name, + "stderr": scanner.Text(), + }) + } +} + +func (ph *ProcessHook) waitLoop() { + err := ph.cmd.Wait() + ph.closeMu.Lock() + ph.closeErr = err + ph.closeMu.Unlock() + ph.failPending(err) + close(ph.done) +} + +func (ph *ProcessHook) failPending(err error) { + ph.pendingMu.Lock() + defer ph.pendingMu.Unlock() + + msg := processHookRPCMessage{ + Error: &processHookRPCError{ + Code: -32000, + Message: "process exited", + }, + } + if err != nil { + msg.Error.Message = err.Error() + } + + for id, ch := range ph.pending { + delete(ph.pending, id) + ch <- msg + close(ch) + } +} + +func (ph *ProcessHook) removePending(id uint64) { + ph.pendingMu.Lock() + defer ph.pendingMu.Unlock() + + if ch, ok := ph.pending[id]; ok { + delete(ph.pending, id) + close(ch) + } +} + +func (al *AgentLoop) MountProcessHook(ctx context.Context, name string, opts ProcessHookOptions) error { + if al == nil { + return fmt.Errorf("agent loop is nil") + } + processHook, err := NewProcessHook(ctx, name, opts) + if err != nil { + return err + } + if err := al.MountHook(HookRegistration{ + Name: name, + Source: HookSourceProcess, + Hook: processHook, + }); err != nil { + _ = processHook.Close() + return err + } + return nil +} + +func newProcessHookObserveKinds(kinds []string) map[string]struct{} { + if len(kinds) == 0 { + return nil + } + + normalized := make(map[string]struct{}, len(kinds)) + for _, kind := range kinds { + if kind == "" { + continue + } + normalized[kind] = struct{}{} + } + if len(normalized) == 0 { + return nil + } + return normalized +} diff --git a/pkg/agent/hook_process_test.go b/pkg/agent/hook_process_test.go new file mode 100644 index 000000000..50f89811f --- /dev/null +++ b/pkg/agent/hook_process_test.go @@ -0,0 +1,339 @@ +package agent + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/sipeed/picoclaw/pkg/providers" +) + +func TestProcessHook_HelperProcess(t *testing.T) { + if os.Getenv("PICOCLAW_HOOK_HELPER") != "1" { + return + } + if err := runProcessHookHelper(); err != nil { + fmt.Fprintln(os.Stderr, err.Error()) + os.Exit(1) + } + os.Exit(0) +} + +func TestAgentLoop_MountProcessHook_LLMAndObserver(t *testing.T) { + provider := &llmHookTestProvider{} + al, agent, cleanup := newHookTestLoop(t, provider) + defer cleanup() + + eventLog := filepath.Join(t.TempDir(), "events.log") + if err := al.MountProcessHook(context.Background(), "ipc-llm", ProcessHookOptions{ + Command: processHookHelperCommand(), + Env: processHookHelperEnv("rewrite", eventLog), + Observe: true, + InterceptLLM: true, + }); err != nil { + t.Fatalf("MountProcessHook failed: %v", err) + } + + resp, err := al.runAgentLoop(context.Background(), agent, processOptions{ + SessionKey: "session-1", + Channel: "cli", + ChatID: "direct", + UserMessage: "hello", + DefaultResponse: defaultResponse, + EnableSummary: false, + SendResponse: false, + }) + if err != nil { + t.Fatalf("runAgentLoop failed: %v", err) + } + if resp != "provider content|ipc" { + t.Fatalf("expected process-hooked llm content, got %q", resp) + } + + provider.mu.Lock() + lastModel := provider.lastModel + provider.mu.Unlock() + if lastModel != "process-model" { + t.Fatalf("expected process model, got %q", lastModel) + } + + waitForFileContains(t, eventLog, "turn_end") +} + +func TestAgentLoop_MountProcessHook_ToolRewrite(t *testing.T) { + provider := &toolHookProvider{} + al, agent, cleanup := newHookTestLoop(t, provider) + defer cleanup() + + al.RegisterTool(&echoTextTool{}) + if err := al.MountProcessHook(context.Background(), "ipc-tool", ProcessHookOptions{ + Command: processHookHelperCommand(), + Env: processHookHelperEnv("rewrite", ""), + InterceptTool: true, + }); err != nil { + t.Fatalf("MountProcessHook failed: %v", err) + } + + resp, err := al.runAgentLoop(context.Background(), agent, processOptions{ + SessionKey: "session-1", + Channel: "cli", + ChatID: "direct", + UserMessage: "run tool", + DefaultResponse: defaultResponse, + EnableSummary: false, + SendResponse: false, + }) + if err != nil { + t.Fatalf("runAgentLoop failed: %v", err) + } + if resp != "ipc:ipc" { + t.Fatalf("expected rewritten process-hook tool result, got %q", resp) + } +} + +type blockedToolProvider struct { + calls int +} + +func (p *blockedToolProvider) Chat( + ctx context.Context, + messages []providers.Message, + tools []providers.ToolDefinition, + model string, + opts map[string]any, +) (*providers.LLMResponse, error) { + p.calls++ + if p.calls == 1 { + return &providers.LLMResponse{ + ToolCalls: []providers.ToolCall{ + { + ID: "call-1", + Name: "blocked_tool", + Arguments: map[string]any{}, + }, + }, + }, nil + } + + return &providers.LLMResponse{ + Content: messages[len(messages)-1].Content, + }, nil +} + +func (p *blockedToolProvider) GetDefaultModel() string { + return "blocked-tool-provider" +} + +func TestAgentLoop_MountProcessHook_ApprovalDeny(t *testing.T) { + provider := &blockedToolProvider{} + al, agent, cleanup := newHookTestLoop(t, provider) + defer cleanup() + + if err := al.MountProcessHook(context.Background(), "ipc-approval", ProcessHookOptions{ + Command: processHookHelperCommand(), + Env: processHookHelperEnv("deny", ""), + ApproveTool: true, + }); err != nil { + t.Fatalf("MountProcessHook failed: %v", err) + } + + sub := al.SubscribeEvents(16) + defer al.UnsubscribeEvents(sub.ID) + + resp, err := al.runAgentLoop(context.Background(), agent, processOptions{ + SessionKey: "session-1", + Channel: "cli", + ChatID: "direct", + UserMessage: "run blocked tool", + DefaultResponse: defaultResponse, + EnableSummary: false, + SendResponse: false, + }) + if err != nil { + t.Fatalf("runAgentLoop failed: %v", err) + } + + expected := "Tool execution denied by approval hook: blocked by ipc hook" + if resp != expected { + t.Fatalf("expected %q, got %q", expected, resp) + } + + events := collectEventStream(sub.C) + skippedEvt, ok := findEvent(events, EventKindToolExecSkipped) + if !ok { + t.Fatal("expected tool skipped event") + } + payload, ok := skippedEvt.Payload.(ToolExecSkippedPayload) + if !ok { + t.Fatalf("expected ToolExecSkippedPayload, got %T", skippedEvt.Payload) + } + if payload.Reason != expected { + t.Fatalf("expected reason %q, got %q", expected, payload.Reason) + } +} + +func processHookHelperCommand() []string { + return []string{os.Args[0], "-test.run=TestProcessHook_HelperProcess", "--"} +} + +func processHookHelperEnv(mode, eventLog string) []string { + env := []string{ + "PICOCLAW_HOOK_HELPER=1", + "PICOCLAW_HOOK_MODE=" + mode, + } + if eventLog != "" { + env = append(env, "PICOCLAW_HOOK_EVENT_LOG="+eventLog) + } + return env +} + +func waitForFileContains(t *testing.T, path, substring string) { + t.Helper() + + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + data, err := os.ReadFile(path) + if err == nil && strings.Contains(string(data), substring) { + return + } + time.Sleep(20 * time.Millisecond) + } + + data, _ := os.ReadFile(path) + t.Fatalf("timed out waiting for %q in %s; current content: %q", substring, path, string(data)) +} + +func runProcessHookHelper() error { + mode := os.Getenv("PICOCLAW_HOOK_MODE") + eventLog := os.Getenv("PICOCLAW_HOOK_EVENT_LOG") + + scanner := bufio.NewScanner(os.Stdin) + scanner.Buffer(make([]byte, 0, 64*1024), processHookReadBufferSize) + encoder := json.NewEncoder(os.Stdout) + + for scanner.Scan() { + var msg processHookRPCMessage + if err := json.Unmarshal(scanner.Bytes(), &msg); err != nil { + return err + } + + if msg.ID == 0 { + if msg.Method == "hook.event" && eventLog != "" { + var evt map[string]any + if err := json.Unmarshal(msg.Params, &evt); err == nil { + if rawKind, ok := evt["Kind"].(float64); ok { + kind := EventKind(rawKind) + _ = os.WriteFile(eventLog, []byte(kind.String()+"\n"), 0o644) + } + } + } + continue + } + + result, rpcErr := handleProcessHookRequest(mode, msg) + resp := processHookRPCMessage{ + JSONRPC: processHookJSONRPCVersion, + ID: msg.ID, + } + if rpcErr != nil { + resp.Error = rpcErr + } else if result != nil { + body, err := json.Marshal(result) + if err != nil { + return err + } + resp.Result = body + } else { + resp.Result = []byte("{}") + } + + if err := encoder.Encode(resp); err != nil { + return err + } + } + + return scanner.Err() +} + +func handleProcessHookRequest(mode string, msg processHookRPCMessage) (any, *processHookRPCError) { + switch msg.Method { + case "hook.hello": + return map[string]any{"ok": true}, nil + case "hook.before_llm": + if mode != "rewrite" { + return map[string]any{"action": HookActionContinue}, nil + } + var req map[string]any + _ = json.Unmarshal(msg.Params, &req) + req["model"] = "process-model" + return map[string]any{ + "action": HookActionModify, + "request": req, + }, nil + case "hook.after_llm": + if mode != "rewrite" { + return map[string]any{"action": HookActionContinue}, nil + } + var resp map[string]any + _ = json.Unmarshal(msg.Params, &resp) + if rawResponse, ok := resp["response"].(map[string]any); ok { + if content, ok := rawResponse["content"].(string); ok { + rawResponse["content"] = content + "|ipc" + } + } + return map[string]any{ + "action": HookActionModify, + "response": resp, + }, nil + case "hook.before_tool": + if mode != "rewrite" { + return map[string]any{"action": HookActionContinue}, nil + } + var call map[string]any + _ = json.Unmarshal(msg.Params, &call) + rawArgs, ok := call["arguments"].(map[string]any) + if !ok || rawArgs == nil { + rawArgs = map[string]any{} + } + rawArgs["text"] = "ipc" + call["arguments"] = rawArgs + return map[string]any{ + "action": HookActionModify, + "call": call, + }, nil + case "hook.after_tool": + if mode != "rewrite" { + return map[string]any{"action": HookActionContinue}, nil + } + var result map[string]any + _ = json.Unmarshal(msg.Params, &result) + if rawResult, ok := result["result"].(map[string]any); ok { + if forLLM, ok := rawResult["for_llm"].(string); ok { + rawResult["for_llm"] = "ipc:" + forLLM + } + } + return map[string]any{ + "action": HookActionModify, + "result": result, + }, nil + case "hook.approve_tool": + if mode == "deny" { + return ApprovalDecision{ + Approved: false, + Reason: "blocked by ipc hook", + }, nil + } + return ApprovalDecision{Approved: true}, nil + default: + return nil, &processHookRPCError{ + Code: -32601, + Message: "method not found", + } + } +} diff --git a/pkg/agent/hooks.go b/pkg/agent/hooks.go index 74af542fa..c1ef58ffd 100644 --- a/pkg/agent/hooks.go +++ b/pkg/agent/hooks.go @@ -3,6 +3,7 @@ package agent import ( "context" "fmt" + "io" "sort" "sync" "time" @@ -30,8 +31,8 @@ const ( ) type HookDecision struct { - Action HookAction - Reason string + Action HookAction `json:"action"` + Reason string `json:"reason,omitempty"` } func (d HookDecision) normalizedAction() HookAction { @@ -42,20 +43,29 @@ func (d HookDecision) normalizedAction() HookAction { } type ApprovalDecision struct { - Approved bool - Reason string + Approved bool `json:"approved"` + Reason string `json:"reason,omitempty"` } +type HookSource uint8 + +const ( + HookSourceInProcess HookSource = iota + HookSourceProcess +) + type HookRegistration struct { Name string Priority int + Source HookSource Hook any } func NamedHook(name string, hook any) HookRegistration { return HookRegistration{ - Name: name, - Hook: hook, + Name: name, + Source: HookSourceInProcess, + Hook: hook, } } @@ -78,14 +88,14 @@ type ToolApprover interface { } type LLMHookRequest struct { - Meta EventMeta - Model string - Messages []providers.Message - Tools []providers.ToolDefinition - Options map[string]any - Channel string - ChatID string - GracefulTerminal bool + Meta EventMeta `json:"meta"` + Model string `json:"model"` + Messages []providers.Message `json:"messages,omitempty"` + Tools []providers.ToolDefinition `json:"tools,omitempty"` + Options map[string]any `json:"options,omitempty"` + Channel string `json:"channel,omitempty"` + ChatID string `json:"chat_id,omitempty"` + GracefulTerminal bool `json:"graceful_terminal,omitempty"` } func (r *LLMHookRequest) Clone() *LLMHookRequest { @@ -100,11 +110,11 @@ func (r *LLMHookRequest) Clone() *LLMHookRequest { } type LLMHookResponse struct { - Meta EventMeta - Model string - Response *providers.LLMResponse - Channel string - ChatID string + Meta EventMeta `json:"meta"` + Model string `json:"model"` + Response *providers.LLMResponse `json:"response,omitempty"` + Channel string `json:"channel,omitempty"` + ChatID string `json:"chat_id,omitempty"` } func (r *LLMHookResponse) Clone() *LLMHookResponse { @@ -117,11 +127,11 @@ func (r *LLMHookResponse) Clone() *LLMHookResponse { } type ToolCallHookRequest struct { - Meta EventMeta - Tool string - Arguments map[string]any - Channel string - ChatID string + Meta EventMeta `json:"meta"` + Tool string `json:"tool"` + Arguments map[string]any `json:"arguments,omitempty"` + Channel string `json:"channel,omitempty"` + ChatID string `json:"chat_id,omitempty"` } func (r *ToolCallHookRequest) Clone() *ToolCallHookRequest { @@ -134,11 +144,11 @@ func (r *ToolCallHookRequest) Clone() *ToolCallHookRequest { } type ToolApprovalRequest struct { - Meta EventMeta - Tool string - Arguments map[string]any - Channel string - ChatID string + Meta EventMeta `json:"meta"` + Tool string `json:"tool"` + Arguments map[string]any `json:"arguments,omitempty"` + Channel string `json:"channel,omitempty"` + ChatID string `json:"chat_id,omitempty"` } func (r *ToolApprovalRequest) Clone() *ToolApprovalRequest { @@ -151,13 +161,13 @@ func (r *ToolApprovalRequest) Clone() *ToolApprovalRequest { } type ToolResultHookResponse struct { - Meta EventMeta - Tool string - Arguments map[string]any - Result *tools.ToolResult - Duration time.Duration - Channel string - ChatID string + Meta EventMeta `json:"meta"` + Tool string `json:"tool"` + Arguments map[string]any `json:"arguments,omitempty"` + Result *tools.ToolResult `json:"result,omitempty"` + Duration time.Duration `json:"duration"` + Channel string `json:"channel,omitempty"` + ChatID string `json:"chat_id,omitempty"` } func (r *ToolResultHookResponse) Clone() *ToolResultHookResponse { @@ -215,9 +225,25 @@ func (hm *HookManager) Close() { hm.eventBus.Unsubscribe(hm.sub.ID) } <-hm.done + hm.closeAllHooks() }) } +func (hm *HookManager) ConfigureTimeouts(observer, interceptor, approval time.Duration) { + if hm == nil { + return + } + if observer > 0 { + hm.observerTimeout = observer + } + if interceptor > 0 { + hm.interceptorTimeout = interceptor + } + if approval > 0 { + hm.approvalTimeout = approval + } +} + func (hm *HookManager) Mount(reg HookRegistration) error { if hm == nil { return fmt.Errorf("hook manager is nil") @@ -232,6 +258,9 @@ func (hm *HookManager) Mount(reg HookRegistration) error { hm.mu.Lock() defer hm.mu.Unlock() + if existing, ok := hm.hooks[reg.Name]; ok { + closeHookIfPossible(existing.Hook) + } hm.hooks[reg.Name] = reg hm.rebuildOrdered() return nil @@ -245,6 +274,9 @@ func (hm *HookManager) Unmount(name string) { hm.mu.Lock() defer hm.mu.Unlock() + if existing, ok := hm.hooks[name]; ok { + closeHookIfPossible(existing.Hook) + } delete(hm.hooks, name) hm.rebuildOrdered() } @@ -425,6 +457,9 @@ func (hm *HookManager) rebuildOrdered() { hm.ordered = append(hm.ordered, reg) } sort.SliceStable(hm.ordered, func(i, j int) bool { + if hm.ordered[i].Source != hm.ordered[j].Source { + return hm.ordered[i].Source < hm.ordered[j].Source + } if hm.ordered[i].Priority == hm.ordered[j].Priority { return hm.ordered[i].Name < hm.ordered[j].Name } @@ -441,6 +476,17 @@ func (hm *HookManager) snapshotHooks() []HookRegistration { return snapshot } +func (hm *HookManager) closeAllHooks() { + hm.mu.Lock() + defer hm.mu.Unlock() + + for name, reg := range hm.hooks { + closeHookIfPossible(reg.Hook) + delete(hm.hooks, name) + } + hm.ordered = nil +} + func (hm *HookManager) runObserver(name string, observer EventObserver, evt Event) { ctx, cancel := context.WithTimeout(context.Background(), hm.observerTimeout) defer cancel() @@ -749,3 +795,15 @@ func cloneToolResult(result *tools.ToolResult) *tools.ToolResult { } return &cloned } + +func closeHookIfPossible(hook any) { + closer, ok := hook.(io.Closer) + if !ok { + return + } + if err := closer.Close(); err != nil { + logger.WarnCF("hooks", "Failed to close hook", map[string]any{ + "error": err.Error(), + }) + } +} diff --git a/pkg/agent/hooks_test.go b/pkg/agent/hooks_test.go index 6607b5fe7..e6471e9cc 100644 --- a/pkg/agent/hooks_test.go +++ b/pkg/agent/hooks_test.go @@ -47,6 +47,39 @@ func newHookTestLoop( } } +func TestHookManager_SortsInProcessBeforeProcess(t *testing.T) { + hm := NewHookManager(nil) + defer hm.Close() + + if err := hm.Mount(HookRegistration{ + Name: "process", + Priority: -10, + Source: HookSourceProcess, + Hook: struct{}{}, + }); err != nil { + t.Fatalf("mount process hook: %v", err) + } + if err := hm.Mount(HookRegistration{ + Name: "in-process", + Priority: 100, + Source: HookSourceInProcess, + Hook: struct{}{}, + }); err != nil { + t.Fatalf("mount in-process hook: %v", err) + } + + ordered := hm.snapshotHooks() + if len(ordered) != 2 { + t.Fatalf("expected 2 hooks, got %d", len(ordered)) + } + if ordered[0].Name != "in-process" { + t.Fatalf("expected in-process hook first, got %q", ordered[0].Name) + } + if ordered[1].Name != "process" { + t.Fatalf("expected process hook second, got %q", ordered[1].Name) + } +} + type llmHookTestProvider struct { mu sync.Mutex lastModel string diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index a85abcb60..41dfdff5f 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -49,6 +49,7 @@ type AgentLoop struct { transcriber voice.Transcriber cmdRegistry *commands.Registry mcp mcpRuntime + hookRuntime hookRuntime steering *steeringQueue mu sync.RWMutex activeTurnMu sync.RWMutex @@ -122,6 +123,7 @@ func NewAgentLoop( steering: newSteeringQueue(parseSteeringMode(cfg.Agents.Defaults.SteeringMode)), } al.hooks = NewHookManager(eventBus) + configureHookManagerFromConfig(al.hooks, cfg) return al } @@ -259,6 +261,9 @@ func registerSharedTools( func (al *AgentLoop) Run(ctx context.Context) error { al.running.Store(true) + if err := al.ensureHooksInitialized(ctx); err != nil { + return err + } if err := al.ensureMCPInitialized(ctx); err != nil { return err } @@ -773,6 +778,9 @@ func (al *AgentLoop) ReloadProviderAndConfig( al.mu.Unlock() + al.hookRuntime.reset(al) + configureHookManagerFromConfig(al.hooks, cfg) + // Close old provider after releasing the lock // This prevents blocking readers while closing if oldProvider, ok := extractProvider(oldRegistry); ok { @@ -987,6 +995,9 @@ func (al *AgentLoop) ProcessDirectWithChannel( ctx context.Context, content, sessionKey, channel, chatID string, ) (string, error) { + if err := al.ensureHooksInitialized(ctx); err != nil { + return "", err + } if err := al.ensureMCPInitialized(ctx); err != nil { return "", err } @@ -1008,6 +1019,13 @@ func (al *AgentLoop) ProcessHeartbeat( ctx context.Context, content, channel, chatID string, ) (string, error) { + if err := al.ensureHooksInitialized(ctx); err != nil { + return "", err + } + if err := al.ensureMCPInitialized(ctx); err != nil { + return "", err + } + agent := al.GetRegistry().GetDefaultAgent() if agent == nil { return "", fmt.Errorf("no default agent for heartbeat") diff --git a/pkg/agent/steering.go b/pkg/agent/steering.go index 77c2e0c17..55ee45ad1 100644 --- a/pkg/agent/steering.go +++ b/pkg/agent/steering.go @@ -183,6 +183,12 @@ func (al *AgentLoop) Continue(ctx context.Context, sessionKey, channel, chatID s if active := al.GetActiveTurn(); active != nil { return "", fmt.Errorf("turn %s is still active", active.TurnID) } + if err := al.ensureHooksInitialized(ctx); err != nil { + return "", err + } + if err := al.ensureMCPInitialized(ctx); err != nil { + return "", err + } steeringMsgs := al.dequeueSteeringMessages() if len(steeringMsgs) == 0 { diff --git a/pkg/config/config.go b/pkg/config/config.go index a3720b656..a7c44c825 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -82,6 +82,7 @@ type Config struct { Providers ProvidersConfig `json:"providers,omitempty"` ModelList []ModelConfig `json:"model_list"` // New model-centric provider configuration Gateway GatewayConfig `json:"gateway"` + Hooks HooksConfig `json:"hooks,omitempty"` Tools ToolsConfig `json:"tools"` Heartbeat HeartbeatConfig `json:"heartbeat"` Devices DevicesConfig `json:"devices"` @@ -90,6 +91,36 @@ type Config struct { BuildInfo BuildInfo `json:"build_info,omitempty"` } +type HooksConfig struct { + Enabled bool `json:"enabled"` + Defaults HookDefaultsConfig `json:"defaults,omitempty"` + Builtins map[string]BuiltinHookConfig `json:"builtins,omitempty"` + Processes map[string]ProcessHookConfig `json:"processes,omitempty"` +} + +type HookDefaultsConfig struct { + ObserverTimeoutMS int `json:"observer_timeout_ms,omitempty"` + InterceptorTimeoutMS int `json:"interceptor_timeout_ms,omitempty"` + ApprovalTimeoutMS int `json:"approval_timeout_ms,omitempty"` +} + +type BuiltinHookConfig struct { + Enabled bool `json:"enabled"` + Priority int `json:"priority,omitempty"` + Config json.RawMessage `json:"config,omitempty"` +} + +type ProcessHookConfig struct { + Enabled bool `json:"enabled"` + Priority int `json:"priority,omitempty"` + Transport string `json:"transport,omitempty"` + Command []string `json:"command,omitempty"` + Dir string `json:"dir,omitempty"` + Env map[string]string `json:"env,omitempty"` + Observe []string `json:"observe,omitempty"` + Intercept []string `json:"intercept,omitempty"` +} + // BuildInfo contains build-time version information type BuildInfo struct { Version string `json:"version"` diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index c5bdbf3c3..caab8a152 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -391,6 +391,22 @@ func TestDefaultConfig_ExecAllowRemoteEnabled(t *testing.T) { } } +func TestDefaultConfig_HooksDefaults(t *testing.T) { + cfg := DefaultConfig() + if !cfg.Hooks.Enabled { + t.Fatal("DefaultConfig().Hooks.Enabled should be true") + } + if cfg.Hooks.Defaults.ObserverTimeoutMS != 500 { + t.Fatalf("ObserverTimeoutMS = %d, want 500", cfg.Hooks.Defaults.ObserverTimeoutMS) + } + if cfg.Hooks.Defaults.InterceptorTimeoutMS != 5000 { + t.Fatalf("InterceptorTimeoutMS = %d, want 5000", cfg.Hooks.Defaults.InterceptorTimeoutMS) + } + if cfg.Hooks.Defaults.ApprovalTimeoutMS != 60000 { + t.Fatalf("ApprovalTimeoutMS = %d, want 60000", cfg.Hooks.Defaults.ApprovalTimeoutMS) + } +} + func TestLoadConfig_OpenAIWebSearchDefaultsTrueWhenUnset(t *testing.T) { dir := t.TempDir() configPath := filepath.Join(dir, "config.json") @@ -460,6 +476,88 @@ func TestLoadConfig_WebToolsProxy(t *testing.T) { } } +func TestLoadConfig_HooksProcessConfig(t *testing.T) { + tmpDir := t.TempDir() + configPath := filepath.Join(tmpDir, "config.json") + configJSON := `{ + "hooks": { + "processes": { + "review-gate": { + "enabled": true, + "transport": "stdio", + "command": ["uvx", "picoclaw-hook-reviewer"], + "dir": "/tmp/hooks", + "env": { + "HOOK_MODE": "rewrite" + }, + "observe": ["turn_start", "turn_end"], + "intercept": ["before_tool", "approve_tool"] + } + }, + "builtins": { + "audit": { + "enabled": true, + "priority": 5, + "config": { + "label": "audit" + } + } + } + } +}` + if err := os.WriteFile(configPath, []byte(configJSON), 0o600); err != nil { + t.Fatalf("os.WriteFile() error: %v", err) + } + + cfg, err := LoadConfig(configPath) + if err != nil { + t.Fatalf("LoadConfig() error: %v", err) + } + + processCfg, ok := cfg.Hooks.Processes["review-gate"] + if !ok { + t.Fatal("expected review-gate process hook") + } + if !processCfg.Enabled { + t.Fatal("expected review-gate process hook to be enabled") + } + if processCfg.Transport != "stdio" { + t.Fatalf("Transport = %q, want stdio", processCfg.Transport) + } + if len(processCfg.Command) != 2 || processCfg.Command[0] != "uvx" { + t.Fatalf("Command = %v", processCfg.Command) + } + if processCfg.Dir != "/tmp/hooks" { + t.Fatalf("Dir = %q, want /tmp/hooks", processCfg.Dir) + } + if processCfg.Env["HOOK_MODE"] != "rewrite" { + t.Fatalf("HOOK_MODE = %q, want rewrite", processCfg.Env["HOOK_MODE"]) + } + if len(processCfg.Observe) != 2 || processCfg.Observe[1] != "turn_end" { + t.Fatalf("Observe = %v", processCfg.Observe) + } + if len(processCfg.Intercept) != 2 || processCfg.Intercept[1] != "approve_tool" { + t.Fatalf("Intercept = %v", processCfg.Intercept) + } + + builtinCfg, ok := cfg.Hooks.Builtins["audit"] + if !ok { + t.Fatal("expected audit builtin hook") + } + if !builtinCfg.Enabled { + t.Fatal("expected audit builtin hook to be enabled") + } + if builtinCfg.Priority != 5 { + t.Fatalf("Priority = %d, want 5", builtinCfg.Priority) + } + if !strings.Contains(string(builtinCfg.Config), `"audit"`) { + t.Fatalf("Config = %s", string(builtinCfg.Config)) + } + if cfg.Hooks.Defaults.ApprovalTimeoutMS != 60000 { + t.Fatalf("ApprovalTimeoutMS = %d, want 60000", cfg.Hooks.Defaults.ApprovalTimeoutMS) + } +} + // TestDefaultConfig_DMScope verifies the default dm_scope value // TestDefaultConfig_SummarizationThresholds verifies summarization defaults func TestDefaultConfig_SummarizationThresholds(t *testing.T) { diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index 5e6b89a4c..bfb54fb97 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -177,6 +177,14 @@ func DefaultConfig() *Config { AllowFrom: FlexibleStringSlice{}, }, }, + Hooks: HooksConfig{ + Enabled: true, + Defaults: HookDefaultsConfig{ + ObserverTimeoutMS: 500, + InterceptorTimeoutMS: 5000, + ApprovalTimeoutMS: 60000, + }, + }, Providers: ProvidersConfig{ OpenAI: OpenAIProviderConfig{WebSearch: true}, }, From 9978c9550bc03f70e17dbbac5256263cc7fd1fed Mon Sep 17 00:00:00 2001 From: Hoshina Date: Sat, 21 Mar 2026 23:18:29 +0800 Subject: [PATCH 3/3] docs(hooks): inline and translate hook examples --- config/config.example.json | 8 + docs/hooks/README.md | 679 +++++++++++++++++++++++++++++++++++++ docs/hooks/README.zh.md | 679 +++++++++++++++++++++++++++++++++++++ 3 files changed, 1366 insertions(+) create mode 100644 docs/hooks/README.md create mode 100644 docs/hooks/README.zh.md diff --git a/config/config.example.json b/config/config.example.json index 20c10e60d..3c149c744 100644 --- a/config/config.example.json +++ b/config/config.example.json @@ -511,6 +511,14 @@ "voice": { "echo_transcription": false }, + "hooks": { + "enabled": true, + "defaults": { + "observer_timeout_ms": 500, + "interceptor_timeout_ms": 5000, + "approval_timeout_ms": 60000 + } + }, "gateway": { "host": "127.0.0.1", "port": 18790 diff --git a/docs/hooks/README.md b/docs/hooks/README.md new file mode 100644 index 000000000..ec3bbc46a --- /dev/null +++ b/docs/hooks/README.md @@ -0,0 +1,679 @@ +# Hook System Guide + +This document describes the hook system that is implemented in the current repository, not the older design draft. + +The current implementation supports two mounting modes: + +1. In-process hooks +2. Out-of-process process hooks (`JSON-RPC over stdio`) + +The repository no longer ships standalone example source files. The Go and Python examples below are embedded directly in this document. If you want to use them, copy them into your own local files first. + +## Supported Hook Types + +| Type | Interface | Stage | Can modify data | +| --- | --- | --- | --- | +| Observer | `EventObserver` | EventBus broadcast | No | +| LLM interceptor | `LLMInterceptor` | `before_llm` / `after_llm` | Yes | +| Tool interceptor | `ToolInterceptor` | `before_tool` / `after_tool` | Yes | +| Tool approver | `ToolApprover` | `approve_tool` | No, returns allow/deny | + +The currently exposed synchronous hook points are: + +- `before_llm` +- `after_llm` +- `before_tool` +- `after_tool` +- `approve_tool` + +Everything else is exposed as read-only events. + +## Execution Order + +`HookManager` sorts hooks like this: + +1. In-process hooks first +2. Process hooks second +3. Lower `priority` first within the same source +4. Name order as the final tie-breaker + +## Timeouts + +Global defaults live under `hooks.defaults`: + +- `observer_timeout_ms` +- `interceptor_timeout_ms` +- `approval_timeout_ms` + +Note: the current implementation does not support per-process-hook `timeout_ms`. Timeouts are global defaults. + +## Quick Start + +If your first goal is simply to prove that the hook flow works and observe real requests, the easiest path is the Python process-hook example below: + +1. Enable `hooks.enabled` +2. Save the Python example from this document to a local file, for example `/tmp/review_gate.py` +3. Set `PICOCLAW_HOOK_LOG_FILE` +4. Restart the gateway +5. Watch the log file with `tail -f` + +Example: + +```json +{ + "hooks": { + "enabled": true, + "processes": { + "py_review_gate": { + "enabled": true, + "priority": 100, + "transport": "stdio", + "command": [ + "python3", + "/tmp/review_gate.py" + ], + "observe": [ + "tool_exec_start", + "tool_exec_end", + "tool_exec_skipped" + ], + "intercept": [ + "before_tool", + "approve_tool" + ], + "env": { + "PICOCLAW_HOOK_LOG_FILE": "/tmp/picoclaw-hook-review-gate.log" + } + } + } + } +} +``` + +Watch it with: + +```bash +tail -f /tmp/picoclaw-hook-review-gate.log +``` + +If you are developing PicoClaw itself rather than only validating the protocol, continue with the Go in-process example as well. + +## What The Two Examples Are For + +- Go in-process example + Best for validating the host-side hook chain and understanding `MountHook()` plus the synchronous stages +- Python process example + Best for understanding the `JSON-RPC over stdio` protocol and verifying the message flow between PicoClaw and an external process + +Both examples are intentionally safe: they only log, never rewrite, and never deny. + +## Go In-Process Example + +The following is a minimal logging hook for in-process use. It implements: + +1. `EventObserver` +2. `LLMInterceptor` +3. `ToolInterceptor` +4. `ToolApprover` + +It only records activity. It does not rewrite requests or reject tools. + +You can save it as your own Go file, for example `pkg/myhooks/example_logger.go`: + +```go +package myhooks + +import ( + "context" + "encoding/json" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/sipeed/picoclaw/pkg/agent" + "github.com/sipeed/picoclaw/pkg/logger" +) + +type ExampleLoggerHookOptions struct { + LogFile string `json:"log_file,omitempty"` + LogEvents bool `json:"log_events,omitempty"` +} + +type ExampleLoggerHook struct { + logFile string + logEvents bool + mu sync.Mutex +} + +func NewExampleLoggerHook(opts ExampleLoggerHookOptions) *ExampleLoggerHook { + return &ExampleLoggerHook{ + logFile: strings.TrimSpace(opts.LogFile), + logEvents: opts.LogEvents, + } +} + +func (h *ExampleLoggerHook) OnEvent(ctx context.Context, evt agent.Event) error { + _ = ctx + if h == nil || !h.logEvents { + return nil + } + h.record("event", evt.Meta, map[string]any{ + "event": evt.Kind.String(), + "payload": evt.Payload, + }, nil) + return nil +} + +func (h *ExampleLoggerHook) BeforeLLM( + ctx context.Context, + req *agent.LLMHookRequest, +) (*agent.LLMHookRequest, agent.HookDecision, error) { + _ = ctx + h.record("before_llm", req.Meta, req, agent.HookDecision{Action: agent.HookActionContinue}) + return req, agent.HookDecision{Action: agent.HookActionContinue}, nil +} + +func (h *ExampleLoggerHook) AfterLLM( + ctx context.Context, + resp *agent.LLMHookResponse, +) (*agent.LLMHookResponse, agent.HookDecision, error) { + _ = ctx + h.record("after_llm", resp.Meta, resp, agent.HookDecision{Action: agent.HookActionContinue}) + return resp, agent.HookDecision{Action: agent.HookActionContinue}, nil +} + +func (h *ExampleLoggerHook) BeforeTool( + ctx context.Context, + call *agent.ToolCallHookRequest, +) (*agent.ToolCallHookRequest, agent.HookDecision, error) { + _ = ctx + h.record("before_tool", call.Meta, call, agent.HookDecision{Action: agent.HookActionContinue}) + return call, agent.HookDecision{Action: agent.HookActionContinue}, nil +} + +func (h *ExampleLoggerHook) AfterTool( + ctx context.Context, + result *agent.ToolResultHookResponse, +) (*agent.ToolResultHookResponse, agent.HookDecision, error) { + _ = ctx + h.record("after_tool", result.Meta, result, agent.HookDecision{Action: agent.HookActionContinue}) + return result, agent.HookDecision{Action: agent.HookActionContinue}, nil +} + +func (h *ExampleLoggerHook) ApproveTool( + ctx context.Context, + req *agent.ToolApprovalRequest, +) (agent.ApprovalDecision, error) { + _ = ctx + decision := agent.ApprovalDecision{Approved: true} + h.record("approve_tool", req.Meta, req, decision) + return decision, nil +} + +func (h *ExampleLoggerHook) record(stage string, meta agent.EventMeta, payload any, decision any) { + logger.InfoCF("hooks", "Example hook observed", map[string]any{ + "stage": stage, + }) + if h == nil || h.logFile == "" { + return + } + + entry := map[string]any{ + "ts": time.Now().UTC(), + "stage": stage, + "meta": meta, + "payload": payload, + "decision": decision, + } + + body, err := json.Marshal(entry) + if err != nil { + logger.WarnCF("hooks", "Example hook log encode failed", map[string]any{ + "stage": stage, + "error": err.Error(), + }) + return + } + + h.mu.Lock() + defer h.mu.Unlock() + + if dir := filepath.Dir(h.logFile); dir != "" && dir != "." { + if err := os.MkdirAll(dir, 0o755); err != nil { + logger.WarnCF("hooks", "Example hook log mkdir failed", map[string]any{ + "stage": stage, + "path": h.logFile, + "error": err.Error(), + }) + return + } + } + + file, err := os.OpenFile(h.logFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) + if err != nil { + logger.WarnCF("hooks", "Example hook log open failed", map[string]any{ + "stage": stage, + "path": h.logFile, + "error": err.Error(), + }) + return + } + defer func() { _ = file.Close() }() + + if _, err := file.Write(append(body, '\n')); err != nil { + logger.WarnCF("hooks", "Example hook log write failed", map[string]any{ + "stage": stage, + "path": h.logFile, + "error": err.Error(), + }) + } +} +``` + +### Mounting It In Code + +If code mounting is enough, call this after `AgentLoop` is initialized: + +```go +hook := myhooks.NewExampleLoggerHook(myhooks.ExampleLoggerHookOptions{ + LogFile: "/tmp/picoclaw-hook-example-logger.log", + LogEvents: true, +}) + +if err := al.MountHook(agent.NamedHook("example-logger", hook)); err != nil { + panic(err) +} +``` + +### If You Also Want Config Mounting + +The hook system supports builtin hooks, but that requires you to compile the factory into your binary. In practice, that means you need registration code like this alongside the hook definition above: + +```go +package myhooks + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/sipeed/picoclaw/pkg/agent" + "github.com/sipeed/picoclaw/pkg/config" +) + +func init() { + if err := agent.RegisterBuiltinHook("example_logger", func( + ctx context.Context, + spec config.BuiltinHookConfig, + ) (any, error) { + _ = ctx + + var opts ExampleLoggerHookOptions + if len(spec.Config) > 0 { + if err := json.Unmarshal(spec.Config, &opts); err != nil { + return nil, fmt.Errorf("decode example_logger config: %w", err) + } + } + return NewExampleLoggerHook(opts), nil + }); err != nil { + panic(err) + } +} +``` + +Only after you register that builtin will the following config work: + +```json +{ + "hooks": { + "enabled": true, + "builtins": { + "example_logger": { + "enabled": true, + "priority": 10, + "config": { + "log_file": "/tmp/picoclaw-hook-example-logger.log", + "log_events": true + } + } + } + } +} +``` + +### How To Observe It + +- If `log_file` is set, each hook call is appended as JSON Lines +- If `log_file` is not set, the hook still writes summaries to the gateway log +- Requests that only hit the LLM path usually show `before_llm` and `after_llm` +- Requests that trigger tools usually also show `before_tool`, `approve_tool`, and `after_tool` +- If `log_events=true`, you will also see `event` + +Typical log lines: + +```json +{"ts":"2026-03-21T14:10:00Z","stage":"before_tool","meta":{"session_key":"session-1"},"payload":{"tool":"echo_text","arguments":{"text":"hello"}},"decision":{"action":"continue"}} +{"ts":"2026-03-21T14:10:00Z","stage":"approve_tool","meta":{"session_key":"session-1"},"payload":{"tool":"echo_text","arguments":{"text":"hello"}},"decision":{"approved":true}} +``` + +If you only see `before_llm` and `after_llm`, that usually means the request did not trigger any tool call, not that the hook failed to mount. + +## Python Process-Hook Example + +The following script is a minimal process-hook example. It uses only the Python standard library and supports: + +1. `hook.hello` +2. `hook.event` +3. `hook.before_tool` +4. `hook.approve_tool` + +It only records activity. It does not rewrite or deny anything. + +Save it to any local path, for example `/tmp/review_gate.py`: + +```python +#!/usr/bin/env python3 +from __future__ import annotations + +import json +import os +import signal +import sys +from datetime import datetime, timezone +from typing import Any + +LOG_EVENTS = os.getenv("PICOCLAW_HOOK_LOG_EVENTS", "1").lower() not in {"0", "false", "no"} +LOG_FILE = os.getenv("PICOCLAW_HOOK_LOG_FILE", "").strip() + + +def append_log(entry: dict[str, Any]) -> None: + if not LOG_FILE: + return + + payload = { + "ts": datetime.now(timezone.utc).isoformat(), + **entry, + } + try: + log_dir = os.path.dirname(LOG_FILE) + if log_dir: + os.makedirs(log_dir, exist_ok=True) + with open(LOG_FILE, "a", encoding="utf-8") as handle: + handle.write(json.dumps(payload, ensure_ascii=True) + "\n") + except OSError as exc: + log_stderr(f"failed to write hook log file {LOG_FILE}: {exc}") + + +def send_response(message_id: int, result: Any | None = None, error: str | None = None) -> None: + payload: dict[str, Any] = { + "jsonrpc": "2.0", + "id": message_id, + } + if error is not None: + payload["error"] = {"code": -32000, "message": error} + else: + payload["result"] = result if result is not None else {} + + append_log({ + "direction": "out", + "id": message_id, + "response": payload.get("result"), + "error": payload.get("error"), + }) + + try: + sys.stdout.write(json.dumps(payload, ensure_ascii=True) + "\n") + sys.stdout.flush() + except BrokenPipeError: + raise SystemExit(0) from None + + +def log_stderr(message: str) -> None: + try: + sys.stderr.write(message + "\n") + sys.stderr.flush() + except BrokenPipeError: + raise SystemExit(0) from None + + +def handle_shutdown_signal(signum: int, _frame: Any) -> None: + raise KeyboardInterrupt(f"received signal {signum}") + + +def handle_before_tool(params: dict[str, Any]) -> dict[str, Any]: + _ = params + return {"action": "continue"} + + +def handle_approve_tool(params: dict[str, Any]) -> dict[str, Any]: + _ = params + return {"approved": True} + + +def handle_request(method: str, params: dict[str, Any]) -> dict[str, Any]: + if method == "hook.hello": + return {"ok": True, "name": "python-review-gate"} + if method == "hook.before_tool": + return handle_before_tool(params) + if method == "hook.approve_tool": + return handle_approve_tool(params) + if method == "hook.before_llm": + return {"action": "continue"} + if method == "hook.after_llm": + return {"action": "continue"} + if method == "hook.after_tool": + return {"action": "continue"} + raise KeyError(f"method not found: {method}") + + +def main() -> int: + try: + for raw_line in sys.stdin: + line = raw_line.strip() + if not line: + continue + + try: + message = json.loads(line) + except json.JSONDecodeError as exc: + log_stderr(f"failed to decode request: {exc}") + append_log({ + "direction": "in", + "decode_error": str(exc), + "raw": line, + }) + continue + + method = message.get("method") + message_id = message.get("id", 0) + params = message.get("params") or {} + if not isinstance(params, dict): + params = {} + + append_log({ + "direction": "in", + "id": message_id, + "method": method, + "params": params, + "notification": not bool(message_id), + }) + + if not message_id: + if method == "hook.event" and LOG_EVENTS: + log_stderr(f"observed event: {params.get('Kind')}") + continue + + try: + result = handle_request(str(method or ""), params) + except KeyError as exc: + send_response(int(message_id), error=str(exc)) + continue + except Exception as exc: + send_response(int(message_id), error=f"unexpected error: {exc}") + continue + + send_response(int(message_id), result=result) + except KeyboardInterrupt: + return 0 + + return 0 + + +if __name__ == "__main__": + signal.signal(signal.SIGINT, handle_shutdown_signal) + signal.signal(signal.SIGTERM, handle_shutdown_signal) + raise SystemExit(main()) +``` + +### Configuration + +```json +{ + "hooks": { + "enabled": true, + "processes": { + "py_review_gate": { + "enabled": true, + "priority": 100, + "transport": "stdio", + "command": [ + "python3", + "/abs/path/to/review_gate.py" + ], + "observe": [ + "tool_exec_start", + "tool_exec_end", + "tool_exec_skipped" + ], + "intercept": [ + "before_tool", + "approve_tool" + ], + "env": { + "PICOCLAW_HOOK_LOG_FILE": "/tmp/picoclaw-hook-review-gate.log" + } + } + } + } +} +``` + +### Environment Variables + +- `PICOCLAW_HOOK_LOG_EVENTS` + Whether to write `hook.event` summaries to `stderr`, enabled by default +- `PICOCLAW_HOOK_LOG_FILE` + Path to an external log file. When set, the script appends inbound hook requests, notifications, and outbound responses as JSON Lines + +Note: `PICOCLAW_HOOK_LOG_FILE` has no default. If you do not set it, the script does not write any file logs. + +### How To Confirm It Received Hooks + +Watch two places: + +- Gateway logs + Useful for confirming that the host successfully started the process and for seeing event summaries written to `stderr` +- `PICOCLAW_HOOK_LOG_FILE` + Useful for seeing the exact requests the script received and the exact responses it returned + +Typical interpretation: + +- Only `hook.hello` + The process started and completed the handshake, but no business hook request has arrived yet +- `hook.event` + The `observe` configuration is working +- `hook.before_tool` + The `intercept: ["before_tool", ...]` configuration is working +- `hook.approve_tool` + The approval hook path is working + +Because this example never rewrites or denies, the expected responses look like: + +```json +{"direction":"out","id":7,"response":{"action":"continue"},"error":null} +{"direction":"out","id":8,"response":{"approved":true},"error":null} +``` + +A complete sample: + +```json +{"ts":"2026-03-21T14:12:00+00:00","direction":"in","id":1,"method":"hook.hello","params":{"name":"py_review_gate","version":1,"modes":["observe","tool","approve"]},"notification":false} +{"ts":"2026-03-21T14:12:00+00:00","direction":"out","id":1,"response":{"ok":true,"name":"python-review-gate"},"error":null} +{"ts":"2026-03-21T14:12:05+00:00","direction":"in","id":0,"method":"hook.event","params":{"Kind":"tool_exec_start"},"notification":true} +{"ts":"2026-03-21T14:12:05+00:00","direction":"in","id":7,"method":"hook.before_tool","params":{"tool":"echo_text","arguments":{"text":"hello"}},"notification":false} +{"ts":"2026-03-21T14:12:05+00:00","direction":"out","id":7,"response":{"action":"continue"},"error":null} +``` + +Additional notes: + +- Timestamps are UTC +- `notification=true` means it was a notification such as `hook.event`, which does not expect a response +- `id` increases within a single hook process; if the process restarts, the counter starts over + +## Process-Hook Protocol + +Current process hooks use `JSON-RPC over stdio`: + +- PicoClaw starts the external process +- Requests and responses are exchanged as one JSON message per line +- `hook.event` is a notification and does not need a response +- `hook.before_llm`, `hook.after_llm`, `hook.before_tool`, `hook.after_tool`, and `hook.approve_tool` are request/response calls + +The host does not currently accept new RPCs initiated by the process hook. In practice, that means an external hook can only respond to PicoClaw calls; it cannot call back into the host to send channel messages. + +## Configuration Fields + +### `hooks.builtins.` + +- `enabled` +- `priority` +- `config` + +### `hooks.processes.` + +- `enabled` +- `priority` +- `transport` + Currently only `stdio` is supported +- `command` +- `dir` +- `env` +- `observe` +- `intercept` + +## Troubleshooting + +If a hook looks like it is not firing, check these in order: + +1. `hooks.enabled` +2. Whether the target builtin or process hook is `enabled` +3. Whether the process-hook `command` path is correct +4. Whether you are watching the correct log file +5. Whether the current request actually reached the stage you care about +6. Whether `observe` or `intercept` contains the hook point you want + +A practical minimal troubleshooting pair is: + +- Use the Python process-hook example from this document to validate the external protocol +- Use the Go in-process example from this document to validate the host-side chain + +If the Python side shows `hook.hello` but no business hook requests, the protocol is usually fine; the current request simply did not trigger the stage you expected. + +## Scope And Limits + +The current hook system is best suited for: + +- LLM request rewriting +- Tool argument normalization +- Pre-execution tool approval +- Auditing and observability + +It is not yet well suited for: + +- External hooks actively sending channel messages +- Suspending a turn and waiting for human approval replies +- Full inbound/outbound message interception across the whole platform + +If you want a real human approval workflow, use hooks as the approval entry point and keep the state machine plus channel interaction in a separate `ApprovalManager`. diff --git a/docs/hooks/README.zh.md b/docs/hooks/README.zh.md new file mode 100644 index 000000000..46c7c9392 --- /dev/null +++ b/docs/hooks/README.zh.md @@ -0,0 +1,679 @@ +# Hook 系统使用说明 + +这份文档对应当前仓库里已经实现的 hook 系统,而不是设计草案。 + +当前实现支持两类挂载方式: + +1. 进程内 hook +2. 进程外 process hook(`JSON-RPC over stdio`) + +当前仓库不再内置示例代码文件。下面的 Go / Python 示例都直接写在本文档里;如果你要使用它们,需要先复制到你自己的文件路径。 + +## 支持的 hook 类型 + +| 类型 | 接口 | 作用阶段 | 能否改写 | +| --- | --- | --- | --- | +| 观察型 | `EventObserver` | EventBus 广播事件时 | 否 | +| LLM 拦截型 | `LLMInterceptor` | `before_llm` / `after_llm` | 是 | +| Tool 拦截型 | `ToolInterceptor` | `before_tool` / `after_tool` | 是 | +| Tool 审批型 | `ToolApprover` | `approve_tool` | 否,返回批准/拒绝 | + +当前公开的同步点位只有: + +- `before_llm` +- `after_llm` +- `before_tool` +- `after_tool` +- `approve_tool` + +其余 lifecycle 通过事件形式只读暴露。 + +## 执行顺序 + +HookManager 的排序规则是: + +1. 先执行进程内 hook +2. 再执行 process hook +3. 同一来源内按 `priority` 从小到大 +4. 若 `priority` 相同,再按名字排序 + +## 超时 + +当前配置在 `hooks.defaults` 中统一设置: + +- `observer_timeout_ms` +- `interceptor_timeout_ms` +- `approval_timeout_ms` + +注意:当前实现还没有单个 process hook 自己的 `timeout_ms` 字段,超时配置是全局默认值。 + +## 快速开始 + +如果你的目标只是先把当前 hook 流程跑通并观察到实际请求,最省事的是先用下面的 Python process hook 示例: + +1. 打开 `hooks.enabled` +2. 把下面文档里的 Python 示例保存到本地文件,例如 `/tmp/review_gate.py` +3. 给它配置 `PICOCLAW_HOOK_LOG_FILE` +4. 重启 gateway +5. 用 `tail -f` 观察日志文件 + +例如: + +```json +{ + "hooks": { + "enabled": true, + "processes": { + "py_review_gate": { + "enabled": true, + "priority": 100, + "transport": "stdio", + "command": [ + "python3", + "/tmp/review_gate.py" + ], + "observe": [ + "tool_exec_start", + "tool_exec_end", + "tool_exec_skipped" + ], + "intercept": [ + "before_tool", + "approve_tool" + ], + "env": { + "PICOCLAW_HOOK_LOG_FILE": "/tmp/picoclaw-hook-review-gate.log" + } + } + } + } +} +``` + +观察方式: + +```bash +tail -f /tmp/picoclaw-hook-review-gate.log +``` + +如果你是在开发 PicoClaw 本体,而不是只想验证协议,那么再看后面的 Go in-process 示例。 + +## 两个示例的定位 + +- Go in-process 示例 + 适合验证宿主内的 hook 链路、理解 `MountHook()` 和各个同步点位 +- Python process 示例 + 适合理解 `JSON-RPC over stdio` 协议、确认宿主和外部进程之间的消息来回是否正常 + +这两个示例都刻意保持为“只记录、不改写、不拒绝”的安全模式。它们的目的不是提供策略能力,而是帮你观察当前 hook 系统。 + +## Go 进程内示例 + +下面这段代码是一个最小的“记录型” in-process hook。它实现了: + +1. `EventObserver` +2. `LLMInterceptor` +3. `ToolInterceptor` +4. `ToolApprover` + +它只记录,不改写请求,也不拒绝工具。 + +你可以把它保存成你自己的 Go 文件,例如 `pkg/myhooks/example_logger.go`: + +```go +package myhooks + +import ( + "context" + "encoding/json" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/sipeed/picoclaw/pkg/agent" + "github.com/sipeed/picoclaw/pkg/logger" +) + +type ExampleLoggerHookOptions struct { + LogFile string `json:"log_file,omitempty"` + LogEvents bool `json:"log_events,omitempty"` +} + +type ExampleLoggerHook struct { + logFile string + logEvents bool + mu sync.Mutex +} + +func NewExampleLoggerHook(opts ExampleLoggerHookOptions) *ExampleLoggerHook { + return &ExampleLoggerHook{ + logFile: strings.TrimSpace(opts.LogFile), + logEvents: opts.LogEvents, + } +} + +func (h *ExampleLoggerHook) OnEvent(ctx context.Context, evt agent.Event) error { + _ = ctx + if h == nil || !h.logEvents { + return nil + } + h.record("event", evt.Meta, map[string]any{ + "event": evt.Kind.String(), + "payload": evt.Payload, + }, nil) + return nil +} + +func (h *ExampleLoggerHook) BeforeLLM( + ctx context.Context, + req *agent.LLMHookRequest, +) (*agent.LLMHookRequest, agent.HookDecision, error) { + _ = ctx + h.record("before_llm", req.Meta, req, agent.HookDecision{Action: agent.HookActionContinue}) + return req, agent.HookDecision{Action: agent.HookActionContinue}, nil +} + +func (h *ExampleLoggerHook) AfterLLM( + ctx context.Context, + resp *agent.LLMHookResponse, +) (*agent.LLMHookResponse, agent.HookDecision, error) { + _ = ctx + h.record("after_llm", resp.Meta, resp, agent.HookDecision{Action: agent.HookActionContinue}) + return resp, agent.HookDecision{Action: agent.HookActionContinue}, nil +} + +func (h *ExampleLoggerHook) BeforeTool( + ctx context.Context, + call *agent.ToolCallHookRequest, +) (*agent.ToolCallHookRequest, agent.HookDecision, error) { + _ = ctx + h.record("before_tool", call.Meta, call, agent.HookDecision{Action: agent.HookActionContinue}) + return call, agent.HookDecision{Action: agent.HookActionContinue}, nil +} + +func (h *ExampleLoggerHook) AfterTool( + ctx context.Context, + result *agent.ToolResultHookResponse, +) (*agent.ToolResultHookResponse, agent.HookDecision, error) { + _ = ctx + h.record("after_tool", result.Meta, result, agent.HookDecision{Action: agent.HookActionContinue}) + return result, agent.HookDecision{Action: agent.HookActionContinue}, nil +} + +func (h *ExampleLoggerHook) ApproveTool( + ctx context.Context, + req *agent.ToolApprovalRequest, +) (agent.ApprovalDecision, error) { + _ = ctx + decision := agent.ApprovalDecision{Approved: true} + h.record("approve_tool", req.Meta, req, decision) + return decision, nil +} + +func (h *ExampleLoggerHook) record(stage string, meta agent.EventMeta, payload any, decision any) { + logger.InfoCF("hooks", "Example hook observed", map[string]any{ + "stage": stage, + }) + if h == nil || h.logFile == "" { + return + } + + entry := map[string]any{ + "ts": time.Now().UTC(), + "stage": stage, + "meta": meta, + "payload": payload, + "decision": decision, + } + + body, err := json.Marshal(entry) + if err != nil { + logger.WarnCF("hooks", "Example hook log encode failed", map[string]any{ + "stage": stage, + "error": err.Error(), + }) + return + } + + h.mu.Lock() + defer h.mu.Unlock() + + if dir := filepath.Dir(h.logFile); dir != "" && dir != "." { + if err := os.MkdirAll(dir, 0o755); err != nil { + logger.WarnCF("hooks", "Example hook log mkdir failed", map[string]any{ + "stage": stage, + "path": h.logFile, + "error": err.Error(), + }) + return + } + } + + file, err := os.OpenFile(h.logFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) + if err != nil { + logger.WarnCF("hooks", "Example hook log open failed", map[string]any{ + "stage": stage, + "path": h.logFile, + "error": err.Error(), + }) + return + } + defer func() { _ = file.Close() }() + + if _, err := file.Write(append(body, '\n')); err != nil { + logger.WarnCF("hooks", "Example hook log write failed", map[string]any{ + "stage": stage, + "path": h.logFile, + "error": err.Error(), + }) + } +} +``` + +### 如何挂载 + +如果你只需要代码挂载,直接在 `AgentLoop` 初始化后调用: + +```go +hook := myhooks.NewExampleLoggerHook(myhooks.ExampleLoggerHookOptions{ + LogFile: "/tmp/picoclaw-hook-example-logger.log", + LogEvents: true, +}) + +if err := al.MountHook(agent.NamedHook("example-logger", hook)); err != nil { + panic(err) +} +``` + +### 如果你还想用配置挂载 + +当前 hook 系统支持 builtin hook,但这要求你自己把 factory 编进二进制。也就是说,下面这段注册代码需要和上面的 hook 定义一起放进你的工程里: + +```go +package myhooks + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/sipeed/picoclaw/pkg/agent" + "github.com/sipeed/picoclaw/pkg/config" +) + +func init() { + if err := agent.RegisterBuiltinHook("example_logger", func( + ctx context.Context, + spec config.BuiltinHookConfig, + ) (any, error) { + _ = ctx + + var opts ExampleLoggerHookOptions + if len(spec.Config) > 0 { + if err := json.Unmarshal(spec.Config, &opts); err != nil { + return nil, fmt.Errorf("decode example_logger config: %w", err) + } + } + return NewExampleLoggerHook(opts), nil + }); err != nil { + panic(err) + } +} +``` + +只有在你自己注册了 builtin 之后,下面的配置才会生效: + +```json +{ + "hooks": { + "enabled": true, + "builtins": { + "example_logger": { + "enabled": true, + "priority": 10, + "config": { + "log_file": "/tmp/picoclaw-hook-example-logger.log", + "log_events": true + } + } + } + } +} +``` + +### 如何观察它是否生效 + +- 如果设置了 `log_file`,它会把每次 hook 调用按 JSON Lines 写入文件 +- 如果没有设置 `log_file`,它仍然会把摘要写到 gateway 日志 +- 普通只走 LLM 的请求,通常会看到 `before_llm` 和 `after_llm` +- 触发工具调用的请求,通常还会看到 `before_tool`、`approve_tool`、`after_tool` +- 如果 `log_events=true`,还会额外看到 `event` + +典型日志: + +```json +{"ts":"2026-03-21T14:10:00Z","stage":"before_tool","meta":{"session_key":"session-1"},"payload":{"tool":"echo_text","arguments":{"text":"hello"}},"decision":{"action":"continue"}} +{"ts":"2026-03-21T14:10:00Z","stage":"approve_tool","meta":{"session_key":"session-1"},"payload":{"tool":"echo_text","arguments":{"text":"hello"}},"decision":{"approved":true}} +``` + +如果你只看到了 `before_llm` / `after_llm`,没有看到 tool 相关阶段,通常不是 hook 没挂上,而是这次请求本身没有触发工具调用。 + +## Python process hook 示例 + +下面这段脚本是一个最小的 `process hook` 示例。它只使用 Python 标准库,支持: + +1. `hook.hello` +2. `hook.event` +3. `hook.before_tool` +4. `hook.approve_tool` + +它默认只记录,不改写,也不拒绝。 + +你可以把它保存到任意本地路径,例如 `/tmp/review_gate.py`: + +```python +#!/usr/bin/env python3 +from __future__ import annotations + +import json +import os +import signal +import sys +from datetime import datetime, timezone +from typing import Any + +LOG_EVENTS = os.getenv("PICOCLAW_HOOK_LOG_EVENTS", "1").lower() not in {"0", "false", "no"} +LOG_FILE = os.getenv("PICOCLAW_HOOK_LOG_FILE", "").strip() + + +def append_log(entry: dict[str, Any]) -> None: + if not LOG_FILE: + return + + payload = { + "ts": datetime.now(timezone.utc).isoformat(), + **entry, + } + try: + log_dir = os.path.dirname(LOG_FILE) + if log_dir: + os.makedirs(log_dir, exist_ok=True) + with open(LOG_FILE, "a", encoding="utf-8") as handle: + handle.write(json.dumps(payload, ensure_ascii=True) + "\n") + except OSError as exc: + log_stderr(f"failed to write hook log file {LOG_FILE}: {exc}") + + +def send_response(message_id: int, result: Any | None = None, error: str | None = None) -> None: + payload: dict[str, Any] = { + "jsonrpc": "2.0", + "id": message_id, + } + if error is not None: + payload["error"] = {"code": -32000, "message": error} + else: + payload["result"] = result if result is not None else {} + + append_log({ + "direction": "out", + "id": message_id, + "response": payload.get("result"), + "error": payload.get("error"), + }) + + try: + sys.stdout.write(json.dumps(payload, ensure_ascii=True) + "\n") + sys.stdout.flush() + except BrokenPipeError: + raise SystemExit(0) from None + + +def log_stderr(message: str) -> None: + try: + sys.stderr.write(message + "\n") + sys.stderr.flush() + except BrokenPipeError: + raise SystemExit(0) from None + + +def handle_shutdown_signal(signum: int, _frame: Any) -> None: + raise KeyboardInterrupt(f"received signal {signum}") + + +def handle_before_tool(params: dict[str, Any]) -> dict[str, Any]: + _ = params + return {"action": "continue"} + + +def handle_approve_tool(params: dict[str, Any]) -> dict[str, Any]: + _ = params + return {"approved": True} + + +def handle_request(method: str, params: dict[str, Any]) -> dict[str, Any]: + if method == "hook.hello": + return {"ok": True, "name": "python-review-gate"} + if method == "hook.before_tool": + return handle_before_tool(params) + if method == "hook.approve_tool": + return handle_approve_tool(params) + if method == "hook.before_llm": + return {"action": "continue"} + if method == "hook.after_llm": + return {"action": "continue"} + if method == "hook.after_tool": + return {"action": "continue"} + raise KeyError(f"method not found: {method}") + + +def main() -> int: + try: + for raw_line in sys.stdin: + line = raw_line.strip() + if not line: + continue + + try: + message = json.loads(line) + except json.JSONDecodeError as exc: + log_stderr(f"failed to decode request: {exc}") + append_log({ + "direction": "in", + "decode_error": str(exc), + "raw": line, + }) + continue + + method = message.get("method") + message_id = message.get("id", 0) + params = message.get("params") or {} + if not isinstance(params, dict): + params = {} + + append_log({ + "direction": "in", + "id": message_id, + "method": method, + "params": params, + "notification": not bool(message_id), + }) + + if not message_id: + if method == "hook.event" and LOG_EVENTS: + log_stderr(f"observed event: {params.get('Kind')}") + continue + + try: + result = handle_request(str(method or ""), params) + except KeyError as exc: + send_response(int(message_id), error=str(exc)) + continue + except Exception as exc: + send_response(int(message_id), error=f"unexpected error: {exc}") + continue + + send_response(int(message_id), result=result) + except KeyboardInterrupt: + return 0 + + return 0 + + +if __name__ == "__main__": + signal.signal(signal.SIGINT, handle_shutdown_signal) + signal.signal(signal.SIGTERM, handle_shutdown_signal) + raise SystemExit(main()) +``` + +### 如何配置 + +```json +{ + "hooks": { + "enabled": true, + "processes": { + "py_review_gate": { + "enabled": true, + "priority": 100, + "transport": "stdio", + "command": [ + "python3", + "/abs/path/to/review_gate.py" + ], + "observe": [ + "tool_exec_start", + "tool_exec_end", + "tool_exec_skipped" + ], + "intercept": [ + "before_tool", + "approve_tool" + ], + "env": { + "PICOCLAW_HOOK_LOG_FILE": "/tmp/picoclaw-hook-review-gate.log" + } + } + } + } +} +``` + +### 环境变量 + +- `PICOCLAW_HOOK_LOG_EVENTS` + 是否把 `hook.event` 写到 `stderr`,默认开启 +- `PICOCLAW_HOOK_LOG_FILE` + 外部日志文件路径。设置后,脚本会把收到的 hook 请求、notification 和返回结果按 JSON Lines 追加到该文件 + +注意:`PICOCLAW_HOOK_LOG_FILE` 没有默认值。不设置时,脚本不会自动落盘日志。 + +### 如何确认它收到了 hook + +推荐同时看两个地方: + +- gateway 日志 + 用来观察宿主是否成功启动了外部进程,以及脚本写到 `stderr` 的事件摘要 +- `PICOCLAW_HOOK_LOG_FILE` + 用来观察脚本实际收到了什么请求、返回了什么响应 + +典型判断方式: + +- 只看到 `hook.hello` + 说明进程启动并完成握手了,但还没有新的业务 hook 请求真正打进来 +- 看到 `hook.event` + 说明 `observe` 配置生效了 +- 看到 `hook.before_tool` + 说明 `intercept: ["before_tool", ...]` 生效了 +- 看到 `hook.approve_tool` + 说明审批 hook 生效了 + +这份示例脚本不会改写任何参数,也不会拒绝工具,所以你应该看到的典型返回是: + +```json +{"direction":"out","id":7,"response":{"action":"continue"},"error":null} +{"direction":"out","id":8,"response":{"approved":true},"error":null} +``` + +一组完整样例: + +```json +{"ts":"2026-03-21T14:12:00+00:00","direction":"in","id":1,"method":"hook.hello","params":{"name":"py_review_gate","version":1,"modes":["observe","tool","approve"]},"notification":false} +{"ts":"2026-03-21T14:12:00+00:00","direction":"out","id":1,"response":{"ok":true,"name":"python-review-gate"},"error":null} +{"ts":"2026-03-21T14:12:05+00:00","direction":"in","id":0,"method":"hook.event","params":{"Kind":"tool_exec_start"},"notification":true} +{"ts":"2026-03-21T14:12:05+00:00","direction":"in","id":7,"method":"hook.before_tool","params":{"tool":"echo_text","arguments":{"text":"hello"}},"notification":false} +{"ts":"2026-03-21T14:12:05+00:00","direction":"out","id":7,"response":{"action":"continue"},"error":null} +``` + +补充说明: + +- 时间戳是 UTC,不是本地时区 +- `notification=true` 表示这是 `hook.event` 这类不需要响应的通知 +- `id` 会随着当前进程内的请求递增;如果 hook 进程重启,计数会重新开始 + +## Process Hook 协议约定 + +当前 process hook 使用 `JSON-RPC over stdio`: + +- PicoClaw 启动外部进程 +- 请求和响应都按“一行一个 JSON 消息”传输 +- `hook.event` 是 notification,不需要响应 +- `hook.before_llm` / `hook.after_llm` / `hook.before_tool` / `hook.after_tool` / `hook.approve_tool` 是 request/response + +当前宿主不会接受 process hook 主动发起的新 RPC。也就是说,外部 hook 现在只能“响应 PicoClaw 的调用”,不能反向调用宿主去发送 channel 消息。 + +## 配置字段 + +### `hooks.builtins.` + +- `enabled` +- `priority` +- `config` + +### `hooks.processes.` + +- `enabled` +- `priority` +- `transport` + 当前只支持 `stdio` +- `command` +- `dir` +- `env` +- `observe` +- `intercept` + +## 排查建议 + +当你觉得“hook 没触发”时,优先按这个顺序排查: + +1. `hooks.enabled` 是否为 `true` +2. 对应的 builtin/process hook 是否 `enabled` +3. process hook 的 `command` 路径是否正确 +4. 你看的是否是正确的日志文件 +5. 当前请求是否真的走到了对应阶段 +6. `observe` / `intercept` 是否包含了你想看的点位 + +一个很实用的最小排查组合是: + +- 先用文档里的 Python process 示例确认外部协议没问题 +- 再用文档里的 Go in-process 示例确认宿主内的 hook 链路没问题 + +如果前者有 `hook.hello` 但没有业务请求,通常不是协议挂了,而是当前这次请求没有真正触发对应的 hook 点位。 + +## 适用边界 + +当前 hook 系统最适合做这些事: + +- LLM 请求改写 +- 工具参数规范化 +- 工具执行前审批 +- 审计和观测 + +当前还不适合直接承载这些需求: + +- 外部 hook 主动发 channel 消息 +- 挂起 turn 并等待人工审批回复 +- inbound/outbound 全链路消息拦截 + +如果你要做人审流转,推荐把 hook 作为审批入口,把审批状态机和 channel 交互放到独立的 `ApprovalManager`。