mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
429 lines
13 KiB
Go
429 lines
13 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/sipeed/picoclaw/pkg/providers"
|
|
"github.com/sipeed/picoclaw/pkg/session"
|
|
"github.com/sipeed/picoclaw/pkg/tools"
|
|
)
|
|
|
|
// ====================== Context Keys ======================
|
|
type turnStateKeyType struct{}
|
|
|
|
var turnStateKey = turnStateKeyType{}
|
|
|
|
func withTurnState(ctx context.Context, ts *turnState) context.Context {
|
|
return context.WithValue(ctx, turnStateKey, ts)
|
|
}
|
|
|
|
// TurnStateFromContext retrieves turnState from context (exported for tools)
|
|
func TurnStateFromContext(ctx context.Context) *turnState {
|
|
return turnStateFromContext(ctx)
|
|
}
|
|
|
|
func turnStateFromContext(ctx context.Context) *turnState {
|
|
ts, _ := ctx.Value(turnStateKey).(*turnState)
|
|
return ts
|
|
}
|
|
|
|
// ====================== turnState ======================
|
|
|
|
type turnState struct {
|
|
ctx context.Context
|
|
cancelFunc context.CancelFunc // Used to cancel all children when this turn finishes
|
|
turnID string
|
|
parentTurnID string
|
|
depth int
|
|
childTurnIDs []string // MUST be accessed under mu lock or maybe add a getter method
|
|
pendingResults chan *tools.ToolResult
|
|
session session.SessionStore
|
|
initialHistoryLength int // Snapshot of session history length at turn start, for rollback on hard abort
|
|
mu sync.Mutex
|
|
isFinished bool // MUST be accessed under mu lock
|
|
closeOnce sync.Once // Ensures pendingResults channel is closed exactly once
|
|
concurrencySem chan struct{} // Limits concurrent child sub-turns
|
|
finishedChan chan struct{} // Lazily initialized, closed when turn finishes
|
|
|
|
// parentEnded signals that the parent turn has finished gracefully.
|
|
// Child SubTurns should check this via IsParentEnded() to decide whether
|
|
// to continue running (Critical=true) or exit gracefully (Critical=false).
|
|
parentEnded atomic.Bool
|
|
|
|
// critical indicates whether this SubTurn should continue running after
|
|
// the parent turn finishes gracefully. Set from SubTurnConfig.Critical.
|
|
critical bool
|
|
|
|
// parentTurnState holds a reference to the parent turnState.
|
|
// This allows child SubTurns to check if the parent has ended.
|
|
// Nil for root turns.
|
|
parentTurnState *turnState
|
|
|
|
// lastFinishReason stores the finish_reason from the last LLM call.
|
|
// Used by SubTurn to detect truncation and retry.
|
|
// MUST be accessed under mu lock.
|
|
lastFinishReason string
|
|
|
|
// Token budget tracking
|
|
// tokenBudget is a shared atomic counter for tracking remaining tokens across team members.
|
|
// Inherited from parent or initialized from SubTurnConfig.InitialTokenBudget.
|
|
// Nil if no budget is set.
|
|
tokenBudget *atomic.Int64
|
|
|
|
// lastUsage stores the token usage from the last LLM call.
|
|
// Used by SubTurn to deduct from tokenBudget after each LLM iteration.
|
|
// MUST be accessed under mu lock.
|
|
lastUsage *providers.UsageInfo
|
|
}
|
|
|
|
// ====================== Public API ======================
|
|
|
|
// TurnInfo provides read-only information about an active turn.
|
|
type TurnInfo struct {
|
|
TurnID string
|
|
ParentTurnID string
|
|
Depth int
|
|
ChildTurnIDs []string
|
|
IsFinished bool
|
|
}
|
|
|
|
// GetActiveTurn retrieves information about the currently active turn for a session.
|
|
// Returns nil if no active turn exists for the given session key.
|
|
func (al *AgentLoop) GetActiveTurn(sessionKey string) *TurnInfo {
|
|
tsInterface, ok := al.activeTurnStates.Load(sessionKey)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
ts, ok := tsInterface.(*turnState)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
return ts.Info()
|
|
}
|
|
|
|
// Info returns a read-only snapshot of the turn state information.
|
|
// This method is thread-safe and can be called concurrently.
|
|
func (ts *turnState) Info() *TurnInfo {
|
|
ts.mu.Lock()
|
|
defer ts.mu.Unlock()
|
|
|
|
// Create a copy of childTurnIDs to avoid race conditions
|
|
childIDs := make([]string, len(ts.childTurnIDs))
|
|
copy(childIDs, ts.childTurnIDs)
|
|
|
|
return &TurnInfo{
|
|
TurnID: ts.turnID,
|
|
ParentTurnID: ts.parentTurnID,
|
|
Depth: ts.depth,
|
|
ChildTurnIDs: childIDs,
|
|
IsFinished: ts.isFinished,
|
|
}
|
|
}
|
|
|
|
// GetAllActiveTurns retrieves information about all currently active turns across all sessions.
|
|
func (al *AgentLoop) GetAllActiveTurns() []*TurnInfo {
|
|
var turns []*TurnInfo
|
|
al.activeTurnStates.Range(func(key, value interface{}) bool {
|
|
if ts, ok := value.(*turnState); ok {
|
|
turns = append(turns, ts.Info())
|
|
}
|
|
return true
|
|
})
|
|
return turns
|
|
}
|
|
|
|
// FormatTree recursively builds a string representation of the active turn tree.
|
|
func (al *AgentLoop) FormatTree(turnInfo *TurnInfo, prefix string, isLast bool) string {
|
|
if turnInfo == nil {
|
|
return ""
|
|
}
|
|
|
|
var sb strings.Builder
|
|
|
|
// Print current node
|
|
marker := "├── "
|
|
if isLast {
|
|
marker = "└── "
|
|
}
|
|
if turnInfo.Depth == 0 {
|
|
marker = "" // Root node no marker
|
|
}
|
|
|
|
status := "Running"
|
|
if turnInfo.IsFinished {
|
|
status = "Finished"
|
|
}
|
|
|
|
orphanMarker := ""
|
|
if turnInfo.Depth > 0 && prefix == "" {
|
|
orphanMarker = " (Orphaned)"
|
|
}
|
|
|
|
fmt.Fprintf(
|
|
&sb,
|
|
"%s%s[%s] Depth:%d (%s)%s\n",
|
|
prefix,
|
|
marker,
|
|
turnInfo.TurnID,
|
|
turnInfo.Depth,
|
|
status,
|
|
orphanMarker,
|
|
)
|
|
|
|
// Prepare prefix for children
|
|
childPrefix := prefix
|
|
if turnInfo.Depth > 0 {
|
|
if isLast {
|
|
childPrefix += " "
|
|
} else {
|
|
childPrefix += "│ "
|
|
}
|
|
}
|
|
|
|
for i, childID := range turnInfo.ChildTurnIDs {
|
|
// Look up child turn state
|
|
childInfo := al.GetActiveTurn(childID)
|
|
if childInfo != nil {
|
|
isLastChild := (i == len(turnInfo.ChildTurnIDs)-1)
|
|
sb.WriteString(al.FormatTree(childInfo, childPrefix, isLastChild))
|
|
} else {
|
|
// Child might have already been removed from active states if it finished early
|
|
isLastChild := (i == len(turnInfo.ChildTurnIDs)-1)
|
|
cMarker := "├── "
|
|
if isLastChild {
|
|
cMarker = "└── "
|
|
}
|
|
fmt.Fprintf(&sb, "%s%s[%s] (Completed/Cleaned Up)\n", childPrefix, cMarker, childID)
|
|
}
|
|
}
|
|
|
|
return sb.String()
|
|
}
|
|
|
|
// ====================== Helper Functions ======================
|
|
|
|
func newTurnState(ctx context.Context, id string, parent *turnState, maxConcurrent int) *turnState {
|
|
// Note: We don't create a new context with cancel here because the caller
|
|
// (spawnSubTurn) already creates one. The turnState stores the context and
|
|
// cancelFunc provided by the caller to avoid redundant context wrapping.
|
|
return &turnState{
|
|
ctx: ctx,
|
|
cancelFunc: nil, // Will be set by the caller
|
|
turnID: id,
|
|
parentTurnID: parent.turnID,
|
|
depth: parent.depth + 1,
|
|
session: newEphemeralSession(parent.session),
|
|
parentTurnState: parent, // Store reference to parent for IsParentEnded() checks
|
|
// NOTE: In this PoC, I use a fixed-size channel (16).
|
|
// Under high concurrency or long-running sub-turns, this might fill up and cause
|
|
// intermediate results to be discarded in deliverSubTurnResult.
|
|
// For production, consider an unbounded queue or a blocking strategy with backpressure.
|
|
pendingResults: make(chan *tools.ToolResult, 16),
|
|
concurrencySem: make(chan struct{}, maxConcurrent),
|
|
}
|
|
}
|
|
|
|
// IsParentEnded returns true if the parent turn has finished gracefully.
|
|
// This is safe to call from child SubTurn goroutines.
|
|
// Returns false if this is a root turn (no parent).
|
|
func (ts *turnState) IsParentEnded() bool {
|
|
if ts.parentTurnState == nil {
|
|
return false
|
|
}
|
|
return ts.parentTurnState.parentEnded.Load()
|
|
}
|
|
|
|
// SetLastFinishReason updates the last finish reason (thread-safe).
|
|
func (ts *turnState) SetLastFinishReason(reason string) {
|
|
ts.mu.Lock()
|
|
defer ts.mu.Unlock()
|
|
ts.lastFinishReason = reason
|
|
}
|
|
|
|
// GetLastFinishReason retrieves the last finish reason (thread-safe).
|
|
func (ts *turnState) GetLastFinishReason() string {
|
|
ts.mu.Lock()
|
|
defer ts.mu.Unlock()
|
|
return ts.lastFinishReason
|
|
}
|
|
|
|
// SetLastUsage stores the token usage from the last LLM call.
|
|
// This is used by SubTurn to track token consumption for budget enforcement.
|
|
func (ts *turnState) SetLastUsage(usage *providers.UsageInfo) {
|
|
ts.mu.Lock()
|
|
defer ts.mu.Unlock()
|
|
ts.lastUsage = usage
|
|
}
|
|
|
|
// GetLastUsage retrieves the token usage from the last LLM call.
|
|
// Returns nil if no LLM call has been made yet.
|
|
func (ts *turnState) GetLastUsage() *providers.UsageInfo {
|
|
ts.mu.Lock()
|
|
defer ts.mu.Unlock()
|
|
return ts.lastUsage
|
|
}
|
|
|
|
// IsParentEnded is a convenience method to check if parent ended.
|
|
// It returns the value of the parent's parentEnded atomic flag.
|
|
|
|
// Finished returns a channel that is closed when the turn finishes.
|
|
// This allows child turns to safely block on delivering results without leaking
|
|
// if the parent finishes before they can deliver.
|
|
func (ts *turnState) Finished() <-chan struct{} {
|
|
ts.mu.Lock()
|
|
defer ts.mu.Unlock()
|
|
if ts.finishedChan == nil {
|
|
ts.finishedChan = make(chan struct{})
|
|
if ts.isFinished {
|
|
close(ts.finishedChan)
|
|
}
|
|
}
|
|
return ts.finishedChan
|
|
}
|
|
|
|
// Finish marks the turn as finished.
|
|
//
|
|
// If isHardAbort is true (Hard Abort):
|
|
// - Cancels all child contexts immediately via cancelFunc
|
|
// - Used for user-initiated termination (e.g., "stop now")
|
|
//
|
|
// If isHardAbort is false (Graceful Finish):
|
|
// - Only signals parentEnded for graceful child exit
|
|
// - Children check IsParentEnded() and decide whether to continue or exit
|
|
// - Critical SubTurns continue running and deliver orphan results
|
|
// - Non-Critical SubTurns exit gracefully without error
|
|
//
|
|
// In both cases, the pendingResults channel is NOT closed.
|
|
// It is left open to be garbage collected when no longer used, avoiding
|
|
// "send on closed channel" panics from concurrently finishing async subturns.
|
|
func (ts *turnState) Finish(isHardAbort bool) {
|
|
var fc chan struct{}
|
|
|
|
ts.mu.Lock()
|
|
if !ts.isFinished {
|
|
ts.isFinished = true
|
|
if ts.finishedChan == nil {
|
|
ts.finishedChan = make(chan struct{})
|
|
}
|
|
fc = ts.finishedChan
|
|
}
|
|
ts.mu.Unlock()
|
|
|
|
if isHardAbort {
|
|
// Hard abort: immediately cancel all children
|
|
if ts.cancelFunc != nil {
|
|
ts.cancelFunc()
|
|
}
|
|
} else {
|
|
// Graceful finish: signal parent ended, let children decide
|
|
ts.parentEnded.Store(true)
|
|
}
|
|
|
|
// Safely close the finishedChan exactly once
|
|
if fc != nil {
|
|
ts.closeOnce.Do(func() {
|
|
close(fc)
|
|
})
|
|
}
|
|
|
|
// We no longer close(ts.pendingResults) here to avoid panicking any
|
|
// concurrent deliverSubTurnResult calls. We rely on GC to clean up the channel.
|
|
}
|
|
|
|
// ====================== Ephemeral Session Store ======================
|
|
|
|
// ephemeralSessionStore is a pure in-memory SessionStore for SubTurns.
|
|
// It never writes to disk, keeping sub-turn history isolated from the parent session.
|
|
// It automatically truncates history when it exceeds maxEphemeralHistorySize to prevent memory accumulation.
|
|
type ephemeralSessionStore struct {
|
|
mu sync.Mutex
|
|
history []providers.Message
|
|
summary string
|
|
}
|
|
|
|
func (e *ephemeralSessionStore) AddMessage(sessionKey, role, content string) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
e.history = append(e.history, providers.Message{Role: role, Content: content})
|
|
e.autoTruncate()
|
|
}
|
|
|
|
func (e *ephemeralSessionStore) AddFullMessage(sessionKey string, msg providers.Message) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
e.history = append(e.history, msg)
|
|
e.autoTruncate()
|
|
}
|
|
|
|
// autoTruncate automatically limits history size to prevent memory accumulation.
|
|
// Must be called with mu held.
|
|
func (e *ephemeralSessionStore) autoTruncate() {
|
|
if len(e.history) > maxEphemeralHistorySize {
|
|
// Keep only the most recent messages
|
|
e.history = e.history[len(e.history)-maxEphemeralHistorySize:]
|
|
}
|
|
}
|
|
|
|
func (e *ephemeralSessionStore) GetHistory(key string) []providers.Message {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
out := make([]providers.Message, len(e.history))
|
|
copy(out, e.history)
|
|
return out
|
|
}
|
|
|
|
func (e *ephemeralSessionStore) GetSummary(key string) string {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
return e.summary
|
|
}
|
|
|
|
func (e *ephemeralSessionStore) SetSummary(key, summary string) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
e.summary = summary
|
|
}
|
|
|
|
func (e *ephemeralSessionStore) SetHistory(key string, history []providers.Message) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
e.history = make([]providers.Message, len(history))
|
|
copy(e.history, history)
|
|
}
|
|
|
|
func (e *ephemeralSessionStore) TruncateHistory(key string, keepLast int) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
if len(e.history) > keepLast {
|
|
e.history = e.history[len(e.history)-keepLast:]
|
|
}
|
|
}
|
|
|
|
func (e *ephemeralSessionStore) Save(key string) error { return nil }
|
|
func (e *ephemeralSessionStore) Close() error { return nil }
|
|
|
|
// newEphemeralSession creates a new isolated ephemeral session for a sub-turn.
|
|
//
|
|
// IMPORTANT: The parent session parameter is intentionally unused (marked with _).
|
|
// This is by design according to issue #1316: sub-turns use completely isolated
|
|
// ephemeral sessions that do NOT inherit history from the parent session.
|
|
//
|
|
// Rationale for isolation:
|
|
// - Sub-turns are independent execution contexts with their own prompts
|
|
// - Inheriting parent history could cause context pollution
|
|
// - Each sub-turn should start with a clean slate
|
|
// - Memory is managed independently (auto-truncation at maxEphemeralHistorySize)
|
|
// - Results are communicated back via the result channel, not via shared history
|
|
//
|
|
// If future requirements need parent history inheritance, this design decision
|
|
// should be reconsidered with careful attention to memory management and context size.
|
|
func newEphemeralSession(_ session.SessionStore) session.SessionStore {
|
|
return &ephemeralSessionStore{}
|
|
}
|