feat(subturn): add configurable runtime parameters under agents.defaults

Replace hardcoded constants with config-driven parameters in agents.defaults:
- MaxDepth, MaxConcurrent, DefaultTimeout, DefaultTokenBudget, ConcurrencyTimeout
- Support JSON config and env vars (PICOCLAW_AGENTS_DEFAULTS_SUBTURN_*)
- Add getSubTurnConfig() for runtime config resolution with defaults
- Apply defaultTokenBudget when no explicit budget is provided

Rationale: SubTurn is agent execution infrastructure, not a tool, so it belongs
in agents.defaults rather than tools config.

Example:
{
  "agents": {
    "defaults": {
      "subturn": {
        "max_depth": 5,
        "max_concurrent": 10,
        "default_timeout_minutes": 10
      }
    }
  }
}
This commit is contained in:
Administrator
2026-03-19 13:08:46 +08:00
parent 99b189d3fb
commit ce311be70b
5 changed files with 115 additions and 53 deletions
+1 -1
View File
@@ -1022,7 +1022,7 @@ func (al *AgentLoop) runAgentLoop(
session: agent.Sessions,
initialHistoryLength: len(agent.Sessions.GetHistory("")), // Snapshot for rollback on hard abort
pendingResults: make(chan *tools.ToolResult, 16),
concurrencySem: make(chan struct{}, maxConcurrentSubTurns), // maxConcurrentSubTurns
concurrencySem: make(chan struct{}, al.getSubTurnConfig().maxConcurrent), // maxConcurrentSubTurns
}
ctx = withTurnState(ctx, rootTS)
ctx = WithAgentLoop(ctx, al) // Inject AgentLoop for tool access
+61 -14
View File
@@ -16,17 +16,14 @@ import (
// ====================== Config & Constants ======================
const (
maxSubTurnDepth = 3
maxConcurrentSubTurns = 5
// concurrencyTimeout is the maximum time to wait for a concurrency slot.
// This prevents indefinite blocking when all slots are occupied by slow sub-turns.
concurrencyTimeout = 30 * time.Second
// Default values for SubTurn configuration (used when config is not set or is zero)
defaultMaxSubTurnDepth = 3
defaultMaxConcurrentSubTurns = 5
defaultConcurrencyTimeout = 30 * time.Second
defaultSubTurnTimeout = 5 * time.Minute
// maxEphemeralHistorySize limits the number of messages stored in ephemeral sessions.
// This prevents memory accumulation in long-running sub-turns.
maxEphemeralHistorySize = 50
// defaultSubTurnTimeout is the default maximum duration for a SubTurn.
// SubTurns that run longer than this will be cancelled.
defaultSubTurnTimeout = 5 * time.Minute
)
var (
@@ -35,6 +32,48 @@ var (
ErrConcurrencyTimeout = errors.New("timeout waiting for concurrency slot")
)
// getSubTurnConfig returns the effective SubTurn configuration with defaults applied.
func (al *AgentLoop) getSubTurnConfig() subTurnRuntimeConfig {
cfg := al.cfg.Agents.Defaults.SubTurn
maxDepth := cfg.MaxDepth
if maxDepth <= 0 {
maxDepth = defaultMaxSubTurnDepth
}
maxConcurrent := cfg.MaxConcurrent
if maxConcurrent <= 0 {
maxConcurrent = defaultMaxConcurrentSubTurns
}
concurrencyTimeout := time.Duration(cfg.ConcurrencyTimeoutSec) * time.Second
if concurrencyTimeout <= 0 {
concurrencyTimeout = defaultConcurrencyTimeout
}
defaultTimeout := time.Duration(cfg.DefaultTimeoutMinutes) * time.Minute
if defaultTimeout <= 0 {
defaultTimeout = defaultSubTurnTimeout
}
return subTurnRuntimeConfig{
maxDepth: maxDepth,
maxConcurrent: maxConcurrent,
concurrencyTimeout: concurrencyTimeout,
defaultTimeout: defaultTimeout,
defaultTokenBudget: cfg.DefaultTokenBudget,
}
}
// subTurnRuntimeConfig holds the effective runtime configuration for SubTurn execution.
type subTurnRuntimeConfig struct {
maxDepth int
maxConcurrent int
concurrencyTimeout time.Duration
defaultTimeout time.Duration
defaultTokenBudget int
}
// ====================== SubTurn Config ======================
// SubTurnConfig configures the execution of a child sub-turn.
@@ -239,13 +278,16 @@ func SpawnSubTurn(ctx context.Context, cfg SubTurnConfig) (*tools.ToolResult, er
}
func spawnSubTurn(ctx context.Context, al *AgentLoop, parentTS *turnState, cfg SubTurnConfig) (result *tools.ToolResult, err error) {
// Get effective SubTurn configuration
rtCfg := al.getSubTurnConfig()
// 0. Acquire concurrency semaphore FIRST to ensure it's released even if early validation fails.
// Blocks if parent already has maxConcurrentSubTurns running, with a timeout to prevent indefinite blocking.
// Also respects context cancellation so we don't block forever if parent is aborted.
var semAcquired bool
if parentTS.concurrencySem != nil {
// Create a timeout context for semaphore acquisition
timeoutCtx, cancel := context.WithTimeout(ctx, concurrencyTimeout)
timeoutCtx, cancel := context.WithTimeout(ctx, rtCfg.concurrencyTimeout)
defer cancel()
select {
@@ -263,16 +305,16 @@ func spawnSubTurn(ctx context.Context, al *AgentLoop, parentTS *turnState, cfg S
}
// Otherwise it's our timeout
return nil, fmt.Errorf("%w: all %d slots occupied for %v",
ErrConcurrencyTimeout, maxConcurrentSubTurns, concurrencyTimeout)
ErrConcurrencyTimeout, rtCfg.maxConcurrent, rtCfg.concurrencyTimeout)
}
}
// 1. Depth limit check
if parentTS.depth >= maxSubTurnDepth {
if parentTS.depth >= rtCfg.maxDepth {
logger.WarnCF("subturn", "Depth limit exceeded", map[string]any{
"parent_id": parentTS.turnID,
"depth": parentTS.depth,
"max_depth": maxSubTurnDepth,
"max_depth": rtCfg.maxDepth,
})
return nil, ErrDepthLimitExceeded
}
@@ -285,7 +327,7 @@ func spawnSubTurn(ctx context.Context, al *AgentLoop, parentTS *turnState, cfg S
// 3. Determine timeout for child SubTurn
timeout := cfg.Timeout
if timeout <= 0 {
timeout = defaultSubTurnTimeout
timeout = rtCfg.defaultTimeout
}
// 4. Create INDEPENDENT child context (not derived from parent ctx).
@@ -295,7 +337,7 @@ func spawnSubTurn(ctx context.Context, al *AgentLoop, parentTS *turnState, cfg S
defer cancel()
childID := al.generateSubTurnID()
childTS := newTurnState(childCtx, childID, parentTS)
childTS := newTurnState(childCtx, childID, parentTS, rtCfg.maxConcurrent)
// Set the cancel function so Finish(true) can trigger hard cancellation
childTS.cancelFunc = cancel
childTS.critical = cfg.Critical
@@ -307,6 +349,11 @@ func spawnSubTurn(ctx context.Context, al *AgentLoop, parentTS *turnState, cfg S
childTS.tokenBudget = cfg.InitialTokenBudget
} else if parentTS.tokenBudget != nil {
childTS.tokenBudget = parentTS.tokenBudget
} else if rtCfg.defaultTokenBudget > 0 {
// Apply default token budget from config if no budget is set
budget := &atomic.Int64{}
budget.Store(int64(rtCfg.defaultTokenBudget))
childTS.tokenBudget = budget
}
// IMPORTANT: Put childTS into childCtx so that code inside runTurn can retrieve it
+24 -19
View File
@@ -15,6 +15,11 @@ import (
"github.com/sipeed/picoclaw/pkg/tools"
)
// Test constants (use defaults from subturn.go)
const (
testMaxConcurrentSubTurns = defaultMaxConcurrentSubTurns
)
// ====================== Test Helper: Event Collector ======================
type eventCollector struct {
events []any
@@ -918,7 +923,7 @@ func TestGetActiveTurn(t *testing.T) {
childTurnIDs: []string{},
session: newEphemeralSession(nil),
pendingResults: make(chan *tools.ToolResult, 16),
concurrencySem: make(chan struct{}, maxConcurrentSubTurns),
concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns),
}
sessionKey := "test-session"
@@ -975,7 +980,7 @@ func TestGetActiveTurn_WithChildren(t *testing.T) {
childTurnIDs: []string{"child-1", "child-2"},
session: newEphemeralSession(nil),
pendingResults: make(chan *tools.ToolResult, 16),
concurrencySem: make(chan struct{}, maxConcurrentSubTurns),
concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns),
}
sessionKey := "test-session-with-children"
@@ -1007,7 +1012,7 @@ func TestTurnStateInfo_ThreadSafety(t *testing.T) {
childTurnIDs: []string{},
session: newEphemeralSession(nil),
pendingResults: make(chan *tools.ToolResult, 16),
concurrencySem: make(chan struct{}, maxConcurrentSubTurns),
concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns),
}
// Concurrently read Info() and modify childTurnIDs
@@ -1120,7 +1125,7 @@ func TestInterruptHard_Alias(t *testing.T) {
session: newEphemeralSession(nil),
initialHistoryLength: 0,
pendingResults: make(chan *tools.ToolResult, 16),
concurrencySem: make(chan struct{}, maxConcurrentSubTurns),
concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns),
}
sessionKey := "test-session-interrupt"
@@ -1148,7 +1153,7 @@ func TestFinish_ConcurrentCalls(t *testing.T) {
turnID: "parent-concurrent-finish",
depth: 0,
pendingResults: make(chan *tools.ToolResult, 16),
concurrencySem: make(chan struct{}, maxConcurrentSubTurns),
concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns),
}
parentTS.ctx, parentTS.cancelFunc = context.WithCancel(ctx)
@@ -1214,7 +1219,7 @@ func TestDeliverSubTurnResult_RaceWithFinish(t *testing.T) {
turnID: "parent-race-test",
depth: 0,
pendingResults: make(chan *tools.ToolResult, 16),
concurrencySem: make(chan struct{}, maxConcurrentSubTurns),
concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns),
}
parentTS.ctx, parentTS.cancelFunc = context.WithCancel(ctx)
@@ -1296,13 +1301,13 @@ func TestConcurrencySemaphore_Timeout(t *testing.T) {
depth: 0,
session: newEphemeralSession(nil),
pendingResults: make(chan *tools.ToolResult, 16),
concurrencySem: make(chan struct{}, maxConcurrentSubTurns),
concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns),
}
parentTS.ctx, parentTS.cancelFunc = context.WithCancel(ctx)
defer parentTS.Finish(false)
// Fill all concurrency slots
for i := 0; i < maxConcurrentSubTurns; i++ {
for i := 0; i < testMaxConcurrentSubTurns; i++ {
parentTS.concurrencySem <- struct{}{}
}
@@ -1339,7 +1344,7 @@ func TestConcurrencySemaphore_Timeout(t *testing.T) {
t.Logf("Timeout occurred after %v with error: %v", elapsed, err)
// Clean up - drain the semaphore
for i := 0; i < maxConcurrentSubTurns; i++ {
for i := 0; i < testMaxConcurrentSubTurns; i++ {
<-parentTS.concurrencySem
}
}
@@ -1396,7 +1401,7 @@ func TestContextWrapping_SingleLayer(t *testing.T) {
depth: 0,
session: newEphemeralSession(nil),
pendingResults: make(chan *tools.ToolResult, 16),
concurrencySem: make(chan struct{}, maxConcurrentSubTurns),
concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns),
}
parentTS.ctx, parentTS.cancelFunc = context.WithCancel(ctx)
defer parentTS.Finish(false)
@@ -1442,7 +1447,7 @@ func TestSyncSubTurn_NoChannelDelivery(t *testing.T) {
depth: 0,
session: newEphemeralSession(nil),
pendingResults: make(chan *tools.ToolResult, 16),
concurrencySem: make(chan struct{}, maxConcurrentSubTurns),
concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns),
}
parentTS.ctx, parentTS.cancelFunc = context.WithCancel(ctx)
defer parentTS.Finish(false)
@@ -1499,7 +1504,7 @@ func TestAsyncSubTurn_ChannelDelivery(t *testing.T) {
depth: 0,
session: newEphemeralSession(nil),
pendingResults: make(chan *tools.ToolResult, 16),
concurrencySem: make(chan struct{}, maxConcurrentSubTurns),
concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns),
}
parentTS.ctx, parentTS.cancelFunc = context.WithCancel(ctx)
defer parentTS.Finish(false)
@@ -1543,7 +1548,7 @@ func TestGrandchildAbort_CascadingCancellation(t *testing.T) {
depth: 0,
session: newEphemeralSession(nil),
pendingResults: make(chan *tools.ToolResult, 16),
concurrencySem: make(chan struct{}, maxConcurrentSubTurns),
concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns),
}
grandparentTS.ctx, grandparentTS.cancelFunc = context.WithCancel(ctx)
@@ -1557,7 +1562,7 @@ func TestGrandchildAbort_CascadingCancellation(t *testing.T) {
depth: 1,
session: newEphemeralSession(nil),
pendingResults: make(chan *tools.ToolResult, 16),
concurrencySem: make(chan struct{}, maxConcurrentSubTurns),
concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns),
}
parentTS.cancelFunc = parentCancel
@@ -1571,7 +1576,7 @@ func TestGrandchildAbort_CascadingCancellation(t *testing.T) {
depth: 2,
session: newEphemeralSession(nil),
pendingResults: make(chan *tools.ToolResult, 16),
concurrencySem: make(chan struct{}, maxConcurrentSubTurns),
concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns),
}
childTS.cancelFunc = childCancel
@@ -1642,7 +1647,7 @@ func TestSpawnDuringAbort_RaceCondition(t *testing.T) {
depth: 0,
session: newEphemeralSession(nil),
pendingResults: make(chan *tools.ToolResult, 16),
concurrencySem: make(chan struct{}, maxConcurrentSubTurns),
concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns),
}
parentTS.ctx, parentTS.cancelFunc = context.WithCancel(ctx)
@@ -1755,7 +1760,7 @@ func TestAsyncSubTurn_ParentFinishesEarly(t *testing.T) {
depth: 0,
session: newEphemeralSession(nil),
pendingResults: make(chan *tools.ToolResult, 16),
concurrencySem: make(chan struct{}, maxConcurrentSubTurns),
concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns),
}
parentTS.ctx, parentTS.cancelFunc = context.WithCancel(ctx)
@@ -1828,7 +1833,7 @@ func TestAsyncSubTurn_ParentWaitsForChild(t *testing.T) {
depth: 0,
session: newEphemeralSession(nil),
pendingResults: make(chan *tools.ToolResult, 16),
concurrencySem: make(chan struct{}, maxConcurrentSubTurns),
concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns),
}
parentTS.ctx, parentTS.cancelFunc = context.WithCancel(ctx)
@@ -1995,7 +2000,7 @@ func TestSubTurn_IndependentContext(t *testing.T) {
depth: 0,
session: newEphemeralSession(nil),
pendingResults: make(chan *tools.ToolResult, 16),
concurrencySem: make(chan struct{}, maxConcurrentSubTurns),
concurrencySem: make(chan struct{}, testMaxConcurrentSubTurns),
}
parentTS.ctx, parentTS.cancelFunc = context.WithCancel(ctx)
+2 -2
View File
@@ -199,7 +199,7 @@ func (al *AgentLoop) FormatTree(turnInfo *TurnInfo, prefix string, isLast bool)
// ====================== Helper Functions ======================
func newTurnState(ctx context.Context, id string, parent *turnState) *turnState {
func newTurnState(ctx context.Context, id string, parent *turnState, maxConcurrent int) *turnState {
// Note: We don't create a new context with cancel here because the caller
// (spawnSubTurn) already creates one. The turnState stores the context and
// cancelFunc provided by the caller to avoid redundant context wrapping.
@@ -216,7 +216,7 @@ func newTurnState(ctx context.Context, id string, parent *turnState) *turnState
// intermediate results to be discarded in deliverSubTurnResult.
// For production, consider an unbounded queue or a blocking strategy with backpressure.
pendingResults: make(chan *tools.ToolResult, 16),
concurrencySem: make(chan struct{}, maxConcurrentSubTurns),
concurrencySem: make(chan struct{}, maxConcurrent),
}
}
+27 -17
View File
@@ -219,24 +219,34 @@ type RoutingConfig struct {
Threshold float64 `json:"threshold"` // complexity score in [0,1]; score >= threshold → primary model
}
// SubTurnConfig configures the SubTurn execution system.
type SubTurnConfig struct {
MaxDepth int `json:"max_depth" env:"PICOCLAW_AGENTS_DEFAULTS_SUBTURN_MAX_DEPTH"`
MaxConcurrent int `json:"max_concurrent" env:"PICOCLAW_AGENTS_DEFAULTS_SUBTURN_MAX_CONCURRENT"`
DefaultTimeoutMinutes int `json:"default_timeout_minutes" env:"PICOCLAW_AGENTS_DEFAULTS_SUBTURN_DEFAULT_TIMEOUT_MINUTES"`
DefaultTokenBudget int `json:"default_token_budget" env:"PICOCLAW_AGENTS_DEFAULTS_SUBTURN_DEFAULT_TOKEN_BUDGET"`
ConcurrencyTimeoutSec int `json:"concurrency_timeout_sec" env:"PICOCLAW_AGENTS_DEFAULTS_SUBTURN_CONCURRENCY_TIMEOUT_SEC"`
}
type AgentDefaults struct {
Workspace string `json:"workspace" env:"PICOCLAW_AGENTS_DEFAULTS_WORKSPACE"`
RestrictToWorkspace bool `json:"restrict_to_workspace" env:"PICOCLAW_AGENTS_DEFAULTS_RESTRICT_TO_WORKSPACE"`
AllowReadOutsideWorkspace bool `json:"allow_read_outside_workspace" env:"PICOCLAW_AGENTS_DEFAULTS_ALLOW_READ_OUTSIDE_WORKSPACE"`
Provider string `json:"provider" env:"PICOCLAW_AGENTS_DEFAULTS_PROVIDER"`
ModelName string `json:"model_name" env:"PICOCLAW_AGENTS_DEFAULTS_MODEL_NAME"`
Model string `json:"model,omitempty" env:"PICOCLAW_AGENTS_DEFAULTS_MODEL"` // Deprecated: use model_name instead
ModelFallbacks []string `json:"model_fallbacks,omitempty"`
ImageModel string `json:"image_model,omitempty" env:"PICOCLAW_AGENTS_DEFAULTS_IMAGE_MODEL"`
ImageModelFallbacks []string `json:"image_model_fallbacks,omitempty"`
MaxTokens int `json:"max_tokens" env:"PICOCLAW_AGENTS_DEFAULTS_MAX_TOKENS"`
Temperature *float64 `json:"temperature,omitempty" env:"PICOCLAW_AGENTS_DEFAULTS_TEMPERATURE"`
MaxToolIterations int `json:"max_tool_iterations" env:"PICOCLAW_AGENTS_DEFAULTS_MAX_TOOL_ITERATIONS"`
SummarizeMessageThreshold int `json:"summarize_message_threshold" env:"PICOCLAW_AGENTS_DEFAULTS_SUMMARIZE_MESSAGE_THRESHOLD"`
SummarizeTokenPercent int `json:"summarize_token_percent" env:"PICOCLAW_AGENTS_DEFAULTS_SUMMARIZE_TOKEN_PERCENT"`
MaxMediaSize int `json:"max_media_size,omitempty" env:"PICOCLAW_AGENTS_DEFAULTS_MAX_MEDIA_SIZE"`
Routing *RoutingConfig `json:"routing,omitempty"`
SteeringMode string `json:"steering_mode,omitempty" env:"PICOCLAW_AGENTS_DEFAULTS_STEERING_MODE"` // "one-at-a-time" (default) or "all"
Workspace string `json:"workspace" env:"PICOCLAW_AGENTS_DEFAULTS_WORKSPACE"`
RestrictToWorkspace bool `json:"restrict_to_workspace" env:"PICOCLAW_AGENTS_DEFAULTS_RESTRICT_TO_WORKSPACE"`
AllowReadOutsideWorkspace bool `json:"allow_read_outside_workspace" env:"PICOCLAW_AGENTS_DEFAULTS_ALLOW_READ_OUTSIDE_WORKSPACE"`
Provider string `json:"provider" env:"PICOCLAW_AGENTS_DEFAULTS_PROVIDER"`
ModelName string `json:"model_name" env:"PICOCLAW_AGENTS_DEFAULTS_MODEL_NAME"`
Model string `json:"model,omitempty" env:"PICOCLAW_AGENTS_DEFAULTS_MODEL"` // Deprecated: use model_name instead
ModelFallbacks []string `json:"model_fallbacks,omitempty"`
ImageModel string `json:"image_model,omitempty" env:"PICOCLAW_AGENTS_DEFAULTS_IMAGE_MODEL"`
ImageModelFallbacks []string `json:"image_model_fallbacks,omitempty"`
MaxTokens int `json:"max_tokens" env:"PICOCLAW_AGENTS_DEFAULTS_MAX_TOKENS"`
Temperature *float64 `json:"temperature,omitempty" env:"PICOCLAW_AGENTS_DEFAULTS_TEMPERATURE"`
MaxToolIterations int `json:"max_tool_iterations" env:"PICOCLAW_AGENTS_DEFAULTS_MAX_TOOL_ITERATIONS"`
SummarizeMessageThreshold int `json:"summarize_message_threshold" env:"PICOCLAW_AGENTS_DEFAULTS_SUMMARIZE_MESSAGE_THRESHOLD"`
SummarizeTokenPercent int `json:"summarize_token_percent" env:"PICOCLAW_AGENTS_DEFAULTS_SUMMARIZE_TOKEN_PERCENT"`
MaxMediaSize int `json:"max_media_size,omitempty" env:"PICOCLAW_AGENTS_DEFAULTS_MAX_MEDIA_SIZE"`
Routing *RoutingConfig `json:"routing,omitempty"`
SteeringMode string `json:"steering_mode,omitempty" env:"PICOCLAW_AGENTS_DEFAULTS_STEERING_MODE"` // "one-at-a-time" (default) or "all"
SubTurn SubTurnConfig `json:"subturn" envPrefix:"PICOCLAW_AGENTS_DEFAULTS_SUBTURN_"`
}
const DefaultMaxMediaSize = 20 * 1024 * 1024 // 20 MB