package agent import ( "context" "errors" "fmt" "sync" "sync/atomic" "time" "github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/tools" ) // ====================== Config & Constants ====================== const ( // Default values for SubTurn configuration (used when config is not set or is zero) defaultMaxSubTurnDepth = 3 defaultMaxConcurrentSubTurns = 5 defaultConcurrencyTimeout = 30 * time.Second defaultSubTurnTimeout = 5 * time.Minute // maxEphemeralHistorySize limits the number of messages stored in ephemeral sessions. // This prevents memory accumulation in long-running sub-turns. maxEphemeralHistorySize = 50 ) var ( ErrDepthLimitExceeded = errors.New("sub-turn depth limit exceeded") ErrInvalidSubTurnConfig = errors.New("invalid sub-turn config") ErrConcurrencyTimeout = errors.New("timeout waiting for concurrency slot") ) // getSubTurnConfig returns the effective SubTurn configuration with defaults applied. func (al *AgentLoop) getSubTurnConfig() subTurnRuntimeConfig { cfg := al.cfg.Agents.Defaults.SubTurn maxDepth := cfg.MaxDepth if maxDepth <= 0 { maxDepth = defaultMaxSubTurnDepth } maxConcurrent := cfg.MaxConcurrent if maxConcurrent <= 0 { maxConcurrent = defaultMaxConcurrentSubTurns } concurrencyTimeout := time.Duration(cfg.ConcurrencyTimeoutSec) * time.Second if concurrencyTimeout <= 0 { concurrencyTimeout = defaultConcurrencyTimeout } defaultTimeout := time.Duration(cfg.DefaultTimeoutMinutes) * time.Minute if defaultTimeout <= 0 { defaultTimeout = defaultSubTurnTimeout } return subTurnRuntimeConfig{ maxDepth: maxDepth, maxConcurrent: maxConcurrent, concurrencyTimeout: concurrencyTimeout, defaultTimeout: defaultTimeout, defaultTokenBudget: cfg.DefaultTokenBudget, } } // subTurnRuntimeConfig holds the effective runtime configuration for SubTurn execution. type subTurnRuntimeConfig struct { maxDepth int maxConcurrent int concurrencyTimeout time.Duration defaultTimeout time.Duration defaultTokenBudget int } // ====================== SubTurn Config ====================== // SubTurnConfig configures the execution of a child sub-turn. // // Usage Examples: // // Synchronous sub-turn (Async=false): // // cfg := SubTurnConfig{ // Model: "gpt-4o-mini", // SystemPrompt: "Analyze this code", // Async: false, // Result returned immediately // } // result, err := SpawnSubTurn(ctx, cfg) // // Use result directly here // processResult(result) // // Asynchronous sub-turn (Async=true): // // cfg := SubTurnConfig{ // Model: "gpt-4o-mini", // SystemPrompt: "Background analysis", // Async: true, // Result delivered to channel // } // result, err := SpawnSubTurn(ctx, cfg) // // Result also available in parent's pendingResults channel // // Parent turn will poll and process it in a later iteration type SubTurnConfig struct { Model string Tools []tools.Tool SystemPrompt string MaxTokens int // Async controls the result delivery mechanism: // // When Async = false (synchronous sub-turn): // - The caller blocks until the sub-turn completes // - The result is ONLY returned via the function return value // - The result is NOT delivered to the parent's pendingResults channel // - This prevents double delivery: caller gets result immediately, no need for channel // - Use case: When the caller needs the result immediately to continue execution // - Example: A tool that needs to process the sub-turn result before returning // // When Async = true (asynchronous sub-turn): // - The sub-turn runs in the background (still blocks the caller, but semantically async) // - The result is delivered to the parent's pendingResults channel // - The result is ALSO returned via the function return value (for consistency) // - The parent turn can poll pendingResults in later iterations to process results // - Use case: Fire-and-forget operations, or when results are processed in batches // - Example: Spawning multiple sub-turns in parallel and collecting results later // // IMPORTANT: The Async flag does NOT make the call non-blocking. It only controls // whether the result is delivered via the channel. For true non-blocking execution, // the caller must spawn the sub-turn in a separate goroutine. Async bool // Critical indicates this SubTurn's result is important and should continue // running even after the parent turn finishes gracefully. // // When parent finishes gracefully (Finish(false)): // - Critical=true: SubTurn continues running, delivers result as orphan // - Critical=false: SubTurn exits gracefully without error // // When parent finishes with hard abort (Finish(true)): // - All SubTurns are canceled regardless of Critical flag Critical bool // Timeout is the maximum duration for this SubTurn. // If the SubTurn runs longer than this, it will be canceled. // Default is 5 minutes (defaultSubTurnTimeout) if not specified. Timeout time.Duration // MaxContextRunes limits the context size (in runes) passed to the SubTurn. // This prevents context window overflow by truncating message history before LLM calls. // // Values: // 0 = Auto-calculate based on model's ContextWindow * 0.75 (default, recommended) // -1 = No limit (disable soft truncation, rely only on hard context errors) // >0 = Use specified rune limit // // The soft limit acts as a first line of defense before hitting the provider's // hard context window limit. When exceeded, older messages are intelligently // truncated while preserving system messages and recent context. MaxContextRunes int // ActualSystemPrompt is injected as the true 'system' role message for the childAgent. // The legacy SystemPrompt field is actually used as the first 'user' message (task description). ActualSystemPrompt string // InitialMessages preloads the ephemeral session history before the agent loop starts. // Used by evaluator-optimizer patterns to pass the full worker context across multiple iterations. InitialMessages []providers.Message // InitialTokenBudget is a shared atomic counter for tracking remaining tokens. // If set, the SubTurn will inherit this budget and deduct tokens after each LLM call. // If nil, the SubTurn will inherit the parent's tokenBudget (if any). // Used by team tool to enforce token limits across all team members. InitialTokenBudget *atomic.Int64 // Can be extended with temperature, topP, etc. } // ====================== Context Keys ====================== type agentLoopKeyType struct{} var agentLoopKey = agentLoopKeyType{} // WithAgentLoop injects AgentLoop into context for tool access func WithAgentLoop(ctx context.Context, al *AgentLoop) context.Context { return context.WithValue(ctx, agentLoopKey, al) } // AgentLoopFromContext retrieves AgentLoop from context func AgentLoopFromContext(ctx context.Context) *AgentLoop { al, _ := ctx.Value(agentLoopKey).(*AgentLoop) return al } // ====================== Helper Functions ====================== func (al *AgentLoop) generateSubTurnID() string { return fmt.Sprintf("subturn-%d", al.subTurnCounter.Add(1)) } // ====================== Core Function: spawnSubTurn ====================== // AgentLoopSpawner implements tools.SubTurnSpawner interface. // This allows tools to spawn sub-turns without circular dependency. type AgentLoopSpawner struct { al *AgentLoop } // SpawnSubTurn implements tools.SubTurnSpawner interface. func (s *AgentLoopSpawner) SpawnSubTurn( ctx context.Context, cfg tools.SubTurnConfig, ) (*tools.ToolResult, error) { parentTS := turnStateFromContext(ctx) if parentTS == nil { return nil, errors.New( "parent turnState not found in context - cannot spawn sub-turn outside of a turn", ) } // Convert tools.SubTurnConfig to agent.SubTurnConfig agentCfg := SubTurnConfig{ Model: cfg.Model, Tools: cfg.Tools, SystemPrompt: cfg.SystemPrompt, ActualSystemPrompt: cfg.ActualSystemPrompt, InitialMessages: cfg.InitialMessages, InitialTokenBudget: cfg.InitialTokenBudget, MaxTokens: cfg.MaxTokens, Async: cfg.Async, Critical: cfg.Critical, Timeout: cfg.Timeout, MaxContextRunes: cfg.MaxContextRunes, } return spawnSubTurn(ctx, s.al, parentTS, agentCfg) } // NewSubTurnSpawner creates a SubTurnSpawner for the given AgentLoop. func NewSubTurnSpawner(al *AgentLoop) *AgentLoopSpawner { return &AgentLoopSpawner{al: al} } // SpawnSubTurn is the exported entry point for tools to spawn sub-turns. // It retrieves AgentLoop and parent turnState from context and delegates to spawnSubTurn. func SpawnSubTurn(ctx context.Context, cfg SubTurnConfig) (*tools.ToolResult, error) { al := AgentLoopFromContext(ctx) if al == nil { return nil, errors.New( "AgentLoop not found in context - ensure context is properly initialized", ) } parentTS := turnStateFromContext(ctx) if parentTS == nil { return nil, errors.New( "parent turnState not found in context - cannot spawn sub-turn outside of a turn", ) } return spawnSubTurn(ctx, al, parentTS, cfg) } func spawnSubTurn( ctx context.Context, al *AgentLoop, parentTS *turnState, cfg SubTurnConfig, ) (result *tools.ToolResult, err error) { // Get effective SubTurn configuration rtCfg := al.getSubTurnConfig() // 0. Acquire concurrency semaphore FIRST to ensure it's released even if early validation fails. // Blocks if parent already has maxConcurrentSubTurns running, with a timeout to prevent indefinite blocking. // Also respects context cancellation so we don't block forever if parent is aborted. // NOTE: The semaphore is released immediately after runTurn completes (not in a defer) to // ensure it is freed before the cleanup phase (async result delivery), which may block on // a full pendingResults channel. Holding the semaphore through cleanup would allow the // parent's goroutine to be blocked waiting for a semaphore slot while child turns are // blocked delivering results — a deadlock. var semAcquired bool if parentTS.concurrencySem != nil { // Create a timeout context for semaphore acquisition timeoutCtx, cancel := context.WithTimeout(ctx, rtCfg.concurrencyTimeout) defer cancel() select { case parentTS.concurrencySem <- struct{}{}: semAcquired = true defer func() { if semAcquired { <-parentTS.concurrencySem } }() case <-timeoutCtx.Done(): // Check parent context first - if it was canceled, propagate that error if ctx.Err() != nil { return nil, ctx.Err() } // Otherwise it's our timeout return nil, fmt.Errorf("%w: all %d slots occupied for %v", ErrConcurrencyTimeout, rtCfg.maxConcurrent, rtCfg.concurrencyTimeout) } } // 1. Depth limit check if parentTS.depth >= rtCfg.maxDepth { logger.WarnCF("subturn", "Depth limit exceeded", map[string]any{ "parent_id": parentTS.turnID, "depth": parentTS.depth, "max_depth": rtCfg.maxDepth, }) return nil, ErrDepthLimitExceeded } // 2. Config validation if cfg.Model == "" { return nil, ErrInvalidSubTurnConfig } // 3. Determine timeout for child SubTurn timeout := cfg.Timeout if timeout <= 0 { timeout = rtCfg.defaultTimeout } // 4. Create INDEPENDENT child context (not derived from parent ctx). // This allows the child to continue running after parent finishes gracefully. // The child has its own timeout for self-protection. childCtx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() childID := al.generateSubTurnID() // Get the agent instance from parent, falling back to the default agent. // Wrap it in a shallow copy that uses an ephemeral (in-memory only) session store // so that child turns never pollute or persist to the parent's session history. baseAgent := parentTS.agent if baseAgent == nil { baseAgent = al.registry.GetDefaultAgent() } if baseAgent == nil { return nil, errors.New("parent turnState has no agent instance") } ephemeralStore := newEphemeralSession(nil) agent := *baseAgent // shallow copy agent.Sessions = ephemeralStore // Clone the tool registry so child turn's tool registrations // don't pollute the parent's registry. if baseAgent.Tools != nil { agent.Tools = baseAgent.Tools.Clone() } // Create processOptions for the child turn opts := processOptions{ SessionKey: childID, Channel: parentTS.channel, ChatID: parentTS.chatID, SenderID: parentTS.opts.SenderID, SenderDisplayName: parentTS.opts.SenderDisplayName, UserMessage: cfg.SystemPrompt, // Task description becomes the first user message SystemPromptOverride: cfg.ActualSystemPrompt, Media: nil, InitialSteeringMessages: cfg.InitialMessages, DefaultResponse: "", EnableSummary: false, SendResponse: false, NoHistory: true, // SubTurns don't use session history SkipInitialSteeringPoll: true, } // Create event scope for the child turn scope := al.newTurnEventScope(agent.ID, childID) // Create child turnState using the new API childTS := newTurnState(&agent, opts, scope) // Set SubTurn-specific fields childTS.cancelFunc = cancel childTS.critical = cfg.Critical childTS.depth = parentTS.depth + 1 childTS.parentTurnID = parentTS.turnID childTS.parentTurnState = parentTS childTS.pendingResults = make(chan *tools.ToolResult, 16) childTS.concurrencySem = make(chan struct{}, rtCfg.maxConcurrent) childTS.al = al // back-ref for hard abort cascade childTS.session = ephemeralStore // same store as agent.Sessions // Token budget initialization/inheritance // If InitialTokenBudget is explicitly provided (e.g., by team tool), use it. // Otherwise, inherit from parent's tokenBudget (for nested SubTurns). if cfg.InitialTokenBudget != nil { childTS.tokenBudget = cfg.InitialTokenBudget } else if parentTS.tokenBudget != nil { childTS.tokenBudget = parentTS.tokenBudget } else if rtCfg.defaultTokenBudget > 0 { // Apply default token budget from config if no budget is set budget := &atomic.Int64{} budget.Store(int64(rtCfg.defaultTokenBudget)) childTS.tokenBudget = budget } // IMPORTANT: Put childTS into childCtx so that code inside runTurn can retrieve it childCtx = withTurnState(childCtx, childTS) childCtx = WithAgentLoop(childCtx, al) // Propagate AgentLoop to child turn childTS.ctx = childCtx // Register child turn state so GetAllActiveTurns/Subagents can find it al.activeTurnStates.Store(childID, childTS) defer al.activeTurnStates.Delete(childID) // 5. Establish parent-child relationship (thread-safe) parentTS.mu.Lock() parentTS.childTurnIDs = append(parentTS.childTurnIDs, childID) parentTS.mu.Unlock() // 6. Emit Spawn event al.emitEvent(EventKindSubTurnSpawn, childTS.eventMeta("spawnSubTurn", "subturn.spawn"), SubTurnSpawnPayload{ AgentID: childTS.agentID, Label: childID, ParentTurnID: parentTS.turnID, }, ) // 7. Defer cleanup: deliver result (for async), emit End event, and recover from panics defer func() { if r := recover(); r != nil { err = fmt.Errorf("subturn panicked: %v", r) result = nil logger.ErrorCF("subturn", "SubTurn panicked", map[string]any{ "child_id": childID, "parent_id": parentTS.turnID, "panic": r, }) } // Result Delivery Strategy (Async vs Sync) if cfg.Async { deliverSubTurnResult(al, parentTS, childID, result) } status := "completed" if err != nil { status = "error" } al.emitEvent(EventKindSubTurnEnd, childTS.eventMeta("spawnSubTurn", "subturn.end"), SubTurnEndPayload{ AgentID: childTS.agentID, Status: status, }, ) }() // 8. Execute sub-turn via the real agent loop. turnRes, turnErr := al.runTurn(childCtx, childTS) // Release the concurrency semaphore immediately after runTurn completes, // before the cleanup defer runs. This prevents a deadlock where: // - All semaphore slots are held by sub-turns in their cleanup phase // - Cleanup blocks on a full pendingResults channel // - The parent goroutine is blocked waiting for a semaphore slot // - The parent cannot consume pendingResults because it is blocked on the semaphore if semAcquired { <-parentTS.concurrencySem semAcquired = false // prevent the defer from double-releasing } // Convert turnResult to tools.ToolResult if turnErr != nil { err = turnErr result = &tools.ToolResult{ Err: turnErr, ForLLM: fmt.Sprintf("SubTurn failed: %v", turnErr), } } else { result = &tools.ToolResult{ ForLLM: turnRes.finalContent, ForUser: turnRes.finalContent, } } return result, err } // ====================== Result Delivery ====================== // deliverSubTurnResult delivers a sub-turn result to the parent turn's pendingResults channel. // // IMPORTANT: This function is ONLY called for asynchronous sub-turns (Async=true). // For synchronous sub-turns (Async=false), results are returned directly via the function // return value to avoid double delivery. // // Delivery behavior: // - If parent turn is still running: attempts to deliver to pendingResults channel // - If channel is full: emits SubTurnOrphanResultEvent (result is lost from channel but tracked) // - If parent turn has finished: emits SubTurnOrphanResultEvent (late arrival) // // Thread safety: // - Reads parent state under lock, then releases lock before channel send // - Small race window exists but is acceptable (worst case: result becomes orphan) // // Event emissions: // - SubTurnResultDeliveredEvent: successful delivery to channel // - SubTurnOrphanResultEvent: delivery failed (parent finished or channel full) func deliverSubTurnResult(al *AgentLoop, parentTS *turnState, childID string, result *tools.ToolResult) { // Let GC clean up the pendingResults channel; parent Finish will no longer close it. // We use defer/recover to catch any unlikely channel panics if it were ever closed. defer func() { if r := recover(); r != nil { logger.WarnCF("subturn", "recovered panic sending to pendingResults", map[string]any{ "parent_id": parentTS.turnID, "child_id": childID, "recover": r, }) if result != nil && al != nil { al.emitEvent(EventKindSubTurnOrphan, parentTS.eventMeta("deliverSubTurnResult", "subturn.orphan"), SubTurnOrphanPayload{ParentTurnID: parentTS.turnID, ChildTurnID: childID, Reason: "panic"}, ) } } }() parentTS.mu.Lock() isFinished := parentTS.isFinished.Load() resultChan := parentTS.pendingResults parentTS.mu.Unlock() // If parent turn has already finished, treat this as an orphan result if isFinished || resultChan == nil { if result != nil && al != nil { al.emitEvent(EventKindSubTurnOrphan, parentTS.eventMeta("deliverSubTurnResult", "subturn.orphan"), SubTurnOrphanPayload{ParentTurnID: parentTS.turnID, ChildTurnID: childID, Reason: "parent_finished"}, ) } return } // Parent Turn is still running → attempt to deliver result // We use a select statement with parentTS.Finished() to ensure that if the // parent turn finishes while we are waiting to send the result (e.g. channel // is full), we don't leak this goroutine by blocking forever. select { case resultChan <- result: // Successfully delivered if al != nil { al.emitEvent(EventKindSubTurnResultDelivered, parentTS.eventMeta("deliverSubTurnResult", "subturn.result_delivered"), SubTurnResultDeliveredPayload{ContentLen: len(result.ForLLM)}, ) } case <-parentTS.Finished(): // Parent finished while we were waiting to deliver. // The result cannot be delivered to the LLM, so it becomes an orphan. logger.WarnCF("subturn", "parent finished before result could be delivered", map[string]any{ "parent_id": parentTS.turnID, "child_id": childID, }) if result != nil && al != nil { al.emitEvent( EventKindSubTurnOrphan, parentTS.eventMeta("deliverSubTurnResult", "subturn.orphan"), SubTurnOrphanPayload{ ParentTurnID: parentTS.turnID, ChildTurnID: childID, Reason: "parent_finished_waiting", }, ) } } } // ====================== Other Types ====================== // ephemeralSessionStore is an in-memory session.SessionStore used by SubTurns. // It does not persist to disk and auto-truncates history to maxEphemeralHistorySize. type ephemeralSessionStore struct { mu sync.Mutex history []providers.Message summary string } func newEphemeralSession(initial []providers.Message) ephemeralSessionStoreIface { s := &ephemeralSessionStore{} if len(initial) > 0 { s.history = append(s.history, initial...) } return s } // ephemeralSessionStoreIface is satisfied by *ephemeralSessionStore. // Declared so newEphemeralSession can return a typed interface. type ephemeralSessionStoreIface interface { AddMessage(sessionKey, role, content string) AddFullMessage(sessionKey string, msg providers.Message) GetHistory(key string) []providers.Message GetSummary(key string) string SetSummary(key, summary string) SetHistory(key string, history []providers.Message) TruncateHistory(key string, keepLast int) Save(key string) error Close() error } func (e *ephemeralSessionStore) AddMessage(_, role, content string) { e.mu.Lock() defer e.mu.Unlock() e.history = append(e.history, providers.Message{Role: role, Content: content}) e.truncateLocked() } func (e *ephemeralSessionStore) AddFullMessage(_ string, msg providers.Message) { e.mu.Lock() defer e.mu.Unlock() e.history = append(e.history, msg) e.truncateLocked() } func (e *ephemeralSessionStore) GetHistory(_ string) []providers.Message { e.mu.Lock() defer e.mu.Unlock() out := make([]providers.Message, len(e.history)) copy(out, e.history) return out } func (e *ephemeralSessionStore) GetSummary(_ string) string { e.mu.Lock() defer e.mu.Unlock() return e.summary } func (e *ephemeralSessionStore) SetSummary(_, summary string) { e.mu.Lock() defer e.mu.Unlock() e.summary = summary } func (e *ephemeralSessionStore) SetHistory(_ string, history []providers.Message) { e.mu.Lock() defer e.mu.Unlock() e.history = make([]providers.Message, len(history)) copy(e.history, history) e.truncateLocked() } func (e *ephemeralSessionStore) TruncateHistory(_ string, keepLast int) { e.mu.Lock() defer e.mu.Unlock() if keepLast <= 0 { e.history = nil return } if keepLast >= len(e.history) { return } e.history = e.history[len(e.history)-keepLast:] } func (e *ephemeralSessionStore) Save(_ string) error { return nil } func (e *ephemeralSessionStore) Close() error { return nil } func (e *ephemeralSessionStore) truncateLocked() { if len(e.history) > maxEphemeralHistorySize { e.history = e.history[len(e.history)-maxEphemeralHistorySize:] } }