diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 6adaa423d..903e919f7 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -1022,7 +1022,7 @@ func (al *AgentLoop) runAgentLoop( session: agent.Sessions, initialHistoryLength: len(agent.Sessions.GetHistory("")), // Snapshot for rollback on hard abort pendingResults: make(chan *tools.ToolResult, 16), - concurrencySem: make(chan struct{}, maxConcurrentSubTurns), // maxConcurrentSubTurns + concurrencySem: make(chan struct{}, al.getSubTurnConfig().maxConcurrent), // maxConcurrentSubTurns } ctx = withTurnState(ctx, rootTS) ctx = WithAgentLoop(ctx, al) // Inject AgentLoop for tool access diff --git a/pkg/agent/subturn.go b/pkg/agent/subturn.go index b8d986841..7980fbafe 100644 --- a/pkg/agent/subturn.go +++ b/pkg/agent/subturn.go @@ -16,17 +16,14 @@ import ( // ====================== Config & Constants ====================== const ( - maxSubTurnDepth = 3 - maxConcurrentSubTurns = 5 - // concurrencyTimeout is the maximum time to wait for a concurrency slot. - // This prevents indefinite blocking when all slots are occupied by slow sub-turns. - concurrencyTimeout = 30 * time.Second + // Default values for SubTurn configuration (used when config is not set or is zero) + defaultMaxSubTurnDepth = 3 + defaultMaxConcurrentSubTurns = 5 + defaultConcurrencyTimeout = 30 * time.Second + defaultSubTurnTimeout = 5 * time.Minute // maxEphemeralHistorySize limits the number of messages stored in ephemeral sessions. // This prevents memory accumulation in long-running sub-turns. maxEphemeralHistorySize = 50 - // defaultSubTurnTimeout is the default maximum duration for a SubTurn. - // SubTurns that run longer than this will be cancelled. - defaultSubTurnTimeout = 5 * time.Minute ) var ( @@ -35,6 +32,48 @@ var ( ErrConcurrencyTimeout = errors.New("timeout waiting for concurrency slot") ) +// getSubTurnConfig returns the effective SubTurn configuration with defaults applied. +func (al *AgentLoop) getSubTurnConfig() subTurnRuntimeConfig { + cfg := al.cfg.Agents.Defaults.SubTurn + + maxDepth := cfg.MaxDepth + if maxDepth <= 0 { + maxDepth = defaultMaxSubTurnDepth + } + + maxConcurrent := cfg.MaxConcurrent + if maxConcurrent <= 0 { + maxConcurrent = defaultMaxConcurrentSubTurns + } + + concurrencyTimeout := time.Duration(cfg.ConcurrencyTimeoutSec) * time.Second + if concurrencyTimeout <= 0 { + concurrencyTimeout = defaultConcurrencyTimeout + } + + defaultTimeout := time.Duration(cfg.DefaultTimeoutMinutes) * time.Minute + if defaultTimeout <= 0 { + defaultTimeout = defaultSubTurnTimeout + } + + return subTurnRuntimeConfig{ + maxDepth: maxDepth, + maxConcurrent: maxConcurrent, + concurrencyTimeout: concurrencyTimeout, + defaultTimeout: defaultTimeout, + defaultTokenBudget: cfg.DefaultTokenBudget, + } +} + +// subTurnRuntimeConfig holds the effective runtime configuration for SubTurn execution. +type subTurnRuntimeConfig struct { + maxDepth int + maxConcurrent int + concurrencyTimeout time.Duration + defaultTimeout time.Duration + defaultTokenBudget int +} + // ====================== SubTurn Config ====================== // SubTurnConfig configures the execution of a child sub-turn. @@ -239,13 +278,16 @@ func SpawnSubTurn(ctx context.Context, cfg SubTurnConfig) (*tools.ToolResult, er } func spawnSubTurn(ctx context.Context, al *AgentLoop, parentTS *turnState, cfg SubTurnConfig) (result *tools.ToolResult, err error) { + // Get effective SubTurn configuration + rtCfg := al.getSubTurnConfig() + // 0. Acquire concurrency semaphore FIRST to ensure it's released even if early validation fails. // Blocks if parent already has maxConcurrentSubTurns running, with a timeout to prevent indefinite blocking. // Also respects context cancellation so we don't block forever if parent is aborted. var semAcquired bool if parentTS.concurrencySem != nil { // Create a timeout context for semaphore acquisition - timeoutCtx, cancel := context.WithTimeout(ctx, concurrencyTimeout) + timeoutCtx, cancel := context.WithTimeout(ctx, rtCfg.concurrencyTimeout) defer cancel() select { @@ -263,16 +305,16 @@ func spawnSubTurn(ctx context.Context, al *AgentLoop, parentTS *turnState, cfg S } // Otherwise it's our timeout return nil, fmt.Errorf("%w: all %d slots occupied for %v", - ErrConcurrencyTimeout, maxConcurrentSubTurns, concurrencyTimeout) + ErrConcurrencyTimeout, rtCfg.maxConcurrent, rtCfg.concurrencyTimeout) } } // 1. Depth limit check - if parentTS.depth >= maxSubTurnDepth { + if parentTS.depth >= rtCfg.maxDepth { logger.WarnCF("subturn", "Depth limit exceeded", map[string]any{ "parent_id": parentTS.turnID, "depth": parentTS.depth, - "max_depth": maxSubTurnDepth, + "max_depth": rtCfg.maxDepth, }) return nil, ErrDepthLimitExceeded } @@ -285,7 +327,7 @@ func spawnSubTurn(ctx context.Context, al *AgentLoop, parentTS *turnState, cfg S // 3. Determine timeout for child SubTurn timeout := cfg.Timeout if timeout <= 0 { - timeout = defaultSubTurnTimeout + timeout = rtCfg.defaultTimeout } // 4. Create INDEPENDENT child context (not derived from parent ctx). @@ -295,7 +337,7 @@ func spawnSubTurn(ctx context.Context, al *AgentLoop, parentTS *turnState, cfg S defer cancel() childID := al.generateSubTurnID() - childTS := newTurnState(childCtx, childID, parentTS) + childTS := newTurnState(childCtx, childID, parentTS, rtCfg.maxConcurrent) // Set the cancel function so Finish(true) can trigger hard cancellation childTS.cancelFunc = cancel childTS.critical = cfg.Critical @@ -307,6 +349,11 @@ func spawnSubTurn(ctx context.Context, al *AgentLoop, parentTS *turnState, cfg S childTS.tokenBudget = cfg.InitialTokenBudget } else if parentTS.tokenBudget != nil { childTS.tokenBudget = parentTS.tokenBudget + } else if rtCfg.defaultTokenBudget > 0 { + // Apply default token budget from config if no budget is set + budget := &atomic.Int64{} + budget.Store(int64(rtCfg.defaultTokenBudget)) + childTS.tokenBudget = budget } // IMPORTANT: Put childTS into childCtx so that code inside runTurn can retrieve it diff --git a/pkg/agent/subturn_test.go b/pkg/agent/subturn_test.go index 883958231..009800ee4 100644 --- a/pkg/agent/subturn_test.go +++ b/pkg/agent/subturn_test.go @@ -15,6 +15,11 @@ import ( "github.com/sipeed/picoclaw/pkg/tools" ) +// Test constants (use defaults from subturn.go) +const ( + testMaxConcurrentSubTurns = defaultMaxConcurrentSubTurns +) + // ====================== Test Helper: Event Collector ====================== type eventCollector struct { events []any @@ -918,7 +923,7 @@ func TestGetActiveTurn(t *testing.T) { childTurnIDs: []string{}, session: newEphemeralSession(nil), pendingResults: make(chan *tools.ToolResult, 16), - concurrencySem: make(chan struct{}, maxConcurrentSubTurns), + concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns), } sessionKey := "test-session" @@ -975,7 +980,7 @@ func TestGetActiveTurn_WithChildren(t *testing.T) { childTurnIDs: []string{"child-1", "child-2"}, session: newEphemeralSession(nil), pendingResults: make(chan *tools.ToolResult, 16), - concurrencySem: make(chan struct{}, maxConcurrentSubTurns), + concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns), } sessionKey := "test-session-with-children" @@ -1007,7 +1012,7 @@ func TestTurnStateInfo_ThreadSafety(t *testing.T) { childTurnIDs: []string{}, session: newEphemeralSession(nil), pendingResults: make(chan *tools.ToolResult, 16), - concurrencySem: make(chan struct{}, maxConcurrentSubTurns), + concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns), } // Concurrently read Info() and modify childTurnIDs @@ -1120,7 +1125,7 @@ func TestInterruptHard_Alias(t *testing.T) { session: newEphemeralSession(nil), initialHistoryLength: 0, pendingResults: make(chan *tools.ToolResult, 16), - concurrencySem: make(chan struct{}, maxConcurrentSubTurns), + concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns), } sessionKey := "test-session-interrupt" @@ -1148,7 +1153,7 @@ func TestFinish_ConcurrentCalls(t *testing.T) { turnID: "parent-concurrent-finish", depth: 0, pendingResults: make(chan *tools.ToolResult, 16), - concurrencySem: make(chan struct{}, maxConcurrentSubTurns), + concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns), } parentTS.ctx, parentTS.cancelFunc = context.WithCancel(ctx) @@ -1214,7 +1219,7 @@ func TestDeliverSubTurnResult_RaceWithFinish(t *testing.T) { turnID: "parent-race-test", depth: 0, pendingResults: make(chan *tools.ToolResult, 16), - concurrencySem: make(chan struct{}, maxConcurrentSubTurns), + concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns), } parentTS.ctx, parentTS.cancelFunc = context.WithCancel(ctx) @@ -1296,13 +1301,13 @@ func TestConcurrencySemaphore_Timeout(t *testing.T) { depth: 0, session: newEphemeralSession(nil), pendingResults: make(chan *tools.ToolResult, 16), - concurrencySem: make(chan struct{}, maxConcurrentSubTurns), + concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns), } parentTS.ctx, parentTS.cancelFunc = context.WithCancel(ctx) defer parentTS.Finish(false) // Fill all concurrency slots - for i := 0; i < maxConcurrentSubTurns; i++ { + for i := 0; i < testMaxConcurrentSubTurns; i++ { parentTS.concurrencySem <- struct{}{} } @@ -1339,7 +1344,7 @@ func TestConcurrencySemaphore_Timeout(t *testing.T) { t.Logf("Timeout occurred after %v with error: %v", elapsed, err) // Clean up - drain the semaphore - for i := 0; i < maxConcurrentSubTurns; i++ { + for i := 0; i < testMaxConcurrentSubTurns; i++ { <-parentTS.concurrencySem } } @@ -1396,7 +1401,7 @@ func TestContextWrapping_SingleLayer(t *testing.T) { depth: 0, session: newEphemeralSession(nil), pendingResults: make(chan *tools.ToolResult, 16), - concurrencySem: make(chan struct{}, maxConcurrentSubTurns), + concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns), } parentTS.ctx, parentTS.cancelFunc = context.WithCancel(ctx) defer parentTS.Finish(false) @@ -1442,7 +1447,7 @@ func TestSyncSubTurn_NoChannelDelivery(t *testing.T) { depth: 0, session: newEphemeralSession(nil), pendingResults: make(chan *tools.ToolResult, 16), - concurrencySem: make(chan struct{}, maxConcurrentSubTurns), + concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns), } parentTS.ctx, parentTS.cancelFunc = context.WithCancel(ctx) defer parentTS.Finish(false) @@ -1499,7 +1504,7 @@ func TestAsyncSubTurn_ChannelDelivery(t *testing.T) { depth: 0, session: newEphemeralSession(nil), pendingResults: make(chan *tools.ToolResult, 16), - concurrencySem: make(chan struct{}, maxConcurrentSubTurns), + concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns), } parentTS.ctx, parentTS.cancelFunc = context.WithCancel(ctx) defer parentTS.Finish(false) @@ -1543,7 +1548,7 @@ func TestGrandchildAbort_CascadingCancellation(t *testing.T) { depth: 0, session: newEphemeralSession(nil), pendingResults: make(chan *tools.ToolResult, 16), - concurrencySem: make(chan struct{}, maxConcurrentSubTurns), + concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns), } grandparentTS.ctx, grandparentTS.cancelFunc = context.WithCancel(ctx) @@ -1557,7 +1562,7 @@ func TestGrandchildAbort_CascadingCancellation(t *testing.T) { depth: 1, session: newEphemeralSession(nil), pendingResults: make(chan *tools.ToolResult, 16), - concurrencySem: make(chan struct{}, maxConcurrentSubTurns), + concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns), } parentTS.cancelFunc = parentCancel @@ -1571,7 +1576,7 @@ func TestGrandchildAbort_CascadingCancellation(t *testing.T) { depth: 2, session: newEphemeralSession(nil), pendingResults: make(chan *tools.ToolResult, 16), - concurrencySem: make(chan struct{}, maxConcurrentSubTurns), + concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns), } childTS.cancelFunc = childCancel @@ -1642,7 +1647,7 @@ func TestSpawnDuringAbort_RaceCondition(t *testing.T) { depth: 0, session: newEphemeralSession(nil), pendingResults: make(chan *tools.ToolResult, 16), - concurrencySem: make(chan struct{}, maxConcurrentSubTurns), + concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns), } parentTS.ctx, parentTS.cancelFunc = context.WithCancel(ctx) @@ -1755,7 +1760,7 @@ func TestAsyncSubTurn_ParentFinishesEarly(t *testing.T) { depth: 0, session: newEphemeralSession(nil), pendingResults: make(chan *tools.ToolResult, 16), - concurrencySem: make(chan struct{}, maxConcurrentSubTurns), + concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns), } parentTS.ctx, parentTS.cancelFunc = context.WithCancel(ctx) @@ -1828,7 +1833,7 @@ func TestAsyncSubTurn_ParentWaitsForChild(t *testing.T) { depth: 0, session: newEphemeralSession(nil), pendingResults: make(chan *tools.ToolResult, 16), - concurrencySem: make(chan struct{}, maxConcurrentSubTurns), + concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns), } parentTS.ctx, parentTS.cancelFunc = context.WithCancel(ctx) @@ -1995,7 +2000,7 @@ func TestSubTurn_IndependentContext(t *testing.T) { depth: 0, session: newEphemeralSession(nil), pendingResults: make(chan *tools.ToolResult, 16), - concurrencySem: make(chan struct{}, maxConcurrentSubTurns), + concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns), } parentTS.ctx, parentTS.cancelFunc = context.WithCancel(ctx) diff --git a/pkg/agent/turn_state.go b/pkg/agent/turn_state.go index 1f7716ec7..2afb8861d 100644 --- a/pkg/agent/turn_state.go +++ b/pkg/agent/turn_state.go @@ -199,7 +199,7 @@ func (al *AgentLoop) FormatTree(turnInfo *TurnInfo, prefix string, isLast bool) // ====================== Helper Functions ====================== -func newTurnState(ctx context.Context, id string, parent *turnState) *turnState { +func newTurnState(ctx context.Context, id string, parent *turnState, maxConcurrent int) *turnState { // Note: We don't create a new context with cancel here because the caller // (spawnSubTurn) already creates one. The turnState stores the context and // cancelFunc provided by the caller to avoid redundant context wrapping. @@ -216,7 +216,7 @@ func newTurnState(ctx context.Context, id string, parent *turnState) *turnState // intermediate results to be discarded in deliverSubTurnResult. // For production, consider an unbounded queue or a blocking strategy with backpressure. pendingResults: make(chan *tools.ToolResult, 16), - concurrencySem: make(chan struct{}, maxConcurrentSubTurns), + concurrencySem: make(chan struct{}, maxConcurrent), } } diff --git a/pkg/config/config.go b/pkg/config/config.go index fe0fd711d..f948c26c2 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -219,24 +219,34 @@ type RoutingConfig struct { Threshold float64 `json:"threshold"` // complexity score in [0,1]; score >= threshold → primary model } +// SubTurnConfig configures the SubTurn execution system. +type SubTurnConfig struct { + MaxDepth int `json:"max_depth" env:"PICOCLAW_AGENTS_DEFAULTS_SUBTURN_MAX_DEPTH"` + MaxConcurrent int `json:"max_concurrent" env:"PICOCLAW_AGENTS_DEFAULTS_SUBTURN_MAX_CONCURRENT"` + DefaultTimeoutMinutes int `json:"default_timeout_minutes" env:"PICOCLAW_AGENTS_DEFAULTS_SUBTURN_DEFAULT_TIMEOUT_MINUTES"` + DefaultTokenBudget int `json:"default_token_budget" env:"PICOCLAW_AGENTS_DEFAULTS_SUBTURN_DEFAULT_TOKEN_BUDGET"` + ConcurrencyTimeoutSec int `json:"concurrency_timeout_sec" env:"PICOCLAW_AGENTS_DEFAULTS_SUBTURN_CONCURRENCY_TIMEOUT_SEC"` +} + type AgentDefaults struct { - Workspace string `json:"workspace" env:"PICOCLAW_AGENTS_DEFAULTS_WORKSPACE"` - RestrictToWorkspace bool `json:"restrict_to_workspace" env:"PICOCLAW_AGENTS_DEFAULTS_RESTRICT_TO_WORKSPACE"` - AllowReadOutsideWorkspace bool `json:"allow_read_outside_workspace" env:"PICOCLAW_AGENTS_DEFAULTS_ALLOW_READ_OUTSIDE_WORKSPACE"` - Provider string `json:"provider" env:"PICOCLAW_AGENTS_DEFAULTS_PROVIDER"` - ModelName string `json:"model_name" env:"PICOCLAW_AGENTS_DEFAULTS_MODEL_NAME"` - Model string `json:"model,omitempty" env:"PICOCLAW_AGENTS_DEFAULTS_MODEL"` // Deprecated: use model_name instead - ModelFallbacks []string `json:"model_fallbacks,omitempty"` - ImageModel string `json:"image_model,omitempty" env:"PICOCLAW_AGENTS_DEFAULTS_IMAGE_MODEL"` - ImageModelFallbacks []string `json:"image_model_fallbacks,omitempty"` - MaxTokens int `json:"max_tokens" env:"PICOCLAW_AGENTS_DEFAULTS_MAX_TOKENS"` - Temperature *float64 `json:"temperature,omitempty" env:"PICOCLAW_AGENTS_DEFAULTS_TEMPERATURE"` - MaxToolIterations int `json:"max_tool_iterations" env:"PICOCLAW_AGENTS_DEFAULTS_MAX_TOOL_ITERATIONS"` - SummarizeMessageThreshold int `json:"summarize_message_threshold" env:"PICOCLAW_AGENTS_DEFAULTS_SUMMARIZE_MESSAGE_THRESHOLD"` - SummarizeTokenPercent int `json:"summarize_token_percent" env:"PICOCLAW_AGENTS_DEFAULTS_SUMMARIZE_TOKEN_PERCENT"` - MaxMediaSize int `json:"max_media_size,omitempty" env:"PICOCLAW_AGENTS_DEFAULTS_MAX_MEDIA_SIZE"` - Routing *RoutingConfig `json:"routing,omitempty"` - SteeringMode string `json:"steering_mode,omitempty" env:"PICOCLAW_AGENTS_DEFAULTS_STEERING_MODE"` // "one-at-a-time" (default) or "all" + Workspace string `json:"workspace" env:"PICOCLAW_AGENTS_DEFAULTS_WORKSPACE"` + RestrictToWorkspace bool `json:"restrict_to_workspace" env:"PICOCLAW_AGENTS_DEFAULTS_RESTRICT_TO_WORKSPACE"` + AllowReadOutsideWorkspace bool `json:"allow_read_outside_workspace" env:"PICOCLAW_AGENTS_DEFAULTS_ALLOW_READ_OUTSIDE_WORKSPACE"` + Provider string `json:"provider" env:"PICOCLAW_AGENTS_DEFAULTS_PROVIDER"` + ModelName string `json:"model_name" env:"PICOCLAW_AGENTS_DEFAULTS_MODEL_NAME"` + Model string `json:"model,omitempty" env:"PICOCLAW_AGENTS_DEFAULTS_MODEL"` // Deprecated: use model_name instead + ModelFallbacks []string `json:"model_fallbacks,omitempty"` + ImageModel string `json:"image_model,omitempty" env:"PICOCLAW_AGENTS_DEFAULTS_IMAGE_MODEL"` + ImageModelFallbacks []string `json:"image_model_fallbacks,omitempty"` + MaxTokens int `json:"max_tokens" env:"PICOCLAW_AGENTS_DEFAULTS_MAX_TOKENS"` + Temperature *float64 `json:"temperature,omitempty" env:"PICOCLAW_AGENTS_DEFAULTS_TEMPERATURE"` + MaxToolIterations int `json:"max_tool_iterations" env:"PICOCLAW_AGENTS_DEFAULTS_MAX_TOOL_ITERATIONS"` + SummarizeMessageThreshold int `json:"summarize_message_threshold" env:"PICOCLAW_AGENTS_DEFAULTS_SUMMARIZE_MESSAGE_THRESHOLD"` + SummarizeTokenPercent int `json:"summarize_token_percent" env:"PICOCLAW_AGENTS_DEFAULTS_SUMMARIZE_TOKEN_PERCENT"` + MaxMediaSize int `json:"max_media_size,omitempty" env:"PICOCLAW_AGENTS_DEFAULTS_MAX_MEDIA_SIZE"` + Routing *RoutingConfig `json:"routing,omitempty"` + SteeringMode string `json:"steering_mode,omitempty" env:"PICOCLAW_AGENTS_DEFAULTS_STEERING_MODE"` // "one-at-a-time" (default) or "all" + SubTurn SubTurnConfig `json:"subturn" envPrefix:"PICOCLAW_AGENTS_DEFAULTS_SUBTURN_"` } const DefaultMaxMediaSize = 20 * 1024 * 1024 // 20 MB