Merge branch 'main' into feat/skill-channel-commands

# Conflicts:
#	pkg/agent/loop.go
This commit is contained in:
afjcjsbx
2026-03-22 20:51:16 +01:00
62 changed files with 16762 additions and 708 deletions
+164
View File
@@ -0,0 +1,164 @@
# Context
## What this document covers
This document makes explicit the boundaries of context management in the agent loop:
- what fills the context window and how space is divided
- what is stored in session history vs. built at request time
- when and how context compression happens
- how token budgets are estimated
These are existing concepts. This document clarifies their boundaries rather than introducing new ones.
---
## Context window regions
The context window is the model's total input capacity. Four regions fill it:
| Region | Assembled by | Stored in session? |
|---|---|---|
| System prompt | `BuildMessages()` — static + dynamic parts | No |
| Summary | `SetSummary()` stores it; `BuildMessages()` injects it | Separate from history |
| Session history | User / assistant / tool messages | Yes |
| Tool definitions | Provider adapter injects at call time | No |
`MaxTokens` (the output generation limit) must also be reserved from the total budget.
The available space for history is therefore:
```
history_budget = ContextWindow - system_prompt - summary - tool_definitions - MaxTokens
```
---
## ContextWindow vs MaxTokens
These serve different purposes:
- **MaxTokens** — maximum tokens the LLM may generate in one response. Sent as the `max_tokens` request parameter.
- **ContextWindow** — the model's total input context capacity.
These were previously set to the same value, which caused the summarization threshold to fire either far too early (at the default 32K) or not at all (when a user raised `max_tokens`).
Current default when not explicitly configured: `ContextWindow = MaxTokens * 4`.
---
## Session history
Session history stores only conversation messages:
- `user` — user input
- `assistant` — LLM response (may include `ToolCalls`)
- `tool` — tool execution results
Session history does **not** contain:
- System prompts — assembled at request time by `BuildMessages`
- Summary content — stored separately via `SetSummary`, injected by `BuildMessages`
This distinction matters: any code that operates on session history — compression, boundary detection, token estimation — must not assume a system message is present.
---
## Turn
A **Turn** is one complete cycle:
> user message -> LLM iterations (possibly including tool calls) -> final assistant response
This definition comes from the agent loop design (#1316). In session history, Turn boundaries are identified by `user`-role messages.
Turn is the atomic unit for compression. Cutting inside a Turn can orphan tool-call sequences — an assistant message with `ToolCalls` separated from its corresponding `tool` results. Compressing at Turn boundaries avoids this by construction.
`parseTurnBoundaries(history)` returns the starting index of each Turn.
`findSafeBoundary(history, targetIndex)` snaps a target cut point to the nearest Turn boundary.
---
## Compression paths
Three compression paths exist, in order of preference:
### 1. Async summarization
`maybeSummarize` runs after each Turn completes.
Triggers when message count exceeds a threshold, or when estimated history tokens exceed a percentage of `ContextWindow`. If triggered, a background goroutine calls the LLM to produce a summary of the oldest messages. The summary is stored via `SetSummary`; `BuildMessages` injects it into the system prompt on the next call.
Cut point uses `findSafeBoundary` so no Turn is split.
### 2. Proactive budget check
`isOverContextBudget` runs before each LLM call.
Uses the full budget formula: `message_tokens + tool_def_tokens + MaxTokens > ContextWindow`. If over budget, triggers `forceCompression` and rebuilds messages before calling the LLM.
This prevents wasted (and billed) LLM calls that would otherwise fail with a context-window error.
### 3. Emergency compression (reactive)
`forceCompression` runs when the LLM returns a context-window error despite the proactive check.
Drops the oldest ~50% of Turns. If the history is a single Turn with no safe split point (e.g. one user message followed by a massive tool response), falls back to keeping only the most recent user message — breaking Turn atomicity as a last resort to avoid a context-exceeded loop.
Stores a compression note in the session summary (not in history messages) so `BuildMessages` can include it in the next system prompt.
This is the fallback for when the token estimate undershoots reality.
---
## Token estimation
Estimation uses a heuristic of ~2.5 characters per token (`chars * 2 / 5`).
`estimateMessageTokens` counts:
- `Content` (rune count, for multibyte correctness)
- `ReasoningContent` (extended thinking / chain-of-thought)
- `ToolCalls` — ID, type, function name, arguments
- `ToolCallID` (tool result metadata)
- Per-message overhead (role label, JSON structure)
- `Media` items — flat per-item token estimate, added directly to the final count (not through the character heuristic, since actual cost depends on resolution and provider-specific image tokenization)
`estimateToolDefsTokens` counts tool definition overhead: name, description, JSON schema of parameters.
These are deliberately heuristic. The proactive check handles the common case; the reactive path catches estimation errors.
---
## Interface boundaries
Context budget functions (`parseTurnBoundaries`, `findSafeBoundary`, `estimateMessageTokens`, `isOverContextBudget`) are **pure functions**. They take `[]providers.Message` and integer parameters. They have no dependency on `AgentLoop` or any other runtime struct.
`BuildMessages` is the sole assembler of the final message array sent to the LLM. Budget functions inform compression decisions but do not construct messages.
`forceCompression` and `summarizeSession` mutate session state (history and summary). `BuildMessages` reads that state to construct context. The flow is:
```
budget check --> compression decision --> mutate session --> BuildMessages reads session --> LLM call
```
---
## Known gaps
These are recognized limitations in the current implementation, documented here for visibility:
- **Summarization trigger does not use the full budget formula.** `maybeSummarize` compares estimated history tokens against a percentage of `ContextWindow`. It does not account for system prompt size, tool definition overhead, or `MaxTokens` reserve. The proactive check covers the critical path (preventing 400 errors), but the summarization trigger could be aligned with the same budget model for more accurate early compression.
- **Token estimation is heuristic.** It does not account for provider-specific tokenization, exact system prompt size (assembled separately), or variable image token costs. The two-path design (proactive + reactive) is intended to tolerate this imprecision.
- **Reactive retry does not preserve media.** When the reactive path rebuilds context after compression, it currently passes empty values for media references. This is a pre-existing issue in the main loop, not introduced by the budget system.
---
## What this document does not cover
- How `AGENT.md` frontmatter configures context parameters — that is part of the Agent definition work
- How the context builder assembles context in the new architecture — that is upcoming work
- How compression events surface through the event system — that is part of the event model (#1316)
- Subagent context isolation — that is a separate track
+476
View File
@@ -0,0 +1,476 @@
# PicoClaw Hook 系统设计(基于 `refactor/agent`
## 背景
本设计围绕两个议题展开:
- `#1316`:把 agent loop 重构为事件驱动、可中断、可追加、可观测
- `#1796`:在 EventBus 稳定后,把 hooks 设计为 EventBus 的 consumer,而不是重新发明一套事件模型
当前分支已经完成了第一步里的“事件系统基础”,但还没有真正的 hook 挂载层。因此这里的目标不是重新设计 event,而是在已有实现上补出一层可扩展、可拦截、可外挂的 HookManager。
## 外部项目对比
### OpenClaw
OpenClaw 的扩展能力分成三层:
- Internal hooks:目录发现,运行在 Gateway 进程内
- Plugin hooks:插件在运行时注册 hook,也在进程内
- Webhooks:外部系统通过 HTTP 触发 Gateway 动作,属于进程外
值得借鉴的点:
- 有“项目内挂载”和“项目外挂载”两种路径
- hook 是配置驱动,可启停
- 外部入口有明确的安全边界和映射层
不建议直接照搬的点:
- OpenClaw 的 hooks / plugin hooks / webhooks 是三套路由,PicoClaw 当前体量下会偏重
- HTTP webhook 更适合“事件进入系统”,不适合作为“可同步拦截 agent loop”的基础机制
### pi-mono
pi-mono 的核心思路更接近当前分支:
- 扩展统一为 extension API
- 事件分为观察型和可变更型
- 某些阶段允许 `transform` / `block` / `replace`
- 扩展代码主要是进程内执行
- RPC mode 把 UI 交互桥接到进程外客户端
值得借鉴的点:
- 不把“观察”和“拦截”混成一个接口
- 允许返回结构化动作,而不是只有回调
- 进程外通信只暴露必要协议,不把整个内部对象图泄露出去
## 当前分支现状
### 已有能力
当前分支已经具备 hook 系统的地基:
- `pkg/agent/events.go` 定义了稳定的 `EventKind``EventMeta` 和 payload
- `pkg/agent/eventbus.go` 提供了非阻塞 fan-out 的 `EventBus`
- `pkg/agent/loop.go` 中的 `runTurn()` 已在 turn、llm、tool、interrupt、follow-up、summary 等节点发射事件
- `pkg/agent/steering.go` 已支持 steering、graceful interrupt、hard abort
- `pkg/agent/turn.go` 已维护 turn phase、恢复点、active turn、abort 状态
### 现有缺口
当前分支还缺四件事:
- 没有 HookManager,只有 EventBus
- 没有 Before/After LLM、Before/After Tool 这种同步拦截点
- 没有审批型 hook
- 子 agent 仍走 `pkg/tools/SubagentManager + RunToolLoop`,没有接入 `pkg/agent` 的 turn tree 和事件流
### 一个关键现实
`#1316` 文案里提到“只读并行、写入串行”的工具执行策略,但当前 `runTurn()` 实现已经先收敛成“顺序执行 + 每个工具后检查 steering / interrupt”。因此 hook 设计不应依赖未来的并行模型,而应该先兼容当前顺序执行,再为以后增加 `ReadOnlyIndicator` 留口子。
## 设计原则
- Hook 必须建立在 `pkg/agent` 的 EventBus 和 turn 上下文之上
- EventBus 负责广播,HookManager 负责拦截,两者职责分离
- 项目内挂载要简单,项目外挂载必须走 IPC
- 观察型 hook 不能阻塞 loop;拦截型 hook 必须有超时
- 先覆盖主 turn,不把 sub-turn 一次做满
- 不新增第二套用户事件命名系统,优先复用 `EventKind.String()`
## 总体架构
分成三层:
1. `EventBus`
负责广播只读事件,现有实现直接复用
2. `HookManager`
负责管理 hook、排序、超时、错误隔离,并在 `runTurn()` 的明确检查点执行同步拦截
3. `HookMount`
负责两种挂载方式:
- 进程内 Go hook
- 进程外 IPC hook
换句话说:
- EventBus 是“发生了什么”
- HookManager 是“谁能介入”
- HookMount 是“这些 hook 从哪里来”
## Hook 分类
不建议把所有 hook 都设计成 `OnEvent(evt)`
建议拆成两类。
### 1. 观察型
只消费事件,不修改流程:
```go
type EventObserver interface {
OnEvent(ctx context.Context, evt agent.Event) error
}
```
这类 hook 直接订阅 EventBus 即可。
适用场景:
- 审计日志
- 指标上报
- 调试 trace
- 将事件转发给外部 UI / TUI / Web 面板
### 2. 拦截型
只在少数明确节点触发,允许返回动作:
```go
type LLMInterceptor interface {
BeforeLLM(ctx context.Context, req *LLMRequest) HookDecision[*LLMRequest]
AfterLLM(ctx context.Context, resp *LLMResponse) HookDecision[*LLMResponse]
}
type ToolInterceptor interface {
BeforeTool(ctx context.Context, call *ToolCall) HookDecision[*ToolCall]
AfterTool(ctx context.Context, result *ToolResultView) HookDecision[*ToolResultView]
}
type ToolApprover interface {
ApproveTool(ctx context.Context, req *ToolApprovalRequest) ApprovalDecision
}
```
这里的 `HookDecision` 统一支持:
- `continue`
- `modify`
- `deny_tool`
- `abort_turn`
- `hard_abort`
## 对外暴露的最小 hook 面
V1 不需要把所有 EventKind 都变成可拦截点。
建议只开放这些同步 hook
- `before_llm`
- `after_llm`
- `before_tool`
- `after_tool`
- `approve_tool`
其余节点继续作为只读事件暴露:
- `turn_start`
- `turn_end`
- `llm_request`
- `llm_response`
- `tool_exec_start`
- `tool_exec_end`
- `tool_exec_skipped`
- `steering_injected`
- `follow_up_queued`
- `interrupt_received`
- `context_compress`
- `session_summarize`
- `error`
`subturn_*` 在 V1 中保留名字,但不承诺一定触发,直到子 turn 迁移完成。
## 项目内挂载
内部挂载必须尽量低摩擦。
建议提供两种等价方式,底层都走 HookManager。
### 方式 A:代码显式挂载
```go
al.MountHook(hooks.Named("audit", &AuditHook{}))
```
适用于:
- 仓内内建 hook
- 单元测试
- feature flag 控制
### 方式 B:内建 registry
```go
func init() {
hooks.RegisterBuiltin("audit", func() hooks.Hook {
return &AuditHook{}
})
}
```
启动时根据配置启用:
```json
{
"hooks": {
"builtins": {
"audit": { "enabled": true }
}
}
}
```
这比 OpenClaw 的目录扫描更轻,也更贴合 Go 项目。
## 项目外挂载
这是本设计的硬要求。
建议 V1 采用:
- `JSON-RPC over stdio`
原因:
- 跨平台最简单
- 不依赖额外端口
- 非常适合“由 PicoClaw 启动一个外部 hook 进程”
- 比 HTTP webhook 更适合同步拦截
### 外部 hook 进程模型
PicoClaw 启动外部进程,并在其 stdin/stdout 上跑协议。
配置示例:
```json
{
"hooks": {
"processes": {
"review-gate": {
"enabled": true,
"transport": "stdio",
"command": ["uvx", "picoclaw-hook-reviewer"],
"observe": ["turn_start", "turn_end", "tool_exec_end"],
"intercept": ["before_tool", "approve_tool"],
"timeout_ms": 5000
}
}
}
}
```
### 协议边界
不要把内部 Go 结构体直接暴露给 IPC。
建议定义稳定的协议对象:
- `HookHandshake`
- `HookEventNotification`
- `BeforeLLMRequest`
- `AfterLLMRequest`
- `BeforeToolRequest`
- `AfterToolRequest`
- `ApproveToolRequest`
- `HookDecision`
其中:
- 观察型事件用 notificationfire-and-forget
- 拦截型事件用 request/response,同步等待
### 为什么是 stdio,而不是直接用 HTTP webhook
因为两者用途不同:
- HTTP webhook 更适合“外部系统向 PicoClaw 投递事件”
- stdio/RPC 更适合“PicoClaw 在 turn 内同步询问外部 hook 是否改写 / 放行 / 拒绝”
如果未来需要 OpenClaw 式 webhook,可以作为独立入口层,再把外部事件转成 inbound message 或 steering,而不是直接替代 hook IPC。
## Hook 执行顺序
建议统一排序规则:
- 先内建 in-process hook
- 再外部 IPC hook
- 同组内按 `priority` 从小到大执行
原因:
- 内建 hook 延迟更低,适合做基础规范化
- 外部 hook 更适合做审批、审计、组织级策略
## 超时与错误策略
### 观察型
- 默认超时:`500ms`
- 超时或报错:记录日志,继续主流程
### 拦截型
- `before_llm` / `after_llm` / `before_tool` / `after_tool`:默认 `5s`
- `approve_tool`:默认 `60s`
超时行为:
- 普通拦截:`continue`
- 审批:`deny`
这点应直接沿用 `#1316` 的安全倾向。
## 与当前分支的对接点
### 直接复用
- 事件定义:`pkg/agent/events.go`
- 事件广播:`pkg/agent/eventbus.go`
- 活跃 turn / interrupt / rollback`pkg/agent/turn.go`
- 事件发射点:`pkg/agent/loop.go`
### 需要新增
- `pkg/agent/hooks.go`
- Hook 接口
- HookDecision / ApprovalDecision
- HookManager
- `pkg/agent/hook_mount.go`
- 内建 hook 注册
- 外部进程 hook 注册
- `pkg/agent/hook_ipc.go`
- stdio JSON-RPC bridge
- `pkg/agent/hook_types.go`
- IPC 稳定载荷
### 需要改造
- `pkg/agent/loop.go`
- 在 LLM 和 tool 关键路径前后插入 HookManager 调用
- `pkg/tools/base.go`
- 可选新增 `ReadOnlyIndicator`
- `pkg/tools/spawn.go`
- `pkg/tools/subagent.go`
- 先保留现状
- 等 sub-turn 迁移后再接入 `subturn_*` hook
## 一个更贴合当前分支的数据流
### 观察链路
```text
runTurn() -> emitEvent() -> EventBus -> observers
```
### 拦截链路
```text
runTurn()
-> HookManager.BeforeLLM()
-> Provider.Chat()
-> HookManager.AfterLLM()
-> HookManager.BeforeTool()
-> HookManager.ApproveTool()
-> tool.Execute()
-> HookManager.AfterTool()
```
也就是说:
- observer 不改变现有 `emitEvent()`
- interceptor 直接插在 `runTurn()` 热路径
## 用户可见配置
建议新增:
```json
{
"hooks": {
"enabled": true,
"builtins": {},
"processes": {},
"defaults": {
"observer_timeout_ms": 500,
"interceptor_timeout_ms": 5000,
"approval_timeout_ms": 60000
}
}
}
```
V1 不做复杂自动发现。
原因:
- 当前分支重点是把地基打稳
- 目录扫描、安装器、脚手架可以后置
- 先让仓内和仓外都能挂上去,比“管理体验完整”更重要
## 推荐的 V1 范围
### 必做
- HookManager
- in-process 挂载
- stdio IPC 挂载
- observer hooks
- `before_tool` / `after_tool` / `approve_tool`
- `before_llm` / `after_llm`
### 可后置
- hook CLI 管理命令
- hook 自动发现
- Unix socket / named pipe transport
- sub-turn hook 生命周期
- read-only 并行分组
- webhook 到 inbound message 的映射入口
## 分阶段落地
### Phase 1
- 引入 HookManager
- 支持 in-process observer + interceptor
- 先只接主 turn
### Phase 2
- 引入 `stdio` 外部 hook 进程桥
- 支持组织级审批 / 审计 / 参数改写
### Phase 3
-`SubagentManager` 迁移到 `runTurn/sub-turn`
- 接通 `subturn_spawn` / `subturn_end` / `subturn_result_delivered`
### Phase 4
- 视需求补 `ReadOnlyIndicator`
- 在主 turn 和 sub-turn 上统一只读并行策略
## 最终结论
最适合 PicoClaw 当前分支的方案,不是直接复制 OpenClaw 的 hooks,也不是完整照搬 pi-mono 的 extension system,而是:
- 以现有 `EventBus` 为只读观察面
- 以新增 `HookManager` 为同步拦截面
- 项目内通过 Go 对象直接挂载
- 项目外通过 `stdio JSON-RPC` 进程通信挂载
这样做有三个好处:
-`#1796` 一致,hooks 只是 EventBus 之上的消费层
- 和当前 `refactor/agent` 实现一致,不需要推翻已有事件系统
- 同时满足“仓内简单挂载”和“仓外进程通信挂载”两个硬需求
+306
View File
@@ -0,0 +1,306 @@
# Steering — Implementation Specification
## Problem
When the agent is running (executing a chain of tool calls), the user has no way to redirect it. They must wait for the full cycle to complete before sending a new message. This creates a poor experience when the agent takes a wrong direction — the user watches it waste time on tools that are no longer relevant.
## Solution
Steering introduces a **message queue** that external callers can push into at any time. The agent loop polls this queue at well-defined checkpoints. When a steering message is found, the agent:
1. Stops executing further tools in the current batch
2. Injects the user's message into the conversation context
3. Calls the LLM again with the updated context
The user's intent reaches the model **as soon as the current tool finishes**, not after the entire turn completes.
## Architecture Overview
```mermaid
graph TD
subgraph External Callers
TG[Telegram]
DC[Discord]
SL[Slack]
end
subgraph AgentLoop
BUS[MessageBus]
DRAIN[drainBusToSteering goroutine]
SQ[steeringQueue]
RLI[runLLMIteration]
TE[Tool Execution Loop]
LLM[LLM Call]
end
TG -->|PublishInbound| BUS
DC -->|PublishInbound| BUS
SL -->|PublishInbound| BUS
BUS -->|ConsumeInbound while busy| DRAIN
DRAIN -->|Steer| SQ
RLI -->|1. initial poll| SQ
TE -->|2. poll after each tool| SQ
SQ -->|pendingMessages| RLI
RLI -->|inject into context| LLM
```
### Bus drain mechanism
Channels (Telegram, Discord, etc.) publish messages to the `MessageBus` via `PublishInbound`. Without additional wiring, these messages would sit in the bus buffer until the current `processMessage` finishes — meaning steering would never work for real users.
The solution: when `Run()` starts processing a message, it spawns a **drain goroutine** (`drainBusToSteering`) that keeps consuming from the bus and calling `Steer()`. When `processMessage` returns, the drain is canceled and normal consumption resumes.
```mermaid
sequenceDiagram
participant Bus
participant Run
participant Drain
participant AgentLoop
Run->>Bus: ConsumeInbound() → msg
Run->>Drain: spawn drainBusToSteering(ctx)
Run->>Run: processMessage(msg)
Note over Drain: running concurrently
Bus-->>Drain: ConsumeInbound() → newMsg
Drain->>AgentLoop: al.transcribeAudioInMessage(ctx, newMsg)
Drain->>AgentLoop: Steer(providers.Message{Content: newMsg.Content})
Run->>Run: processMessage returns
Run->>Drain: cancel context
Note over Drain: exits
```
## Data Structures
### steeringQueue
A thread-safe FIFO queue, private to the `agent` package.
| Field | Type | Description |
|-------|------|-------------|
| `mu` | `sync.Mutex` | Protects all access to `queue` and `mode` |
| `queue` | `[]providers.Message` | Pending steering messages |
| `mode` | `SteeringMode` | Dequeue strategy |
**Methods:**
| Method | Description |
|--------|-------------|
| `push(msg) error` | Appends a message to the queue. Returns an error if the queue is full (`MaxQueueSize`) |
| `dequeue() []Message` | Removes and returns messages according to `mode`. Returns `nil` if empty |
| `len() int` | Returns the current queue length |
| `setMode(mode)` | Updates the dequeue strategy |
| `getMode() SteeringMode` | Returns the current mode |
### SteeringMode
| Value | Constant | Behavior |
|-------|----------|----------|
| `"one-at-a-time"` | `SteeringOneAtATime` | `dequeue()` returns only the **first** message. Remaining messages stay in the queue for subsequent polls. |
| `"all"` | `SteeringAll` | `dequeue()` drains the **entire** queue and returns all messages at once. |
Default: `"one-at-a-time"`.
### processOptions extension
A new field was added to `processOptions`:
| Field | Type | Description |
|-------|------|-------------|
| `SkipInitialSteeringPoll` | `bool` | When `true`, the initial steering poll at loop start is skipped. Used by `Continue()` to avoid double-dequeuing. |
## Public API on AgentLoop
| Method | Signature | Description |
|--------|-----------|-------------|
| `Steer` | `Steer(msg providers.Message) error` | Enqueues a steering message. Returns an error if the queue is full or not initialized. Thread-safe, can be called from any goroutine. |
| `SteeringMode` | `SteeringMode() SteeringMode` | Returns the current dequeue mode. |
| `SetSteeringMode` | `SetSteeringMode(mode SteeringMode)` | Changes the dequeue mode at runtime. |
| `Continue` | `Continue(ctx, sessionKey, channel, chatID) (string, error)` | Resumes an idle agent using pending steering messages. Returns `""` if queue is empty. |
## Integration into the Agent Loop
### Where steering is wired
The steering queue lives as a field on `AgentLoop`:
```
AgentLoop
├── bus
├── cfg
├── registry
├── steering *steeringQueue ← new
├── ...
```
It is initialized in `NewAgentLoop` from `cfg.Agents.Defaults.SteeringMode`.
### Detailed flow through runLLMIteration
```mermaid
sequenceDiagram
participant User
participant AgentLoop
participant runLLMIteration
participant ToolExecution
participant LLM
User->>AgentLoop: Steer(message)
Note over AgentLoop: steeringQueue.push(message)
Note over runLLMIteration: ── iteration starts ──
runLLMIteration->>AgentLoop: dequeueSteeringMessages()<br/>[initial poll]
AgentLoop-->>runLLMIteration: [] (empty, or messages)
alt pendingMessages not empty
runLLMIteration->>runLLMIteration: inject into messages[]<br/>save to session
end
runLLMIteration->>LLM: Chat(messages, tools)
LLM-->>runLLMIteration: response with toolCalls[0..N]
loop for each tool call (sequential)
ToolExecution->>ToolExecution: execute tool[i]
ToolExecution->>ToolExecution: process result,<br/>append to messages[]
ToolExecution->>AgentLoop: dequeueSteeringMessages()
AgentLoop-->>ToolExecution: steeringMessages
alt steering found
opt remaining tools > 0
Note over ToolExecution: Mark tool[i+1..N-1] as<br/>"Skipped due to queued user message."
end
Note over ToolExecution: steeringAfterTools = steeringMessages
Note over ToolExecution: break out of tool loop
end
end
alt steeringAfterTools not empty
ToolExecution-->>runLLMIteration: pendingMessages = steeringAfterTools
Note over runLLMIteration: next iteration will inject<br/>these before calling LLM
end
Note over runLLMIteration: ── loop back to iteration start ──
```
### Polling checkpoints
| # | Location | When | Purpose |
|---|----------|------|---------|
| 1 | Top of `runLLMIteration`, before first LLM call | Once, at loop entry | Catch messages enqueued while the agent was still setting up context |
| 2 | After every tool completes (including the first and the last) | Immediately after each tool's result is processed | Interrupt the batch as early as possible — if steering is found and there are remaining tools, they are all skipped |
### What happens to skipped tools
When steering interrupts a tool batch after tool `[i]` completes, all tools from `[i+1]` to `[N-1]` are **not executed**. Instead, a tool result message is generated for each:
```json
{
"role": "tool",
"content": "Skipped due to queued user message.",
"tool_call_id": "<original_call_id>"
}
```
These results are:
- Appended to the conversation `messages[]`
- Saved to the session via `AddFullMessage`
This ensures the LLM knows which of its requested actions were not performed.
### Loop condition change
The iteration loop condition was changed from:
```go
for iteration < agent.MaxIterations
```
to:
```go
for iteration < agent.MaxIterations || len(pendingMessages) > 0
```
This allows **one extra iteration** when steering arrives right at the max iteration boundary, ensuring the steering message is always processed.
### Tool execution: parallel → sequential
**Before steering:** all tool calls in a batch were executed in parallel using `sync.WaitGroup`.
**After steering:** tool calls execute **sequentially**. This is required because steering must be polled between individual tool completions. A parallel execution model would not allow interrupting mid-batch.
> **Trade-off:** This introduces latency when the LLM requests multiple independent tools in a single turn. In practice, most batches contain 1-2 tools, so the impact is minimal. The benefit of being able to interrupt outweighs the cost.
### Why skip remaining tools (instead of letting them finish)
Two strategies were considered when a steering message is detected mid-batch:
1. **Skip remaining tools** (chosen) — stop executing, mark the rest as skipped, inject steering
2. **Finish all tools, then inject** — let everything run, append steering afterwards
Strategy 2 was rejected for three reasons:
**Irreversible side effects.** Tools can send emails, write files, spawn subagents, or call external APIs. If the user says "stop" or "change direction", those actions have already happened and cannot be undone.
| Tool batch | Steering | Skip (1) | Finish (2) |
|---|---|---|---|
| `[search, send_email]` | "don't send it" | Email not sent | Email sent |
| `[query, write_file, spawn]` | "wrong database" | Only query runs | File + subagent wasted |
| `[fetch₁, fetch₂, fetch₃, write]` | topic change | 1 fetch | 3 fetches + write, all discarded |
**Wasted latency.** Tools like web fetches and API calls take seconds each. In a 3-tool batch averaging 3-4s per tool, the user would wait 10+ seconds for work that gets thrown away.
**The LLM retains full awareness.** Skipped tools receive an explicit `"Skipped due to queued user message."` result, so the model knows what was not done and can decide whether to re-execute with the new context or take a different path.
## The Continue() method
`Continue` handles the case where the agent is **idle** (its last message was from the assistant) and the user has enqueued steering messages in the meantime.
```mermaid
flowchart TD
A[Continue called] --> B{dequeueSteeringMessages}
B -->|empty| C["return ('', nil)"]
B -->|messages found| D[Combine message contents]
D --> E["runAgentLoop with<br/>SkipInitialSteeringPoll: true"]
E --> F[Return response]
```
**Why `SkipInitialSteeringPoll: true`?** Because `Continue` already dequeued the messages itself. Without this flag, `runLLMIteration` would poll again at the start and find nothing (the queue is already empty), or worse, double-process if new messages arrived in the meantime.
## Configuration
```json
{
"agents": {
"defaults": {
"steering_mode": "one-at-a-time"
}
}
}
```
| Field | Type | Default | Env var |
|-------|------|---------|---------|
| `steering_mode` | `string` | `"one-at-a-time"` | `PICOCLAW_AGENTS_DEFAULTS_STEERING_MODE` |
## Design decisions and trade-offs
| Decision | Rationale |
|----------|-----------|
| Sequential tool execution | Required for per-tool steering polls. Parallel execution cannot be interrupted mid-batch. |
| Polling-based (not channel/signal) | Keeps the implementation simple. No need for `select` or signal channels. The polling cost is negligible (mutex lock + slice length check). |
| `one-at-a-time` as default | Gives the model a chance to react to each steering message individually. More predictable behavior than dumping all messages at once. |
| Skipped tools get explicit error results | The LLM protocol requires a tool result for every tool call in the assistant message. Omitting them would cause API errors. The skip message also informs the model about what was not done. |
| `Continue()` uses `SkipInitialSteeringPoll` | Prevents race conditions and double-dequeuing when resuming an idle agent. |
| Queue stored on `AgentLoop`, not `AgentInstance` | Steering is a loop-level concern (it affects the iteration flow), not a per-agent concern. All agents share the same steering queue since `processMessage` is sequential. |
| Bus drain goroutine in `Run()` | Channels (Telegram, Discord, etc.) publish to the bus via `PublishInbound`. Without the drain, messages would queue in the bus channel buffer and only be consumed after `processMessage` returns — defeating the purpose of steering. The drain goroutine bridges the gap by consuming new bus messages and calling `Steer()` while the agent is busy. |
| Audio transcription before steering | The drain goroutine calls `al.transcribeAudioInMessage(ctx, msg)` before steering, so voice messages are converted to text before the agent sees them. If transcription fails, the error is silently discarded and the original message is steered as-is. |
| `MaxQueueSize = 10` | Prevents unbounded memory growth if a user sends many messages while the agent is busy. Excess messages are dropped with a warning. |
+679
View File
@@ -0,0 +1,679 @@
# Hook System Guide
This document describes the hook system that is implemented in the current repository, not the older design draft.
The current implementation supports two mounting modes:
1. In-process hooks
2. Out-of-process process hooks (`JSON-RPC over stdio`)
The repository no longer ships standalone example source files. The Go and Python examples below are embedded directly in this document. If you want to use them, copy them into your own local files first.
## Supported Hook Types
| Type | Interface | Stage | Can modify data |
| --- | --- | --- | --- |
| Observer | `EventObserver` | EventBus broadcast | No |
| LLM interceptor | `LLMInterceptor` | `before_llm` / `after_llm` | Yes |
| Tool interceptor | `ToolInterceptor` | `before_tool` / `after_tool` | Yes |
| Tool approver | `ToolApprover` | `approve_tool` | No, returns allow/deny |
The currently exposed synchronous hook points are:
- `before_llm`
- `after_llm`
- `before_tool`
- `after_tool`
- `approve_tool`
Everything else is exposed as read-only events.
## Execution Order
`HookManager` sorts hooks like this:
1. In-process hooks first
2. Process hooks second
3. Lower `priority` first within the same source
4. Name order as the final tie-breaker
## Timeouts
Global defaults live under `hooks.defaults`:
- `observer_timeout_ms`
- `interceptor_timeout_ms`
- `approval_timeout_ms`
Note: the current implementation does not support per-process-hook `timeout_ms`. Timeouts are global defaults.
## Quick Start
If your first goal is simply to prove that the hook flow works and observe real requests, the easiest path is the Python process-hook example below:
1. Enable `hooks.enabled`
2. Save the Python example from this document to a local file, for example `/tmp/review_gate.py`
3. Set `PICOCLAW_HOOK_LOG_FILE`
4. Restart the gateway
5. Watch the log file with `tail -f`
Example:
```json
{
"hooks": {
"enabled": true,
"processes": {
"py_review_gate": {
"enabled": true,
"priority": 100,
"transport": "stdio",
"command": [
"python3",
"/tmp/review_gate.py"
],
"observe": [
"tool_exec_start",
"tool_exec_end",
"tool_exec_skipped"
],
"intercept": [
"before_tool",
"approve_tool"
],
"env": {
"PICOCLAW_HOOK_LOG_FILE": "/tmp/picoclaw-hook-review-gate.log"
}
}
}
}
}
```
Watch it with:
```bash
tail -f /tmp/picoclaw-hook-review-gate.log
```
If you are developing PicoClaw itself rather than only validating the protocol, continue with the Go in-process example as well.
## What The Two Examples Are For
- Go in-process example
Best for validating the host-side hook chain and understanding `MountHook()` plus the synchronous stages
- Python process example
Best for understanding the `JSON-RPC over stdio` protocol and verifying the message flow between PicoClaw and an external process
Both examples are intentionally safe: they only log, never rewrite, and never deny.
## Go In-Process Example
The following is a minimal logging hook for in-process use. It implements:
1. `EventObserver`
2. `LLMInterceptor`
3. `ToolInterceptor`
4. `ToolApprover`
It only records activity. It does not rewrite requests or reject tools.
You can save it as your own Go file, for example `pkg/myhooks/example_logger.go`:
```go
package myhooks
import (
"context"
"encoding/json"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/sipeed/picoclaw/pkg/agent"
"github.com/sipeed/picoclaw/pkg/logger"
)
type ExampleLoggerHookOptions struct {
LogFile string `json:"log_file,omitempty"`
LogEvents bool `json:"log_events,omitempty"`
}
type ExampleLoggerHook struct {
logFile string
logEvents bool
mu sync.Mutex
}
func NewExampleLoggerHook(opts ExampleLoggerHookOptions) *ExampleLoggerHook {
return &ExampleLoggerHook{
logFile: strings.TrimSpace(opts.LogFile),
logEvents: opts.LogEvents,
}
}
func (h *ExampleLoggerHook) OnEvent(ctx context.Context, evt agent.Event) error {
_ = ctx
if h == nil || !h.logEvents {
return nil
}
h.record("event", evt.Meta, map[string]any{
"event": evt.Kind.String(),
"payload": evt.Payload,
}, nil)
return nil
}
func (h *ExampleLoggerHook) BeforeLLM(
ctx context.Context,
req *agent.LLMHookRequest,
) (*agent.LLMHookRequest, agent.HookDecision, error) {
_ = ctx
h.record("before_llm", req.Meta, req, agent.HookDecision{Action: agent.HookActionContinue})
return req, agent.HookDecision{Action: agent.HookActionContinue}, nil
}
func (h *ExampleLoggerHook) AfterLLM(
ctx context.Context,
resp *agent.LLMHookResponse,
) (*agent.LLMHookResponse, agent.HookDecision, error) {
_ = ctx
h.record("after_llm", resp.Meta, resp, agent.HookDecision{Action: agent.HookActionContinue})
return resp, agent.HookDecision{Action: agent.HookActionContinue}, nil
}
func (h *ExampleLoggerHook) BeforeTool(
ctx context.Context,
call *agent.ToolCallHookRequest,
) (*agent.ToolCallHookRequest, agent.HookDecision, error) {
_ = ctx
h.record("before_tool", call.Meta, call, agent.HookDecision{Action: agent.HookActionContinue})
return call, agent.HookDecision{Action: agent.HookActionContinue}, nil
}
func (h *ExampleLoggerHook) AfterTool(
ctx context.Context,
result *agent.ToolResultHookResponse,
) (*agent.ToolResultHookResponse, agent.HookDecision, error) {
_ = ctx
h.record("after_tool", result.Meta, result, agent.HookDecision{Action: agent.HookActionContinue})
return result, agent.HookDecision{Action: agent.HookActionContinue}, nil
}
func (h *ExampleLoggerHook) ApproveTool(
ctx context.Context,
req *agent.ToolApprovalRequest,
) (agent.ApprovalDecision, error) {
_ = ctx
decision := agent.ApprovalDecision{Approved: true}
h.record("approve_tool", req.Meta, req, decision)
return decision, nil
}
func (h *ExampleLoggerHook) record(stage string, meta agent.EventMeta, payload any, decision any) {
logger.InfoCF("hooks", "Example hook observed", map[string]any{
"stage": stage,
})
if h == nil || h.logFile == "" {
return
}
entry := map[string]any{
"ts": time.Now().UTC(),
"stage": stage,
"meta": meta,
"payload": payload,
"decision": decision,
}
body, err := json.Marshal(entry)
if err != nil {
logger.WarnCF("hooks", "Example hook log encode failed", map[string]any{
"stage": stage,
"error": err.Error(),
})
return
}
h.mu.Lock()
defer h.mu.Unlock()
if dir := filepath.Dir(h.logFile); dir != "" && dir != "." {
if err := os.MkdirAll(dir, 0o755); err != nil {
logger.WarnCF("hooks", "Example hook log mkdir failed", map[string]any{
"stage": stage,
"path": h.logFile,
"error": err.Error(),
})
return
}
}
file, err := os.OpenFile(h.logFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
if err != nil {
logger.WarnCF("hooks", "Example hook log open failed", map[string]any{
"stage": stage,
"path": h.logFile,
"error": err.Error(),
})
return
}
defer func() { _ = file.Close() }()
if _, err := file.Write(append(body, '\n')); err != nil {
logger.WarnCF("hooks", "Example hook log write failed", map[string]any{
"stage": stage,
"path": h.logFile,
"error": err.Error(),
})
}
}
```
### Mounting It In Code
If code mounting is enough, call this after `AgentLoop` is initialized:
```go
hook := myhooks.NewExampleLoggerHook(myhooks.ExampleLoggerHookOptions{
LogFile: "/tmp/picoclaw-hook-example-logger.log",
LogEvents: true,
})
if err := al.MountHook(agent.NamedHook("example-logger", hook)); err != nil {
panic(err)
}
```
### If You Also Want Config Mounting
The hook system supports builtin hooks, but that requires you to compile the factory into your binary. In practice, that means you need registration code like this alongside the hook definition above:
```go
package myhooks
import (
"context"
"encoding/json"
"fmt"
"github.com/sipeed/picoclaw/pkg/agent"
"github.com/sipeed/picoclaw/pkg/config"
)
func init() {
if err := agent.RegisterBuiltinHook("example_logger", func(
ctx context.Context,
spec config.BuiltinHookConfig,
) (any, error) {
_ = ctx
var opts ExampleLoggerHookOptions
if len(spec.Config) > 0 {
if err := json.Unmarshal(spec.Config, &opts); err != nil {
return nil, fmt.Errorf("decode example_logger config: %w", err)
}
}
return NewExampleLoggerHook(opts), nil
}); err != nil {
panic(err)
}
}
```
Only after you register that builtin will the following config work:
```json
{
"hooks": {
"enabled": true,
"builtins": {
"example_logger": {
"enabled": true,
"priority": 10,
"config": {
"log_file": "/tmp/picoclaw-hook-example-logger.log",
"log_events": true
}
}
}
}
}
```
### How To Observe It
- If `log_file` is set, each hook call is appended as JSON Lines
- If `log_file` is not set, the hook still writes summaries to the gateway log
- Requests that only hit the LLM path usually show `before_llm` and `after_llm`
- Requests that trigger tools usually also show `before_tool`, `approve_tool`, and `after_tool`
- If `log_events=true`, you will also see `event`
Typical log lines:
```json
{"ts":"2026-03-21T14:10:00Z","stage":"before_tool","meta":{"session_key":"session-1"},"payload":{"tool":"echo_text","arguments":{"text":"hello"}},"decision":{"action":"continue"}}
{"ts":"2026-03-21T14:10:00Z","stage":"approve_tool","meta":{"session_key":"session-1"},"payload":{"tool":"echo_text","arguments":{"text":"hello"}},"decision":{"approved":true}}
```
If you only see `before_llm` and `after_llm`, that usually means the request did not trigger any tool call, not that the hook failed to mount.
## Python Process-Hook Example
The following script is a minimal process-hook example. It uses only the Python standard library and supports:
1. `hook.hello`
2. `hook.event`
3. `hook.before_tool`
4. `hook.approve_tool`
It only records activity. It does not rewrite or deny anything.
Save it to any local path, for example `/tmp/review_gate.py`:
```python
#!/usr/bin/env python3
from __future__ import annotations
import json
import os
import signal
import sys
from datetime import datetime, timezone
from typing import Any
LOG_EVENTS = os.getenv("PICOCLAW_HOOK_LOG_EVENTS", "1").lower() not in {"0", "false", "no"}
LOG_FILE = os.getenv("PICOCLAW_HOOK_LOG_FILE", "").strip()
def append_log(entry: dict[str, Any]) -> None:
if not LOG_FILE:
return
payload = {
"ts": datetime.now(timezone.utc).isoformat(),
**entry,
}
try:
log_dir = os.path.dirname(LOG_FILE)
if log_dir:
os.makedirs(log_dir, exist_ok=True)
with open(LOG_FILE, "a", encoding="utf-8") as handle:
handle.write(json.dumps(payload, ensure_ascii=True) + "\n")
except OSError as exc:
log_stderr(f"failed to write hook log file {LOG_FILE}: {exc}")
def send_response(message_id: int, result: Any | None = None, error: str | None = None) -> None:
payload: dict[str, Any] = {
"jsonrpc": "2.0",
"id": message_id,
}
if error is not None:
payload["error"] = {"code": -32000, "message": error}
else:
payload["result"] = result if result is not None else {}
append_log({
"direction": "out",
"id": message_id,
"response": payload.get("result"),
"error": payload.get("error"),
})
try:
sys.stdout.write(json.dumps(payload, ensure_ascii=True) + "\n")
sys.stdout.flush()
except BrokenPipeError:
raise SystemExit(0) from None
def log_stderr(message: str) -> None:
try:
sys.stderr.write(message + "\n")
sys.stderr.flush()
except BrokenPipeError:
raise SystemExit(0) from None
def handle_shutdown_signal(signum: int, _frame: Any) -> None:
raise KeyboardInterrupt(f"received signal {signum}")
def handle_before_tool(params: dict[str, Any]) -> dict[str, Any]:
_ = params
return {"action": "continue"}
def handle_approve_tool(params: dict[str, Any]) -> dict[str, Any]:
_ = params
return {"approved": True}
def handle_request(method: str, params: dict[str, Any]) -> dict[str, Any]:
if method == "hook.hello":
return {"ok": True, "name": "python-review-gate"}
if method == "hook.before_tool":
return handle_before_tool(params)
if method == "hook.approve_tool":
return handle_approve_tool(params)
if method == "hook.before_llm":
return {"action": "continue"}
if method == "hook.after_llm":
return {"action": "continue"}
if method == "hook.after_tool":
return {"action": "continue"}
raise KeyError(f"method not found: {method}")
def main() -> int:
try:
for raw_line in sys.stdin:
line = raw_line.strip()
if not line:
continue
try:
message = json.loads(line)
except json.JSONDecodeError as exc:
log_stderr(f"failed to decode request: {exc}")
append_log({
"direction": "in",
"decode_error": str(exc),
"raw": line,
})
continue
method = message.get("method")
message_id = message.get("id", 0)
params = message.get("params") or {}
if not isinstance(params, dict):
params = {}
append_log({
"direction": "in",
"id": message_id,
"method": method,
"params": params,
"notification": not bool(message_id),
})
if not message_id:
if method == "hook.event" and LOG_EVENTS:
log_stderr(f"observed event: {params.get('Kind')}")
continue
try:
result = handle_request(str(method or ""), params)
except KeyError as exc:
send_response(int(message_id), error=str(exc))
continue
except Exception as exc:
send_response(int(message_id), error=f"unexpected error: {exc}")
continue
send_response(int(message_id), result=result)
except KeyboardInterrupt:
return 0
return 0
if __name__ == "__main__":
signal.signal(signal.SIGINT, handle_shutdown_signal)
signal.signal(signal.SIGTERM, handle_shutdown_signal)
raise SystemExit(main())
```
### Configuration
```json
{
"hooks": {
"enabled": true,
"processes": {
"py_review_gate": {
"enabled": true,
"priority": 100,
"transport": "stdio",
"command": [
"python3",
"/abs/path/to/review_gate.py"
],
"observe": [
"tool_exec_start",
"tool_exec_end",
"tool_exec_skipped"
],
"intercept": [
"before_tool",
"approve_tool"
],
"env": {
"PICOCLAW_HOOK_LOG_FILE": "/tmp/picoclaw-hook-review-gate.log"
}
}
}
}
}
```
### Environment Variables
- `PICOCLAW_HOOK_LOG_EVENTS`
Whether to write `hook.event` summaries to `stderr`, enabled by default
- `PICOCLAW_HOOK_LOG_FILE`
Path to an external log file. When set, the script appends inbound hook requests, notifications, and outbound responses as JSON Lines
Note: `PICOCLAW_HOOK_LOG_FILE` has no default. If you do not set it, the script does not write any file logs.
### How To Confirm It Received Hooks
Watch two places:
- Gateway logs
Useful for confirming that the host successfully started the process and for seeing event summaries written to `stderr`
- `PICOCLAW_HOOK_LOG_FILE`
Useful for seeing the exact requests the script received and the exact responses it returned
Typical interpretation:
- Only `hook.hello`
The process started and completed the handshake, but no business hook request has arrived yet
- `hook.event`
The `observe` configuration is working
- `hook.before_tool`
The `intercept: ["before_tool", ...]` configuration is working
- `hook.approve_tool`
The approval hook path is working
Because this example never rewrites or denies, the expected responses look like:
```json
{"direction":"out","id":7,"response":{"action":"continue"},"error":null}
{"direction":"out","id":8,"response":{"approved":true},"error":null}
```
A complete sample:
```json
{"ts":"2026-03-21T14:12:00+00:00","direction":"in","id":1,"method":"hook.hello","params":{"name":"py_review_gate","version":1,"modes":["observe","tool","approve"]},"notification":false}
{"ts":"2026-03-21T14:12:00+00:00","direction":"out","id":1,"response":{"ok":true,"name":"python-review-gate"},"error":null}
{"ts":"2026-03-21T14:12:05+00:00","direction":"in","id":0,"method":"hook.event","params":{"Kind":"tool_exec_start"},"notification":true}
{"ts":"2026-03-21T14:12:05+00:00","direction":"in","id":7,"method":"hook.before_tool","params":{"tool":"echo_text","arguments":{"text":"hello"}},"notification":false}
{"ts":"2026-03-21T14:12:05+00:00","direction":"out","id":7,"response":{"action":"continue"},"error":null}
```
Additional notes:
- Timestamps are UTC
- `notification=true` means it was a notification such as `hook.event`, which does not expect a response
- `id` increases within a single hook process; if the process restarts, the counter starts over
## Process-Hook Protocol
Current process hooks use `JSON-RPC over stdio`:
- PicoClaw starts the external process
- Requests and responses are exchanged as one JSON message per line
- `hook.event` is a notification and does not need a response
- `hook.before_llm`, `hook.after_llm`, `hook.before_tool`, `hook.after_tool`, and `hook.approve_tool` are request/response calls
The host does not currently accept new RPCs initiated by the process hook. In practice, that means an external hook can only respond to PicoClaw calls; it cannot call back into the host to send channel messages.
## Configuration Fields
### `hooks.builtins.<name>`
- `enabled`
- `priority`
- `config`
### `hooks.processes.<name>`
- `enabled`
- `priority`
- `transport`
Currently only `stdio` is supported
- `command`
- `dir`
- `env`
- `observe`
- `intercept`
## Troubleshooting
If a hook looks like it is not firing, check these in order:
1. `hooks.enabled`
2. Whether the target builtin or process hook is `enabled`
3. Whether the process-hook `command` path is correct
4. Whether you are watching the correct log file
5. Whether the current request actually reached the stage you care about
6. Whether `observe` or `intercept` contains the hook point you want
A practical minimal troubleshooting pair is:
- Use the Python process-hook example from this document to validate the external protocol
- Use the Go in-process example from this document to validate the host-side chain
If the Python side shows `hook.hello` but no business hook requests, the protocol is usually fine; the current request simply did not trigger the stage you expected.
## Scope And Limits
The current hook system is best suited for:
- LLM request rewriting
- Tool argument normalization
- Pre-execution tool approval
- Auditing and observability
It is not yet well suited for:
- External hooks actively sending channel messages
- Suspending a turn and waiting for human approval replies
- Full inbound/outbound message interception across the whole platform
If you want a real human approval workflow, use hooks as the approval entry point and keep the state machine plus channel interaction in a separate `ApprovalManager`.
+679
View File
@@ -0,0 +1,679 @@
# Hook 系统使用说明
这份文档对应当前仓库里已经实现的 hook 系统,而不是设计草案。
当前实现支持两类挂载方式:
1. 进程内 hook
2. 进程外 process hook`JSON-RPC over stdio`
当前仓库不再内置示例代码文件。下面的 Go / Python 示例都直接写在本文档里;如果你要使用它们,需要先复制到你自己的文件路径。
## 支持的 hook 类型
| 类型 | 接口 | 作用阶段 | 能否改写 |
| --- | --- | --- | --- |
| 观察型 | `EventObserver` | EventBus 广播事件时 | 否 |
| LLM 拦截型 | `LLMInterceptor` | `before_llm` / `after_llm` | 是 |
| Tool 拦截型 | `ToolInterceptor` | `before_tool` / `after_tool` | 是 |
| Tool 审批型 | `ToolApprover` | `approve_tool` | 否,返回批准/拒绝 |
当前公开的同步点位只有:
- `before_llm`
- `after_llm`
- `before_tool`
- `after_tool`
- `approve_tool`
其余 lifecycle 通过事件形式只读暴露。
## 执行顺序
HookManager 的排序规则是:
1. 先执行进程内 hook
2. 再执行 process hook
3. 同一来源内按 `priority` 从小到大
4.`priority` 相同,再按名字排序
## 超时
当前配置在 `hooks.defaults` 中统一设置:
- `observer_timeout_ms`
- `interceptor_timeout_ms`
- `approval_timeout_ms`
注意:当前实现还没有单个 process hook 自己的 `timeout_ms` 字段,超时配置是全局默认值。
## 快速开始
如果你的目标只是先把当前 hook 流程跑通并观察到实际请求,最省事的是先用下面的 Python process hook 示例:
1. 打开 `hooks.enabled`
2. 把下面文档里的 Python 示例保存到本地文件,例如 `/tmp/review_gate.py`
3. 给它配置 `PICOCLAW_HOOK_LOG_FILE`
4. 重启 gateway
5.`tail -f` 观察日志文件
例如:
```json
{
"hooks": {
"enabled": true,
"processes": {
"py_review_gate": {
"enabled": true,
"priority": 100,
"transport": "stdio",
"command": [
"python3",
"/tmp/review_gate.py"
],
"observe": [
"tool_exec_start",
"tool_exec_end",
"tool_exec_skipped"
],
"intercept": [
"before_tool",
"approve_tool"
],
"env": {
"PICOCLAW_HOOK_LOG_FILE": "/tmp/picoclaw-hook-review-gate.log"
}
}
}
}
}
```
观察方式:
```bash
tail -f /tmp/picoclaw-hook-review-gate.log
```
如果你是在开发 PicoClaw 本体,而不是只想验证协议,那么再看后面的 Go in-process 示例。
## 两个示例的定位
- Go in-process 示例
适合验证宿主内的 hook 链路、理解 `MountHook()` 和各个同步点位
- Python process 示例
适合理解 `JSON-RPC over stdio` 协议、确认宿主和外部进程之间的消息来回是否正常
这两个示例都刻意保持为“只记录、不改写、不拒绝”的安全模式。它们的目的不是提供策略能力,而是帮你观察当前 hook 系统。
## Go 进程内示例
下面这段代码是一个最小的“记录型” in-process hook。它实现了:
1. `EventObserver`
2. `LLMInterceptor`
3. `ToolInterceptor`
4. `ToolApprover`
它只记录,不改写请求,也不拒绝工具。
你可以把它保存成你自己的 Go 文件,例如 `pkg/myhooks/example_logger.go`
```go
package myhooks
import (
"context"
"encoding/json"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/sipeed/picoclaw/pkg/agent"
"github.com/sipeed/picoclaw/pkg/logger"
)
type ExampleLoggerHookOptions struct {
LogFile string `json:"log_file,omitempty"`
LogEvents bool `json:"log_events,omitempty"`
}
type ExampleLoggerHook struct {
logFile string
logEvents bool
mu sync.Mutex
}
func NewExampleLoggerHook(opts ExampleLoggerHookOptions) *ExampleLoggerHook {
return &ExampleLoggerHook{
logFile: strings.TrimSpace(opts.LogFile),
logEvents: opts.LogEvents,
}
}
func (h *ExampleLoggerHook) OnEvent(ctx context.Context, evt agent.Event) error {
_ = ctx
if h == nil || !h.logEvents {
return nil
}
h.record("event", evt.Meta, map[string]any{
"event": evt.Kind.String(),
"payload": evt.Payload,
}, nil)
return nil
}
func (h *ExampleLoggerHook) BeforeLLM(
ctx context.Context,
req *agent.LLMHookRequest,
) (*agent.LLMHookRequest, agent.HookDecision, error) {
_ = ctx
h.record("before_llm", req.Meta, req, agent.HookDecision{Action: agent.HookActionContinue})
return req, agent.HookDecision{Action: agent.HookActionContinue}, nil
}
func (h *ExampleLoggerHook) AfterLLM(
ctx context.Context,
resp *agent.LLMHookResponse,
) (*agent.LLMHookResponse, agent.HookDecision, error) {
_ = ctx
h.record("after_llm", resp.Meta, resp, agent.HookDecision{Action: agent.HookActionContinue})
return resp, agent.HookDecision{Action: agent.HookActionContinue}, nil
}
func (h *ExampleLoggerHook) BeforeTool(
ctx context.Context,
call *agent.ToolCallHookRequest,
) (*agent.ToolCallHookRequest, agent.HookDecision, error) {
_ = ctx
h.record("before_tool", call.Meta, call, agent.HookDecision{Action: agent.HookActionContinue})
return call, agent.HookDecision{Action: agent.HookActionContinue}, nil
}
func (h *ExampleLoggerHook) AfterTool(
ctx context.Context,
result *agent.ToolResultHookResponse,
) (*agent.ToolResultHookResponse, agent.HookDecision, error) {
_ = ctx
h.record("after_tool", result.Meta, result, agent.HookDecision{Action: agent.HookActionContinue})
return result, agent.HookDecision{Action: agent.HookActionContinue}, nil
}
func (h *ExampleLoggerHook) ApproveTool(
ctx context.Context,
req *agent.ToolApprovalRequest,
) (agent.ApprovalDecision, error) {
_ = ctx
decision := agent.ApprovalDecision{Approved: true}
h.record("approve_tool", req.Meta, req, decision)
return decision, nil
}
func (h *ExampleLoggerHook) record(stage string, meta agent.EventMeta, payload any, decision any) {
logger.InfoCF("hooks", "Example hook observed", map[string]any{
"stage": stage,
})
if h == nil || h.logFile == "" {
return
}
entry := map[string]any{
"ts": time.Now().UTC(),
"stage": stage,
"meta": meta,
"payload": payload,
"decision": decision,
}
body, err := json.Marshal(entry)
if err != nil {
logger.WarnCF("hooks", "Example hook log encode failed", map[string]any{
"stage": stage,
"error": err.Error(),
})
return
}
h.mu.Lock()
defer h.mu.Unlock()
if dir := filepath.Dir(h.logFile); dir != "" && dir != "." {
if err := os.MkdirAll(dir, 0o755); err != nil {
logger.WarnCF("hooks", "Example hook log mkdir failed", map[string]any{
"stage": stage,
"path": h.logFile,
"error": err.Error(),
})
return
}
}
file, err := os.OpenFile(h.logFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
if err != nil {
logger.WarnCF("hooks", "Example hook log open failed", map[string]any{
"stage": stage,
"path": h.logFile,
"error": err.Error(),
})
return
}
defer func() { _ = file.Close() }()
if _, err := file.Write(append(body, '\n')); err != nil {
logger.WarnCF("hooks", "Example hook log write failed", map[string]any{
"stage": stage,
"path": h.logFile,
"error": err.Error(),
})
}
}
```
### 如何挂载
如果你只需要代码挂载,直接在 `AgentLoop` 初始化后调用:
```go
hook := myhooks.NewExampleLoggerHook(myhooks.ExampleLoggerHookOptions{
LogFile: "/tmp/picoclaw-hook-example-logger.log",
LogEvents: true,
})
if err := al.MountHook(agent.NamedHook("example-logger", hook)); err != nil {
panic(err)
}
```
### 如果你还想用配置挂载
当前 hook 系统支持 builtin hook,但这要求你自己把 factory 编进二进制。也就是说,下面这段注册代码需要和上面的 hook 定义一起放进你的工程里:
```go
package myhooks
import (
"context"
"encoding/json"
"fmt"
"github.com/sipeed/picoclaw/pkg/agent"
"github.com/sipeed/picoclaw/pkg/config"
)
func init() {
if err := agent.RegisterBuiltinHook("example_logger", func(
ctx context.Context,
spec config.BuiltinHookConfig,
) (any, error) {
_ = ctx
var opts ExampleLoggerHookOptions
if len(spec.Config) > 0 {
if err := json.Unmarshal(spec.Config, &opts); err != nil {
return nil, fmt.Errorf("decode example_logger config: %w", err)
}
}
return NewExampleLoggerHook(opts), nil
}); err != nil {
panic(err)
}
}
```
只有在你自己注册了 builtin 之后,下面的配置才会生效:
```json
{
"hooks": {
"enabled": true,
"builtins": {
"example_logger": {
"enabled": true,
"priority": 10,
"config": {
"log_file": "/tmp/picoclaw-hook-example-logger.log",
"log_events": true
}
}
}
}
}
```
### 如何观察它是否生效
- 如果设置了 `log_file`,它会把每次 hook 调用按 JSON Lines 写入文件
- 如果没有设置 `log_file`,它仍然会把摘要写到 gateway 日志
- 普通只走 LLM 的请求,通常会看到 `before_llm``after_llm`
- 触发工具调用的请求,通常还会看到 `before_tool``approve_tool``after_tool`
- 如果 `log_events=true`,还会额外看到 `event`
典型日志:
```json
{"ts":"2026-03-21T14:10:00Z","stage":"before_tool","meta":{"session_key":"session-1"},"payload":{"tool":"echo_text","arguments":{"text":"hello"}},"decision":{"action":"continue"}}
{"ts":"2026-03-21T14:10:00Z","stage":"approve_tool","meta":{"session_key":"session-1"},"payload":{"tool":"echo_text","arguments":{"text":"hello"}},"decision":{"approved":true}}
```
如果你只看到了 `before_llm` / `after_llm`,没有看到 tool 相关阶段,通常不是 hook 没挂上,而是这次请求本身没有触发工具调用。
## Python process hook 示例
下面这段脚本是一个最小的 `process hook` 示例。它只使用 Python 标准库,支持:
1. `hook.hello`
2. `hook.event`
3. `hook.before_tool`
4. `hook.approve_tool`
它默认只记录,不改写,也不拒绝。
你可以把它保存到任意本地路径,例如 `/tmp/review_gate.py`
```python
#!/usr/bin/env python3
from __future__ import annotations
import json
import os
import signal
import sys
from datetime import datetime, timezone
from typing import Any
LOG_EVENTS = os.getenv("PICOCLAW_HOOK_LOG_EVENTS", "1").lower() not in {"0", "false", "no"}
LOG_FILE = os.getenv("PICOCLAW_HOOK_LOG_FILE", "").strip()
def append_log(entry: dict[str, Any]) -> None:
if not LOG_FILE:
return
payload = {
"ts": datetime.now(timezone.utc).isoformat(),
**entry,
}
try:
log_dir = os.path.dirname(LOG_FILE)
if log_dir:
os.makedirs(log_dir, exist_ok=True)
with open(LOG_FILE, "a", encoding="utf-8") as handle:
handle.write(json.dumps(payload, ensure_ascii=True) + "\n")
except OSError as exc:
log_stderr(f"failed to write hook log file {LOG_FILE}: {exc}")
def send_response(message_id: int, result: Any | None = None, error: str | None = None) -> None:
payload: dict[str, Any] = {
"jsonrpc": "2.0",
"id": message_id,
}
if error is not None:
payload["error"] = {"code": -32000, "message": error}
else:
payload["result"] = result if result is not None else {}
append_log({
"direction": "out",
"id": message_id,
"response": payload.get("result"),
"error": payload.get("error"),
})
try:
sys.stdout.write(json.dumps(payload, ensure_ascii=True) + "\n")
sys.stdout.flush()
except BrokenPipeError:
raise SystemExit(0) from None
def log_stderr(message: str) -> None:
try:
sys.stderr.write(message + "\n")
sys.stderr.flush()
except BrokenPipeError:
raise SystemExit(0) from None
def handle_shutdown_signal(signum: int, _frame: Any) -> None:
raise KeyboardInterrupt(f"received signal {signum}")
def handle_before_tool(params: dict[str, Any]) -> dict[str, Any]:
_ = params
return {"action": "continue"}
def handle_approve_tool(params: dict[str, Any]) -> dict[str, Any]:
_ = params
return {"approved": True}
def handle_request(method: str, params: dict[str, Any]) -> dict[str, Any]:
if method == "hook.hello":
return {"ok": True, "name": "python-review-gate"}
if method == "hook.before_tool":
return handle_before_tool(params)
if method == "hook.approve_tool":
return handle_approve_tool(params)
if method == "hook.before_llm":
return {"action": "continue"}
if method == "hook.after_llm":
return {"action": "continue"}
if method == "hook.after_tool":
return {"action": "continue"}
raise KeyError(f"method not found: {method}")
def main() -> int:
try:
for raw_line in sys.stdin:
line = raw_line.strip()
if not line:
continue
try:
message = json.loads(line)
except json.JSONDecodeError as exc:
log_stderr(f"failed to decode request: {exc}")
append_log({
"direction": "in",
"decode_error": str(exc),
"raw": line,
})
continue
method = message.get("method")
message_id = message.get("id", 0)
params = message.get("params") or {}
if not isinstance(params, dict):
params = {}
append_log({
"direction": "in",
"id": message_id,
"method": method,
"params": params,
"notification": not bool(message_id),
})
if not message_id:
if method == "hook.event" and LOG_EVENTS:
log_stderr(f"observed event: {params.get('Kind')}")
continue
try:
result = handle_request(str(method or ""), params)
except KeyError as exc:
send_response(int(message_id), error=str(exc))
continue
except Exception as exc:
send_response(int(message_id), error=f"unexpected error: {exc}")
continue
send_response(int(message_id), result=result)
except KeyboardInterrupt:
return 0
return 0
if __name__ == "__main__":
signal.signal(signal.SIGINT, handle_shutdown_signal)
signal.signal(signal.SIGTERM, handle_shutdown_signal)
raise SystemExit(main())
```
### 如何配置
```json
{
"hooks": {
"enabled": true,
"processes": {
"py_review_gate": {
"enabled": true,
"priority": 100,
"transport": "stdio",
"command": [
"python3",
"/abs/path/to/review_gate.py"
],
"observe": [
"tool_exec_start",
"tool_exec_end",
"tool_exec_skipped"
],
"intercept": [
"before_tool",
"approve_tool"
],
"env": {
"PICOCLAW_HOOK_LOG_FILE": "/tmp/picoclaw-hook-review-gate.log"
}
}
}
}
}
```
### 环境变量
- `PICOCLAW_HOOK_LOG_EVENTS`
是否把 `hook.event` 写到 `stderr`,默认开启
- `PICOCLAW_HOOK_LOG_FILE`
外部日志文件路径。设置后,脚本会把收到的 hook 请求、notification 和返回结果按 JSON Lines 追加到该文件
注意:`PICOCLAW_HOOK_LOG_FILE` 没有默认值。不设置时,脚本不会自动落盘日志。
### 如何确认它收到了 hook
推荐同时看两个地方:
- gateway 日志
用来观察宿主是否成功启动了外部进程,以及脚本写到 `stderr` 的事件摘要
- `PICOCLAW_HOOK_LOG_FILE`
用来观察脚本实际收到了什么请求、返回了什么响应
典型判断方式:
- 只看到 `hook.hello`
说明进程启动并完成握手了,但还没有新的业务 hook 请求真正打进来
- 看到 `hook.event`
说明 `observe` 配置生效了
- 看到 `hook.before_tool`
说明 `intercept: ["before_tool", ...]` 生效了
- 看到 `hook.approve_tool`
说明审批 hook 生效了
这份示例脚本不会改写任何参数,也不会拒绝工具,所以你应该看到的典型返回是:
```json
{"direction":"out","id":7,"response":{"action":"continue"},"error":null}
{"direction":"out","id":8,"response":{"approved":true},"error":null}
```
一组完整样例:
```json
{"ts":"2026-03-21T14:12:00+00:00","direction":"in","id":1,"method":"hook.hello","params":{"name":"py_review_gate","version":1,"modes":["observe","tool","approve"]},"notification":false}
{"ts":"2026-03-21T14:12:00+00:00","direction":"out","id":1,"response":{"ok":true,"name":"python-review-gate"},"error":null}
{"ts":"2026-03-21T14:12:05+00:00","direction":"in","id":0,"method":"hook.event","params":{"Kind":"tool_exec_start"},"notification":true}
{"ts":"2026-03-21T14:12:05+00:00","direction":"in","id":7,"method":"hook.before_tool","params":{"tool":"echo_text","arguments":{"text":"hello"}},"notification":false}
{"ts":"2026-03-21T14:12:05+00:00","direction":"out","id":7,"response":{"action":"continue"},"error":null}
```
补充说明:
- 时间戳是 UTC,不是本地时区
- `notification=true` 表示这是 `hook.event` 这类不需要响应的通知
- `id` 会随着当前进程内的请求递增;如果 hook 进程重启,计数会重新开始
## Process Hook 协议约定
当前 process hook 使用 `JSON-RPC over stdio`
- PicoClaw 启动外部进程
- 请求和响应都按“一行一个 JSON 消息”传输
- `hook.event` 是 notification,不需要响应
- `hook.before_llm` / `hook.after_llm` / `hook.before_tool` / `hook.after_tool` / `hook.approve_tool` 是 request/response
当前宿主不会接受 process hook 主动发起的新 RPC。也就是说,外部 hook 现在只能“响应 PicoClaw 的调用”,不能反向调用宿主去发送 channel 消息。
## 配置字段
### `hooks.builtins.<name>`
- `enabled`
- `priority`
- `config`
### `hooks.processes.<name>`
- `enabled`
- `priority`
- `transport`
当前只支持 `stdio`
- `command`
- `dir`
- `env`
- `observe`
- `intercept`
## 排查建议
当你觉得“hook 没触发”时,优先按这个顺序排查:
1. `hooks.enabled` 是否为 `true`
2. 对应的 builtin/process hook 是否 `enabled`
3. process hook 的 `command` 路径是否正确
4. 你看的是否是正确的日志文件
5. 当前请求是否真的走到了对应阶段
6. `observe` / `intercept` 是否包含了你想看的点位
一个很实用的最小排查组合是:
- 先用文档里的 Python process 示例确认外部协议没问题
- 再用文档里的 Go in-process 示例确认宿主内的 hook 链路没问题
如果前者有 `hook.hello` 但没有业务请求,通常不是协议挂了,而是当前这次请求没有真正触发对应的 hook 点位。
## 适用边界
当前 hook 系统最适合做这些事:
- LLM 请求改写
- 工具参数规范化
- 工具执行前审批
- 审计和观测
当前还不适合直接承载这些需求:
- 外部 hook 主动发 channel 消息
- 挂起 turn 并等待人工审批回复
- inbound/outbound 全链路消息拦截
如果你要做人审流转,推荐把 hook 作为审批入口,把审批状态机和 channel 交互放到独立的 `ApprovalManager`
+199
View File
@@ -0,0 +1,199 @@
# Steering
Steering allows injecting messages into an already-running agent loop, interrupting it between tool calls without waiting for the entire cycle to complete.
## How it works
When the agent is executing a sequence of tool calls (e.g. the model requested 3 tools in a single turn), steering checks the queue **after each tool** completes. If it finds queued messages:
1. The remaining tools are **skipped** and receive `"Skipped due to queued user message."` as their result
2. The steering messages are **injected into the conversation context**
3. The model is called again with the updated context, including the user's steering message
```
User ──► Steer("change approach")
Agent Loop ▼
├─ tool[0] ✔ (executed)
├─ [polling] → steering found!
├─ tool[1] ✘ (skipped)
├─ tool[2] ✘ (skipped)
└─ new LLM turn with steering message
```
## Scoped queues
Steering is now isolated per resolved session scope, not stored in a single
global queue.
- The active turn writes and reads from its own scope key (usually the routed session key such as `agent:<agent_id>:...`)
- `Steer()` still works outside an active turn through a legacy fallback queue
- `Continue()` first dequeues messages for the requested session scope, then falls back to the legacy queue for backwards compatibility
This prevents a message arriving from another chat, DM peer, or routed agent
session from being injected into the wrong conversation.
## Configuration
In `config.json`, under `agents.defaults`:
```json
{
"agents": {
"defaults": {
"steering_mode": "one-at-a-time"
}
}
}
```
### Modes
| Value | Behavior |
|-------|----------|
| `"one-at-a-time"` | **(default)** Dequeues only one message per polling cycle. If there are 3 messages in the queue, they are processed one at a time across 3 successive iterations. |
| `"all"` | Drains the entire queue in a single poll. All pending messages are injected into the context together. |
The environment variable `PICOCLAW_AGENTS_DEFAULTS_STEERING_MODE` can be used as an alternative.
## Go API
### Steer — Send a steering message
```go
err := agentLoop.Steer(providers.Message{
Role: "user",
Content: "change direction, focus on X instead",
})
if err != nil {
// Queue is full (MaxQueueSize=10) or not initialized
}
```
The message is enqueued in a thread-safe manner. Returns an error if the queue is full or not initialized. It will be picked up at the next polling point (after the current tool finishes).
### SteeringMode / SetSteeringMode
```go
// Read the current mode
mode := agentLoop.SteeringMode() // SteeringOneAtATime | SteeringAll
// Change it at runtime
agentLoop.SetSteeringMode(agent.SteeringAll)
```
### Continue — Resume an idle agent
When the agent is idle (it has finished processing and its last message was from the assistant), `Continue` checks if there are steering messages in the queue and uses them to start a new cycle:
```go
response, err := agentLoop.Continue(ctx, sessionKey, channel, chatID)
if err != nil {
// Error (e.g. "no default agent available")
}
if response == "" {
// No steering messages in queue, the agent stays idle
}
```
`Continue` internally uses `SkipInitialSteeringPoll: true` to avoid double-dequeuing the same messages (since it already extracted them and passes them directly as input).
`Continue` also resolves the target agent from the provided session key, so
agent-scoped sessions continue on the correct agent instead of always using
the default one.
## Polling points in the loop
Steering is checked at the following points in the agent cycle:
1. **At loop start** — before the first LLM call, to catch messages enqueued during setup
2. **After every tool completes** — including the first and the last. If steering is found and there are remaining tools, they are all skipped immediately
3. **After a direct LLM response** — if a new steering message arrived while the model was generating a non-tool response, the loop continues instead of returning a stale answer
4. **Right before the turn is finalized** — if steering arrived at the very end of the turn, the agent immediately starts a continuation turn instead of leaving the message orphaned in the queue
## Why remaining tools are skipped
When a steering message is detected, all remaining tools in the batch are skipped rather than executed. The alternative — let all tools finish and inject the steering message afterwards — was considered and rejected. Here is why.
### Preventing unwanted side effects
Tools can have **irreversible side effects**. If the user says "no, wait" while the agent is mid-batch, executing the remaining tools means those side effects happen anyway:
| Tool batch | Steering message | With skip | Without skip |
|---|---|---|---|
| `[web_search, send_email]` | "don't send it" | Email **not** sent | Email sent, damage done |
| `[query_db, write_file, spawn_agent]` | "use another database" | Only the query runs | File written + subagent spawned, all wasted |
| `[search₁, search₂, search₃, write_file]` | user changes topic entirely | 1 search | 3 searches + file write, all irrelevant |
### Avoiding wasted time
Tools that take seconds (web fetches, API calls, database queries) would all run to completion before the agent sees the user's correction. In a batch of 3 tools each taking 3-4 seconds, that's 10+ seconds of work that will be discarded.
With skipping, the agent reacts as soon as the current tool finishes — typically within a few seconds instead of waiting for the entire batch.
### The LLM gets full context
Skipped tools receive an explicit error result (`"Skipped due to queued user message."`), so the model knows exactly which actions were not performed. It can then decide whether to re-execute them with the new context, or take a different path entirely.
### Trade-off: sequential execution
Skipping requires tools to run **sequentially** (the previous implementation ran them in parallel). This introduces latency when the LLM requests multiple independent tools in a single turn. In practice, most batches contain 1-2 tools, so the impact is minimal compared to the benefit of being able to stop unwanted actions.
## Skipped tool result format
When steering interrupts a batch, each tool that was not executed receives a `tool` result with:
```
Content: "Skipped due to queued user message."
```
This is saved to the session via `AddFullMessage` and sent to the model, so it is aware that some requested actions were not performed.
## Full flow example
```
1. User: "search for info on X, write a file, and send me a message"
2. LLM responds with 3 tool calls: [web_search, write_file, message]
3. web_search is executed → result saved
4. [polling] → User called Steer("no, search for Y instead")
5. write_file is skipped → "Skipped due to queued user message."
message is skipped → "Skipped due to queued user message."
6. Message "search for Y instead" injected into context
7. LLM receives the full updated context and responds accordingly
```
## Automatic bus drain
When the agent loop (`Run()`) starts processing a message, it spawns a background goroutine that keeps consuming new inbound messages from the bus. These messages are automatically redirected into the steering queue via `Steer()`. This means:
- Users on any channel (Telegram, Discord, etc.) don't need to do anything special — their messages are automatically captured as steering when the agent is busy
- Audio messages are transcribed before being steered, so the agent receives text. If transcription fails, the original (non-transcribed) message is steered as-is
- Only messages that resolve to the **same steering scope** as the active turn are redirected. Messages for other chats/sessions are requeued onto the inbound bus so they can be processed normally
- `system` inbound messages are not treated as steering input
- When `processMessage` finishes, the drain goroutine is canceled and normal message consumption resumes
## Steering with media
Steering messages can include `Media` refs, just like normal inbound user
messages.
- The original `media://` refs are preserved in session history via `AddFullMessage`
- Before the next provider call, steering messages go through the normal media resolution pipeline
- Image refs are converted to data URLs for multimodal providers; non-image refs are resolved the same way as standard inbound media
This applies both to in-turn steering and to idle-session continuation through
`Continue()`.
## Notes
- Steering **does not interrupt** a tool that is currently executing. It waits for the current tool to finish, then checks the queue.
- With `one-at-a-time` mode, if multiple messages are enqueued rapidly, they will be processed one per iteration. This gives the model the opportunity to react to each message individually.
- With `all` mode, all pending messages are combined into a single injection. Useful when you want the agent to receive all the context at once.
- The steering queue has a maximum capacity of 10 messages (`MaxQueueSize`). `Steer()` returns an error when the queue is full. In the bus drain path, the error is logged as a warning and the message is effectively dropped.
- Manual `Steer()` calls made outside an active turn still go to the legacy fallback queue, so older integrations keep working.
+279
View File
@@ -0,0 +1,279 @@
# 🔄 SubTurn Mechanism
> Back to [README](../README.md)
## Overview
The `SubTurn` mechanism is a core feature in PicoClaw that allows tools to spawn isolated, nested agent loops to handle complex sub-tasks.
By using a SubTurn, an agent can break down a problem and run a separate LLM invocation in an independent, ephemeral session. This ensures that intermediate reasoning, background tasks, or sub-agent outputs do not pollute the main conversation history.
## Core Capabilities
- **Context Isolation**: Each SubTurn uses an `ephemeralSessionStore`. Its message history does not leak into the parent task and is destroyed upon completion. The ephemeral session holds at most **50 messages**; older messages are automatically truncated when this limit is reached.
- **Depth & Concurrency Limits**: Prevents infinite loops and resource exhaustion.
- **Maximum Depth**: Up to 3 nested levels.
- **Maximum Concurrency**: Up to 5 concurrent sub-turns per parent turn (managed via a semaphore with a 30-second timeout).
- **Context Protection**: Supports soft context limits (`MaxContextRunes`). It proactively truncates old messages (while preserving system prompts and recent context) before hitting the provider's hard context window limit.
- **Error Recovery**: Automatically detects and recovers from provider context length exceeded errors and truncation errors by compressing history and retrying.
## Configuration (`SubTurnConfig`)
When spawning a SubTurn, you must provide a `SubTurnConfig`:
| Field | Type | Description |
| :--- | :--- | :--- |
| `Model` | `string` | The LLM model to use for the sub-turn (e.g., `gpt-4o-mini`). **Required.** |
| `Tools` | `[]tools.Tool` | Tools granted to the sub-turn. If empty, it inherits the parent's tools. |
| `SystemPrompt` | `string` | The task description for the sub-turn. Sent as the first user message to the LLM (not as a system prompt override). |
| `ActualSystemPrompt` | `string` | Optional explicit system prompt to replace the agent's default. Leave empty to inherit the parent agent's system prompt. |
| `MaxTokens` | `int` | Maximum tokens for the generated response. |
| `Async` | `bool` | Controls the result delivery mode (Synchronous vs. Asynchronous). |
| `Critical` | `bool` | If `true`, the sub-turn continues running even if the parent finishes gracefully. |
| `Timeout` | `time.Duration` | Maximum execution time (default: 5 minutes). |
| `MaxContextRunes`| `int` | Soft context limit. `0` = auto-calculate (75% of model's context window, recommended), `-1` = no limit (disable soft truncation, rely only on hard context error recovery), `>0` = use specified rune limit. |
> **Note:** The `Async` flag does **not** make the call non-blocking. It only controls whether the result is also delivered to the parent's `pendingResults` channel. Both modes block the caller until the sub-turn completes. For true non-blocking execution, the caller must spawn the sub-turn in a separate goroutine.
## Execution Modes
### Synchronous (`Async: false`)
This is the standard mode where the caller needs the result immediately to proceed.
- The caller blocks until the sub-turn completes.
- The result is **only** returned directly via the function return value.
- It is **not** delivered to the parent's pending results channel.
**Example:**
```go
cfg := agent.SubTurnConfig{
Model: "gpt-4o-mini",
SystemPrompt: "Analyze the provided codebase...",
Async: false,
}
result, err := agent.SpawnSubTurn(ctx, cfg)
// Process result immediately
```
### Asynchronous (`Async: true`)
Used for "fire-and-forget" operations or parallel processing where the parent turn collects results later.
- The result is delivered to the parent turn's `pendingResults` channel.
- The result is **also** returned via the function return value (for consistency).
- The parent's Agent Loop will poll this channel in subsequent iterations and automatically inject the results into the ongoing conversation context as `[SubTurn Result]`.
**Example:**
```go
cfg := agent.SubTurnConfig{
Model: "gpt-4o-mini",
SystemPrompt: "Run a background security scan...",
Async: true,
}
result, err := agent.SpawnSubTurn(ctx, cfg)
// The result will also be injected into the parent loop later via channel
```
## Error Recovery and Retries
SubTurns implement automatic retry mechanisms for transient errors:
| Error Type | Max Retries | Recovery Action |
|:-----------|:------------|:----------------|
| Context Length Exceeded | 2 | Force compress history and retry |
| Response Truncated (`finish_reason="truncated"`) | 2 | Inject recovery prompt and retry |
### Truncation Recovery
When the LLM response is truncated (`finish_reason="truncated"`), SubTurn automatically:
1. Detects the truncation from `turnState.lastFinishReason`
2. Injects a recovery prompt: "Your previous response was truncated due to length. Please provide a shorter, complete response..."
3. Retries up to 2 times
### Context Error Recovery
When the provider returns a context length error (e.g., `context_length_exceeded`):
1. Force compresses the message history (drops oldest 50% of conversation)
2. Retries with the compressed context
3. Up to 2 retries before failing
## Lifecycle and Cancellation
SubTurns operate within an independent context but maintain a structural link to their parent `turnState`.
### Graceful Parent Finish
When the parent task finishes naturally (`Finish(false)`):
- **Non-critical** sub-turns receive a signal to exit gracefully without throwing an error.
- **Critical** (`Critical: true`) sub-turns continue running in the background. Once finished, their results are emitted as **Orphan Results** so the data is not lost.
### Hard Abort
When the parent task is forcefully aborted (e.g., user interrupts with `/stop`):
- A cascading cancellation is triggered, instantly terminating all child and grandchild sub-turns.
- The root turn's session history rolls back to the snapshot taken at turn start (`initialHistoryLength`), preventing dirty context. SubTurns are not affected by this rollback as they use ephemeral sessions that are discarded anyway.
## Agent Loop Integration
### Bus Draining During Processing
When a message enters the `Run()` loop, the agent starts a `drainBusToSteering` goroutine before calling `processMessage`. This goroutine runs concurrently with the entire processing lifecycle and continuously consumes any new inbound messages from the bus, redirecting them into the **steering queue** instead of dropping them.
This ensures that if a user sends a follow-up message while the agent is processing (including during SubTurn execution), the message is not lost — it will be picked up between tool call iterations via `dequeueSteeringMessages`.
The drain goroutine stops automatically when `processMessage` returns (via a cancellable context).
### Pending Result Polling
The agent loop polls for async SubTurn results at two points per iteration:
1. **Before the LLM call**: injects any arrived results as `[SubTurn Result]` messages into the conversation context.
2. **After all tool executions**: polls again during the tool loop to catch results that arrived during tool execution.
3. **After the final iteration**: one last poll before the turn ends to avoid losing late-arriving results.
### Turn State Tracking
All active root turns are registered in `AgentLoop.activeTurnStates` (`sync.Map`, keyed by session key). This allows `HardAbort` and `/subagents` observability commands to find and operate on active turns.
## Event Bus Integration
SubTurns emit specific events to the PicoClaw `EventBus` for observability and debugging:
| Event Kind | When Emitted | Payload |
|:------|:-------------|:--------|
| `subturn_spawn` | Sub-turn successfully initialized | `SubTurnSpawnPayload{AgentID, Label, ParentTurnID}` |
| `subturn_end` | Sub-turn finishes (success or error) | `SubTurnEndPayload{AgentID, Status}` |
| `subturn_result_delivered` | Async result successfully delivered to parent | `SubTurnResultDeliveredPayload{TargetChannel, TargetChatID, ContentLen}` |
| `subturn_orphan` | Result cannot be delivered (parent finished or channel full) | `SubTurnOrphanPayload{ParentTurnID, ChildTurnID, Reason}` |
## API Reference
### SpawnSubTurn (Public Entry Point)
```go
func SpawnSubTurn(ctx context.Context, cfg SubTurnConfig) (*tools.ToolResult, error)
```
This is the exported package-level entry point for agent-internal code (e.g., tests, direct invocations). It retrieves `AgentLoop` and `turnState` from context and delegates to the internal `spawnSubTurn`.
**Requirements:**
- `AgentLoop` must be injected into context via `WithAgentLoop()`
- Parent `turnState` must exist in context (automatically set when called from tools)
**Returns:**
- `*tools.ToolResult`: Contains `ForLLM` field with the sub-turn's output
- `error`: One of the defined error types or context errors
### AgentLoopSpawner (Interface Implementation)
```go
type AgentLoopSpawner struct { al *AgentLoop }
func (s *AgentLoopSpawner) SpawnSubTurn(ctx context.Context, cfg tools.SubTurnConfig) (*tools.ToolResult, error)
```
This implements the `tools.SubTurnSpawner` interface for use by tools that need to spawn sub-turns without a direct import of the `agent` package (avoiding circular dependencies). It converts `tools.SubTurnConfig``agent.SubTurnConfig` before delegating to the internal `spawnSubTurn`.
### NewSubTurnSpawner
```go
func NewSubTurnSpawner(al *AgentLoop) *AgentLoopSpawner
```
Creates a new spawner instance for the given AgentLoop. Pass the returned value to `SpawnTool.SetSpawner()` or `SubagentTool.SetSpawner()` during tool registration.
### Continue
```go
func (al *AgentLoop) Continue(ctx context.Context, sessionKey string) error
```
Resumes an idle agent turn by injecting any queued steering messages as a new LLM iteration. Used when the agent is waiting and a deferred steering message needs to be processed without a new inbound message arriving.
## Context Propagation
SubTurn relies on context values for proper operation:
| Context Key | Purpose |
|:------------|:--------|
| `agentLoopKey` | Stores `*AgentLoop` for tool access and SubTurn spawning |
| `turnStateKey` | Stores `*turnState` for hierarchy tracking and result delivery |
### Injecting Dependencies
```go
// Before calling tools that may spawn SubTurns
ctx = WithAgentLoop(ctx, agentLoop)
ctx = withTurnState(ctx, turnState)
```
### Independent Child Context
**Important**: The child SubTurn uses an **independent context** derived from `context.Background()`, not from the parent context. This design choice:
- Allows critical SubTurns to continue after parent cancellation
- Prevents parent timeout from affecting child execution
- Child has its own timeout for self-protection (`Timeout` config or 5 minutes default)
## Error Types
| Error | Condition |
|:------|:----------|
| `ErrDepthLimitExceeded` | SubTurn depth exceeds 3 levels |
| `ErrInvalidSubTurnConfig` | Required field `Model` is empty |
| `ErrConcurrencyTimeout` | All 5 concurrency slots occupied for 30+ seconds |
| Context errors | Parent context cancelled during semaphore acquisition |
## Thread Safety
SubTurns are designed for concurrent execution:
- **Parent-child relationships**: Managed under mutex (`parentTS.mu.Lock()`)
- **Active turn tracking**: Uses `sync.Map` for concurrent access to `activeTurnStates`
- **ID generation**: Uses `atomic.Int64` for unique SubTurn IDs (format: `subturn-N`, globally monotonic per `AgentLoop` instance)
- **Result delivery**: Reads parent state under lock, releases before channel send (small race window acceptable)
## Orphan Results
An orphan result occurs when:
1. Parent turn finishes before the SubTurn completes
2. The `pendingResults` channel is full (buffer size: 16)
When a result becomes orphan:
- `SubTurnOrphanResultEvent` is emitted to EventBus
- The result is **NOT** delivered to the LLM context
- External systems can listen to this event for custom handling
### Preventing Orphan Results
- Use `Critical: true` for important SubTurns that must complete
- Monitor `SubTurnOrphanResultEvent` for observability
- Consider the 16-buffer limit when spawning many async SubTurns
## Tool Inheritance
### When `cfg.Tools` is empty:
- SubTurn inherits **all** tools from the parent agent
- Tools are registered in a new `ToolRegistry` instance
- Tool TTL is managed independently from parent
### When `cfg.Tools` is specified:
- Only the specified tools are available to the SubTurn
- Parent tools are **NOT** merged
- Use this to restrict SubTurn capabilities for security or focus
**Example - Restricted SubTurn:**
```go
cfg := agent.SubTurnConfig{
Model: "gpt-4o-mini",
Tools: []tools.Tool{readOnlyTool}, // Only read-only access
SystemPrompt: "Analyze the file structure...",
}
```
## Reference
| Constant | Value |
|:---------|:------|
| `maxSubTurnDepth` | 3 |
| `maxConcurrentSubTurns` | 5 |
| `concurrencyTimeout` | 30s |
| `defaultSubTurnTimeout` | 5m |
| `maxEphemeralHistorySize` | 50 messages |
| `pendingResults` buffer | 16 |
| `MaxContextRunes` default | 75% of model context window |