From 672d11c7d4939976e0741575069cd7cabf5e73f9 Mon Sep 17 00:00:00 2001 From: Administrator <1280842908@qq.com> Date: Mon, 16 Mar 2026 23:48:51 +0800 Subject: [PATCH] fix(agent): prevent double result delivery and panic bypass in SubTurn - Fix synchronous SubTurn calls placing results in pendingResults channel, causing double delivery. Now only async calls (Async=true) use the channel. - Move deliverSubTurnResult into defer to ensure result delivery even when runTurn panics. Add TestSpawnSubTurn_PanicRecovery to verify. - Fix ContextWindow incorrectly set to MaxTokens; now inherits from parentAgent.ContextWindow. - Add TestSpawnSubTurn_ResultDeliverySync to verify sync behavior. --- pkg/agent/subturn.go | 25 ++++++-- pkg/agent/subturn_test.go | 131 +++++++++++++++++++++++++++++++++++--- 2 files changed, 140 insertions(+), 16 deletions(-) diff --git a/pkg/agent/subturn.go b/pkg/agent/subturn.go index 10543bfad..3589a3c7d 100644 --- a/pkg/agent/subturn.go +++ b/pkg/agent/subturn.go @@ -29,6 +29,10 @@ type SubTurnConfig struct { Tools []tools.Tool SystemPrompt string MaxTokens int + // Async indicates whether this is an async SubTurn call. + // If true, the result will be delivered via pendingResults channel. + // If false (synchronous), the result is only returned directly to avoid double delivery. + Async bool // Can be extended with temperature, topP, etc. } @@ -234,6 +238,9 @@ func spawnSubTurn(ctx context.Context, al *AgentLoop, parentTS *turnState, cfg S childID := al.generateSubTurnID() childTS := newTurnState(childCtx, childID, parentTS) + // IMPORTANT: Put childTS into childCtx so that code inside runTurn can retrieve it + childCtx = withTurnState(childCtx, childTS) + // 4. Establish parent-child relationship (thread-safe) parentTS.mu.Lock() parentTS.childTurnIDs = append(parentTS.childTurnIDs, childID) @@ -246,12 +253,22 @@ func spawnSubTurn(ctx context.Context, al *AgentLoop, parentTS *turnState, cfg S Config: cfg, }) - // 6. Defer emitting End event, and recover from panics to ensure it's always fired + // 6. Defer cleanup: deliver result (for async), emit End event, and recover from panics + // IMPORTANT: deliverSubTurnResult must be in defer to ensure it runs even if runTurn panics. defer func() { if r := recover(); r != nil { err = fmt.Errorf("subturn panicked: %v", r) } + // 8. Deliver result back to parent Turn (only for async calls) + // For synchronous calls (Async=false), the result is returned directly to avoid double delivery. + // For async calls (Async=true), the result is delivered via pendingResults channel + // so the parent turn can process it in a later iteration. + // This must be in defer to ensure delivery even if runTurn panics. + if cfg.Async { + deliverSubTurnResult(parentTS, childID, result) + } + MockEventBus.Emit(SubTurnEndEvent{ ChildID: childID, Result: result, @@ -263,9 +280,6 @@ func spawnSubTurn(ctx context.Context, al *AgentLoop, parentTS *turnState, cfg S // Build a child AgentInstance from SubTurnConfig, inheriting defaults from the parent agent. result, err = runTurn(childCtx, al, childTS, cfg) - // 8. Deliver result back to parent Turn - deliverSubTurnResult(parentTS, childID, result) - return result, err } @@ -346,7 +360,7 @@ func runTurn(ctx context.Context, al *AgentLoop, ts *turnState, cfg SubTurnConfi MaxTokens: cfg.MaxTokens, Temperature: parentAgent.Temperature, ThinkingLevel: parentAgent.ThinkingLevel, - ContextWindow: cfg.MaxTokens, + ContextWindow: parentAgent.ContextWindow, // Inherit from parent agent SummarizeMessageThreshold: parentAgent.SummarizeMessageThreshold, SummarizeTokenPercent: parentAgent.SummarizeTokenPercent, Provider: parentAgent.Provider, @@ -357,7 +371,6 @@ func runTurn(ctx context.Context, al *AgentLoop, ts *turnState, cfg SubTurnConfi } if childAgent.MaxTokens == 0 { childAgent.MaxTokens = parentAgent.MaxTokens - childAgent.ContextWindow = parentAgent.ContextWindow } finalContent, err := al.runAgentLoop(ctx, childAgent, processOptions{ diff --git a/pkg/agent/subturn_test.go b/pkg/agent/subturn_test.go index d8214c116..32029960d 100644 --- a/pkg/agent/subturn_test.go +++ b/pkg/agent/subturn_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + "github.com/sipeed/picoclaw/pkg/bus" + "github.com/sipeed/picoclaw/pkg/config" "github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/tools" ) @@ -158,12 +160,9 @@ func TestSpawnSubTurn(t *testing.T) { t.Error("child Turn not added to parent.childTurnIDs") } - // Verify result delivery (pendingResults or history) - if len(parent.pendingResults) > 0 || len(parent.session.GetHistory("")) > 0 { - // Result delivered via at least one path - } else { - t.Error("child result not delivered") - } + // For synchronous calls (Async=false, the default), result is returned directly + // and should NOT be in pendingResults. The result was already verified above. + // Only async calls (Async=true) would place results in pendingResults. }) } } @@ -196,7 +195,7 @@ func TestSpawnSubTurn_EphemeralSessionIsolation(t *testing.T) { } } -// ====================== Extra Independent Test: Result Delivery Path ====================== +// ====================== Extra Independent Test: Result Delivery Path (Async) ====================== func TestSpawnSubTurn_ResultDelivery(t *testing.T) { al, _, _, _, cleanup := newTestAgentLoop(t) defer cleanup() @@ -209,18 +208,54 @@ func TestSpawnSubTurn_ResultDelivery(t *testing.T) { session: &ephemeralSessionStore{}, } - cfg := SubTurnConfig{Model: "gpt-4o-mini", Tools: []tools.Tool{}} + // Set Async=true to test async result delivery via pendingResults channel + cfg := SubTurnConfig{Model: "gpt-4o-mini", Tools: []tools.Tool{}, Async: true} _, _ = spawnSubTurn(context.Background(), al, parent, cfg) - // Check if pendingResults received the result + // Check if pendingResults received the result (only for async calls) select { case res := <-parent.pendingResults: if res == nil { t.Error("received nil result in pendingResults") } default: - t.Error("result did not enter pendingResults") + t.Error("result did not enter pendingResults for async call") + } +} + +// ====================== Extra Independent Test: Result Delivery Path (Sync) ====================== +func TestSpawnSubTurn_ResultDeliverySync(t *testing.T) { + al, _, _, _, cleanup := newTestAgentLoop(t) + defer cleanup() + + parent := &turnState{ + ctx: context.Background(), + turnID: "parent-sync-1", + depth: 0, + pendingResults: make(chan *tools.ToolResult, 1), + session: &ephemeralSessionStore{}, + } + + // Sync call (Async=false, the default) - result should be returned directly + cfg := SubTurnConfig{Model: "gpt-4o-mini", Tools: []tools.Tool{}, Async: false} + + result, err := spawnSubTurn(context.Background(), al, parent, cfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Result should be returned directly + if result == nil { + t.Error("expected non-nil result from sync call") + } + + // pendingResults should NOT contain the result (no double delivery) + select { + case <-parent.pendingResults: + t.Error("sync call should not place result in pendingResults (double delivery)") + default: + // Expected - channel should be empty } } @@ -752,3 +787,79 @@ func TestFinalPollCapturesLateResults(t *testing.T) { t.Errorf("expected 0 results on second poll, got %d", len(results)) } } + +// TestSpawnSubTurn_PanicRecovery verifies that even if runTurn panics, +// the result is still delivered for async calls and SubTurnEndEvent is emitted. +func TestSpawnSubTurn_PanicRecovery(t *testing.T) { + // Create a panic provider + panicProvider := &panicMockProvider{} + cfg := &config.Config{ + Agents: config.AgentsConfig{ + Defaults: config.AgentDefaults{ + Workspace: t.TempDir(), + Model: "test-model", + MaxTokens: 4096, + MaxToolIterations: 10, + }, + }, + } + al := NewAgentLoop(cfg, bus.NewMessageBus(), panicProvider) + + parent := &turnState{ + ctx: context.Background(), + turnID: "parent-panic", + depth: 0, + pendingResults: make(chan *tools.ToolResult, 1), + session: &ephemeralSessionStore{}, + } + + collector := &eventCollector{} + originalEmit := MockEventBus.Emit + MockEventBus.Emit = collector.collect + defer func() { MockEventBus.Emit = originalEmit }() + + // Test async call - result should still be delivered via channel + asyncCfg := SubTurnConfig{Model: "gpt-4o-mini", Tools: []tools.Tool{}, Async: true} + result, err := spawnSubTurn(context.Background(), al, parent, asyncCfg) + + // Should return error from panic recovery + if err == nil { + t.Error("expected error from panic recovery") + } + + // Result should be nil because panic occurred before runTurn could return + if result != nil { + t.Error("expected nil result after panic") + } + + // SubTurnEndEvent should still be emitted + if !collector.hasEventOfType(SubTurnEndEvent{}) { + t.Error("SubTurnEndEvent not emitted after panic") + } + + // For async call, result should still be delivered to channel (even if nil) + select { + case res := <-parent.pendingResults: + // Result was delivered (nil due to panic) + _ = res + default: + t.Error("async result should be delivered to channel even after panic") + } +} + +// panicMockProvider is a mock provider that always panics +type panicMockProvider struct{} + +func (m *panicMockProvider) Chat( + ctx context.Context, + messages []providers.Message, + tools []providers.ToolDefinition, + model string, + opts map[string]any, +) (*providers.LLMResponse, error) { + panic("intentional panic for testing") +} + +func (m *panicMockProvider) GetDefaultModel() string { + return "panic-model" +}