From ae23193295cd267856bc14de508baf86c11d736b Mon Sep 17 00:00:00 2001 From: Administrator <1280842908@qq.com> Date: Mon, 16 Mar 2026 14:31:32 +0800 Subject: [PATCH] feat(agent): port subturn PoC to refactor/agent branch - Replace duplicate types (ToolResult/Session/Message) with real project types - Implement ephemeralSessionStore satisfying session.SessionStore interface - Connect runTurn to real AgentLoop via runAgentLoop + AgentInstance - Fix subturn_test.go to match updated signatures and types Co-Authored-By: Claude Sonnet 4 --- pkg/agent/eventbus_mock.go | 12 ++ pkg/agent/subturn.go | 309 +++++++++++++++++++++++++++++++++++++ pkg/agent/subturn_test.go | 255 ++++++++++++++++++++++++++++++ 3 files changed, 576 insertions(+) create mode 100644 pkg/agent/eventbus_mock.go create mode 100644 pkg/agent/subturn.go create mode 100644 pkg/agent/subturn_test.go diff --git a/pkg/agent/eventbus_mock.go b/pkg/agent/eventbus_mock.go new file mode 100644 index 000000000..c9641092b --- /dev/null +++ b/pkg/agent/eventbus_mock.go @@ -0,0 +1,12 @@ +package agent + +import "fmt" + +// MockEventBus - for POC +var MockEventBus = struct { + Emit func(event any) +}{ + Emit: func(event any) { + fmt.Printf("[Mock EventBus] %T %+v\n", event, event) + }, +} diff --git a/pkg/agent/subturn.go b/pkg/agent/subturn.go new file mode 100644 index 000000000..ab7d60957 --- /dev/null +++ b/pkg/agent/subturn.go @@ -0,0 +1,309 @@ +package agent + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + + "github.com/sipeed/picoclaw/pkg/providers" + "github.com/sipeed/picoclaw/pkg/session" + "github.com/sipeed/picoclaw/pkg/tools" +) + +// ====================== Config & Constants ====================== +const maxSubTurnDepth = 3 + +var ( + ErrDepthLimitExceeded = errors.New("sub-turn depth limit exceeded") + ErrInvalidSubTurnConfig = errors.New("invalid sub-turn config") +) + +// ====================== SubTurn Config ====================== +type SubTurnConfig struct { + Model string + Tools []tools.Tool + SystemPrompt string + MaxTokens int + // Can be extended with temperature, topP, etc. +} + +// ====================== Sub-turn Events (Aligned with EventBus) ====================== +type SubTurnSpawnEvent struct { + ParentID string + ChildID string + Config SubTurnConfig +} + +type SubTurnEndEvent struct { + ChildID string + Result *tools.ToolResult + Err error +} + +type SubTurnResultDeliveredEvent struct { + ParentID string + ChildID string + Result *tools.ToolResult +} + +type SubTurnOrphanResultEvent struct { + ParentID string + ChildID string + Result *tools.ToolResult +} + +// ====================== turnState (Simplified, reusable with existing structs) ====================== +type turnState struct { + ctx context.Context + cancelFunc context.CancelFunc // Used to cancel all children when this turn finishes + turnID string + parentTurnID string + depth int + childTurnIDs []string + pendingResults chan *tools.ToolResult + session session.SessionStore + mu sync.Mutex + isFinished bool // Marks if the parent Turn has ended +} + +// ====================== Helper Functions ====================== +var globalTurnCounter int64 + +func generateTurnID() string { + return fmt.Sprintf("subturn-%d", atomic.AddInt64(&globalTurnCounter, 1)) +} + +func newTurnState(ctx context.Context, id string, parent *turnState) *turnState { + turnCtx, cancel := context.WithCancel(ctx) + return &turnState{ + ctx: turnCtx, + cancelFunc: cancel, + turnID: id, + parentTurnID: parent.turnID, + depth: parent.depth + 1, + session: newEphemeralSession(parent.session), + // NOTE: In this PoC, I use a fixed-size channel (16). + // Under high concurrency or long-running sub-turns, this might fill up and cause + // intermediate results to be discarded in deliverSubTurnResult. + // For production, consider an unbounded queue or a blocking strategy with backpressure. + pendingResults: make(chan *tools.ToolResult, 16), + } +} + +// Finish marks the turn as finished and cancels its context, aborting any running sub-turns. +func (ts *turnState) Finish() { + ts.mu.Lock() + defer ts.mu.Unlock() + ts.isFinished = true + if ts.cancelFunc != nil { + ts.cancelFunc() + } +} + +// ephemeralSessionStore is a pure in-memory SessionStore for SubTurns. +// It never writes to disk, keeping sub-turn history isolated from the parent session. +type ephemeralSessionStore struct { + mu sync.Mutex + history []providers.Message + summary string +} + +func (e *ephemeralSessionStore) AddMessage(sessionKey, role, content string) { + e.mu.Lock() + defer e.mu.Unlock() + e.history = append(e.history, providers.Message{Role: role, Content: content}) +} + +func (e *ephemeralSessionStore) AddFullMessage(sessionKey string, msg providers.Message) { + e.mu.Lock() + defer e.mu.Unlock() + e.history = append(e.history, msg) +} + +func (e *ephemeralSessionStore) GetHistory(key string) []providers.Message { + e.mu.Lock() + defer e.mu.Unlock() + out := make([]providers.Message, len(e.history)) + copy(out, e.history) + return out +} + +func (e *ephemeralSessionStore) GetSummary(key string) string { + e.mu.Lock() + defer e.mu.Unlock() + return e.summary +} + +func (e *ephemeralSessionStore) SetSummary(key, summary string) { + e.mu.Lock() + defer e.mu.Unlock() + e.summary = summary +} + +func (e *ephemeralSessionStore) SetHistory(key string, history []providers.Message) { + e.mu.Lock() + defer e.mu.Unlock() + e.history = make([]providers.Message, len(history)) + copy(e.history, history) +} + +func (e *ephemeralSessionStore) TruncateHistory(key string, keepLast int) { + e.mu.Lock() + defer e.mu.Unlock() + if len(e.history) > keepLast { + e.history = e.history[len(e.history)-keepLast:] + } +} + +func (e *ephemeralSessionStore) Save(key string) error { return nil } +func (e *ephemeralSessionStore) Close() error { return nil } + +func newEphemeralSession(_ session.SessionStore) session.SessionStore { + return &ephemeralSessionStore{} +} + +// ====================== Core Function: spawnSubTurn ====================== +func spawnSubTurn(ctx context.Context, al *AgentLoop, parentTS *turnState, cfg SubTurnConfig) (result *tools.ToolResult, err error) { + // 1. Depth limit check + if parentTS.depth >= maxSubTurnDepth { + return nil, ErrDepthLimitExceeded + } + + // 2. Config validation + if cfg.Model == "" { + return nil, ErrInvalidSubTurnConfig + } + + // Create a sub-context for the child turn to support cancellation + childCtx, cancel := context.WithCancel(ctx) + defer cancel() + + // 3. Create child Turn state + childID := generateTurnID() + childTS := newTurnState(childCtx, childID, parentTS) + + // 4. Establish parent-child relationship (thread-safe) + parentTS.mu.Lock() + parentTS.childTurnIDs = append(parentTS.childTurnIDs, childID) + parentTS.mu.Unlock() + + // 5. Emit Spawn event (currently using Mock, will be replaced by real EventBus) + MockEventBus.Emit(SubTurnSpawnEvent{ + ParentID: parentTS.turnID, + ChildID: childID, + Config: cfg, + }) + + // 6. Defer emitting End event, and recover from panics to ensure it's always fired + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("subturn panicked: %v", r) + } + + MockEventBus.Emit(SubTurnEndEvent{ + ChildID: childID, + Result: result, + Err: err, + }) + }() + + // 7. Execute sub-turn via the real agent loop. + // Build a child AgentInstance from SubTurnConfig, inheriting defaults from the parent agent. + result, err = runTurn(childCtx, al, childTS, cfg) + + // 8. Deliver result back to parent Turn + deliverSubTurnResult(parentTS, childID, result) + + return result, err +} + +// ====================== Result Delivery ====================== +func deliverSubTurnResult(parentTS *turnState, childID string, result *tools.ToolResult) { + parentTS.mu.Lock() + defer parentTS.mu.Unlock() + + // Emit ResultDelivered event + MockEventBus.Emit(SubTurnResultDeliveredEvent{ + ParentID: parentTS.turnID, + ChildID: childID, + Result: result, + }) + + if !parentTS.isFinished { + // Parent Turn is still running → Place in pending queue (handled automatically by parent loop in next round) + select { + case parentTS.pendingResults <- result: + default: + fmt.Println("[SubTurn] warning: pendingResults channel full") + } + return + } + + // Parent Turn has ended + // emit an OrphanResultEvent so the system/UI can handle this late arrival. + if result != nil { + MockEventBus.Emit(SubTurnOrphanResultEvent{ + ParentID: parentTS.turnID, + ChildID: childID, + Result: result, + }) + } +} + +// runTurn builds a temporary AgentInstance from SubTurnConfig and delegates to +// the real agent loop. The child's ephemeral session is used for history so it +// never pollutes the parent session. +func runTurn(ctx context.Context, al *AgentLoop, ts *turnState, cfg SubTurnConfig) (*tools.ToolResult, error) { + // Derive candidates from the requested model using the parent loop's provider. + defaultProvider := al.GetConfig().Agents.Defaults.Provider + candidates := providers.ResolveCandidates( + providers.ModelConfig{Primary: cfg.Model}, + defaultProvider, + ) + + // Build a minimal AgentInstance for this sub-turn. + // It reuses the parent loop's provider and config, but gets its own + // ephemeral session store and tool registry. + toolRegistry := tools.NewToolRegistry() + for _, t := range cfg.Tools { + toolRegistry.Register(t) + } + + parentAgent := al.GetRegistry().GetDefaultAgent() + childAgent := &AgentInstance{ + ID: ts.turnID, + Model: cfg.Model, + MaxIterations: parentAgent.MaxIterations, + MaxTokens: cfg.MaxTokens, + Temperature: parentAgent.Temperature, + ThinkingLevel: parentAgent.ThinkingLevel, + ContextWindow: cfg.MaxTokens, + SummarizeMessageThreshold: parentAgent.SummarizeMessageThreshold, + SummarizeTokenPercent: parentAgent.SummarizeTokenPercent, + Provider: parentAgent.Provider, + Sessions: ts.session, + ContextBuilder: parentAgent.ContextBuilder, + Tools: toolRegistry, + Candidates: candidates, + } + if childAgent.MaxTokens == 0 { + childAgent.MaxTokens = parentAgent.MaxTokens + childAgent.ContextWindow = parentAgent.ContextWindow + } + + finalContent, err := al.runAgentLoop(ctx, childAgent, processOptions{ + SessionKey: ts.turnID, + UserMessage: cfg.SystemPrompt, + DefaultResponse: "", + EnableSummary: false, + SendResponse: false, + }) + if err != nil { + return nil, err + } + return &tools.ToolResult{ForLLM: finalContent}, nil +} + +// ====================== Other Types ====================== diff --git a/pkg/agent/subturn_test.go b/pkg/agent/subturn_test.go new file mode 100644 index 000000000..943c46015 --- /dev/null +++ b/pkg/agent/subturn_test.go @@ -0,0 +1,255 @@ +package agent + +import ( + "context" + "reflect" + "testing" + + "github.com/sipeed/picoclaw/pkg/tools" +) + +// ====================== Test Helper: Event Collector ====================== +type eventCollector struct { + events []any +} + +func (c *eventCollector) collect(e any) { + c.events = append(c.events, e) +} + +func (c *eventCollector) hasEventOfType(typ any) bool { + targetType := reflect.TypeOf(typ) + for _, e := range c.events { + if reflect.TypeOf(e) == targetType { + return true + } + } + return false +} + +func (c *eventCollector) countOfType(typ any) int { + targetType := reflect.TypeOf(typ) + count := 0 + for _, e := range c.events { + if reflect.TypeOf(e) == targetType { + count++ + } + } + return count +} + +// ====================== Main Test Function ====================== +func TestSpawnSubTurn(t *testing.T) { + tests := []struct { + name string + parentDepth int + config SubTurnConfig + wantErr error + wantSpawn bool + wantEnd bool + wantDepthFail bool + }{ + { + name: "Basic success path - Single layer sub-turn", + parentDepth: 0, + config: SubTurnConfig{ + Model: "gpt-4o-mini", + Tools: []tools.Tool{}, // At least one tool + }, + wantErr: nil, + wantSpawn: true, + wantEnd: true, + }, + { + name: "Nested 2 layers - Normal", + parentDepth: 1, + config: SubTurnConfig{ + Model: "gpt-4o-mini", + Tools: []tools.Tool{}, + }, + wantErr: nil, + wantSpawn: true, + wantEnd: true, + }, + { + name: "Depth limit triggered - 4th layer fails", + parentDepth: 3, + config: SubTurnConfig{ + Model: "gpt-4o-mini", + Tools: []tools.Tool{}, + }, + wantErr: ErrDepthLimitExceeded, + wantSpawn: false, + wantEnd: false, + wantDepthFail: true, + }, + { + name: "Invalid config - Empty Model", + parentDepth: 0, + config: SubTurnConfig{ + Model: "", + Tools: []tools.Tool{}, + }, + wantErr: ErrInvalidSubTurnConfig, + wantSpawn: false, + wantEnd: false, + }, + } + + al, _, _, _, cleanup := newTestAgentLoop(t) + defer cleanup() + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Prepare parent Turn + parent := &turnState{ + ctx: context.Background(), + turnID: "parent-1", + depth: tt.parentDepth, + childTurnIDs: []string{}, + pendingResults: make(chan *tools.ToolResult, 10), + session: &ephemeralSessionStore{}, + } + + // Replace mock with test collector + collector := &eventCollector{} + originalEmit := MockEventBus.Emit + MockEventBus.Emit = collector.collect + defer func() { MockEventBus.Emit = originalEmit }() + + // Execute spawnSubTurn + result, err := spawnSubTurn(context.Background(), al, parent, tt.config) + + // Assert errors + if tt.wantErr != nil { + if err == nil || err != tt.wantErr { + t.Errorf("expected error %v, got %v", tt.wantErr, err) + } + return + } + if err != nil { + t.Errorf("unexpected error: %v", err) + return + } + + // Verify result + if result == nil { + t.Error("expected non-nil result") + } + + // Verify event emission + if tt.wantSpawn { + if !collector.hasEventOfType(SubTurnSpawnEvent{}) { + t.Error("SubTurnSpawnEvent not emitted") + } + } + if tt.wantEnd { + if !collector.hasEventOfType(SubTurnEndEvent{}) { + t.Error("SubTurnEndEvent not emitted") + } + } + + // Verify turn tree + if len(parent.childTurnIDs) == 0 && !tt.wantDepthFail { + t.Error("child Turn not added to parent.childTurnIDs") + } + + // Verify result delivery (pendingResults or history) + if len(parent.pendingResults) > 0 || len(parent.session.GetHistory("")) > 0 { + // Result delivered via at least one path + } else { + t.Error("child result not delivered") + } + }) + } +} + +// ====================== Extra Independent Test: Ephemeral Session Isolation ====================== +func TestSpawnSubTurn_EphemeralSessionIsolation(t *testing.T) { + al, _, _, _, cleanup := newTestAgentLoop(t) + defer cleanup() + + parentSession := &ephemeralSessionStore{} + parentSession.AddMessage("", "user", "parent msg") + parent := &turnState{ + ctx: context.Background(), + turnID: "parent-1", + depth: 0, + pendingResults: make(chan *tools.ToolResult, 1), + session: parentSession, + } + + cfg := SubTurnConfig{Model: "gpt-4o-mini", Tools: []tools.Tool{}} + + // Record main session length before execution + originalLen := len(parent.session.GetHistory("")) + + _, _ = spawnSubTurn(context.Background(), al, parent, cfg) + + // After sub-turn ends, main session must remain unchanged + if len(parent.session.GetHistory("")) != originalLen { + t.Error("ephemeral session polluted the main session") + } +} + +// ====================== Extra Independent Test: Result Delivery Path ====================== +func TestSpawnSubTurn_ResultDelivery(t *testing.T) { + al, _, _, _, cleanup := newTestAgentLoop(t) + defer cleanup() + + parent := &turnState{ + ctx: context.Background(), + turnID: "parent-1", + depth: 0, + pendingResults: make(chan *tools.ToolResult, 1), + session: &ephemeralSessionStore{}, + } + + cfg := SubTurnConfig{Model: "gpt-4o-mini", Tools: []tools.Tool{}} + + _, _ = spawnSubTurn(context.Background(), al, parent, cfg) + + // Check if pendingResults received the result + select { + case res := <-parent.pendingResults: + if res == nil { + t.Error("received nil result in pendingResults") + } + default: + t.Error("result did not enter pendingResults") + } +} + +// ====================== Extra Independent Test: Orphan Result Routing ====================== +func TestSpawnSubTurn_OrphanResultRouting(t *testing.T) { + parentCtx, cancelParent := context.WithCancel(context.Background()) + parent := &turnState{ + ctx: parentCtx, + cancelFunc: cancelParent, + turnID: "parent-1", + depth: 0, + pendingResults: make(chan *tools.ToolResult, 1), + session: &ephemeralSessionStore{}, + } + + collector := &eventCollector{} + originalEmit := MockEventBus.Emit + MockEventBus.Emit = collector.collect + defer func() { MockEventBus.Emit = originalEmit }() + + // Simulate parent finishing before child delivers result + parent.Finish() + + // Call deliverSubTurnResult directly to simulate a delayed child + deliverSubTurnResult(parent, "delayed-child", &tools.ToolResult{ForLLM: "late result"}) + + // Verify Orphan event is emitted + if !collector.hasEventOfType(SubTurnOrphanResultEvent{}) { + t.Error("SubTurnOrphanResultEvent not emitted for finished parent") + } + + // Verify history is NOT polluted + if len(parent.session.GetHistory("")) != 0 { + t.Error("Parent history was polluted by orphan result") + } +}