Files
picoclaw/pkg/agent/pipeline_llm.go
T
sky5454 329e68e017 refactor(agent): Agent Looper refactor phase2, restructure pipeline and rename loop files to agent (#2585)
* refactor(agent): introduce interfaces for MessageBus and ChannelManager

Phase 2 of loop.go refactor — dependency inversion using adapter pattern.

- Add interfaces.MessageBus and interfaces.ChannelManager interfaces
- Create adapters/messagebus.go wrapping *bus.MessageBus
- Create adapters/channelmanager.go wrapping *channels.Manager
- Update AgentLoop to use interfaces instead of concrete types
- Update registerSharedTools to accept interfaces.MessageBus

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* refactor(agent): restructure pipeline and rename loop files

Pipeline refactoring:
- Split pipeline.go (1400 lines) into focused files:
  - pipeline_setup.go (~115 lines): SetupTurn method
  - pipeline_llm.go (~519 lines): CallLLM method
  - pipeline_execute.go (~693 lines): ExecuteTools method
  - pipeline_finalize.go (~78 lines): Finalize method
- Pipeline struct and NewPipeline remain in pipeline.go (~39 lines)

Agent file renaming:
- Rename loop_*.go to agent_*.go for consistent naming:
  - loop.go -> agent.go, loop_message.go -> agent_message.go, etc.
- Merge turn.go + turn_exec.go into turn_state.go
- Rename loop_turn.go -> turn_coord.go

Documentation:
- Update docs/pipeline-restructuring-plan.md
- Add docs/agent-rename-plan.md

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* fix(agent): code format  fixed

* refactor(agent): code test file added/renamed

* docs(agent): update agent refactor docs

* fix(agent): fix agent hardAbortX

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-21 10:55:50 +08:00

526 lines
16 KiB
Go

// PicoClaw - Ultra-lightweight personal AI agent
package agent
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/constants"
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/providers"
)
// CallLLM performs an LLM call with fallback support, hook invocation, and retry logic.
// It handles PreLLM setup, the actual LLM invocation with retry, and AfterLLM processing.
// Returns Control indicating what the coordinator should do next.
func (p *Pipeline) CallLLM(
ctx context.Context,
turnCtx context.Context,
ts *turnState,
exec *turnExecution,
iteration int,
) (Control, error) {
al := p.al
maxMediaSize := p.Cfg.Agents.Defaults.GetMaxMediaSize()
// PreLLM: resolve media refs (except on iteration 1 where user media is already resolved)
if iteration > 1 {
exec.messages = resolveMediaRefs(exec.messages, p.MediaStore, maxMediaSize)
}
// PreLLM: graceful terminal handling
exec.gracefulTerminal, _ = ts.gracefulInterruptRequested()
exec.providerToolDefs = ts.agent.Tools.ToProviderDefs()
// Native web search support
_, hasWebSearch := ts.agent.Tools.Get("web_search")
exec.useNativeSearch = al.cfg.Tools.Web.PreferNative && hasWebSearch &&
func() bool {
if ns, ok := ts.agent.Provider.(interface{ SupportsNativeSearch() bool }); ok {
return ns.SupportsNativeSearch()
}
return false
}()
if exec.useNativeSearch {
filtered := make([]providers.ToolDefinition, 0, len(exec.providerToolDefs))
for _, td := range exec.providerToolDefs {
if td.Function.Name != "web_search" {
filtered = append(filtered, td)
}
}
exec.providerToolDefs = filtered
}
exec.callMessages = exec.messages
if exec.gracefulTerminal {
exec.callMessages = append(append([]providers.Message(nil), exec.messages...), ts.interruptHintMessage())
exec.providerToolDefs = nil
ts.markGracefulTerminalUsed()
}
exec.llmOpts = map[string]any{
"max_tokens": ts.agent.MaxTokens,
"temperature": ts.agent.Temperature,
"prompt_cache_key": ts.agent.ID,
}
if exec.useNativeSearch {
exec.llmOpts["native_search"] = true
}
if ts.agent.ThinkingLevel != ThinkingOff {
if tc, ok := ts.agent.Provider.(providers.ThinkingCapable); ok && tc.SupportsThinking() {
exec.llmOpts["thinking_level"] = string(ts.agent.ThinkingLevel)
} else {
logger.WarnCF("agent", "thinking_level is set but current provider does not support it, ignoring",
map[string]any{"agent_id": ts.agent.ID, "thinking_level": string(ts.agent.ThinkingLevel)})
}
}
exec.llmModel = exec.activeModel
// BeforeLLM hook
if p.Hooks != nil {
llmReq, decision := p.Hooks.BeforeLLM(turnCtx, &LLMHookRequest{
Meta: ts.eventMeta("runTurn", "turn.llm.request"),
Context: cloneTurnContext(ts.turnCtx),
Model: exec.llmModel,
Messages: exec.callMessages,
Tools: exec.providerToolDefs,
Options: exec.llmOpts,
GracefulTerminal: exec.gracefulTerminal,
})
switch decision.normalizedAction() {
case HookActionContinue, HookActionModify:
if llmReq != nil {
exec.llmModel = llmReq.Model
exec.callMessages = llmReq.Messages
exec.providerToolDefs = llmReq.Tools
exec.llmOpts = llmReq.Options
}
case HookActionAbortTurn:
exec.abortedByHook = true
return ControlBreak, nil
case HookActionHardAbort:
_ = ts.requestHardAbort()
exec.abortedByHardAbort = true
return ControlBreak, nil
}
}
al.emitEvent(
EventKindLLMRequest,
ts.eventMeta("runTurn", "turn.llm.request"),
LLMRequestPayload{
Model: exec.llmModel,
MessagesCount: len(exec.callMessages),
ToolsCount: len(exec.providerToolDefs),
MaxTokens: ts.agent.MaxTokens,
Temperature: ts.agent.Temperature,
},
)
logger.DebugCF("agent", "LLM request",
map[string]any{
"agent_id": ts.agent.ID,
"iteration": iteration,
"model": exec.llmModel,
"messages_count": len(exec.callMessages),
"tools_count": len(exec.providerToolDefs),
"max_tokens": ts.agent.MaxTokens,
"temperature": ts.agent.Temperature,
"system_prompt_len": len(exec.callMessages[0].Content),
})
logger.DebugCF("agent", "Full LLM request",
map[string]any{
"iteration": iteration,
"messages_json": formatMessagesForLog(exec.callMessages),
"tools_json": formatToolsForLog(exec.providerToolDefs),
})
// LLM call closure with fallback support
callLLM := func(messagesForCall []providers.Message, toolDefsForCall []providers.ToolDefinition) (*providers.LLMResponse, error) {
providerCtx, providerCancel := context.WithCancel(turnCtx)
ts.setProviderCancel(providerCancel)
defer func() {
providerCancel()
ts.clearProviderCancel(providerCancel)
}()
al.activeRequests.Add(1)
defer al.activeRequests.Done()
if len(exec.activeCandidates) > 1 && p.Fallback != nil {
fbResult, fbErr := p.Fallback.Execute(
providerCtx,
exec.activeCandidates,
func(ctx context.Context, provider, model string) (*providers.LLMResponse, error) {
candidateProvider := exec.activeProvider
if cp, ok := ts.agent.CandidateProviders[providers.ModelKey(provider, model)]; ok {
candidateProvider = cp
}
return candidateProvider.Chat(ctx, messagesForCall, toolDefsForCall, model, exec.llmOpts)
},
)
if fbErr != nil {
return nil, fbErr
}
if fbResult.Provider != "" && len(fbResult.Attempts) > 0 {
logger.InfoCF(
"agent",
fmt.Sprintf("Fallback: succeeded with %s/%s after %d attempts",
fbResult.Provider, fbResult.Model, len(fbResult.Attempts)+1),
map[string]any{"agent_id": ts.agent.ID, "iteration": iteration},
)
}
return fbResult.Response, nil
}
return exec.activeProvider.Chat(providerCtx, messagesForCall, toolDefsForCall, exec.llmModel, exec.llmOpts)
}
// Retry loop
var err error
maxRetries := 2
for retry := 0; retry <= maxRetries; retry++ {
exec.response, err = callLLM(exec.callMessages, exec.providerToolDefs)
if err == nil {
break
}
if ts.hardAbortRequested() && errors.Is(err, context.Canceled) {
_ = ts.requestHardAbort()
exec.abortedByHardAbort = true
return ControlBreak, nil
}
// Retry without media if vision is unsupported
if hasMediaRefs(exec.callMessages) && isVisionUnsupportedError(err) && retry < maxRetries {
al.emitEvent(
EventKindLLMRetry,
ts.eventMeta("runTurn", "turn.llm.retry"),
LLMRetryPayload{
Attempt: retry + 1,
MaxRetries: maxRetries,
Reason: "vision_unsupported",
Error: err.Error(),
Backoff: 0,
},
)
logger.WarnCF("agent", "Vision unsupported, retrying without media", map[string]any{
"error": err.Error(),
"retry": retry,
})
exec.callMessages = stripMessageMedia(exec.callMessages)
if !ts.opts.NoHistory {
exec.history = stripMessageMedia(exec.history)
ts.agent.Sessions.SetHistory(ts.sessionKey, exec.history)
for i := range ts.persistedMessages {
ts.persistedMessages[i].Media = nil
}
ts.refreshRestorePointFromSession(ts.agent)
}
continue
}
errMsg := strings.ToLower(err.Error())
isTimeoutError := errors.Is(err, context.DeadlineExceeded) ||
strings.Contains(errMsg, "deadline exceeded") ||
strings.Contains(errMsg, "client.timeout") ||
strings.Contains(errMsg, "timed out") ||
strings.Contains(errMsg, "timeout exceeded")
isContextError := !isTimeoutError && (strings.Contains(errMsg, "context_length_exceeded") ||
strings.Contains(errMsg, "context window") ||
strings.Contains(errMsg, "context_window") ||
strings.Contains(errMsg, "maximum context length") ||
strings.Contains(errMsg, "token limit") ||
strings.Contains(errMsg, "too many tokens") ||
strings.Contains(errMsg, "max_tokens") ||
strings.Contains(errMsg, "invalidparameter") ||
strings.Contains(errMsg, "prompt is too long") ||
strings.Contains(errMsg, "request too large"))
if isTimeoutError && retry < maxRetries {
backoff := time.Duration(retry+1) * 5 * time.Second
al.emitEvent(
EventKindLLMRetry,
ts.eventMeta("runTurn", "turn.llm.retry"),
LLMRetryPayload{
Attempt: retry + 1,
MaxRetries: maxRetries,
Reason: "timeout",
Error: err.Error(),
Backoff: backoff,
},
)
logger.WarnCF("agent", "Timeout error, retrying after backoff", map[string]any{
"error": err.Error(),
"retry": retry,
"backoff": backoff.String(),
})
if sleepErr := sleepWithContext(turnCtx, backoff); sleepErr != nil {
if ts.hardAbortRequested() {
_ = ts.requestHardAbort()
return ControlBreak, nil
}
err = sleepErr
break
}
continue
}
if isContextError && retry < maxRetries && !ts.opts.NoHistory {
al.emitEvent(
EventKindLLMRetry,
ts.eventMeta("runTurn", "turn.llm.retry"),
LLMRetryPayload{
Attempt: retry + 1,
MaxRetries: maxRetries,
Reason: "context_limit",
Error: err.Error(),
},
)
logger.WarnCF(
"agent",
"Context window error detected, attempting compression",
map[string]any{
"error": err.Error(),
"retry": retry,
},
)
if retry == 0 && !constants.IsInternalChannel(ts.channel) {
al.bus.PublishOutbound(ctx, outboundMessageForTurn(
ts,
"Context window exceeded. Compressing history and retrying...",
))
}
if compactErr := p.ContextManager.Compact(ctx, &CompactRequest{
SessionKey: ts.sessionKey,
Reason: ContextCompressReasonRetry,
Budget: ts.agent.ContextWindow,
}); compactErr != nil {
logger.WarnCF("agent", "Context overflow compact failed", map[string]any{
"session_key": ts.sessionKey,
"error": compactErr.Error(),
})
}
ts.refreshRestorePointFromSession(ts.agent)
if asmResp, asmErr := p.ContextManager.Assemble(ctx, &AssembleRequest{
SessionKey: ts.sessionKey,
Budget: ts.agent.ContextWindow,
MaxTokens: ts.agent.MaxTokens,
}); asmErr == nil && asmResp != nil {
exec.history = asmResp.History
exec.summary = asmResp.Summary
}
exec.messages = ts.agent.ContextBuilder.BuildMessages(
exec.history, exec.summary, "",
nil, ts.channel, ts.chatID, ts.opts.Dispatch.SenderID(), ts.opts.SenderDisplayName,
activeSkillNames(ts.agent, ts.opts)...,
)
exec.callMessages = exec.messages
if exec.gracefulTerminal {
msgs := append([]providers.Message(nil), exec.messages...)
exec.callMessages = append(msgs, ts.interruptHintMessage())
}
continue
}
break
}
if err != nil {
al.emitEvent(
EventKindError,
ts.eventMeta("runTurn", "turn.error"),
ErrorPayload{
Stage: "llm",
Message: err.Error(),
},
)
logger.ErrorCF("agent", "LLM call failed",
map[string]any{
"agent_id": ts.agent.ID,
"iteration": iteration,
"model": exec.llmModel,
"error": err.Error(),
})
return ControlBreak, fmt.Errorf("LLM call failed after retries: %w", err)
}
// AfterLLM hook
if p.Hooks != nil {
llmResp, decision := p.Hooks.AfterLLM(turnCtx, &LLMHookResponse{
Meta: ts.eventMeta("runTurn", "turn.llm.response"),
Context: cloneTurnContext(ts.turnCtx),
Model: exec.llmModel,
Response: exec.response,
})
switch decision.normalizedAction() {
case HookActionContinue, HookActionModify:
if llmResp != nil && llmResp.Response != nil {
exec.response = llmResp.Response
}
case HookActionAbortTurn:
exec.abortedByHook = true
return ControlBreak, nil
case HookActionHardAbort:
_ = ts.requestHardAbort()
exec.abortedByHardAbort = true
return ControlBreak, nil
}
}
// Save finishReason to turnState for SubTurn truncation detection
if innerTS := turnStateFromContext(ctx); innerTS != nil {
innerTS.SetLastFinishReason(exec.response.FinishReason)
if exec.response.Usage != nil {
innerTS.SetLastUsage(exec.response.Usage)
}
}
reasoningContent := exec.response.Reasoning
if reasoningContent == "" {
reasoningContent = exec.response.ReasoningContent
}
if ts.channel == "pico" {
go al.publishPicoReasoning(turnCtx, reasoningContent, ts.chatID)
} else {
go al.handleReasoning(
turnCtx,
reasoningContent,
ts.channel,
al.targetReasoningChannelID(ts.channel),
)
}
al.emitEvent(
EventKindLLMResponse,
ts.eventMeta("runTurn", "turn.llm.response"),
LLMResponsePayload{
ContentLen: len(exec.response.Content),
ToolCalls: len(exec.response.ToolCalls),
HasReasoning: exec.response.Reasoning != "" || exec.response.ReasoningContent != "",
},
)
llmResponseFields := map[string]any{
"agent_id": ts.agent.ID,
"iteration": iteration,
"content_chars": len(exec.response.Content),
"tool_calls": len(exec.response.ToolCalls),
"reasoning": exec.response.Reasoning,
"target_channel": al.targetReasoningChannelID(ts.channel),
"channel": ts.channel,
}
if exec.response.Usage != nil {
llmResponseFields["prompt_tokens"] = exec.response.Usage.PromptTokens
llmResponseFields["completion_tokens"] = exec.response.Usage.CompletionTokens
llmResponseFields["total_tokens"] = exec.response.Usage.TotalTokens
}
logger.DebugCF("agent", "LLM response", llmResponseFields)
if al.bus != nil && ts.channel == "pico" && len(exec.response.ToolCalls) > 0 && ts.opts.AllowInterimPicoPublish {
if strings.TrimSpace(exec.response.Content) != "" {
outCtx, outCancel := context.WithTimeout(turnCtx, 3*time.Second)
publishErr := al.bus.PublishOutbound(outCtx, bus.OutboundMessage{
Channel: ts.channel,
ChatID: ts.chatID,
Content: exec.response.Content,
})
outCancel()
if publishErr != nil {
logger.WarnCF("agent", "Failed to publish pico interim tool-call content", map[string]any{
"error": publishErr.Error(),
"channel": ts.channel,
"chat_id": ts.chatID,
"iteration": iteration,
})
}
}
}
// No-tool-call path: steering check and direct response
if len(exec.response.ToolCalls) == 0 || exec.gracefulTerminal {
responseContent := exec.response.Content
if responseContent == "" && exec.response.ReasoningContent != "" && ts.channel != "pico" {
responseContent = exec.response.ReasoningContent
}
if steerMsgs := al.dequeueSteeringMessagesForScope(ts.sessionKey); len(steerMsgs) > 0 {
logger.InfoCF("agent", "Steering arrived after direct LLM response; continuing turn",
map[string]any{
"agent_id": ts.agent.ID,
"iteration": iteration,
"steering_count": len(steerMsgs),
})
exec.pendingMessages = append(exec.pendingMessages, steerMsgs...)
return ControlContinue, nil
}
exec.finalContent = responseContent
logger.InfoCF("agent", "LLM response without tool calls (direct answer)",
map[string]any{
"agent_id": ts.agent.ID,
"iteration": iteration,
"content_chars": len(exec.finalContent),
})
return ControlBreak, nil
}
// Tool-call path: normalize and prepare for tool execution
exec.normalizedToolCalls = make([]providers.ToolCall, 0, len(exec.response.ToolCalls))
for _, tc := range exec.response.ToolCalls {
exec.normalizedToolCalls = append(exec.normalizedToolCalls, providers.NormalizeToolCall(tc))
}
toolNames := make([]string, 0, len(exec.normalizedToolCalls))
for _, tc := range exec.normalizedToolCalls {
toolNames = append(toolNames, tc.Name)
}
logger.InfoCF("agent", "LLM requested tool calls",
map[string]any{
"agent_id": ts.agent.ID,
"tools": toolNames,
"count": len(exec.normalizedToolCalls),
"iteration": iteration,
})
exec.allResponsesHandled = len(exec.normalizedToolCalls) > 0
assistantMsg := providers.Message{
Role: "assistant",
Content: exec.response.Content,
ReasoningContent: exec.response.ReasoningContent,
}
for _, tc := range exec.normalizedToolCalls {
argumentsJSON, _ := json.Marshal(tc.Arguments)
extraContent := tc.ExtraContent
thoughtSignature := ""
if tc.Function != nil {
thoughtSignature = tc.Function.ThoughtSignature
}
assistantMsg.ToolCalls = append(assistantMsg.ToolCalls, providers.ToolCall{
ID: tc.ID,
Type: "function",
Name: tc.Name,
Function: &providers.FunctionCall{
Name: tc.Name,
Arguments: string(argumentsJSON),
ThoughtSignature: thoughtSignature,
},
ExtraContent: extraContent,
ThoughtSignature: thoughtSignature,
})
}
exec.messages = append(exec.messages, assistantMsg)
if !ts.opts.NoHistory {
ts.agent.Sessions.AddFullMessage(ts.sessionKey, assistantMsg)
ts.recordPersistedMessage(assistantMsg)
ts.ingestMessage(turnCtx, al, assistantMsg)
}
return ControlToolLoop, nil
}