From ceeae15d8ad670b3f03ca430ef2811d98760f2b9 Mon Sep 17 00:00:00 2001 From: Administrator <1280842908@qq.com> Date: Mon, 16 Mar 2026 17:27:04 +0800 Subject: [PATCH] feat(agent): wire SubTurn into AgentLoop and Spawn Tool - Add subTurnResults sync.Map to AgentLoop for per-session channel tracking - Add register/unregister/dequeue methods in steering.go - Poll SubTurn results in runLLMIteration at loop start and after each tool, injecting results as [SubTurn Result] messages into parent conversation - Initialize root turnState in runAgentLoop, propagate via context (withTurnState/turnStateFromContext), call rootTS.Finish() on completion - Wire Spawn Tool to spawnSubTurn via SetSpawner in registerSharedTools, recovering parentTS from context for proper turn hierarchy - Refactor subagent.go to use SetSpawner pattern - Add TestSubTurnResultChannelRegistration and TestDequeuePendingSubTurnResults --- pkg/agent/loop.go | 108 ++++++++++++++++++++++- pkg/agent/steering.go | 41 +++++++++ pkg/agent/subturn.go | 27 ++++-- pkg/agent/subturn_test.go | 70 +++++++++++++++ pkg/tools/subagent.go | 175 ++++++++++++++++++++++++-------------- 5 files changed, 348 insertions(+), 73 deletions(-) diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 21516e7de..510e247e3 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -49,6 +49,7 @@ type AgentLoop struct { cmdRegistry *commands.Registry mcp mcpRuntime steering *steeringQueue + subTurnResults sync.Map mu sync.RWMutex // Track active requests for safe provider cleanup activeRequests sync.WaitGroup @@ -85,9 +86,6 @@ func NewAgentLoop( ) *AgentLoop { registry := NewAgentRegistry(cfg, provider) - // Register shared tools to all agents - registerSharedTools(cfg, msgBus, registry, provider) - // Set up shared fallback chain cooldown := providers.NewCooldownTracker() fallbackChain := providers.NewFallbackChain(cooldown) @@ -110,11 +108,15 @@ func NewAgentLoop( steering: newSteeringQueue(parseSteeringMode(cfg.Agents.Defaults.SteeringMode)), } + // Register shared tools to all agents (now that al is created) + registerSharedTools(al, cfg, msgBus, registry, provider) + return al } // registerSharedTools registers tools that are shared across all agents (web, message, spawn). func registerSharedTools( + al *AgentLoop, cfg *config.Config, msgBus *bus.MessageBus, registry *AgentRegistry, @@ -230,12 +232,76 @@ func registerSharedTools( if cfg.Tools.IsToolEnabled("subagent") { subagentManager := tools.NewSubagentManager(provider, agent.Model, agent.Workspace) subagentManager.SetLLMOptions(agent.MaxTokens, agent.Temperature) + + // Set the spawner that links into AgentLoop's turnState + subagentManager.SetSpawner(func( + ctx context.Context, + task, label, targetAgentID string, + tls *tools.ToolRegistry, + maxTokens int, + temperature float64, + hasMaxTokens, hasTemperature bool, + ) (*tools.ToolResult, error) { + // 1. Recover parent Turn State from Context + parentTS := turnStateFromContext(ctx) + if parentTS == nil { + // Fallback: If no turnState exists in context, create an isolated ad-hoc root turn state + // so that the tool can still function outside of an agent loop (e.g. tests, raw invocations). + parentTS = &turnState{ + ctx: ctx, + turnID: "adhoc-root", + depth: 0, + session: newEphemeralSession(nil), + pendingResults: make(chan *tools.ToolResult, 16), + } + } + + // 2. Build Tools slice from registry + var tlSlice []tools.Tool + for _, name := range tls.List() { + if t, ok := tls.Get(name); ok { + tlSlice = append(tlSlice, t) + } + } + + // 3. System Prompt + systemPrompt := "You are a subagent. Complete the given task independently and report the result.\n" + + "You have access to tools - use them as needed to complete your task.\n" + + "After completing the task, provide a clear summary of what was done.\n\n" + + "Task: " + task + + // 4. Resolve Model + modelToUse := agent.Model + if targetAgentID != "" { + if targetAgent, ok := al.GetRegistry().GetAgent(targetAgentID); ok { + modelToUse = targetAgent.Model + } + } + + // 5. Build SubTurnConfig + cfg := SubTurnConfig{ + Model: modelToUse, + Tools: tlSlice, + SystemPrompt: systemPrompt, + } + if hasMaxTokens { + cfg.MaxTokens = maxTokens + } + + // 6. Spawn SubTurn + return spawnSubTurn(ctx, al, parentTS, cfg) + }) + spawnTool := tools.NewSpawnTool(subagentManager) currentAgentID := agentID spawnTool.SetAllowlistChecker(func(targetAgentID string) bool { return registry.CanSpawnSubagent(currentAgentID, targetAgentID) }) agent.Tools.Register(spawnTool) + + // Also register the synchronous subagent tool + subagentTool := tools.NewSubagentTool(subagentManager) + agent.Tools.Register(subagentTool) } else { logger.WarnCF("agent", "spawn tool requires subagent to be enabled", nil) } @@ -450,7 +516,7 @@ func (al *AgentLoop) ReloadProviderAndConfig( } // Ensure shared tools are re-registered on the new registry - registerSharedTools(cfg, al.bus, registry, provider) + registerSharedTools(al, cfg, al.bus, registry, provider) // Atomically swap the config and registry under write lock // This ensures readers see a consistent pair @@ -896,6 +962,20 @@ func (al *AgentLoop) runAgentLoop( agent *AgentInstance, opts processOptions, ) (string, error) { + // Initialize a root TurnState for this iteration, allowing sub-turns to be spawned. + rootTS := &turnState{ + ctx: ctx, + turnID: opts.SessionKey, // Associate this turn graph with the current session key + depth: 0, + session: agent.Sessions, + pendingResults: make(chan *tools.ToolResult, 16), + } + ctx = withTurnState(ctx, rootTS) + + // Ensure the parent's pending results channel is cleaned up when this root turn finishes + defer al.unregisterSubTurnResultChannel(rootTS.turnID) + al.registerSubTurnResultChannel(rootTS.turnID, rootTS.pendingResults) + // 0. Record last channel for heartbeat notifications (skip internal channels and cli) if opts.Channel != "" && opts.ChatID != "" { if !constants.IsInternalChannel(opts.Channel) { @@ -940,6 +1020,9 @@ func (al *AgentLoop) runAgentLoop( return "", err } + // Signal completion to rootTS so it knows it is finished, terminating any active sub-turns + rootTS.Finish() + // If last tool had ForUser content and we already sent it, we might not need to send final response // This is controlled by the tool's Silent flag and ForUser content @@ -1055,6 +1138,14 @@ func (al *AgentLoop) runLLMIteration( } } + // Poll for any pending SubTurn results and inject them as assistant context. + if subResults := al.dequeuePendingSubTurnResults(opts.SessionKey); len(subResults) > 0 { + for _, r := range subResults { + msg := providers.Message{Role: "user", Content: fmt.Sprintf("[SubTurn Result] %s", r.ForLLM)} + pendingMessages = append(pendingMessages, msg) + } + } + // Determine effective model tier for this conversation turn. // selectCandidates evaluates routing once and the decision is sticky for // all tool-follow-up iterations within the same turn so that a multi-step @@ -1459,6 +1550,15 @@ func (al *AgentLoop) runLLMIteration( steeringAfterTools = steerMsgs break } + + // Also poll for any SubTurn results that arrived during tool execution. + if subResults := al.dequeuePendingSubTurnResults(opts.SessionKey); len(subResults) > 0 { + for _, r := range subResults { + msg := providers.Message{Role: "user", Content: fmt.Sprintf("[SubTurn Result] %s", r.ForLLM)} + messages = append(messages, msg) + agent.Sessions.AddFullMessage(opts.SessionKey, msg) + } + } } // If steering messages were captured during tool execution, they diff --git a/pkg/agent/steering.go b/pkg/agent/steering.go index 8c7c79c16..c09b97581 100644 --- a/pkg/agent/steering.go +++ b/pkg/agent/steering.go @@ -8,6 +8,7 @@ import ( "github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/providers" + "github.com/sipeed/picoclaw/pkg/tools" ) // SteeringMode controls how queued steering messages are dequeued. @@ -186,3 +187,43 @@ func (al *AgentLoop) Continue(ctx context.Context, sessionKey, channel, chatID s SkipInitialSteeringPoll: true, }) } + +// ====================== SubTurn Result Polling ====================== + +// dequeuePendingSubTurnResults polls the SubTurn result channel for the given +// session and returns all available results without blocking. +// Returns nil if no channel is registered for this session. +func (al *AgentLoop) dequeuePendingSubTurnResults(sessionKey string) []*tools.ToolResult { + chInterface, ok := al.subTurnResults.Load(sessionKey) + if !ok { + return nil + } + + ch, ok := chInterface.(chan *tools.ToolResult) + if !ok { + return nil + } + + var results []*tools.ToolResult + for { + select { + case result := <-ch: + if result != nil { + results = append(results, result) + } + default: + return results + } + } +} + +// registerSubTurnResultChannel registers a SubTurn result channel for the given session. +// This allows the parent loop to poll for results from child SubTurns. +func (al *AgentLoop) registerSubTurnResultChannel(sessionKey string, ch chan *tools.ToolResult) { + al.subTurnResults.Store(sessionKey, ch) +} + +// unregisterSubTurnResultChannel removes the SubTurn result channel for the given session. +func (al *AgentLoop) unregisterSubTurnResultChannel(sessionKey string) { + al.subTurnResults.Delete(sessionKey) +} diff --git a/pkg/agent/subturn.go b/pkg/agent/subturn.go index ab7d60957..89b254c69 100644 --- a/pkg/agent/subturn.go +++ b/pkg/agent/subturn.go @@ -54,7 +54,20 @@ type SubTurnOrphanResultEvent struct { Result *tools.ToolResult } -// ====================== turnState (Simplified, reusable with existing structs) ====================== +// ====================== turnState ====================== +type turnStateKeyType struct{} + +var turnStateKey = turnStateKeyType{} + +func withTurnState(ctx context.Context, ts *turnState) context.Context { + return context.WithValue(ctx, turnStateKey, ts) +} + +func turnStateFromContext(ctx context.Context) *turnState { + ts, _ := ctx.Value(turnStateKey).(*turnState) + return ts +} + type turnState struct { ctx context.Context cancelFunc context.CancelFunc // Used to cancel all children when this turn finishes @@ -189,14 +202,18 @@ func spawnSubTurn(ctx context.Context, al *AgentLoop, parentTS *turnState, cfg S parentTS.childTurnIDs = append(parentTS.childTurnIDs, childID) parentTS.mu.Unlock() - // 5. Emit Spawn event (currently using Mock, will be replaced by real EventBus) + // 5. Register the parent's pendingResults channel so the parent loop can poll it + al.registerSubTurnResultChannel(parentTS.turnID, parentTS.pendingResults) + defer al.unregisterSubTurnResultChannel(parentTS.turnID) + + // 6. Emit Spawn event (currently using Mock, will be replaced by real EventBus) MockEventBus.Emit(SubTurnSpawnEvent{ ParentID: parentTS.turnID, ChildID: childID, Config: cfg, }) - // 6. Defer emitting End event, and recover from panics to ensure it's always fired + // 7. Defer emitting End event, and recover from panics to ensure it's always fired defer func() { if r := recover(); r != nil { err = fmt.Errorf("subturn panicked: %v", r) @@ -209,11 +226,11 @@ func spawnSubTurn(ctx context.Context, al *AgentLoop, parentTS *turnState, cfg S }) }() - // 7. Execute sub-turn via the real agent loop. + // 8. Execute sub-turn via the real agent loop. // Build a child AgentInstance from SubTurnConfig, inheriting defaults from the parent agent. result, err = runTurn(childCtx, al, childTS, cfg) - // 8. Deliver result back to parent Turn + // 9. Deliver result back to parent Turn deliverSubTurnResult(parentTS, childID, result) return result, err diff --git a/pkg/agent/subturn_test.go b/pkg/agent/subturn_test.go index 943c46015..b7012e63d 100644 --- a/pkg/agent/subturn_test.go +++ b/pkg/agent/subturn_test.go @@ -253,3 +253,73 @@ func TestSpawnSubTurn_OrphanResultRouting(t *testing.T) { t.Error("Parent history was polluted by orphan result") } } + +// ====================== Extra Independent Test: Result Channel Registration ====================== +func TestSubTurnResultChannelRegistration(t *testing.T) { + al, _, _, _, cleanup := newTestAgentLoop(t) + defer cleanup() + + parent := &turnState{ + ctx: context.Background(), + turnID: "parent-reg-1", + depth: 0, + pendingResults: make(chan *tools.ToolResult, 4), + session: &ephemeralSessionStore{}, + } + + cfg := SubTurnConfig{Model: "gpt-4o-mini", Tools: []tools.Tool{}} + + // Before spawn: channel should not be registered + if results := al.dequeuePendingSubTurnResults(parent.turnID); results != nil { + t.Error("expected no channel before spawnSubTurn") + } + + _, _ = spawnSubTurn(context.Background(), al, parent, cfg) + + // After spawn completes: channel should be unregistered (defer cleanup in spawnSubTurn) + if _, ok := al.subTurnResults.Load(parent.turnID); ok { + t.Error("channel should be unregistered after spawnSubTurn completes") + } +} + +// ====================== Extra Independent Test: Dequeue Pending SubTurn Results ====================== +func TestDequeuePendingSubTurnResults(t *testing.T) { + al, _, _, _, cleanup := newTestAgentLoop(t) + defer cleanup() + + sessionKey := "test-session-dequeue" + ch := make(chan *tools.ToolResult, 4) + + // Register channel manually + al.registerSubTurnResultChannel(sessionKey, ch) + defer al.unregisterSubTurnResultChannel(sessionKey) + + // Empty channel returns nil + if results := al.dequeuePendingSubTurnResults(sessionKey); len(results) != 0 { + t.Errorf("expected empty results, got %d", len(results)) + } + + // Put 3 results in + ch <- &tools.ToolResult{ForLLM: "result-1"} + ch <- &tools.ToolResult{ForLLM: "result-2"} + ch <- &tools.ToolResult{ForLLM: "result-3"} + + results := al.dequeuePendingSubTurnResults(sessionKey) + if len(results) != 3 { + t.Errorf("expected 3 results, got %d", len(results)) + } + if results[0].ForLLM != "result-1" || results[2].ForLLM != "result-3" { + t.Error("results order or content mismatch") + } + + // Channel should be drained now + if results := al.dequeuePendingSubTurnResults(sessionKey); len(results) != 0 { + t.Errorf("expected empty after drain, got %d", len(results)) + } + + // Unregistered session returns nil + al.unregisterSubTurnResultChannel(sessionKey) + if results := al.dequeuePendingSubTurnResults(sessionKey); results != nil { + t.Error("expected nil for unregistered session") + } +} diff --git a/pkg/tools/subagent.go b/pkg/tools/subagent.go index e51cbaafa..7a4290746 100644 --- a/pkg/tools/subagent.go +++ b/pkg/tools/subagent.go @@ -21,6 +21,15 @@ type SubagentTask struct { Created int64 } +type SpawnSubTurnFunc func( + ctx context.Context, + task, label, agentID string, + tools *ToolRegistry, + maxTokens int, + temperature float64, + hasMaxTokens, hasTemperature bool, +) (*ToolResult, error) + type SubagentManager struct { tasks map[string]*SubagentTask mu sync.RWMutex @@ -34,6 +43,7 @@ type SubagentManager struct { hasMaxTokens bool hasTemperature bool nextID int + spawner SpawnSubTurnFunc } func NewSubagentManager( @@ -51,6 +61,12 @@ func NewSubagentManager( } } +func (sm *SubagentManager) SetSpawner(spawner SpawnSubTurnFunc) { + sm.mu.Lock() + defer sm.mu.Unlock() + sm.spawner = spawner +} + // SetLLMOptions sets max tokens and temperature for subagent LLM calls. func (sm *SubagentManager) SetLLMOptions(maxTokens int, temperature float64) { sm.mu.Lock() @@ -112,22 +128,6 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask, call task.Status = "running" task.Created = time.Now().UnixMilli() - // Build system prompt for subagent - systemPrompt := `You are a subagent. Complete the given task independently and report the result. -You have access to tools - use them as needed to complete your task. -After completing the task, provide a clear summary of what was done.` - - messages := []providers.Message{ - { - Role: "system", - Content: systemPrompt, - }, - { - Role: "user", - Content: task.Task, - }, - } - // Check if context is already canceled before starting select { case <-ctx.Done(): @@ -139,8 +139,8 @@ After completing the task, provide a clear summary of what was done.` default: } - // Run tool loop with access to tools sm.mu.RLock() + spawner := sm.spawner tools := sm.tools maxIter := sm.maxIterations maxTokens := sm.maxTokens @@ -149,27 +149,59 @@ After completing the task, provide a clear summary of what was done.` hasTemperature := sm.hasTemperature sm.mu.RUnlock() - var llmOptions map[string]any - if hasMaxTokens || hasTemperature { - llmOptions = map[string]any{} - if hasMaxTokens { - llmOptions["max_tokens"] = maxTokens + var result *ToolResult + var err error + + if spawner != nil { + result, err = spawner(ctx, task.Task, task.Label, task.AgentID, tools, maxTokens, temperature, hasMaxTokens, hasTemperature) + } else { + // Fallback to legacy RunToolLoop + systemPrompt := `You are a subagent. Complete the given task independently and report the result. +You have access to tools - use them as needed to complete your task. +After completing the task, provide a clear summary of what was done.` + + messages := []providers.Message{ + {Role: "system", Content: systemPrompt}, + {Role: "user", Content: task.Task}, } - if hasTemperature { - llmOptions["temperature"] = temperature + + var llmOptions map[string]any + if hasMaxTokens || hasTemperature { + llmOptions = map[string]any{} + if hasMaxTokens { + llmOptions["max_tokens"] = maxTokens + } + if hasTemperature { + llmOptions["temperature"] = temperature + } + } + + var loopResult *ToolLoopResult + loopResult, err = RunToolLoop(ctx, ToolLoopConfig{ + Provider: sm.provider, + Model: sm.defaultModel, + Tools: tools, + MaxIterations: maxIter, + LLMOptions: llmOptions, + }, messages, task.OriginChannel, task.OriginChatID) + + if err == nil { + result = &ToolResult{ + ForLLM: fmt.Sprintf( + "Subagent '%s' completed (iterations: %d): %s", + task.Label, + loopResult.Iterations, + loopResult.Content, + ), + ForUser: loopResult.Content, + Silent: false, + IsError: false, + Async: false, + } } } - loopResult, err := RunToolLoop(ctx, ToolLoopConfig{ - Provider: sm.provider, - Model: sm.defaultModel, - Tools: tools, - MaxIterations: maxIter, - LLMOptions: llmOptions, - }, messages, task.OriginChannel, task.OriginChatID) - sm.mu.Lock() - var result *ToolResult defer func() { sm.mu.Unlock() // Call callback if provided and result is set @@ -196,19 +228,7 @@ After completing the task, provide a clear summary of what was done.` } } else { task.Status = "completed" - task.Result = loopResult.Content - result = &ToolResult{ - ForLLM: fmt.Sprintf( - "Subagent '%s' completed (iterations: %d): %s", - task.Label, - loopResult.Iterations, - loopResult.Content, - ), - ForUser: loopResult.Content, - Silent: false, - IsError: false, - Async: false, - } + task.Result = result.ForLLM } } @@ -231,8 +251,6 @@ func (sm *SubagentManager) ListTasks() []*SubagentTask { } // SubagentTool executes a subagent task synchronously and returns the result. -// Unlike SpawnTool which runs tasks asynchronously, SubagentTool waits for completion -// and returns the result directly in the ToolResult. type SubagentTool struct { manager *SubagentManager } @@ -280,7 +298,51 @@ func (t *SubagentTool) Execute(ctx context.Context, args map[string]any) *ToolRe return ErrorResult("Subagent manager not configured").WithError(fmt.Errorf("manager is nil")) } - // Build messages for subagent + sm := t.manager + sm.mu.RLock() + spawner := sm.spawner + tools := sm.tools + maxIter := sm.maxIterations + maxTokens := sm.maxTokens + temperature := sm.temperature + hasMaxTokens := sm.hasMaxTokens + hasTemperature := sm.hasTemperature + sm.mu.RUnlock() + + if spawner != nil { + // Use spawner + res, err := spawner(ctx, task, label, "", tools, maxTokens, temperature, hasMaxTokens, hasTemperature) + if err != nil { + return ErrorResult(fmt.Sprintf("Subagent execution failed: %v", err)).WithError(err) + } + + // Ensure synchronous ForUser display truncates + userContent := res.ForLLM + if res.ForUser != "" { + userContent = res.ForUser + } + maxUserLen := 500 + if len(userContent) > maxUserLen { + userContent = userContent[:maxUserLen] + "..." + } + + labelStr := label + if labelStr == "" { + labelStr = "(unnamed)" + } + llmContent := fmt.Sprintf("Subagent task completed:\nLabel: %s\nResult: %s", + labelStr, res.ForLLM) + + return &ToolResult{ + ForLLM: llmContent, + ForUser: userContent, + Silent: false, + IsError: res.IsError, + Async: false, + } + } + + // Build messages for subagent fallback messages := []providers.Message{ { Role: "system", @@ -292,17 +354,6 @@ func (t *SubagentTool) Execute(ctx context.Context, args map[string]any) *ToolRe }, } - // Use RunToolLoop to execute with tools (same as async SpawnTool) - sm := t.manager - sm.mu.RLock() - tools := sm.tools - maxIter := sm.maxIterations - maxTokens := sm.maxTokens - temperature := sm.temperature - hasMaxTokens := sm.hasMaxTokens - hasTemperature := sm.hasTemperature - sm.mu.RUnlock() - var llmOptions map[string]any if hasMaxTokens || hasTemperature { llmOptions = map[string]any{} @@ -314,8 +365,6 @@ func (t *SubagentTool) Execute(ctx context.Context, args map[string]any) *ToolRe } } - // Fall back to "cli"/"direct" for non-conversation callers (e.g., CLI, tests) - // to preserve the same defaults as the original NewSubagentTool constructor. channel := ToolChannel(ctx) if channel == "" { channel = "cli" @@ -336,14 +385,12 @@ func (t *SubagentTool) Execute(ctx context.Context, args map[string]any) *ToolRe return ErrorResult(fmt.Sprintf("Subagent execution failed: %v", err)).WithError(err) } - // ForUser: Brief summary for user (truncated if too long) userContent := loopResult.Content maxUserLen := 500 if len(userContent) > maxUserLen { userContent = userContent[:maxUserLen] + "..." } - // ForLLM: Full execution details labelStr := label if labelStr == "" { labelStr = "(unnamed)"