feat(agent): port subturn PoC to refactor/agent branch

- Replace duplicate types (ToolResult/Session/Message) with real project types
- Implement ephemeralSessionStore satisfying session.SessionStore interface
- Connect runTurn to real AgentLoop via runAgentLoop + AgentInstance
- Fix subturn_test.go to match updated signatures and types

Co-Authored-By: Claude Sonnet 4 <noreply@anthropic.com>
This commit is contained in:
Administrator
2026-03-16 14:31:32 +08:00
parent 021aa7d6d5
commit ae23193295
3 changed files with 576 additions and 0 deletions
+12
View File
@@ -0,0 +1,12 @@
package agent
import "fmt"
// MockEventBus - for POC
var MockEventBus = struct {
Emit func(event any)
}{
Emit: func(event any) {
fmt.Printf("[Mock EventBus] %T %+v\n", event, event)
},
}
+309
View File
@@ -0,0 +1,309 @@
package agent
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"github.com/sipeed/picoclaw/pkg/providers"
"github.com/sipeed/picoclaw/pkg/session"
"github.com/sipeed/picoclaw/pkg/tools"
)
// ====================== Config & Constants ======================
const maxSubTurnDepth = 3
var (
ErrDepthLimitExceeded = errors.New("sub-turn depth limit exceeded")
ErrInvalidSubTurnConfig = errors.New("invalid sub-turn config")
)
// ====================== SubTurn Config ======================
type SubTurnConfig struct {
Model string
Tools []tools.Tool
SystemPrompt string
MaxTokens int
// Can be extended with temperature, topP, etc.
}
// ====================== Sub-turn Events (Aligned with EventBus) ======================
type SubTurnSpawnEvent struct {
ParentID string
ChildID string
Config SubTurnConfig
}
type SubTurnEndEvent struct {
ChildID string
Result *tools.ToolResult
Err error
}
type SubTurnResultDeliveredEvent struct {
ParentID string
ChildID string
Result *tools.ToolResult
}
type SubTurnOrphanResultEvent struct {
ParentID string
ChildID string
Result *tools.ToolResult
}
// ====================== turnState (Simplified, reusable with existing structs) ======================
type turnState struct {
ctx context.Context
cancelFunc context.CancelFunc // Used to cancel all children when this turn finishes
turnID string
parentTurnID string
depth int
childTurnIDs []string
pendingResults chan *tools.ToolResult
session session.SessionStore
mu sync.Mutex
isFinished bool // Marks if the parent Turn has ended
}
// ====================== Helper Functions ======================
var globalTurnCounter int64
func generateTurnID() string {
return fmt.Sprintf("subturn-%d", atomic.AddInt64(&globalTurnCounter, 1))
}
func newTurnState(ctx context.Context, id string, parent *turnState) *turnState {
turnCtx, cancel := context.WithCancel(ctx)
return &turnState{
ctx: turnCtx,
cancelFunc: cancel,
turnID: id,
parentTurnID: parent.turnID,
depth: parent.depth + 1,
session: newEphemeralSession(parent.session),
// NOTE: In this PoC, I use a fixed-size channel (16).
// Under high concurrency or long-running sub-turns, this might fill up and cause
// intermediate results to be discarded in deliverSubTurnResult.
// For production, consider an unbounded queue or a blocking strategy with backpressure.
pendingResults: make(chan *tools.ToolResult, 16),
}
}
// Finish marks the turn as finished and cancels its context, aborting any running sub-turns.
func (ts *turnState) Finish() {
ts.mu.Lock()
defer ts.mu.Unlock()
ts.isFinished = true
if ts.cancelFunc != nil {
ts.cancelFunc()
}
}
// ephemeralSessionStore is a pure in-memory SessionStore for SubTurns.
// It never writes to disk, keeping sub-turn history isolated from the parent session.
type ephemeralSessionStore struct {
mu sync.Mutex
history []providers.Message
summary string
}
func (e *ephemeralSessionStore) AddMessage(sessionKey, role, content string) {
e.mu.Lock()
defer e.mu.Unlock()
e.history = append(e.history, providers.Message{Role: role, Content: content})
}
func (e *ephemeralSessionStore) AddFullMessage(sessionKey string, msg providers.Message) {
e.mu.Lock()
defer e.mu.Unlock()
e.history = append(e.history, msg)
}
func (e *ephemeralSessionStore) GetHistory(key string) []providers.Message {
e.mu.Lock()
defer e.mu.Unlock()
out := make([]providers.Message, len(e.history))
copy(out, e.history)
return out
}
func (e *ephemeralSessionStore) GetSummary(key string) string {
e.mu.Lock()
defer e.mu.Unlock()
return e.summary
}
func (e *ephemeralSessionStore) SetSummary(key, summary string) {
e.mu.Lock()
defer e.mu.Unlock()
e.summary = summary
}
func (e *ephemeralSessionStore) SetHistory(key string, history []providers.Message) {
e.mu.Lock()
defer e.mu.Unlock()
e.history = make([]providers.Message, len(history))
copy(e.history, history)
}
func (e *ephemeralSessionStore) TruncateHistory(key string, keepLast int) {
e.mu.Lock()
defer e.mu.Unlock()
if len(e.history) > keepLast {
e.history = e.history[len(e.history)-keepLast:]
}
}
func (e *ephemeralSessionStore) Save(key string) error { return nil }
func (e *ephemeralSessionStore) Close() error { return nil }
func newEphemeralSession(_ session.SessionStore) session.SessionStore {
return &ephemeralSessionStore{}
}
// ====================== Core Function: spawnSubTurn ======================
func spawnSubTurn(ctx context.Context, al *AgentLoop, parentTS *turnState, cfg SubTurnConfig) (result *tools.ToolResult, err error) {
// 1. Depth limit check
if parentTS.depth >= maxSubTurnDepth {
return nil, ErrDepthLimitExceeded
}
// 2. Config validation
if cfg.Model == "" {
return nil, ErrInvalidSubTurnConfig
}
// Create a sub-context for the child turn to support cancellation
childCtx, cancel := context.WithCancel(ctx)
defer cancel()
// 3. Create child Turn state
childID := generateTurnID()
childTS := newTurnState(childCtx, childID, parentTS)
// 4. Establish parent-child relationship (thread-safe)
parentTS.mu.Lock()
parentTS.childTurnIDs = append(parentTS.childTurnIDs, childID)
parentTS.mu.Unlock()
// 5. Emit Spawn event (currently using Mock, will be replaced by real EventBus)
MockEventBus.Emit(SubTurnSpawnEvent{
ParentID: parentTS.turnID,
ChildID: childID,
Config: cfg,
})
// 6. Defer emitting End event, and recover from panics to ensure it's always fired
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("subturn panicked: %v", r)
}
MockEventBus.Emit(SubTurnEndEvent{
ChildID: childID,
Result: result,
Err: err,
})
}()
// 7. Execute sub-turn via the real agent loop.
// Build a child AgentInstance from SubTurnConfig, inheriting defaults from the parent agent.
result, err = runTurn(childCtx, al, childTS, cfg)
// 8. Deliver result back to parent Turn
deliverSubTurnResult(parentTS, childID, result)
return result, err
}
// ====================== Result Delivery ======================
func deliverSubTurnResult(parentTS *turnState, childID string, result *tools.ToolResult) {
parentTS.mu.Lock()
defer parentTS.mu.Unlock()
// Emit ResultDelivered event
MockEventBus.Emit(SubTurnResultDeliveredEvent{
ParentID: parentTS.turnID,
ChildID: childID,
Result: result,
})
if !parentTS.isFinished {
// Parent Turn is still running → Place in pending queue (handled automatically by parent loop in next round)
select {
case parentTS.pendingResults <- result:
default:
fmt.Println("[SubTurn] warning: pendingResults channel full")
}
return
}
// Parent Turn has ended
// emit an OrphanResultEvent so the system/UI can handle this late arrival.
if result != nil {
MockEventBus.Emit(SubTurnOrphanResultEvent{
ParentID: parentTS.turnID,
ChildID: childID,
Result: result,
})
}
}
// runTurn builds a temporary AgentInstance from SubTurnConfig and delegates to
// the real agent loop. The child's ephemeral session is used for history so it
// never pollutes the parent session.
func runTurn(ctx context.Context, al *AgentLoop, ts *turnState, cfg SubTurnConfig) (*tools.ToolResult, error) {
// Derive candidates from the requested model using the parent loop's provider.
defaultProvider := al.GetConfig().Agents.Defaults.Provider
candidates := providers.ResolveCandidates(
providers.ModelConfig{Primary: cfg.Model},
defaultProvider,
)
// Build a minimal AgentInstance for this sub-turn.
// It reuses the parent loop's provider and config, but gets its own
// ephemeral session store and tool registry.
toolRegistry := tools.NewToolRegistry()
for _, t := range cfg.Tools {
toolRegistry.Register(t)
}
parentAgent := al.GetRegistry().GetDefaultAgent()
childAgent := &AgentInstance{
ID: ts.turnID,
Model: cfg.Model,
MaxIterations: parentAgent.MaxIterations,
MaxTokens: cfg.MaxTokens,
Temperature: parentAgent.Temperature,
ThinkingLevel: parentAgent.ThinkingLevel,
ContextWindow: cfg.MaxTokens,
SummarizeMessageThreshold: parentAgent.SummarizeMessageThreshold,
SummarizeTokenPercent: parentAgent.SummarizeTokenPercent,
Provider: parentAgent.Provider,
Sessions: ts.session,
ContextBuilder: parentAgent.ContextBuilder,
Tools: toolRegistry,
Candidates: candidates,
}
if childAgent.MaxTokens == 0 {
childAgent.MaxTokens = parentAgent.MaxTokens
childAgent.ContextWindow = parentAgent.ContextWindow
}
finalContent, err := al.runAgentLoop(ctx, childAgent, processOptions{
SessionKey: ts.turnID,
UserMessage: cfg.SystemPrompt,
DefaultResponse: "",
EnableSummary: false,
SendResponse: false,
})
if err != nil {
return nil, err
}
return &tools.ToolResult{ForLLM: finalContent}, nil
}
// ====================== Other Types ======================
+255
View File
@@ -0,0 +1,255 @@
package agent
import (
"context"
"reflect"
"testing"
"github.com/sipeed/picoclaw/pkg/tools"
)
// ====================== Test Helper: Event Collector ======================
type eventCollector struct {
events []any
}
func (c *eventCollector) collect(e any) {
c.events = append(c.events, e)
}
func (c *eventCollector) hasEventOfType(typ any) bool {
targetType := reflect.TypeOf(typ)
for _, e := range c.events {
if reflect.TypeOf(e) == targetType {
return true
}
}
return false
}
func (c *eventCollector) countOfType(typ any) int {
targetType := reflect.TypeOf(typ)
count := 0
for _, e := range c.events {
if reflect.TypeOf(e) == targetType {
count++
}
}
return count
}
// ====================== Main Test Function ======================
func TestSpawnSubTurn(t *testing.T) {
tests := []struct {
name string
parentDepth int
config SubTurnConfig
wantErr error
wantSpawn bool
wantEnd bool
wantDepthFail bool
}{
{
name: "Basic success path - Single layer sub-turn",
parentDepth: 0,
config: SubTurnConfig{
Model: "gpt-4o-mini",
Tools: []tools.Tool{}, // At least one tool
},
wantErr: nil,
wantSpawn: true,
wantEnd: true,
},
{
name: "Nested 2 layers - Normal",
parentDepth: 1,
config: SubTurnConfig{
Model: "gpt-4o-mini",
Tools: []tools.Tool{},
},
wantErr: nil,
wantSpawn: true,
wantEnd: true,
},
{
name: "Depth limit triggered - 4th layer fails",
parentDepth: 3,
config: SubTurnConfig{
Model: "gpt-4o-mini",
Tools: []tools.Tool{},
},
wantErr: ErrDepthLimitExceeded,
wantSpawn: false,
wantEnd: false,
wantDepthFail: true,
},
{
name: "Invalid config - Empty Model",
parentDepth: 0,
config: SubTurnConfig{
Model: "",
Tools: []tools.Tool{},
},
wantErr: ErrInvalidSubTurnConfig,
wantSpawn: false,
wantEnd: false,
},
}
al, _, _, _, cleanup := newTestAgentLoop(t)
defer cleanup()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Prepare parent Turn
parent := &turnState{
ctx: context.Background(),
turnID: "parent-1",
depth: tt.parentDepth,
childTurnIDs: []string{},
pendingResults: make(chan *tools.ToolResult, 10),
session: &ephemeralSessionStore{},
}
// Replace mock with test collector
collector := &eventCollector{}
originalEmit := MockEventBus.Emit
MockEventBus.Emit = collector.collect
defer func() { MockEventBus.Emit = originalEmit }()
// Execute spawnSubTurn
result, err := spawnSubTurn(context.Background(), al, parent, tt.config)
// Assert errors
if tt.wantErr != nil {
if err == nil || err != tt.wantErr {
t.Errorf("expected error %v, got %v", tt.wantErr, err)
}
return
}
if err != nil {
t.Errorf("unexpected error: %v", err)
return
}
// Verify result
if result == nil {
t.Error("expected non-nil result")
}
// Verify event emission
if tt.wantSpawn {
if !collector.hasEventOfType(SubTurnSpawnEvent{}) {
t.Error("SubTurnSpawnEvent not emitted")
}
}
if tt.wantEnd {
if !collector.hasEventOfType(SubTurnEndEvent{}) {
t.Error("SubTurnEndEvent not emitted")
}
}
// Verify turn tree
if len(parent.childTurnIDs) == 0 && !tt.wantDepthFail {
t.Error("child Turn not added to parent.childTurnIDs")
}
// Verify result delivery (pendingResults or history)
if len(parent.pendingResults) > 0 || len(parent.session.GetHistory("")) > 0 {
// Result delivered via at least one path
} else {
t.Error("child result not delivered")
}
})
}
}
// ====================== Extra Independent Test: Ephemeral Session Isolation ======================
func TestSpawnSubTurn_EphemeralSessionIsolation(t *testing.T) {
al, _, _, _, cleanup := newTestAgentLoop(t)
defer cleanup()
parentSession := &ephemeralSessionStore{}
parentSession.AddMessage("", "user", "parent msg")
parent := &turnState{
ctx: context.Background(),
turnID: "parent-1",
depth: 0,
pendingResults: make(chan *tools.ToolResult, 1),
session: parentSession,
}
cfg := SubTurnConfig{Model: "gpt-4o-mini", Tools: []tools.Tool{}}
// Record main session length before execution
originalLen := len(parent.session.GetHistory(""))
_, _ = spawnSubTurn(context.Background(), al, parent, cfg)
// After sub-turn ends, main session must remain unchanged
if len(parent.session.GetHistory("")) != originalLen {
t.Error("ephemeral session polluted the main session")
}
}
// ====================== Extra Independent Test: Result Delivery Path ======================
func TestSpawnSubTurn_ResultDelivery(t *testing.T) {
al, _, _, _, cleanup := newTestAgentLoop(t)
defer cleanup()
parent := &turnState{
ctx: context.Background(),
turnID: "parent-1",
depth: 0,
pendingResults: make(chan *tools.ToolResult, 1),
session: &ephemeralSessionStore{},
}
cfg := SubTurnConfig{Model: "gpt-4o-mini", Tools: []tools.Tool{}}
_, _ = spawnSubTurn(context.Background(), al, parent, cfg)
// Check if pendingResults received the result
select {
case res := <-parent.pendingResults:
if res == nil {
t.Error("received nil result in pendingResults")
}
default:
t.Error("result did not enter pendingResults")
}
}
// ====================== Extra Independent Test: Orphan Result Routing ======================
func TestSpawnSubTurn_OrphanResultRouting(t *testing.T) {
parentCtx, cancelParent := context.WithCancel(context.Background())
parent := &turnState{
ctx: parentCtx,
cancelFunc: cancelParent,
turnID: "parent-1",
depth: 0,
pendingResults: make(chan *tools.ToolResult, 1),
session: &ephemeralSessionStore{},
}
collector := &eventCollector{}
originalEmit := MockEventBus.Emit
MockEventBus.Emit = collector.collect
defer func() { MockEventBus.Emit = originalEmit }()
// Simulate parent finishing before child delivers result
parent.Finish()
// Call deliverSubTurnResult directly to simulate a delayed child
deliverSubTurnResult(parent, "delayed-child", &tools.ToolResult{ForLLM: "late result"})
// Verify Orphan event is emitted
if !collector.hasEventOfType(SubTurnOrphanResultEvent{}) {
t.Error("SubTurnOrphanResultEvent not emitted for finished parent")
}
// Verify history is NOT polluted
if len(parent.session.GetHistory("")) != 0 {
t.Error("Parent history was polluted by orphan result")
}
}