fix(agent): fix subturn panic result, hard abort rollback, and drain bus exit

- spawnSubTurn: set result=nil on panic instead of constructing a non-nil ToolResult
- HardAbort: roll back session history to initialHistoryLength after Finish()
- drainBusToSteering: switch to non-blocking reads after first message so function
  returns promptly when the inbound channel is empty
- remove obsolete documentation files
This commit is contained in:
Administrator
2026-03-22 20:35:14 +08:00
parent 7ba8682ac5
commit 7868c5811a
6 changed files with 36 additions and 1247 deletions
-396
View File
@@ -1,396 +0,0 @@
# Agent Loop 流程图对比
## 1. Incoming (refactor/agent) 流程
### 整体架构
```
User Message
Message Bus (串行队列)
processMessage()
runAgentLoop()
newTurnState() → 创建 turnState
runTurn()
registerActiveTurn(ts) ← 设置 al.activeTurn = ts (单例)
[Turn 执行循环]
clearActiveTurn(ts) ← 清除 al.activeTurn = nil
```
### runTurn() 详细流程
```
┌─────────────────────────────────────────┐
│ runTurn(ctx, turnState) │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 1. 注册 activeTurn (单例) │
│ al.registerActiveTurn(ts) │
│ defer al.clearActiveTurn(ts) │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 2. 发送 TurnStart 事件 │
│ al.emitEvent(EventKindTurnStart) │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 3. 加载 Session History & Summary │
│ history = Sessions.GetHistory() │
│ summary = Sessions.GetSummary() │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 4. 构建消息 │
│ messages = BuildMessages(...) │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 5. 检查 Context Budget │
│ if isOverContextBudget() { │
│ forceCompression() │
│ emitEvent(ContextCompress) │
│ } │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 6. 保存用户消息到 Session │
│ Sessions.AddMessage("user", ...) │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 7. Turn Loop (迭代执行) │
│ for iteration < MaxIterations { │
│ ┌─────────────────────────────┐ │
│ │ 7.1 调用 LLM │ │
│ │ callLLM() │ │
│ │ emitEvent(LLMStart) │ │
│ └─────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────┐ │
│ │ 7.2 处理 Tool Calls │ │
│ │ for each toolCall { │ │
│ │ emitEvent(ToolStart)│ │
│ │ executeTool() │ │
│ │ emitEvent(ToolEnd) │ │
│ │ } │ │
│ └─────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────┐ │
│ │ 7.3 检查中断 │ │
│ │ if gracefulInterrupt { │ │
│ │ break │ │
│ │ } │ │
│ └─────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────┐ │
│ │ 7.4 处理 Steering Messages │ │
│ │ pollSteering() │ │
│ └─────────────────────────────┘ │
│ } │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 8. 保存最终响应到 Session │
│ Sessions.AddMessage("assistant", ...) │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 9. 发送 TurnEnd 事件 │
│ al.emitEvent(EventKindTurnEnd) │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 10. 返回 turnResult │
│ {finalContent, status, followUps} │
└─────────────────────────────────────────┘
```
### 关键特点
-**事件驱动**: 每个阶段都发送事件到 EventBus
-**Hook 集成**: 在 before_llm, after_llm, before_tool, after_tool 触发 Hook
-**单 Turn**: 使用 `activeTurn` 单例,同一时间只有一个 turn
-**无并发**: 不支持多个 session 同时执行 turn
---
## 2. HEAD (feat/subturn-poc) 流程
### 整体架构
```
User Message
Message Bus
processMessage()
runAgentLoop()
检查 Context 中是否有 turnState
├─ 有 → 复用 (SubTurn 场景)
└─ 无 → 创建新的 rootTS
存储到 activeTurnStates[sessionKey]
runLLMIteration()
[并发 SubTurn 支持]
```
### runAgentLoop() 详细流程
```
┌─────────────────────────────────────────┐
│ runAgentLoop(ctx, agent, opts) │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 1. 检查是否在 SubTurn 中 │
│ existingTS = turnStateFromContext() │
│ if existingTS != nil { │
│ rootTS = existingTS (复用) │
│ isRootTurn = false │
│ } else { │
│ rootTS = new turnState │
│ isRootTurn = true │
│ } │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 2. 注册 Turn State (支持并发) │
│ if isRootTurn { │
│ al.activeTurnStates.Store( │
│ sessionKey, rootTS) │
│ defer activeTurnStates.Delete() │
│ } │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 3. 记录 Last Channel │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 4. 构建消息 │
│ messages = BuildMessages(...) │
│ messages = resolveMediaRefs(...) │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 5. 覆盖 System Prompt (如果需要) │
│ if opts.SystemPromptOverride != "" { │
│ // 用于 SubTurn 的特殊 prompt │
│ } │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 6. 保存用户消息 │
│ if !opts.SkipAddUserMessage { │
│ Sessions.AddMessage(...) │
│ } │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 7. 执行 LLM 迭代 │
│ finalContent, iteration, err = │
│ runLLMIteration(ctx, agent, ...) │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 8. 轮询 SubTurn 结果 (如果是根 turn) │
│ if isRootTurn { │
│ results = │
│ dequeuePendingSubTurnResults()│
│ // 将结果注入到最终响应 │
│ } │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 9. 处理空响应 │
│ if finalContent == "" { │
│ finalContent = DefaultResponse │
│ } │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 10. 保存助手响应 │
│ Sessions.AddMessage("assistant"...) │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 11. 发送响应 (如果需要) │
│ if opts.SendResponse { │
│ bus.PublishOutbound(...) │
│ } │
└─────────────────────────────────────────┘
```
### SubTurn 执行流程
```
┌─────────────────────────────────────────┐
│ Tool: spawn │
│ args: {task: "...", label: "..."} │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ SpawnTool.Execute() │
│ if spawner != nil { │
│ // 直接 SubTurn 路径 │
│ } else { │
│ // SubagentManager 路径 │
│ } │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ spawner.SpawnSubTurn() │
│ ┌─────────────────────────────────┐ │
│ │ 1. 生成 SubTurn ID │ │
│ │ subTurnID = atomic.Add() │ │
│ └─────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────┐ │
│ │ 2. 创建 SubTurn Context │ │
│ │ subCtx = withTurnState(...) │ │
│ │ // 继承父 turnState │ │
│ └─────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────┐ │
│ │ 3. 获取并发信号量 │ │
│ │ <-rootTS.concurrencySem │ │
│ │ defer release │ │
│ └─────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────┐ │
│ │ 4. 启动 Goroutine │ │
│ │ go func() { │ │
│ │ result = runAgentLoop( │ │
│ │ subCtx, ...) │ │
│ │ // 将结果发送到 channel │ │
│ │ rootTS.pendingResults <- │ │
│ │ }() │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 父 Turn 继续执行 │
│ - 不等待 SubTurn 完成 │
│ - SubTurn 异步执行 │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 父 Turn 轮询 SubTurn 结果 │
│ results = dequeuePendingSubTurnResults│
│ for each result { │
│ // 注入到响应或下一次迭代 │
│ } │
└─────────────────────────────────────────┘
```
### SubTurn 层级结构
```
Root Turn (Session A)
├─ turnState (depth=0)
│ ├─ turnID: "session-a"
│ ├─ pendingResults: chan
│ └─ concurrencySem: chan (限制并发数)
├─ SubTurn 1 (depth=1)
│ ├─ turnState (继承父 context)
│ ├─ parentTurnID: "session-a"
│ └─ 独立的 goroutine
├─ SubTurn 2 (depth=1)
│ ├─ turnState (继承父 context)
│ ├─ parentTurnID: "session-a"
│ └─ 独立的 goroutine
└─ SubTurn 3 (depth=1)
└─ SubTurn 3.1 (depth=2) ← 嵌套 SubTurn
└─ ...
Root Turn (Session B) - 并发执行
├─ turnState (depth=0)
└─ ...
```
### 关键特点
-**并发支持**: `activeTurnStates` map 支持多个 session 并发
-**SubTurn 层级**: 通过 context 传递 turnState,支持嵌套
-**并发控制**: `concurrencySem` 限制 SubTurn 并发数
-**异步执行**: SubTurn 在独立 goroutine 中执行
-**结果回传**: 通过 `pendingResults` channel 传递结果
-**无事件系统**: 没有 EventBus 和 Hook 集成
---
## 3. 对比总结
| 特性 | Incoming (refactor/agent) | HEAD (feat/subturn-poc) |
|------|---------------------------|-------------------------|
| **并发模型** | 单 Turn (串行) | 多 Turn (并发) |
| **Turn 管理** | `activeTurn` (单例) | `activeTurnStates` (map) |
| **事件系统** | ✅ EventBus | ❌ 无 |
| **Hook 系统** | ✅ HookManager | ❌ 无 |
| **SubTurn** | ❓ 未实现或不同方式 | ✅ 完整实现 |
| **并发 Session** | ❌ 不支持 | ✅ 支持 |
| **嵌套 SubTurn** | ❌ 不支持 | ✅ 支持 |
| **架构复杂度** | 简单 | 复杂 |
| **可扩展性** | 高 (Hook) | 低 |
| **调试难度** | 低 | 高 (并发) |
---
## 4. 混合方案流程
结合两者优点的混合方案:
```
┌─────────────────────────────────────────┐
│ runAgentLoop(ctx, agent, opts) │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 1. 检查 SubTurn Context │
│ existingTS = turnStateFromContext() │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 2. 创建/复用 turnState │
│ ts = newTurnState(agent, opts, ...) │
│ if isRootTurn { │
│ activeTurnStates.Store(key, ts) │
│ } │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 3. 执行 Turn (带事件和 Hook) │
│ result = runTurn(ctx, ts) │
│ ├─ emitEvent(TurnStart) │
│ ├─ Hook: before_llm │
│ ├─ callLLM() │
│ ├─ Hook: after_llm │
│ ├─ Hook: before_tool │
│ ├─ executeTool() │
│ │ └─ 如果是 spawn → SpawnSubTurn │
│ ├─ Hook: after_tool │
│ └─ emitEvent(TurnEnd) │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 4. 处理 SubTurn 结果 │
│ if isRootTurn { │
│ pollSubTurnResults() │
│ } │
└─────────────────────────────────────────┘
```
### 混合方案优势
- ✅ 保留并发能力 (`activeTurnStates`)
- ✅ 获得事件系统 (`EventBus`)
- ✅ 获得扩展能力 (`HookManager`)
- ✅ 支持 SubTurn 并发
- ✅ 支持多 Session 并发
-563
View File
@@ -1,563 +0,0 @@
# 混合方案落地指南
## 目标
结合 Incoming 的事件驱动架构和 HEAD 的并发能力,实现:
- ✅ 保留 `activeTurnStates` map(支持并发 Session
- ✅ 采用 `EventBus``HookManager`(事件驱动 + 扩展性)
- ✅ 保留 SubTurn 并发支持
- ✅ 统一使用 `runTurn` 函数(简化代码)
---
## 实施步骤
### 步骤 1: 合并 AgentLoop 结构体 (30 分钟)
**目标**: 结合两边的字段
```go
type AgentLoop struct {
// ===== Incoming 的字段 (保留) =====
bus *bus.MessageBus
cfg *config.Config
registry *AgentRegistry
state *state.Manager
eventBus *EventBus // ✅ 新增:事件系统
hooks *HookManager // ✅ 新增:Hook 系统
running atomic.Bool
summarizing sync.Map
fallback *providers.FallbackChain
channelManager *channels.Manager
mediaStore media.MediaStore
transcriber voice.Transcriber
cmdRegistry *commands.Registry
mcp mcpRuntime
hookRuntime hookRuntime // ✅ 新增:Hook 运行时
steering *steeringQueue
mu sync.RWMutex
// ===== HEAD 的字段 (保留) =====
activeTurnStates sync.Map // ✅ 保留:支持并发 Session
subTurnCounter atomic.Int64 // ✅ 保留:SubTurn ID 生成
// ===== Incoming 的字段 (调整) =====
turnSeq atomic.Uint64 // ✅ 保留:全局 Turn 序列号
activeRequests sync.WaitGroup // ✅ 保留:请求跟踪
reloadFunc func() error
}
```
**操作**:
1. 找到 AgentLoop 结构体定义(38-77 行的冲突)
2. 采用上面的合并版本
3. 删除 Incoming 的 `activeTurn *turnState``activeTurnMu`(不需要了)
---
### 步骤 2: 合并 processOptions 结构体 (10 分钟)
**目标**: 采用 Incoming 的版本,移除 HEAD 的 `SkipAddUserMessage`
```go
type processOptions struct {
SessionKey string
Channel string
ChatID string
SenderID string
SenderDisplayName string
UserMessage string
SystemPromptOverride string
Media []string
InitialSteeringMessages []providers.Message // ✅ Incoming 的方式
DefaultResponse string
EnableSummary bool
SendResponse bool
NoHistory bool
SkipInitialSteeringPoll bool
}
type continuationTarget struct {
SessionKey string
Channel string
ChatID string
}
```
**操作**:
1. 找到 processOptions 结构体(92-112 行的冲突)
2. 采用上面的版本
3. 添加 `continuationTarget` 结构体
---
### 步骤 3: 更新 turnState 结构体 (20 分钟)
**目标**: 在 Incoming 的 turnState 基础上添加 SubTurn 支持
需要检查 `turn.go``turn_state.go` 文件,确保 turnState 有这些字段:
```go
type turnState struct {
mu sync.RWMutex
// ===== Incoming 的字段 (保留) =====
agent *AgentInstance
opts processOptions
scope turnEventScope
turnID string
agentID string
sessionKey string
channel string
chatID string
userMessage string
media []string
phase TurnPhase
iteration int
startedAt time.Time
finalContent string
followUps []bus.InboundMessage
gracefulInterrupt bool
gracefulInterruptHint string
gracefulTerminalUsed bool
hardAbort bool
providerCancel context.CancelFunc
turnCancel context.CancelFunc
restorePointHistory []providers.Message
restorePointSummary string
persistedMessages []providers.Message
// ===== HEAD 的字段 (新增:SubTurn 支持) =====
depth int // ✅ SubTurn 深度
parentTurnID string // ✅ 父 Turn ID
childTurnIDs []string // ✅ 子 Turn IDs
pendingResults chan *tools.ToolResult // ✅ SubTurn 结果 channel
concurrencySem chan struct{} // ✅ 并发信号量
isFinished atomic.Bool // ✅ 是否已完成
}
```
**操作**:
1. 查找 `turnState` 结构体定义
2. 如果有冲突,采用 Incoming 的基础版本
3. 添加 SubTurn 相关字段(depth, parentTurnID 等)
---
### 步骤 4: 重写 runAgentLoop 函数 (1 小时)
**目标**: 简化为调用 runTurn,但保留 SubTurn 检测
```go
func (al *AgentLoop) runAgentLoop(
ctx context.Context,
agent *AgentInstance,
opts processOptions,
) (string, error) {
// 1. 检查是否在 SubTurn 中
existingTS := turnStateFromContext(ctx)
var ts *turnState
var isRootTurn bool
if existingTS != nil {
// 在 SubTurn 中 - 创建子 turnState
ts = newSubTurnState(agent, opts, existingTS, al.newTurnEventScope(agent.ID, opts.SessionKey))
isRootTurn = false
} else {
// 根 Turn - 创建新的 turnState
ts = newTurnState(agent, opts, al.newTurnEventScope(agent.ID, opts.SessionKey))
isRootTurn = true
// 注册到 activeTurnStates(支持并发)
al.activeTurnStates.Store(opts.SessionKey, ts)
defer al.activeTurnStates.Delete(opts.SessionKey)
}
// 2. 记录 last channel
if opts.Channel != "" && opts.ChatID != "" && !constants.IsInternalChannel(opts.Channel) {
channelKey := fmt.Sprintf("%s:%s", opts.Channel, opts.ChatID)
if err := al.RecordLastChannel(channelKey); err != nil {
logger.WarnCF("agent", "Failed to record last channel",
map[string]any{"error": err.Error()})
}
}
// 3. 执行 Turn(带事件和 Hook
result, err := al.runTurn(ctx, ts)
if err != nil {
return "", err
}
if result.status == TurnEndStatusAborted {
return "", nil
}
// 4. 处理 SubTurn 结果(仅根 Turn
if isRootTurn && ts.pendingResults != nil {
finalResults := al.drainPendingSubTurnResults(ts)
for _, r := range finalResults {
if r != nil && r.ForLLM != "" {
result.finalContent += fmt.Sprintf("\n\n[SubTurn Result] %s", r.ForLLM)
}
}
}
// 5. 处理 follow-up 消息
for _, followUp := range result.followUps {
if pubErr := al.bus.PublishInbound(ctx, followUp); pubErr != nil {
logger.WarnCF("agent", "Failed to publish follow-up after turn",
map[string]any{"turn_id": ts.turnID, "error": pubErr.Error()})
}
}
// 6. 发送响应
if opts.SendResponse && result.finalContent != "" {
al.bus.PublishOutbound(ctx, bus.OutboundMessage{
Channel: opts.Channel,
ChatID: opts.ChatID,
Content: result.finalContent,
})
}
return result.finalContent, nil
}
```
**操作**:
1. 找到 runAgentLoop 函数(1439-1581 行的冲突)
2. 替换为上面的简化版本
3. 保留 SubTurn 检测逻辑(`turnStateFromContext`
4. 保留 `activeTurnStates` 注册逻辑
---
### 步骤 5: 采用 Incoming 的 runTurn 函数 (30 分钟)
**目标**: 使用 Incoming 的 runTurn,但添加 SubTurn 结果轮询
```go
func (al *AgentLoop) runTurn(ctx context.Context, ts *turnState) (turnResult, error) {
turnCtx, turnCancel := context.WithCancel(ctx)
defer turnCancel()
ts.setTurnCancel(turnCancel)
// ===== 不使用单例 activeTurn,因为我们有 activeTurnStates =====
// al.registerActiveTurn(ts) ← 删除这行
// defer al.clearActiveTurn(ts) ← 删除这行
turnStatus := TurnEndStatusCompleted
defer func() {
al.emitEvent(
EventKindTurnEnd,
ts.eventMeta("runTurn", "turn.end"),
TurnEndPayload{
Status: turnStatus,
Iterations: ts.currentIteration(),
Duration: time.Since(ts.startedAt),
FinalContentLen: ts.finalContentLen(),
},
)
}()
al.emitEvent(
EventKindTurnStart,
ts.eventMeta("runTurn", "turn.start"),
TurnStartPayload{
Channel: ts.channel,
ChatID: ts.chatID,
UserMessage: ts.userMessage,
MediaCount: len(ts.media),
},
)
// ... 保留 Incoming 的其余逻辑 ...
// ===== 在 Turn Loop 中添加 SubTurn 结果轮询 =====
turnLoop:
for ts.currentIteration() < ts.agent.MaxIterations || len(pendingMessages) > 0 {
// ... LLM 调用 ...
// ... Tool 执行 ...
// ✅ 新增:轮询 SubTurn 结果
if ts.pendingResults != nil {
subTurnResults := al.pollSubTurnResults(ts)
for _, result := range subTurnResults {
if result.ForLLM != "" {
// 将 SubTurn 结果作为 steering message 注入
pendingMessages = append(pendingMessages, providers.Message{
Role: "user",
Content: fmt.Sprintf("[SubTurn Result] %s", result.ForLLM),
})
}
}
}
// ... 继续迭代 ...
}
// ... 返回结果 ...
}
```
**操作**:
1. 找到 runTurn 函数(1672-1689 行开始的冲突)
2. 采用 Incoming 的完整实现
3. 删除 `registerActiveTurn``clearActiveTurn` 调用
4. 在 Turn Loop 中添加 SubTurn 结果轮询逻辑
---
### 步骤 6: 实现辅助函数 (30 分钟)
需要实现以下辅助函数:
#### 6.1 newSubTurnState
```go
func newSubTurnState(
agent *AgentInstance,
opts processOptions,
parent *turnState,
scope turnEventScope,
) *turnState {
ts := newTurnState(agent, opts, scope)
// 设置 SubTurn 关系
ts.depth = parent.depth + 1
ts.parentTurnID = parent.turnID
ts.pendingResults = parent.pendingResults // 共享结果 channel
ts.concurrencySem = parent.concurrencySem // 共享信号量
// 记录父子关系
parent.mu.Lock()
parent.childTurnIDs = append(parent.childTurnIDs, ts.turnID)
parent.mu.Unlock()
return ts
}
```
#### 6.2 pollSubTurnResults
```go
func (al *AgentLoop) pollSubTurnResults(ts *turnState) []*tools.ToolResult {
if ts.pendingResults == nil {
return nil
}
var results []*tools.ToolResult
for {
select {
case result := <-ts.pendingResults:
results = append(results, result)
default:
return results
}
}
}
```
#### 6.3 drainPendingSubTurnResults
```go
func (al *AgentLoop) drainPendingSubTurnResults(ts *turnState) []*tools.ToolResult {
if ts.pendingResults == nil {
return nil
}
// 等待一小段时间,确保所有 SubTurn 结果都到达
time.Sleep(100 * time.Millisecond)
return al.pollSubTurnResults(ts)
}
```
#### 6.4 更新 GetActiveTurn
```go
func (al *AgentLoop) GetActiveTurn(sessionKey string) *ActiveTurnInfo {
val, ok := al.activeTurnStates.Load(sessionKey)
if !ok {
return nil
}
ts, ok := val.(*turnState)
if !ok {
return nil
}
info := ts.snapshot()
return &info
}
```
---
### 步骤 7: 更新 SpawnSubTurn 实现 (30 分钟)
确保 spawn tool 能正确创建 SubTurn
```go
func (spawner *subTurnSpawner) SpawnSubTurn(
ctx context.Context,
config SubTurnConfig,
) (*tools.ToolResult, error) {
// 1. 获取父 turnState
parentTS := turnStateFromContext(ctx)
if parentTS == nil {
return nil, fmt.Errorf("no parent turn state in context")
}
// 2. 检查深度限制
maxDepth := spawner.loop.getSubTurnConfig().maxDepth
if parentTS.depth >= maxDepth {
return tools.ErrorResult(fmt.Sprintf(
"SubTurn depth limit reached (%d)", maxDepth)), nil
}
// 3. 获取并发信号量
select {
case <-parentTS.concurrencySem:
defer func() { parentTS.concurrencySem <- struct{}{} }()
case <-ctx.Done():
return tools.ErrorResult("SubTurn cancelled"), nil
}
// 4. 生成 SubTurn ID
subTurnID := spawner.loop.subTurnCounter.Add(1)
turnID := fmt.Sprintf("%s-sub-%d", parentTS.turnID, subTurnID)
// 5. 创建 SubTurn context
subCtx := withTurnState(ctx, parentTS) // 继承父 context
// 6. 启动 SubTurn goroutine
go func() {
opts := processOptions{
SessionKey: parentTS.sessionKey,
Channel: parentTS.channel,
ChatID: parentTS.chatID,
UserMessage: config.SystemPrompt,
SystemPromptOverride: config.SystemPrompt,
NoHistory: true, // SubTurn 不加载历史
SendResponse: false, // SubTurn 不发送响应
}
result, err := spawner.loop.runAgentLoop(subCtx, spawner.agent, opts)
// 7. 发送结果到父 Turn
toolResult := &tools.ToolResult{
ForLLM: result,
Error: err,
}
select {
case parentTS.pendingResults <- toolResult:
case <-subCtx.Done():
}
}()
// 8. 立即返回(异步执行)
return tools.AsyncResult(fmt.Sprintf("SubTurn %d started", subTurnID)), nil
}
```
---
### 步骤 8: 解决其他小冲突 (1 小时)
处理剩余的 7 个冲突点:
1. **变量命名冲突** (2179-2183 行等)
- 统一使用 `ts.channel`, `ts.chatID` 而不是 `opts.Channel`
2. **Tool feedback** (2469-2494 行)
- 采用 HEAD 的实现(发送 tool feedback 到 chat
3. **其他小差异**
- 逐个检查,优先采用 Incoming 的实现
- 确保 EventBus 事件正确触发
---
## 验证步骤
### 1. 编译验证
```bash
go build ./pkg/agent/
```
### 2. 单元测试
```bash
go test ./pkg/agent/ -v
```
### 3. 功能测试
创建测试用例验证:
```go
func TestMixedArchitecture_ConcurrentSessions(t *testing.T) {
// 测试多个 session 并发执行
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
sessionKey := fmt.Sprintf("session-%d", id)
// 执行 agent loop
}(i)
}
wg.Wait()
}
func TestMixedArchitecture_SubTurnExecution(t *testing.T) {
// 测试 SubTurn 执行
// 1. 启动主 Turn
// 2. 调用 spawn tool
// 3. 验证 SubTurn 结果返回
}
func TestMixedArchitecture_EventBusIntegration(t *testing.T) {
// 测试事件系统
// 1. 订阅事件
// 2. 执行 Turn
// 3. 验证事件触发
}
```
---
## 预期结果
完成后,系统应该:
✅ 支持多个 Session 并发执行
✅ 支持 SubTurn 并发和嵌套
✅ 所有操作都触发 EventBus 事件
✅ Hook 系统正常工作
✅ 代码结构清晰,易于维护
---
## 时间估算
- 步骤 1-2: 结构体合并 (40 分钟)
- 步骤 3: turnState 更新 (20 分钟)
- 步骤 4: runAgentLoop 重写 (1 小时)
- 步骤 5: runTurn 调整 (30 分钟)
- 步骤 6: 辅助函数 (30 分钟)
- 步骤 7: SpawnSubTurn (30 分钟)
- 步骤 8: 其他冲突 (1 小时)
- 测试验证 (1 小时)
**总计: 约 5-6 小时**
---
## 风险和注意事项
1. **Context 传递**: 确保 SubTurn 的 context 正确继承父 context
2. **Channel 关闭**: 确保 `pendingResults` channel 在合适的时机关闭
3. **并发安全**: 所有对 turnState 的访问都要加锁
4. **事件顺序**: 确保事件按正确顺序触发
5. **测试覆盖**: 重点测试并发场景和 SubTurn 场景
-271
View File
@@ -1,271 +0,0 @@
# loop.go 冲突详细分析
## 概述
loop.go 有 11 处冲突,涉及核心架构差异:
- **HEAD (feat/subturn-poc)**: 基于 context 的 SubTurn 层级管理,使用 `activeTurnStates` map 支持并发
- **Incoming (refactor/agent)**: 事件驱动架构,使用 `EventBus``HookManager`,单个 `activeTurn` **不支持并发 turn**
## 关键发现:Incoming 的并发限制
**重要**: Incoming 分支的 `activeTurn` 设计**不支持并发 turn 执行**!
```go
// Incoming 的实现
func (al *AgentLoop) runTurn(ctx context.Context, ts *turnState) (turnResult, error) {
al.registerActiveTurn(ts) // 设置 al.activeTurn = ts
defer al.clearActiveTurn(ts) // 清除 al.activeTurn = nil
// ...
}
func (al *AgentLoop) registerActiveTurn(ts *turnState) {
al.activeTurnMu.Lock()
defer al.activeTurnMu.Unlock()
al.activeTurn = ts // 单例!后面的会覆盖前面的
}
```
**问题**:
1. 如果两个 session 同时调用 `runAgentLoop`,第二个会覆盖第一个的 `activeTurn`
2. `GetActiveTurn()` 只能返回最后一个注册的 turn
3. 中断操作 (`InterruptGraceful`, `InterruptHard`) 只能影响当前的 `activeTurn`
**HEAD 的优势**:
```go
// HEAD 的实现
activeTurnStates sync.Map // 支持多个并发 turn
// key: sessionKey, value: *turnState
// 每个 session 有独立的 turnState
al.activeTurnStates.Store(opts.SessionKey, rootTS)
```
## 架构决策的影响
如果采用 Incoming 的架构(方案 B),我们会**失去并发 turn 的能力**!
### 选项分析
**选项 1: 完全采用 Incoming(会失去并发)**
- ✅ 获得事件驱动架构
- ✅ 获得 Hook 系统
-**失去并发 turn 支持**
-**失去 SubTurn 并发支持**
- ❌ 多个 session 无法同时处理
**选项 2: 混合方案(推荐)**
- ✅ 保留 HEAD 的 `activeTurnStates sync.Map`
- ✅ 采用 Incoming 的 `EventBus``HookManager`
- ✅ 保持并发能力
- ⚠️ 需要调整 `GetActiveTurn()` 等 API
**选项 3: 改造 Incoming 支持并发**
-`activeTurn *turnState` 改为 `activeTurns sync.Map`
- 修改所有相关方法支持 sessionKey 参数
- 工作量大,但架构更清晰
## 推荐方案:选项 2(混合方案)
### AgentLoop 结构体设计
```go
type AgentLoop struct {
// Incoming 的字段
bus *bus.MessageBus
cfg *config.Config
registry *AgentRegistry
state *state.Manager
eventBus *EventBus // ✅ 保留
hooks *HookManager // ✅ 保留
hookRuntime hookRuntime // ✅ 保留
running atomic.Bool
summarizing sync.Map
fallback *providers.FallbackChain
channelManager *channels.Manager
mediaStore media.MediaStore
transcriber voice.Transcriber
cmdRegistry *commands.Registry
mcp mcpRuntime
steering *steeringQueue
mu sync.RWMutex
// HEAD 的并发支持(保留)
activeTurnStates sync.Map // ✅ 保留:支持并发 turn
subTurnCounter atomic.Int64 // ✅ 保留:SubTurn ID 生成
// Incoming 的字段(调整)
turnSeq atomic.Uint64 // ✅ 保留:全局 turn 序列号
activeRequests sync.WaitGroup // ✅ 保留:请求跟踪
reloadFunc func() error
}
```
### 关键方法调整
1. **GetActiveTurn()**: 需要接受 sessionKey 参数
2. **InterruptGraceful/Hard()**: 需要接受 sessionKey 参数
3. **runAgentLoop()**: 使用 `activeTurnStates` 而不是单个 `activeTurn`
## 冲突详情
### 冲突 1: AgentLoop 结构体 (38-77 行)
**HEAD 新增字段**:
```go
activeTurnStates sync.Map // key: sessionKey (string), value: *turnState
subTurnCounter atomic.Int64 // Counter for generating unique SubTurn IDs
```
**Incoming 新增字段**:
```go
eventBus *EventBus
hooks *HookManager
hookRuntime hookRuntime
activeTurnMu sync.RWMutex
activeTurn *turnState
turnSeq atomic.Uint64
activeRequests sync.WaitGroup
```
**关键差异**:
- HEAD: 使用 `sync.Map` 管理多个并发 turn (`activeTurnStates`)
- Incoming: 使用单个 `activeTurn` + 锁 (`activeTurnMu`)
- HEAD: SubTurn 计数器 (`subTurnCounter`)
- Incoming: Turn 序列号 (`turnSeq`)
- Incoming: 新增事件系统 (`eventBus`, `hooks`, `hookRuntime`)
**解决方案**: 采用 Incoming 的结构,但需要考虑如何在新架构中实现 SubTurn 的并发管理。
---
### 冲突 2: processOptions 结构体 (92-112 行)
**HEAD**:
```go
SkipAddUserMessage bool // If true, skip adding UserMessage to session history
```
**Incoming**:
```go
InitialSteeringMessages []providers.Message
// 新增结构体
type continuationTarget struct {
SessionKey string
Channel string
ChatID string
}
```
**关键差异**:
- HEAD: 使用 `SkipAddUserMessage` 标志
- Incoming: 使用 `InitialSteeringMessages` 数组 + 新的 `continuationTarget` 结构体
**解决方案**: 采用 Incoming 的实现,`InitialSteeringMessages` 提供更灵活的 steering 消息处理。
---
### 冲突 3: runAgentLoop 函数 (1439-1581 行)
这是最大的冲突,涉及核心执行逻辑。
**HEAD 的实现**:
1. 检查是否在 SubTurn 中 (`turnStateFromContext`)
2. 如果是 SubTurn,复用现有 turnState
3. 如果是根 turn,创建新的 rootTS
4. 使用 `activeTurnStates.Store` 注册 turn
5. 调用 `runLLMIteration` 执行 LLM 循环
**Incoming 的实现**:
1. 记录 last channel
2. 调用 `newTurnState` 创建 turn state
3. 调用 `al.runTurn(ctx, ts)` 执行 turn
4. 处理 follow-up 消息
5. 发布响应
**关键差异**:
- HEAD: 复杂的 SubTurn 层级管理,支持嵌套
- Incoming: 简化的 turn 管理,通过 `newTurnState``runTurn`
- HEAD: 使用 `runLLMIteration` 函数
- Incoming: 使用 `runTurn` 函数
- Incoming: 新增 follow-up 消息处理机制
**解决方案**: 采用 Incoming 的简化架构,但需要在 `runTurn` 中添加 SubTurn 支持。
---
### 冲突 4: runLLMIteration vs runTurn (1672-1689 行)
**HEAD**: 有独立的 `runLLMIteration` 函数
**Incoming**: 使用 `runTurn` 函数
需要查看具体实现来决定如何合并。
---
### 冲突 5-11: 其他冲突点
剩余冲突主要涉及:
- 工具执行逻辑
- Steering 消息处理
- 中断处理
- 变量命名差异(`agent` vs `ts.agent`
## 架构决策
根据方案 B(采用重构架构),需要:
1. **采用 Incoming 的 AgentLoop 结构**
- 使用 `eventBus`, `hooks`, `hookRuntime`
- 使用单个 `activeTurn` + `activeTurnMu`
- 保留 `turnSeq`
2. **SubTurn 支持策略**
- 选项 A: 在 `turnState` 中添加父子关系字段
- 选项 B: 使用 context 传递 SubTurn 信息
- 选项 C: 在 EventBus 中管理 SubTurn 层级
3. **函数迁移顺序**
- 先采用 Incoming 的结构体定义
- 更新 `newTurnState` 函数
- 采用 `runTurn` 函数
-`runTurn` 中集成 SubTurn 逻辑
## 推荐实施步骤
### 步骤 1: 结构体定义 (30 分钟)
- 采用 Incoming 的 `AgentLoop` 结构体
- 采用 Incoming 的 `processOptions` 结构体
- 添加 `continuationTarget` 结构体
### 步骤 2: 辅助函数 (30 分钟)
- 更新 `NewAgentLoop` 初始化函数
- 确保 EventBus、Hook 正确初始化
### 步骤 3: runAgentLoop 函数 (1-2 小时)
- 采用 Incoming 的简化实现
- 保留 channel 记录逻辑
- 调用 `newTurnState``runTurn`
- 处理 follow-up 消息
### 步骤 4: runTurn 函数 (2-3 小时)
- 采用 Incoming 的 `runTurn` 实现
- 在其中添加 SubTurn 检测和处理逻辑
- 集成 SubTurn 结果回传机制
### 步骤 5: 其他冲突点 (1-2 小时)
- 逐个解决剩余 7 个冲突
- 确保变量命名一致
- 更新工具执行和 steering 逻辑
## 风险和注意事项
1. **SubTurn 语义变化**: 新架构中 SubTurn 的实现方式可能不同
2. **并发安全**: 从 `sync.Map` 迁移到单个 `activeTurn` + 锁
3. **事件系统集成**: 需要确保 SubTurn 事件正确触发
4. **测试覆盖**: 原有 SubTurn 测试需要更新
## 下一步
建议先实现步骤 1-2(结构体定义和初始化),然后再处理复杂的执行逻辑。
+27 -9
View File
@@ -509,21 +509,39 @@ func (al *AgentLoop) Run(ctx context.Context) error {
return nil
}
// drainBusToSteering continuously consumes inbound messages and redirects
// messages from the active scope into the steering queue. Messages from other
// scopes are requeued so they can be processed normally after the active turn.
// drainBusToSteering consumes inbound messages and redirects messages from the
// active scope into the steering queue. Messages from other scopes are requeued
// so they can be processed normally after the active turn. It drains all
// immediately available messages, blocking for the first one until ctx is done.
func (al *AgentLoop) drainBusToSteering(ctx context.Context, activeScope, activeAgentID string) {
blocking := true
for {
var msg bus.InboundMessage
select {
case <-ctx.Done():
return
case m, ok := <-al.bus.InboundChan():
if !ok {
if blocking {
// Block waiting for the first available message or ctx cancellation.
select {
case <-ctx.Done():
return
case m, ok := <-al.bus.InboundChan():
if !ok {
return
}
msg = m
}
} else {
// Non-blocking: drain any remaining queued messages, return when empty.
select {
case m, ok := <-al.bus.InboundChan():
if !ok {
return
}
msg = m
default:
return
}
msg = m
}
blocking = false
msgScope, _, scopeOK := al.resolveSteeringTarget(msg)
if !scopeOK || msgScope != activeScope {
+8
View File
@@ -460,6 +460,14 @@ func (al *AgentLoop) HardAbort(sessionKey string) error {
// Use isHardAbort=true for hard abort to immediately cancel all children.
ts.Finish(true)
// Roll back session history to the state before the turn started.
if ts.session != nil {
history := ts.session.GetHistory(sessionKey)
if ts.initialHistoryLength < len(history) {
ts.session.SetHistory(sessionKey, history[:ts.initialHistoryLength])
}
}
return nil
}
+1 -8
View File
@@ -428,19 +428,12 @@ func spawnSubTurn(
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("subturn panicked: %v", r)
result = nil
logger.ErrorCF("subturn", "SubTurn panicked", map[string]any{
"child_id": childID,
"parent_id": parentTS.turnID,
"panic": r,
})
// Ensure result is not nil to prevent panic during event emission
if result == nil {
result = &tools.ToolResult{
Err: err,
ForLLM: fmt.Sprintf("SubTurn panicked: %v", r),
}
}
}
// Result Delivery Strategy (Async vs Sync)