From 7868c5811aeb11f55638e17eb4b94d949a1812cb Mon Sep 17 00:00:00 2001 From: Administrator <1280842908@qq.com> Date: Sun, 22 Mar 2026 20:35:14 +0800 Subject: [PATCH] fix(agent): fix subturn panic result, hard abort rollback, and drain bus exit - spawnSubTurn: set result=nil on panic instead of constructing a non-nil ToolResult - HardAbort: roll back session history to initialHistoryLength after Finish() - drainBusToSteering: switch to non-blocking reads after first message so function returns promptly when the inbound channel is empty - remove obsolete documentation files --- flow_diagrams.md | 396 ----------------------- hybrid_implementation_guide.md | 563 --------------------------------- loop_conflict_analysis.md | 271 ---------------- pkg/agent/loop.go | 36 ++- pkg/agent/steering.go | 8 + pkg/agent/subturn.go | 9 +- 6 files changed, 36 insertions(+), 1247 deletions(-) delete mode 100644 flow_diagrams.md delete mode 100644 hybrid_implementation_guide.md delete mode 100644 loop_conflict_analysis.md diff --git a/flow_diagrams.md b/flow_diagrams.md deleted file mode 100644 index 0cd19b886..000000000 --- a/flow_diagrams.md +++ /dev/null @@ -1,396 +0,0 @@ -# Agent Loop 流程图对比 - -## 1. Incoming (refactor/agent) 流程 - -### 整体架构 -``` -User Message - ↓ -Message Bus (串行队列) - ↓ -processMessage() - ↓ -runAgentLoop() - ↓ -newTurnState() → 创建 turnState - ↓ -runTurn() - ↓ -registerActiveTurn(ts) ← 设置 al.activeTurn = ts (单例) - ↓ -[Turn 执行循环] - ↓ -clearActiveTurn(ts) ← 清除 al.activeTurn = nil -``` - -### runTurn() 详细流程 -``` -┌─────────────────────────────────────────┐ -│ runTurn(ctx, turnState) │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 1. 注册 activeTurn (单例) │ -│ al.registerActiveTurn(ts) │ -│ defer al.clearActiveTurn(ts) │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 2. 发送 TurnStart 事件 │ -│ al.emitEvent(EventKindTurnStart) │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 3. 加载 Session History & Summary │ -│ history = Sessions.GetHistory() │ -│ summary = Sessions.GetSummary() │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 4. 构建消息 │ -│ messages = BuildMessages(...) │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 5. 检查 Context Budget │ -│ if isOverContextBudget() { │ -│ forceCompression() │ -│ emitEvent(ContextCompress) │ -│ } │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 6. 保存用户消息到 Session │ -│ Sessions.AddMessage("user", ...) │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 7. Turn Loop (迭代执行) │ -│ for iteration < MaxIterations { │ -│ ┌─────────────────────────────┐ │ -│ │ 7.1 调用 LLM │ │ -│ │ callLLM() │ │ -│ │ emitEvent(LLMStart) │ │ -│ └─────────────────────────────┘ │ -│ ↓ │ -│ ┌─────────────────────────────┐ │ -│ │ 7.2 处理 Tool Calls │ │ -│ │ for each toolCall { │ │ -│ │ emitEvent(ToolStart)│ │ -│ │ executeTool() │ │ -│ │ emitEvent(ToolEnd) │ │ -│ │ } │ │ -│ └─────────────────────────────┘ │ -│ ↓ │ -│ ┌─────────────────────────────┐ │ -│ │ 7.3 检查中断 │ │ -│ │ if gracefulInterrupt { │ │ -│ │ break │ │ -│ │ } │ │ -│ └─────────────────────────────┘ │ -│ ↓ │ -│ ┌─────────────────────────────┐ │ -│ │ 7.4 处理 Steering Messages │ │ -│ │ pollSteering() │ │ -│ └─────────────────────────────┘ │ -│ } │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 8. 保存最终响应到 Session │ -│ Sessions.AddMessage("assistant", ...) │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 9. 发送 TurnEnd 事件 │ -│ al.emitEvent(EventKindTurnEnd) │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 10. 返回 turnResult │ -│ {finalContent, status, followUps} │ -└─────────────────────────────────────────┘ -``` - -### 关键特点 -- ✅ **事件驱动**: 每个阶段都发送事件到 EventBus -- ✅ **Hook 集成**: 在 before_llm, after_llm, before_tool, after_tool 触发 Hook -- ✅ **单 Turn**: 使用 `activeTurn` 单例,同一时间只有一个 turn -- ❌ **无并发**: 不支持多个 session 同时执行 turn - ---- - -## 2. HEAD (feat/subturn-poc) 流程 - -### 整体架构 -``` -User Message - ↓ -Message Bus - ↓ -processMessage() - ↓ -runAgentLoop() - ↓ -检查 Context 中是否有 turnState - ├─ 有 → 复用 (SubTurn 场景) - └─ 无 → 创建新的 rootTS - ↓ - 存储到 activeTurnStates[sessionKey] - ↓ - runLLMIteration() - ↓ - [并发 SubTurn 支持] -``` - -### runAgentLoop() 详细流程 -``` -┌─────────────────────────────────────────┐ -│ runAgentLoop(ctx, agent, opts) │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 1. 检查是否在 SubTurn 中 │ -│ existingTS = turnStateFromContext() │ -│ if existingTS != nil { │ -│ rootTS = existingTS (复用) │ -│ isRootTurn = false │ -│ } else { │ -│ rootTS = new turnState │ -│ isRootTurn = true │ -│ } │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 2. 注册 Turn State (支持并发) │ -│ if isRootTurn { │ -│ al.activeTurnStates.Store( │ -│ sessionKey, rootTS) │ -│ defer activeTurnStates.Delete() │ -│ } │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 3. 记录 Last Channel │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 4. 构建消息 │ -│ messages = BuildMessages(...) │ -│ messages = resolveMediaRefs(...) │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 5. 覆盖 System Prompt (如果需要) │ -│ if opts.SystemPromptOverride != "" { │ -│ // 用于 SubTurn 的特殊 prompt │ -│ } │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 6. 保存用户消息 │ -│ if !opts.SkipAddUserMessage { │ -│ Sessions.AddMessage(...) │ -│ } │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 7. 执行 LLM 迭代 │ -│ finalContent, iteration, err = │ -│ runLLMIteration(ctx, agent, ...) │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 8. 轮询 SubTurn 结果 (如果是根 turn) │ -│ if isRootTurn { │ -│ results = │ -│ dequeuePendingSubTurnResults()│ -│ // 将结果注入到最终响应 │ -│ } │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 9. 处理空响应 │ -│ if finalContent == "" { │ -│ finalContent = DefaultResponse │ -│ } │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 10. 保存助手响应 │ -│ Sessions.AddMessage("assistant"...) │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 11. 发送响应 (如果需要) │ -│ if opts.SendResponse { │ -│ bus.PublishOutbound(...) │ -│ } │ -└─────────────────────────────────────────┘ -``` - -### SubTurn 执行流程 -``` -┌─────────────────────────────────────────┐ -│ Tool: spawn │ -│ args: {task: "...", label: "..."} │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ SpawnTool.Execute() │ -│ if spawner != nil { │ -│ // 直接 SubTurn 路径 │ -│ } else { │ -│ // SubagentManager 路径 │ -│ } │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ spawner.SpawnSubTurn() │ -│ ┌─────────────────────────────────┐ │ -│ │ 1. 生成 SubTurn ID │ │ -│ │ subTurnID = atomic.Add() │ │ -│ └─────────────────────────────────┘ │ -│ ↓ │ -│ ┌─────────────────────────────────┐ │ -│ │ 2. 创建 SubTurn Context │ │ -│ │ subCtx = withTurnState(...) │ │ -│ │ // 继承父 turnState │ │ -│ └─────────────────────────────────┘ │ -│ ↓ │ -│ ┌─────────────────────────────────┐ │ -│ │ 3. 获取并发信号量 │ │ -│ │ <-rootTS.concurrencySem │ │ -│ │ defer release │ │ -│ └─────────────────────────────────┘ │ -│ ↓ │ -│ ┌─────────────────────────────────┐ │ -│ │ 4. 启动 Goroutine │ │ -│ │ go func() { │ │ -│ │ result = runAgentLoop( │ │ -│ │ subCtx, ...) │ │ -│ │ // 将结果发送到 channel │ │ -│ │ rootTS.pendingResults <- │ │ -│ │ }() │ │ -│ └─────────────────────────────────┘ │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 父 Turn 继续执行 │ -│ - 不等待 SubTurn 完成 │ -│ - SubTurn 异步执行 │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 父 Turn 轮询 SubTurn 结果 │ -│ results = dequeuePendingSubTurnResults│ -│ for each result { │ -│ // 注入到响应或下一次迭代 │ -│ } │ -└─────────────────────────────────────────┘ -``` - -### SubTurn 层级结构 -``` -Root Turn (Session A) - ├─ turnState (depth=0) - │ ├─ turnID: "session-a" - │ ├─ pendingResults: chan - │ └─ concurrencySem: chan (限制并发数) - │ - ├─ SubTurn 1 (depth=1) - │ ├─ turnState (继承父 context) - │ ├─ parentTurnID: "session-a" - │ └─ 独立的 goroutine - │ - ├─ SubTurn 2 (depth=1) - │ ├─ turnState (继承父 context) - │ ├─ parentTurnID: "session-a" - │ └─ 独立的 goroutine - │ - └─ SubTurn 3 (depth=1) - └─ SubTurn 3.1 (depth=2) ← 嵌套 SubTurn - └─ ... - -Root Turn (Session B) - 并发执行 - ├─ turnState (depth=0) - └─ ... -``` - -### 关键特点 -- ✅ **并发支持**: `activeTurnStates` map 支持多个 session 并发 -- ✅ **SubTurn 层级**: 通过 context 传递 turnState,支持嵌套 -- ✅ **并发控制**: `concurrencySem` 限制 SubTurn 并发数 -- ✅ **异步执行**: SubTurn 在独立 goroutine 中执行 -- ✅ **结果回传**: 通过 `pendingResults` channel 传递结果 -- ❌ **无事件系统**: 没有 EventBus 和 Hook 集成 - ---- - -## 3. 对比总结 - -| 特性 | Incoming (refactor/agent) | HEAD (feat/subturn-poc) | -|------|---------------------------|-------------------------| -| **并发模型** | 单 Turn (串行) | 多 Turn (并发) | -| **Turn 管理** | `activeTurn` (单例) | `activeTurnStates` (map) | -| **事件系统** | ✅ EventBus | ❌ 无 | -| **Hook 系统** | ✅ HookManager | ❌ 无 | -| **SubTurn** | ❓ 未实现或不同方式 | ✅ 完整实现 | -| **并发 Session** | ❌ 不支持 | ✅ 支持 | -| **嵌套 SubTurn** | ❌ 不支持 | ✅ 支持 | -| **架构复杂度** | 简单 | 复杂 | -| **可扩展性** | 高 (Hook) | 低 | -| **调试难度** | 低 | 高 (并发) | - ---- - -## 4. 混合方案流程 - -结合两者优点的混合方案: - -``` -┌─────────────────────────────────────────┐ -│ runAgentLoop(ctx, agent, opts) │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 1. 检查 SubTurn Context │ -│ existingTS = turnStateFromContext() │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 2. 创建/复用 turnState │ -│ ts = newTurnState(agent, opts, ...) │ -│ if isRootTurn { │ -│ activeTurnStates.Store(key, ts) │ -│ } │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 3. 执行 Turn (带事件和 Hook) │ -│ result = runTurn(ctx, ts) │ -│ ├─ emitEvent(TurnStart) │ -│ ├─ Hook: before_llm │ -│ ├─ callLLM() │ -│ ├─ Hook: after_llm │ -│ ├─ Hook: before_tool │ -│ ├─ executeTool() │ -│ │ └─ 如果是 spawn → SpawnSubTurn │ -│ ├─ Hook: after_tool │ -│ └─ emitEvent(TurnEnd) │ -└─────────────────────────────────────────┘ - ↓ -┌─────────────────────────────────────────┐ -│ 4. 处理 SubTurn 结果 │ -│ if isRootTurn { │ -│ pollSubTurnResults() │ -│ } │ -└─────────────────────────────────────────┘ -``` - -### 混合方案优势 -- ✅ 保留并发能力 (`activeTurnStates`) -- ✅ 获得事件系统 (`EventBus`) -- ✅ 获得扩展能力 (`HookManager`) -- ✅ 支持 SubTurn 并发 -- ✅ 支持多 Session 并发 diff --git a/hybrid_implementation_guide.md b/hybrid_implementation_guide.md deleted file mode 100644 index ba1208baf..000000000 --- a/hybrid_implementation_guide.md +++ /dev/null @@ -1,563 +0,0 @@ -# 混合方案落地指南 - -## 目标 - -结合 Incoming 的事件驱动架构和 HEAD 的并发能力,实现: -- ✅ 保留 `activeTurnStates` map(支持并发 Session) -- ✅ 采用 `EventBus` 和 `HookManager`(事件驱动 + 扩展性) -- ✅ 保留 SubTurn 并发支持 -- ✅ 统一使用 `runTurn` 函数(简化代码) - ---- - -## 实施步骤 - -### 步骤 1: 合并 AgentLoop 结构体 (30 分钟) - -**目标**: 结合两边的字段 - -```go -type AgentLoop struct { - // ===== Incoming 的字段 (保留) ===== - bus *bus.MessageBus - cfg *config.Config - registry *AgentRegistry - state *state.Manager - eventBus *EventBus // ✅ 新增:事件系统 - hooks *HookManager // ✅ 新增:Hook 系统 - running atomic.Bool - summarizing sync.Map - fallback *providers.FallbackChain - channelManager *channels.Manager - mediaStore media.MediaStore - transcriber voice.Transcriber - cmdRegistry *commands.Registry - mcp mcpRuntime - hookRuntime hookRuntime // ✅ 新增:Hook 运行时 - steering *steeringQueue - mu sync.RWMutex - - // ===== HEAD 的字段 (保留) ===== - activeTurnStates sync.Map // ✅ 保留:支持并发 Session - subTurnCounter atomic.Int64 // ✅ 保留:SubTurn ID 生成 - - // ===== Incoming 的字段 (调整) ===== - turnSeq atomic.Uint64 // ✅ 保留:全局 Turn 序列号 - activeRequests sync.WaitGroup // ✅ 保留:请求跟踪 - - reloadFunc func() error -} -``` - -**操作**: -1. 找到 AgentLoop 结构体定义(38-77 行的冲突) -2. 采用上面的合并版本 -3. 删除 Incoming 的 `activeTurn *turnState` 和 `activeTurnMu`(不需要了) - ---- - -### 步骤 2: 合并 processOptions 结构体 (10 分钟) - -**目标**: 采用 Incoming 的版本,移除 HEAD 的 `SkipAddUserMessage` - -```go -type processOptions struct { - SessionKey string - Channel string - ChatID string - SenderID string - SenderDisplayName string - UserMessage string - SystemPromptOverride string - Media []string - InitialSteeringMessages []providers.Message // ✅ Incoming 的方式 - DefaultResponse string - EnableSummary bool - SendResponse bool - NoHistory bool - SkipInitialSteeringPoll bool -} - -type continuationTarget struct { - SessionKey string - Channel string - ChatID string -} -``` - -**操作**: -1. 找到 processOptions 结构体(92-112 行的冲突) -2. 采用上面的版本 -3. 添加 `continuationTarget` 结构体 - ---- - -### 步骤 3: 更新 turnState 结构体 (20 分钟) - -**目标**: 在 Incoming 的 turnState 基础上添加 SubTurn 支持 - -需要检查 `turn.go` 或 `turn_state.go` 文件,确保 turnState 有这些字段: - -```go -type turnState struct { - mu sync.RWMutex - - // ===== Incoming 的字段 (保留) ===== - agent *AgentInstance - opts processOptions - scope turnEventScope - - turnID string - agentID string - sessionKey string - channel string - chatID string - userMessage string - media []string - - phase TurnPhase - iteration int - startedAt time.Time - finalContent string - followUps []bus.InboundMessage - - gracefulInterrupt bool - gracefulInterruptHint string - gracefulTerminalUsed bool - hardAbort bool - providerCancel context.CancelFunc - turnCancel context.CancelFunc - - restorePointHistory []providers.Message - restorePointSummary string - persistedMessages []providers.Message - - // ===== HEAD 的字段 (新增:SubTurn 支持) ===== - depth int // ✅ SubTurn 深度 - parentTurnID string // ✅ 父 Turn ID - childTurnIDs []string // ✅ 子 Turn IDs - pendingResults chan *tools.ToolResult // ✅ SubTurn 结果 channel - concurrencySem chan struct{} // ✅ 并发信号量 - isFinished atomic.Bool // ✅ 是否已完成 -} -``` - -**操作**: -1. 查找 `turnState` 结构体定义 -2. 如果有冲突,采用 Incoming 的基础版本 -3. 添加 SubTurn 相关字段(depth, parentTurnID 等) - ---- - -### 步骤 4: 重写 runAgentLoop 函数 (1 小时) - -**目标**: 简化为调用 runTurn,但保留 SubTurn 检测 - -```go -func (al *AgentLoop) runAgentLoop( - ctx context.Context, - agent *AgentInstance, - opts processOptions, -) (string, error) { - // 1. 检查是否在 SubTurn 中 - existingTS := turnStateFromContext(ctx) - var ts *turnState - var isRootTurn bool - - if existingTS != nil { - // 在 SubTurn 中 - 创建子 turnState - ts = newSubTurnState(agent, opts, existingTS, al.newTurnEventScope(agent.ID, opts.SessionKey)) - isRootTurn = false - } else { - // 根 Turn - 创建新的 turnState - ts = newTurnState(agent, opts, al.newTurnEventScope(agent.ID, opts.SessionKey)) - isRootTurn = true - - // 注册到 activeTurnStates(支持并发) - al.activeTurnStates.Store(opts.SessionKey, ts) - defer al.activeTurnStates.Delete(opts.SessionKey) - } - - // 2. 记录 last channel - if opts.Channel != "" && opts.ChatID != "" && !constants.IsInternalChannel(opts.Channel) { - channelKey := fmt.Sprintf("%s:%s", opts.Channel, opts.ChatID) - if err := al.RecordLastChannel(channelKey); err != nil { - logger.WarnCF("agent", "Failed to record last channel", - map[string]any{"error": err.Error()}) - } - } - - // 3. 执行 Turn(带事件和 Hook) - result, err := al.runTurn(ctx, ts) - if err != nil { - return "", err - } - if result.status == TurnEndStatusAborted { - return "", nil - } - - // 4. 处理 SubTurn 结果(仅根 Turn) - if isRootTurn && ts.pendingResults != nil { - finalResults := al.drainPendingSubTurnResults(ts) - for _, r := range finalResults { - if r != nil && r.ForLLM != "" { - result.finalContent += fmt.Sprintf("\n\n[SubTurn Result] %s", r.ForLLM) - } - } - } - - // 5. 处理 follow-up 消息 - for _, followUp := range result.followUps { - if pubErr := al.bus.PublishInbound(ctx, followUp); pubErr != nil { - logger.WarnCF("agent", "Failed to publish follow-up after turn", - map[string]any{"turn_id": ts.turnID, "error": pubErr.Error()}) - } - } - - // 6. 发送响应 - if opts.SendResponse && result.finalContent != "" { - al.bus.PublishOutbound(ctx, bus.OutboundMessage{ - Channel: opts.Channel, - ChatID: opts.ChatID, - Content: result.finalContent, - }) - } - - return result.finalContent, nil -} -``` - -**操作**: -1. 找到 runAgentLoop 函数(1439-1581 行的冲突) -2. 替换为上面的简化版本 -3. 保留 SubTurn 检测逻辑(`turnStateFromContext`) -4. 保留 `activeTurnStates` 注册逻辑 - ---- - -### 步骤 5: 采用 Incoming 的 runTurn 函数 (30 分钟) - -**目标**: 使用 Incoming 的 runTurn,但添加 SubTurn 结果轮询 - -```go -func (al *AgentLoop) runTurn(ctx context.Context, ts *turnState) (turnResult, error) { - turnCtx, turnCancel := context.WithCancel(ctx) - defer turnCancel() - ts.setTurnCancel(turnCancel) - - // ===== 不使用单例 activeTurn,因为我们有 activeTurnStates ===== - // al.registerActiveTurn(ts) ← 删除这行 - // defer al.clearActiveTurn(ts) ← 删除这行 - - turnStatus := TurnEndStatusCompleted - defer func() { - al.emitEvent( - EventKindTurnEnd, - ts.eventMeta("runTurn", "turn.end"), - TurnEndPayload{ - Status: turnStatus, - Iterations: ts.currentIteration(), - Duration: time.Since(ts.startedAt), - FinalContentLen: ts.finalContentLen(), - }, - ) - }() - - al.emitEvent( - EventKindTurnStart, - ts.eventMeta("runTurn", "turn.start"), - TurnStartPayload{ - Channel: ts.channel, - ChatID: ts.chatID, - UserMessage: ts.userMessage, - MediaCount: len(ts.media), - }, - ) - - // ... 保留 Incoming 的其余逻辑 ... - - // ===== 在 Turn Loop 中添加 SubTurn 结果轮询 ===== -turnLoop: - for ts.currentIteration() < ts.agent.MaxIterations || len(pendingMessages) > 0 { - // ... LLM 调用 ... - // ... Tool 执行 ... - - // ✅ 新增:轮询 SubTurn 结果 - if ts.pendingResults != nil { - subTurnResults := al.pollSubTurnResults(ts) - for _, result := range subTurnResults { - if result.ForLLM != "" { - // 将 SubTurn 结果作为 steering message 注入 - pendingMessages = append(pendingMessages, providers.Message{ - Role: "user", - Content: fmt.Sprintf("[SubTurn Result] %s", result.ForLLM), - }) - } - } - } - - // ... 继续迭代 ... - } - - // ... 返回结果 ... -} -``` - -**操作**: -1. 找到 runTurn 函数(1672-1689 行开始的冲突) -2. 采用 Incoming 的完整实现 -3. 删除 `registerActiveTurn` 和 `clearActiveTurn` 调用 -4. 在 Turn Loop 中添加 SubTurn 结果轮询逻辑 - ---- - -### 步骤 6: 实现辅助函数 (30 分钟) - -需要实现以下辅助函数: - -#### 6.1 newSubTurnState -```go -func newSubTurnState( - agent *AgentInstance, - opts processOptions, - parent *turnState, - scope turnEventScope, -) *turnState { - ts := newTurnState(agent, opts, scope) - - // 设置 SubTurn 关系 - ts.depth = parent.depth + 1 - ts.parentTurnID = parent.turnID - ts.pendingResults = parent.pendingResults // 共享结果 channel - ts.concurrencySem = parent.concurrencySem // 共享信号量 - - // 记录父子关系 - parent.mu.Lock() - parent.childTurnIDs = append(parent.childTurnIDs, ts.turnID) - parent.mu.Unlock() - - return ts -} -``` - -#### 6.2 pollSubTurnResults -```go -func (al *AgentLoop) pollSubTurnResults(ts *turnState) []*tools.ToolResult { - if ts.pendingResults == nil { - return nil - } - - var results []*tools.ToolResult - for { - select { - case result := <-ts.pendingResults: - results = append(results, result) - default: - return results - } - } -} -``` - -#### 6.3 drainPendingSubTurnResults -```go -func (al *AgentLoop) drainPendingSubTurnResults(ts *turnState) []*tools.ToolResult { - if ts.pendingResults == nil { - return nil - } - - // 等待一小段时间,确保所有 SubTurn 结果都到达 - time.Sleep(100 * time.Millisecond) - - return al.pollSubTurnResults(ts) -} -``` - -#### 6.4 更新 GetActiveTurn -```go -func (al *AgentLoop) GetActiveTurn(sessionKey string) *ActiveTurnInfo { - val, ok := al.activeTurnStates.Load(sessionKey) - if !ok { - return nil - } - - ts, ok := val.(*turnState) - if !ok { - return nil - } - - info := ts.snapshot() - return &info -} -``` - ---- - -### 步骤 7: 更新 SpawnSubTurn 实现 (30 分钟) - -确保 spawn tool 能正确创建 SubTurn: - -```go -func (spawner *subTurnSpawner) SpawnSubTurn( - ctx context.Context, - config SubTurnConfig, -) (*tools.ToolResult, error) { - // 1. 获取父 turnState - parentTS := turnStateFromContext(ctx) - if parentTS == nil { - return nil, fmt.Errorf("no parent turn state in context") - } - - // 2. 检查深度限制 - maxDepth := spawner.loop.getSubTurnConfig().maxDepth - if parentTS.depth >= maxDepth { - return tools.ErrorResult(fmt.Sprintf( - "SubTurn depth limit reached (%d)", maxDepth)), nil - } - - // 3. 获取并发信号量 - select { - case <-parentTS.concurrencySem: - defer func() { parentTS.concurrencySem <- struct{}{} }() - case <-ctx.Done(): - return tools.ErrorResult("SubTurn cancelled"), nil - } - - // 4. 生成 SubTurn ID - subTurnID := spawner.loop.subTurnCounter.Add(1) - turnID := fmt.Sprintf("%s-sub-%d", parentTS.turnID, subTurnID) - - // 5. 创建 SubTurn context - subCtx := withTurnState(ctx, parentTS) // 继承父 context - - // 6. 启动 SubTurn goroutine - go func() { - opts := processOptions{ - SessionKey: parentTS.sessionKey, - Channel: parentTS.channel, - ChatID: parentTS.chatID, - UserMessage: config.SystemPrompt, - SystemPromptOverride: config.SystemPrompt, - NoHistory: true, // SubTurn 不加载历史 - SendResponse: false, // SubTurn 不发送响应 - } - - result, err := spawner.loop.runAgentLoop(subCtx, spawner.agent, opts) - - // 7. 发送结果到父 Turn - toolResult := &tools.ToolResult{ - ForLLM: result, - Error: err, - } - - select { - case parentTS.pendingResults <- toolResult: - case <-subCtx.Done(): - } - }() - - // 8. 立即返回(异步执行) - return tools.AsyncResult(fmt.Sprintf("SubTurn %d started", subTurnID)), nil -} -``` - ---- - -### 步骤 8: 解决其他小冲突 (1 小时) - -处理剩余的 7 个冲突点: - -1. **变量命名冲突** (2179-2183 行等) - - 统一使用 `ts.channel`, `ts.chatID` 而不是 `opts.Channel` - -2. **Tool feedback** (2469-2494 行) - - 采用 HEAD 的实现(发送 tool feedback 到 chat) - -3. **其他小差异** - - 逐个检查,优先采用 Incoming 的实现 - - 确保 EventBus 事件正确触发 - ---- - -## 验证步骤 - -### 1. 编译验证 -```bash -go build ./pkg/agent/ -``` - -### 2. 单元测试 -```bash -go test ./pkg/agent/ -v -``` - -### 3. 功能测试 - -创建测试用例验证: - -```go -func TestMixedArchitecture_ConcurrentSessions(t *testing.T) { - // 测试多个 session 并发执行 - var wg sync.WaitGroup - for i := 0; i < 5; i++ { - wg.Add(1) - go func(id int) { - defer wg.Done() - sessionKey := fmt.Sprintf("session-%d", id) - // 执行 agent loop - }(i) - } - wg.Wait() -} - -func TestMixedArchitecture_SubTurnExecution(t *testing.T) { - // 测试 SubTurn 执行 - // 1. 启动主 Turn - // 2. 调用 spawn tool - // 3. 验证 SubTurn 结果返回 -} - -func TestMixedArchitecture_EventBusIntegration(t *testing.T) { - // 测试事件系统 - // 1. 订阅事件 - // 2. 执行 Turn - // 3. 验证事件触发 -} -``` - ---- - -## 预期结果 - -完成后,系统应该: - -✅ 支持多个 Session 并发执行 -✅ 支持 SubTurn 并发和嵌套 -✅ 所有操作都触发 EventBus 事件 -✅ Hook 系统正常工作 -✅ 代码结构清晰,易于维护 - ---- - -## 时间估算 - -- 步骤 1-2: 结构体合并 (40 分钟) -- 步骤 3: turnState 更新 (20 分钟) -- 步骤 4: runAgentLoop 重写 (1 小时) -- 步骤 5: runTurn 调整 (30 分钟) -- 步骤 6: 辅助函数 (30 分钟) -- 步骤 7: SpawnSubTurn (30 分钟) -- 步骤 8: 其他冲突 (1 小时) -- 测试验证 (1 小时) - -**总计: 约 5-6 小时** - ---- - -## 风险和注意事项 - -1. **Context 传递**: 确保 SubTurn 的 context 正确继承父 context -2. **Channel 关闭**: 确保 `pendingResults` channel 在合适的时机关闭 -3. **并发安全**: 所有对 turnState 的访问都要加锁 -4. **事件顺序**: 确保事件按正确顺序触发 -5. **测试覆盖**: 重点测试并发场景和 SubTurn 场景 diff --git a/loop_conflict_analysis.md b/loop_conflict_analysis.md deleted file mode 100644 index 486e19054..000000000 --- a/loop_conflict_analysis.md +++ /dev/null @@ -1,271 +0,0 @@ -# loop.go 冲突详细分析 - -## 概述 - -loop.go 有 11 处冲突,涉及核心架构差异: -- **HEAD (feat/subturn-poc)**: 基于 context 的 SubTurn 层级管理,使用 `activeTurnStates` map 支持并发 -- **Incoming (refactor/agent)**: 事件驱动架构,使用 `EventBus`、`HookManager`,单个 `activeTurn` **不支持并发 turn** - -## 关键发现:Incoming 的并发限制 - -**重要**: Incoming 分支的 `activeTurn` 设计**不支持并发 turn 执行**! - -```go -// Incoming 的实现 -func (al *AgentLoop) runTurn(ctx context.Context, ts *turnState) (turnResult, error) { - al.registerActiveTurn(ts) // 设置 al.activeTurn = ts - defer al.clearActiveTurn(ts) // 清除 al.activeTurn = nil - // ... -} - -func (al *AgentLoop) registerActiveTurn(ts *turnState) { - al.activeTurnMu.Lock() - defer al.activeTurnMu.Unlock() - al.activeTurn = ts // 单例!后面的会覆盖前面的 -} -``` - -**问题**: -1. 如果两个 session 同时调用 `runAgentLoop`,第二个会覆盖第一个的 `activeTurn` -2. `GetActiveTurn()` 只能返回最后一个注册的 turn -3. 中断操作 (`InterruptGraceful`, `InterruptHard`) 只能影响当前的 `activeTurn` - -**HEAD 的优势**: -```go -// HEAD 的实现 -activeTurnStates sync.Map // 支持多个并发 turn -// key: sessionKey, value: *turnState - -// 每个 session 有独立的 turnState -al.activeTurnStates.Store(opts.SessionKey, rootTS) -``` - -## 架构决策的影响 - -如果采用 Incoming 的架构(方案 B),我们会**失去并发 turn 的能力**! - -### 选项分析 - -**选项 1: 完全采用 Incoming(会失去并发)** -- ✅ 获得事件驱动架构 -- ✅ 获得 Hook 系统 -- ❌ **失去并发 turn 支持** -- ❌ **失去 SubTurn 并发支持** -- ❌ 多个 session 无法同时处理 - -**选项 2: 混合方案(推荐)** -- ✅ 保留 HEAD 的 `activeTurnStates sync.Map` -- ✅ 采用 Incoming 的 `EventBus` 和 `HookManager` -- ✅ 保持并发能力 -- ⚠️ 需要调整 `GetActiveTurn()` 等 API - -**选项 3: 改造 Incoming 支持并发** -- 将 `activeTurn *turnState` 改为 `activeTurns sync.Map` -- 修改所有相关方法支持 sessionKey 参数 -- 工作量大,但架构更清晰 - -## 推荐方案:选项 2(混合方案) - -### AgentLoop 结构体设计 - -```go -type AgentLoop struct { - // Incoming 的字段 - bus *bus.MessageBus - cfg *config.Config - registry *AgentRegistry - state *state.Manager - eventBus *EventBus // ✅ 保留 - hooks *HookManager // ✅ 保留 - hookRuntime hookRuntime // ✅ 保留 - running atomic.Bool - summarizing sync.Map - fallback *providers.FallbackChain - channelManager *channels.Manager - mediaStore media.MediaStore - transcriber voice.Transcriber - cmdRegistry *commands.Registry - mcp mcpRuntime - steering *steeringQueue - mu sync.RWMutex - - // HEAD 的并发支持(保留) - activeTurnStates sync.Map // ✅ 保留:支持并发 turn - subTurnCounter atomic.Int64 // ✅ 保留:SubTurn ID 生成 - - // Incoming 的字段(调整) - turnSeq atomic.Uint64 // ✅ 保留:全局 turn 序列号 - activeRequests sync.WaitGroup // ✅ 保留:请求跟踪 - - reloadFunc func() error -} -``` - -### 关键方法调整 - -1. **GetActiveTurn()**: 需要接受 sessionKey 参数 -2. **InterruptGraceful/Hard()**: 需要接受 sessionKey 参数 -3. **runAgentLoop()**: 使用 `activeTurnStates` 而不是单个 `activeTurn` - -## 冲突详情 - -### 冲突 1: AgentLoop 结构体 (38-77 行) - -**HEAD 新增字段**: -```go -activeTurnStates sync.Map // key: sessionKey (string), value: *turnState -subTurnCounter atomic.Int64 // Counter for generating unique SubTurn IDs -``` - -**Incoming 新增字段**: -```go -eventBus *EventBus -hooks *HookManager -hookRuntime hookRuntime -activeTurnMu sync.RWMutex -activeTurn *turnState -turnSeq atomic.Uint64 -activeRequests sync.WaitGroup -``` - -**关键差异**: -- HEAD: 使用 `sync.Map` 管理多个并发 turn (`activeTurnStates`) -- Incoming: 使用单个 `activeTurn` + 锁 (`activeTurnMu`) -- HEAD: SubTurn 计数器 (`subTurnCounter`) -- Incoming: Turn 序列号 (`turnSeq`) -- Incoming: 新增事件系统 (`eventBus`, `hooks`, `hookRuntime`) - -**解决方案**: 采用 Incoming 的结构,但需要考虑如何在新架构中实现 SubTurn 的并发管理。 - ---- - -### 冲突 2: processOptions 结构体 (92-112 行) - -**HEAD**: -```go -SkipAddUserMessage bool // If true, skip adding UserMessage to session history -``` - -**Incoming**: -```go -InitialSteeringMessages []providers.Message - -// 新增结构体 -type continuationTarget struct { - SessionKey string - Channel string - ChatID string -} -``` - -**关键差异**: -- HEAD: 使用 `SkipAddUserMessage` 标志 -- Incoming: 使用 `InitialSteeringMessages` 数组 + 新的 `continuationTarget` 结构体 - -**解决方案**: 采用 Incoming 的实现,`InitialSteeringMessages` 提供更灵活的 steering 消息处理。 - ---- - -### 冲突 3: runAgentLoop 函数 (1439-1581 行) - -这是最大的冲突,涉及核心执行逻辑。 - -**HEAD 的实现**: -1. 检查是否在 SubTurn 中 (`turnStateFromContext`) -2. 如果是 SubTurn,复用现有 turnState -3. 如果是根 turn,创建新的 rootTS -4. 使用 `activeTurnStates.Store` 注册 turn -5. 调用 `runLLMIteration` 执行 LLM 循环 - -**Incoming 的实现**: -1. 记录 last channel -2. 调用 `newTurnState` 创建 turn state -3. 调用 `al.runTurn(ctx, ts)` 执行 turn -4. 处理 follow-up 消息 -5. 发布响应 - -**关键差异**: -- HEAD: 复杂的 SubTurn 层级管理,支持嵌套 -- Incoming: 简化的 turn 管理,通过 `newTurnState` 和 `runTurn` -- HEAD: 使用 `runLLMIteration` 函数 -- Incoming: 使用 `runTurn` 函数 -- Incoming: 新增 follow-up 消息处理机制 - -**解决方案**: 采用 Incoming 的简化架构,但需要在 `runTurn` 中添加 SubTurn 支持。 - ---- - -### 冲突 4: runLLMIteration vs runTurn (1672-1689 行) - -**HEAD**: 有独立的 `runLLMIteration` 函数 -**Incoming**: 使用 `runTurn` 函数 - -需要查看具体实现来决定如何合并。 - ---- - -### 冲突 5-11: 其他冲突点 - -剩余冲突主要涉及: -- 工具执行逻辑 -- Steering 消息处理 -- 中断处理 -- 变量命名差异(`agent` vs `ts.agent`) - -## 架构决策 - -根据方案 B(采用重构架构),需要: - -1. **采用 Incoming 的 AgentLoop 结构** - - 使用 `eventBus`, `hooks`, `hookRuntime` - - 使用单个 `activeTurn` + `activeTurnMu` - - 保留 `turnSeq` - -2. **SubTurn 支持策略** - - 选项 A: 在 `turnState` 中添加父子关系字段 - - 选项 B: 使用 context 传递 SubTurn 信息 - - 选项 C: 在 EventBus 中管理 SubTurn 层级 - -3. **函数迁移顺序** - - 先采用 Incoming 的结构体定义 - - 更新 `newTurnState` 函数 - - 采用 `runTurn` 函数 - - 在 `runTurn` 中集成 SubTurn 逻辑 - -## 推荐实施步骤 - -### 步骤 1: 结构体定义 (30 分钟) -- 采用 Incoming 的 `AgentLoop` 结构体 -- 采用 Incoming 的 `processOptions` 结构体 -- 添加 `continuationTarget` 结构体 - -### 步骤 2: 辅助函数 (30 分钟) -- 更新 `NewAgentLoop` 初始化函数 -- 确保 EventBus、Hook 正确初始化 - -### 步骤 3: runAgentLoop 函数 (1-2 小时) -- 采用 Incoming 的简化实现 -- 保留 channel 记录逻辑 -- 调用 `newTurnState` 和 `runTurn` -- 处理 follow-up 消息 - -### 步骤 4: runTurn 函数 (2-3 小时) -- 采用 Incoming 的 `runTurn` 实现 -- 在其中添加 SubTurn 检测和处理逻辑 -- 集成 SubTurn 结果回传机制 - -### 步骤 5: 其他冲突点 (1-2 小时) -- 逐个解决剩余 7 个冲突 -- 确保变量命名一致 -- 更新工具执行和 steering 逻辑 - -## 风险和注意事项 - -1. **SubTurn 语义变化**: 新架构中 SubTurn 的实现方式可能不同 -2. **并发安全**: 从 `sync.Map` 迁移到单个 `activeTurn` + 锁 -3. **事件系统集成**: 需要确保 SubTurn 事件正确触发 -4. **测试覆盖**: 原有 SubTurn 测试需要更新 - -## 下一步 - -建议先实现步骤 1-2(结构体定义和初始化),然后再处理复杂的执行逻辑。 diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index f7cc381c9..840aa8fa1 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -509,21 +509,39 @@ func (al *AgentLoop) Run(ctx context.Context) error { return nil } -// drainBusToSteering continuously consumes inbound messages and redirects -// messages from the active scope into the steering queue. Messages from other -// scopes are requeued so they can be processed normally after the active turn. +// drainBusToSteering consumes inbound messages and redirects messages from the +// active scope into the steering queue. Messages from other scopes are requeued +// so they can be processed normally after the active turn. It drains all +// immediately available messages, blocking for the first one until ctx is done. func (al *AgentLoop) drainBusToSteering(ctx context.Context, activeScope, activeAgentID string) { + blocking := true for { var msg bus.InboundMessage - select { - case <-ctx.Done(): - return - case m, ok := <-al.bus.InboundChan(): - if !ok { + + if blocking { + // Block waiting for the first available message or ctx cancellation. + select { + case <-ctx.Done(): + return + case m, ok := <-al.bus.InboundChan(): + if !ok { + return + } + msg = m + } + } else { + // Non-blocking: drain any remaining queued messages, return when empty. + select { + case m, ok := <-al.bus.InboundChan(): + if !ok { + return + } + msg = m + default: return } - msg = m } + blocking = false msgScope, _, scopeOK := al.resolveSteeringTarget(msg) if !scopeOK || msgScope != activeScope { diff --git a/pkg/agent/steering.go b/pkg/agent/steering.go index 12533beaf..ad6613e8c 100644 --- a/pkg/agent/steering.go +++ b/pkg/agent/steering.go @@ -460,6 +460,14 @@ func (al *AgentLoop) HardAbort(sessionKey string) error { // Use isHardAbort=true for hard abort to immediately cancel all children. ts.Finish(true) + // Roll back session history to the state before the turn started. + if ts.session != nil { + history := ts.session.GetHistory(sessionKey) + if ts.initialHistoryLength < len(history) { + ts.session.SetHistory(sessionKey, history[:ts.initialHistoryLength]) + } + } + return nil } diff --git a/pkg/agent/subturn.go b/pkg/agent/subturn.go index 72eb2e53a..f5ba412ab 100644 --- a/pkg/agent/subturn.go +++ b/pkg/agent/subturn.go @@ -428,19 +428,12 @@ func spawnSubTurn( defer func() { if r := recover(); r != nil { err = fmt.Errorf("subturn panicked: %v", r) + result = nil logger.ErrorCF("subturn", "SubTurn panicked", map[string]any{ "child_id": childID, "parent_id": parentTS.turnID, "panic": r, }) - - // Ensure result is not nil to prevent panic during event emission - if result == nil { - result = &tools.ToolResult{ - Err: err, - ForLLM: fmt.Sprintf("SubTurn panicked: %v", r), - } - } } // Result Delivery Strategy (Async vs Sync)