mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
fix(agent): prevent double result delivery and panic bypass in SubTurn
- Fix synchronous SubTurn calls placing results in pendingResults channel, causing double delivery. Now only async calls (Async=true) use the channel. - Move deliverSubTurnResult into defer to ensure result delivery even when runTurn panics. Add TestSpawnSubTurn_PanicRecovery to verify. - Fix ContextWindow incorrectly set to MaxTokens; now inherits from parentAgent.ContextWindow. - Add TestSpawnSubTurn_ResultDeliverySync to verify sync behavior.
This commit is contained in:
+19
-6
@@ -29,6 +29,10 @@ type SubTurnConfig struct {
|
||||
Tools []tools.Tool
|
||||
SystemPrompt string
|
||||
MaxTokens int
|
||||
// Async indicates whether this is an async SubTurn call.
|
||||
// If true, the result will be delivered via pendingResults channel.
|
||||
// If false (synchronous), the result is only returned directly to avoid double delivery.
|
||||
Async bool
|
||||
// Can be extended with temperature, topP, etc.
|
||||
}
|
||||
|
||||
@@ -234,6 +238,9 @@ func spawnSubTurn(ctx context.Context, al *AgentLoop, parentTS *turnState, cfg S
|
||||
childID := al.generateSubTurnID()
|
||||
childTS := newTurnState(childCtx, childID, parentTS)
|
||||
|
||||
// IMPORTANT: Put childTS into childCtx so that code inside runTurn can retrieve it
|
||||
childCtx = withTurnState(childCtx, childTS)
|
||||
|
||||
// 4. Establish parent-child relationship (thread-safe)
|
||||
parentTS.mu.Lock()
|
||||
parentTS.childTurnIDs = append(parentTS.childTurnIDs, childID)
|
||||
@@ -246,12 +253,22 @@ func spawnSubTurn(ctx context.Context, al *AgentLoop, parentTS *turnState, cfg S
|
||||
Config: cfg,
|
||||
})
|
||||
|
||||
// 6. Defer emitting End event, and recover from panics to ensure it's always fired
|
||||
// 6. Defer cleanup: deliver result (for async), emit End event, and recover from panics
|
||||
// IMPORTANT: deliverSubTurnResult must be in defer to ensure it runs even if runTurn panics.
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
err = fmt.Errorf("subturn panicked: %v", r)
|
||||
}
|
||||
|
||||
// 8. Deliver result back to parent Turn (only for async calls)
|
||||
// For synchronous calls (Async=false), the result is returned directly to avoid double delivery.
|
||||
// For async calls (Async=true), the result is delivered via pendingResults channel
|
||||
// so the parent turn can process it in a later iteration.
|
||||
// This must be in defer to ensure delivery even if runTurn panics.
|
||||
if cfg.Async {
|
||||
deliverSubTurnResult(parentTS, childID, result)
|
||||
}
|
||||
|
||||
MockEventBus.Emit(SubTurnEndEvent{
|
||||
ChildID: childID,
|
||||
Result: result,
|
||||
@@ -263,9 +280,6 @@ func spawnSubTurn(ctx context.Context, al *AgentLoop, parentTS *turnState, cfg S
|
||||
// Build a child AgentInstance from SubTurnConfig, inheriting defaults from the parent agent.
|
||||
result, err = runTurn(childCtx, al, childTS, cfg)
|
||||
|
||||
// 8. Deliver result back to parent Turn
|
||||
deliverSubTurnResult(parentTS, childID, result)
|
||||
|
||||
return result, err
|
||||
}
|
||||
|
||||
@@ -346,7 +360,7 @@ func runTurn(ctx context.Context, al *AgentLoop, ts *turnState, cfg SubTurnConfi
|
||||
MaxTokens: cfg.MaxTokens,
|
||||
Temperature: parentAgent.Temperature,
|
||||
ThinkingLevel: parentAgent.ThinkingLevel,
|
||||
ContextWindow: cfg.MaxTokens,
|
||||
ContextWindow: parentAgent.ContextWindow, // Inherit from parent agent
|
||||
SummarizeMessageThreshold: parentAgent.SummarizeMessageThreshold,
|
||||
SummarizeTokenPercent: parentAgent.SummarizeTokenPercent,
|
||||
Provider: parentAgent.Provider,
|
||||
@@ -357,7 +371,6 @@ func runTurn(ctx context.Context, al *AgentLoop, ts *turnState, cfg SubTurnConfi
|
||||
}
|
||||
if childAgent.MaxTokens == 0 {
|
||||
childAgent.MaxTokens = parentAgent.MaxTokens
|
||||
childAgent.ContextWindow = parentAgent.ContextWindow
|
||||
}
|
||||
|
||||
finalContent, err := al.runAgentLoop(ctx, childAgent, processOptions{
|
||||
|
||||
+121
-10
@@ -8,6 +8,8 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/sipeed/picoclaw/pkg/bus"
|
||||
"github.com/sipeed/picoclaw/pkg/config"
|
||||
"github.com/sipeed/picoclaw/pkg/providers"
|
||||
"github.com/sipeed/picoclaw/pkg/tools"
|
||||
)
|
||||
@@ -158,12 +160,9 @@ func TestSpawnSubTurn(t *testing.T) {
|
||||
t.Error("child Turn not added to parent.childTurnIDs")
|
||||
}
|
||||
|
||||
// Verify result delivery (pendingResults or history)
|
||||
if len(parent.pendingResults) > 0 || len(parent.session.GetHistory("")) > 0 {
|
||||
// Result delivered via at least one path
|
||||
} else {
|
||||
t.Error("child result not delivered")
|
||||
}
|
||||
// For synchronous calls (Async=false, the default), result is returned directly
|
||||
// and should NOT be in pendingResults. The result was already verified above.
|
||||
// Only async calls (Async=true) would place results in pendingResults.
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -196,7 +195,7 @@ func TestSpawnSubTurn_EphemeralSessionIsolation(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// ====================== Extra Independent Test: Result Delivery Path ======================
|
||||
// ====================== Extra Independent Test: Result Delivery Path (Async) ======================
|
||||
func TestSpawnSubTurn_ResultDelivery(t *testing.T) {
|
||||
al, _, _, _, cleanup := newTestAgentLoop(t)
|
||||
defer cleanup()
|
||||
@@ -209,18 +208,54 @@ func TestSpawnSubTurn_ResultDelivery(t *testing.T) {
|
||||
session: &ephemeralSessionStore{},
|
||||
}
|
||||
|
||||
cfg := SubTurnConfig{Model: "gpt-4o-mini", Tools: []tools.Tool{}}
|
||||
// Set Async=true to test async result delivery via pendingResults channel
|
||||
cfg := SubTurnConfig{Model: "gpt-4o-mini", Tools: []tools.Tool{}, Async: true}
|
||||
|
||||
_, _ = spawnSubTurn(context.Background(), al, parent, cfg)
|
||||
|
||||
// Check if pendingResults received the result
|
||||
// Check if pendingResults received the result (only for async calls)
|
||||
select {
|
||||
case res := <-parent.pendingResults:
|
||||
if res == nil {
|
||||
t.Error("received nil result in pendingResults")
|
||||
}
|
||||
default:
|
||||
t.Error("result did not enter pendingResults")
|
||||
t.Error("result did not enter pendingResults for async call")
|
||||
}
|
||||
}
|
||||
|
||||
// ====================== Extra Independent Test: Result Delivery Path (Sync) ======================
|
||||
func TestSpawnSubTurn_ResultDeliverySync(t *testing.T) {
|
||||
al, _, _, _, cleanup := newTestAgentLoop(t)
|
||||
defer cleanup()
|
||||
|
||||
parent := &turnState{
|
||||
ctx: context.Background(),
|
||||
turnID: "parent-sync-1",
|
||||
depth: 0,
|
||||
pendingResults: make(chan *tools.ToolResult, 1),
|
||||
session: &ephemeralSessionStore{},
|
||||
}
|
||||
|
||||
// Sync call (Async=false, the default) - result should be returned directly
|
||||
cfg := SubTurnConfig{Model: "gpt-4o-mini", Tools: []tools.Tool{}, Async: false}
|
||||
|
||||
result, err := spawnSubTurn(context.Background(), al, parent, cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Result should be returned directly
|
||||
if result == nil {
|
||||
t.Error("expected non-nil result from sync call")
|
||||
}
|
||||
|
||||
// pendingResults should NOT contain the result (no double delivery)
|
||||
select {
|
||||
case <-parent.pendingResults:
|
||||
t.Error("sync call should not place result in pendingResults (double delivery)")
|
||||
default:
|
||||
// Expected - channel should be empty
|
||||
}
|
||||
}
|
||||
|
||||
@@ -752,3 +787,79 @@ func TestFinalPollCapturesLateResults(t *testing.T) {
|
||||
t.Errorf("expected 0 results on second poll, got %d", len(results))
|
||||
}
|
||||
}
|
||||
|
||||
// TestSpawnSubTurn_PanicRecovery verifies that even if runTurn panics,
|
||||
// the result is still delivered for async calls and SubTurnEndEvent is emitted.
|
||||
func TestSpawnSubTurn_PanicRecovery(t *testing.T) {
|
||||
// Create a panic provider
|
||||
panicProvider := &panicMockProvider{}
|
||||
cfg := &config.Config{
|
||||
Agents: config.AgentsConfig{
|
||||
Defaults: config.AgentDefaults{
|
||||
Workspace: t.TempDir(),
|
||||
Model: "test-model",
|
||||
MaxTokens: 4096,
|
||||
MaxToolIterations: 10,
|
||||
},
|
||||
},
|
||||
}
|
||||
al := NewAgentLoop(cfg, bus.NewMessageBus(), panicProvider)
|
||||
|
||||
parent := &turnState{
|
||||
ctx: context.Background(),
|
||||
turnID: "parent-panic",
|
||||
depth: 0,
|
||||
pendingResults: make(chan *tools.ToolResult, 1),
|
||||
session: &ephemeralSessionStore{},
|
||||
}
|
||||
|
||||
collector := &eventCollector{}
|
||||
originalEmit := MockEventBus.Emit
|
||||
MockEventBus.Emit = collector.collect
|
||||
defer func() { MockEventBus.Emit = originalEmit }()
|
||||
|
||||
// Test async call - result should still be delivered via channel
|
||||
asyncCfg := SubTurnConfig{Model: "gpt-4o-mini", Tools: []tools.Tool{}, Async: true}
|
||||
result, err := spawnSubTurn(context.Background(), al, parent, asyncCfg)
|
||||
|
||||
// Should return error from panic recovery
|
||||
if err == nil {
|
||||
t.Error("expected error from panic recovery")
|
||||
}
|
||||
|
||||
// Result should be nil because panic occurred before runTurn could return
|
||||
if result != nil {
|
||||
t.Error("expected nil result after panic")
|
||||
}
|
||||
|
||||
// SubTurnEndEvent should still be emitted
|
||||
if !collector.hasEventOfType(SubTurnEndEvent{}) {
|
||||
t.Error("SubTurnEndEvent not emitted after panic")
|
||||
}
|
||||
|
||||
// For async call, result should still be delivered to channel (even if nil)
|
||||
select {
|
||||
case res := <-parent.pendingResults:
|
||||
// Result was delivered (nil due to panic)
|
||||
_ = res
|
||||
default:
|
||||
t.Error("async result should be delivered to channel even after panic")
|
||||
}
|
||||
}
|
||||
|
||||
// panicMockProvider is a mock provider that always panics
|
||||
type panicMockProvider struct{}
|
||||
|
||||
func (m *panicMockProvider) Chat(
|
||||
ctx context.Context,
|
||||
messages []providers.Message,
|
||||
tools []providers.ToolDefinition,
|
||||
model string,
|
||||
opts map[string]any,
|
||||
) (*providers.LLMResponse, error) {
|
||||
panic("intentional panic for testing")
|
||||
}
|
||||
|
||||
func (m *panicMockProvider) GetDefaultModel() string {
|
||||
return "panic-model"
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user