diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index b9fa1023a..994c6a59a 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -1043,6 +1043,20 @@ func (al *AgentLoop) runAgentLoop( return "", err } + // IMPORTANT: Before finishing the turn, do a final poll for any pending SubTurn results. + // This ensures we don't lose results that arrived after the last iteration poll. + if isRootTurn { + finalResults := al.dequeuePendingSubTurnResults(opts.SessionKey) + if len(finalResults) > 0 { + // Inject late-arriving results into the final response + for _, result := range finalResults { + if result != nil && result.ForLLM != "" { + finalContent += fmt.Sprintf("\n\n[SubTurn Result] %s", result.ForLLM) + } + } + } + } + // Signal completion to rootTS so it knows it is finished, terminating any active sub-turns. // Only call Finish() if this is a root turn (not a SubTurn recursively calling runAgentLoop). if isRootTurn { diff --git a/pkg/agent/subturn.go b/pkg/agent/subturn.go index 1d0239c4b..10543bfad 100644 --- a/pkg/agent/subturn.go +++ b/pkg/agent/subturn.go @@ -239,18 +239,14 @@ func spawnSubTurn(ctx context.Context, al *AgentLoop, parentTS *turnState, cfg S parentTS.childTurnIDs = append(parentTS.childTurnIDs, childID) parentTS.mu.Unlock() - // 5. Register the parent's pendingResults channel so the parent loop can poll it - al.registerSubTurnResultChannel(parentTS.turnID, parentTS.pendingResults) - defer al.unregisterSubTurnResultChannel(parentTS.turnID) - - // 6. Emit Spawn event (currently using Mock, will be replaced by real EventBus) + // 5. Emit Spawn event (currently using Mock, will be replaced by real EventBus) MockEventBus.Emit(SubTurnSpawnEvent{ ParentID: parentTS.turnID, ChildID: childID, Config: cfg, }) - // 7. Defer emitting End event, and recover from panics to ensure it's always fired + // 6. Defer emitting End event, and recover from panics to ensure it's always fired defer func() { if r := recover(); r != nil { err = fmt.Errorf("subturn panicked: %v", r) @@ -263,11 +259,11 @@ func spawnSubTurn(ctx context.Context, al *AgentLoop, parentTS *turnState, cfg S }) }() - // 8. Execute sub-turn via the real agent loop. + // 7. Execute sub-turn via the real agent loop. // Build a child AgentInstance from SubTurnConfig, inheriting defaults from the parent agent. result, err = runTurn(childCtx, al, childTS, cfg) - // 9. Deliver result back to parent Turn + // 8. Deliver result back to parent Turn deliverSubTurnResult(parentTS, childID, result) return result, err diff --git a/pkg/agent/subturn_test.go b/pkg/agent/subturn_test.go index ac085c28a..d8214c116 100644 --- a/pkg/agent/subturn_test.go +++ b/pkg/agent/subturn_test.go @@ -721,3 +721,34 @@ func TestFinishClosesChannel(t *testing.T) { // This should not panic - it should recover and emit OrphanResultEvent deliverSubTurnResult(ts, "child-1", result) } + +// TestFinalPollCapturesLateResults verifies that the final poll before Finish() +// captures results that arrive after the last iteration poll. +func TestFinalPollCapturesLateResults(t *testing.T) { + al, _, _, _, cleanup := newTestAgentLoop(t) + defer cleanup() + + sessionKey := "test-session-final-poll" + ch := make(chan *tools.ToolResult, 4) + + // Register the channel + al.registerSubTurnResultChannel(sessionKey, ch) + defer al.unregisterSubTurnResultChannel(sessionKey) + + // Simulate results arriving after last iteration poll + ch <- &tools.ToolResult{ForLLM: "result 1"} + ch <- &tools.ToolResult{ForLLM: "result 2"} + + // Dequeue should capture both results + results := al.dequeuePendingSubTurnResults(sessionKey) + + if len(results) != 2 { + t.Errorf("expected 2 results, got %d", len(results)) + } + + // Verify channel is now empty + results = al.dequeuePendingSubTurnResults(sessionKey) + if len(results) != 0 { + t.Errorf("expected 0 results on second poll, got %d", len(results)) + } +}