mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
aed7296c0d
- Defer MCP server initialization to Run() using agent's context - Add mcpConfig and mcpInitOnce fields to AgentLoop - Use sync.Once to ensure MCP loads exactly once with proper context - Prevents orphaned subprocesses and resource leaks on cancellation This fixes GitHub Copilot feedback that MCP connections with context.Background() won't terminate when the agent stops, causing potential resource leaks and orphaned stdio/SSE connections.
1112 lines
36 KiB
Go
1112 lines
36 KiB
Go
// PicoClaw - Ultra-lightweight personal AI agent
|
|
// Inspired by and based on nanobot: https://github.com/HKUDS/nanobot
|
|
// License: MIT
|
|
//
|
|
// Copyright (c) 2026 PicoClaw contributors
|
|
|
|
package agent
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
"unicode/utf8"
|
|
|
|
"github.com/sipeed/picoclaw/pkg/bus"
|
|
"github.com/sipeed/picoclaw/pkg/channels"
|
|
"github.com/sipeed/picoclaw/pkg/config"
|
|
"github.com/sipeed/picoclaw/pkg/constants"
|
|
"github.com/sipeed/picoclaw/pkg/logger"
|
|
"github.com/sipeed/picoclaw/pkg/mcp"
|
|
"github.com/sipeed/picoclaw/pkg/providers"
|
|
"github.com/sipeed/picoclaw/pkg/session"
|
|
"github.com/sipeed/picoclaw/pkg/state"
|
|
"github.com/sipeed/picoclaw/pkg/tools"
|
|
"github.com/sipeed/picoclaw/pkg/utils"
|
|
)
|
|
|
|
type AgentLoop struct {
|
|
bus *bus.MessageBus
|
|
provider providers.LLMProvider
|
|
workspace string
|
|
model string
|
|
contextWindow int // Maximum context window size in tokens
|
|
maxIterations int
|
|
sessions *session.SessionManager
|
|
state *state.Manager
|
|
contextBuilder *ContextBuilder
|
|
tools *tools.ToolRegistry
|
|
mcpManager *mcp.Manager // MCP server manager for resource cleanup
|
|
mcpConfig *config.Config // Config for lazy MCP initialization
|
|
mcpInitOnce sync.Once // Ensures MCP is initialized only once
|
|
running atomic.Bool
|
|
summarizing sync.Map // Tracks which sessions are currently being summarized
|
|
channelManager *channels.Manager
|
|
}
|
|
|
|
// processOptions configures how a message is processed
|
|
type processOptions struct {
|
|
SessionKey string // Session identifier for history/context
|
|
Channel string // Target channel for tool execution
|
|
ChatID string // Target chat ID for tool execution
|
|
UserMessage string // User message content (may include prefix)
|
|
DefaultResponse string // Response when LLM returns empty
|
|
EnableSummary bool // Whether to trigger summarization
|
|
SendResponse bool // Whether to send response via bus
|
|
NoHistory bool // If true, don't load session history (for heartbeat)
|
|
}
|
|
|
|
// createToolRegistry creates a tool registry with common tools.
|
|
// This is shared between main agent and subagents.
|
|
func createToolRegistry(workspace string, restrict bool, cfg *config.Config, msgBus *bus.MessageBus, mcpManager *mcp.Manager) *tools.ToolRegistry {
|
|
registry := tools.NewToolRegistry()
|
|
|
|
// File system tools
|
|
registry.Register(tools.NewReadFileTool(workspace, restrict))
|
|
registry.Register(tools.NewWriteFileTool(workspace, restrict))
|
|
registry.Register(tools.NewListDirTool(workspace, restrict))
|
|
registry.Register(tools.NewEditFileTool(workspace, restrict))
|
|
registry.Register(tools.NewAppendFileTool(workspace, restrict))
|
|
|
|
// Shell execution
|
|
registry.Register(tools.NewExecTool(workspace, restrict))
|
|
|
|
if searchTool := tools.NewWebSearchTool(tools.WebSearchToolOptions{
|
|
BraveAPIKey: cfg.Tools.Web.Brave.APIKey,
|
|
BraveMaxResults: cfg.Tools.Web.Brave.MaxResults,
|
|
BraveEnabled: cfg.Tools.Web.Brave.Enabled,
|
|
DuckDuckGoMaxResults: cfg.Tools.Web.DuckDuckGo.MaxResults,
|
|
DuckDuckGoEnabled: cfg.Tools.Web.DuckDuckGo.Enabled,
|
|
}); searchTool != nil {
|
|
registry.Register(searchTool)
|
|
}
|
|
registry.Register(tools.NewWebFetchTool(50000))
|
|
|
|
// Hardware tools (I2C, SPI) - Linux only, returns error on other platforms
|
|
registry.Register(tools.NewI2CTool())
|
|
registry.Register(tools.NewSPITool())
|
|
|
|
// Message tool - available to both agent and subagent
|
|
// Subagent uses it to communicate directly with user
|
|
messageTool := tools.NewMessageTool()
|
|
messageTool.SetSendCallback(func(channel, chatID, content string) error {
|
|
msgBus.PublishOutbound(bus.OutboundMessage{
|
|
Channel: channel,
|
|
ChatID: chatID,
|
|
Content: content,
|
|
})
|
|
return nil
|
|
})
|
|
registry.Register(messageTool)
|
|
|
|
// Register MCP tools from all connected servers
|
|
if mcpManager != nil {
|
|
servers := mcpManager.GetServers()
|
|
for serverName, conn := range servers {
|
|
for _, tool := range conn.Tools {
|
|
mcpTool := tools.NewMCPTool(mcpManager, serverName, tool)
|
|
registry.Register(mcpTool)
|
|
logger.DebugCF("agent", "Registered MCP tool",
|
|
map[string]interface{}{
|
|
"server": serverName,
|
|
"tool": tool.Name,
|
|
"name": mcpTool.Name(),
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
return registry
|
|
}
|
|
|
|
func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers.LLMProvider) *AgentLoop {
|
|
workspace := cfg.WorkspacePath()
|
|
os.MkdirAll(workspace, 0755)
|
|
|
|
restrict := cfg.Agents.Defaults.RestrictToWorkspace
|
|
|
|
// Create MCP Manager (actual server loading deferred to Run())
|
|
// This ensures MCP connections use the agent's lifecycle context
|
|
mcpManager := mcp.NewManager()
|
|
|
|
// Create tool registry for main agent
|
|
toolsRegistry := createToolRegistry(workspace, restrict, cfg, msgBus, mcpManager)
|
|
|
|
// Create subagent manager with its own tool registry
|
|
subagentManager := tools.NewSubagentManager(provider, cfg.Agents.Defaults.Model, workspace, msgBus)
|
|
subagentTools := createToolRegistry(workspace, restrict, cfg, msgBus, mcpManager)
|
|
// Subagent doesn't need spawn/subagent tools to avoid recursion
|
|
subagentManager.SetTools(subagentTools)
|
|
|
|
// Register spawn tool (for main agent)
|
|
spawnTool := tools.NewSpawnTool(subagentManager)
|
|
toolsRegistry.Register(spawnTool)
|
|
|
|
// Register subagent tool (synchronous execution)
|
|
subagentTool := tools.NewSubagentTool(subagentManager)
|
|
toolsRegistry.Register(subagentTool)
|
|
|
|
sessionsManager := session.NewSessionManager(filepath.Join(workspace, "sessions"))
|
|
|
|
// Create state manager for atomic state persistence
|
|
stateManager := state.NewManager(workspace)
|
|
|
|
// Create context builder and set tools registry
|
|
contextBuilder := NewContextBuilder(workspace)
|
|
contextBuilder.SetToolsRegistry(toolsRegistry)
|
|
|
|
return &AgentLoop{
|
|
bus: msgBus,
|
|
provider: provider,
|
|
workspace: workspace,
|
|
model: cfg.Agents.Defaults.Model,
|
|
contextWindow: cfg.Agents.Defaults.MaxTokens, // Restore context window for summarization
|
|
maxIterations: cfg.Agents.Defaults.MaxToolIterations,
|
|
sessions: sessionsManager,
|
|
state: stateManager,
|
|
contextBuilder: contextBuilder,
|
|
tools: toolsRegistry,
|
|
mcpManager: mcpManager,
|
|
mcpConfig: cfg, // Store config for lazy initialization in Run()
|
|
summarizing: sync.Map{},
|
|
}
|
|
}
|
|
|
|
func (al *AgentLoop) Run(ctx context.Context) error {
|
|
al.running.Store(true)
|
|
|
|
// Initialize MCP servers using the agent's lifecycle context
|
|
// This ensures MCP connections are cancelled when the agent stops
|
|
al.mcpInitOnce.Do(func() {
|
|
if err := al.mcpManager.LoadFromConfig(ctx, al.mcpConfig); err != nil {
|
|
logger.WarnCF("agent", "Failed to load MCP servers, MCP tools will not be available",
|
|
map[string]interface{}{
|
|
"error": err.Error(),
|
|
})
|
|
}
|
|
})
|
|
|
|
for al.running.Load() {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
default:
|
|
msg, ok := al.bus.ConsumeInbound(ctx)
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
response, err := al.processMessage(ctx, msg)
|
|
if err != nil {
|
|
response = fmt.Sprintf("Error processing message: %v", err)
|
|
}
|
|
|
|
if response != "" {
|
|
// Check if the message tool already sent a response during this round.
|
|
// If so, skip publishing to avoid duplicate messages to the user.
|
|
alreadySent := false
|
|
if tool, ok := al.tools.Get("message"); ok {
|
|
if mt, ok := tool.(*tools.MessageTool); ok {
|
|
alreadySent = mt.HasSentInRound()
|
|
}
|
|
}
|
|
|
|
if !alreadySent {
|
|
al.bus.PublishOutbound(bus.OutboundMessage{
|
|
Channel: msg.Channel,
|
|
ChatID: msg.ChatID,
|
|
Content: response,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (al *AgentLoop) Stop() {
|
|
al.running.Store(false)
|
|
|
|
// Clean up MCP connections
|
|
if al.mcpManager != nil {
|
|
if err := al.mcpManager.Close(); err != nil {
|
|
logger.ErrorCF("agent", "Failed to close MCP manager",
|
|
map[string]interface{}{
|
|
"error": err.Error(),
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
func (al *AgentLoop) RegisterTool(tool tools.Tool) {
|
|
al.tools.Register(tool)
|
|
}
|
|
|
|
func (al *AgentLoop) SetChannelManager(cm *channels.Manager) {
|
|
al.channelManager = cm
|
|
}
|
|
|
|
// RecordLastChannel records the last active channel for this workspace.
|
|
// This uses the atomic state save mechanism to prevent data loss on crash.
|
|
func (al *AgentLoop) RecordLastChannel(channel string) error {
|
|
return al.state.SetLastChannel(channel)
|
|
}
|
|
|
|
// RecordLastChatID records the last active chat ID for this workspace.
|
|
// This uses the atomic state save mechanism to prevent data loss on crash.
|
|
func (al *AgentLoop) RecordLastChatID(chatID string) error {
|
|
return al.state.SetLastChatID(chatID)
|
|
}
|
|
|
|
func (al *AgentLoop) ProcessDirect(ctx context.Context, content, sessionKey string) (string, error) {
|
|
return al.ProcessDirectWithChannel(ctx, content, sessionKey, "cli", "direct")
|
|
}
|
|
|
|
func (al *AgentLoop) ProcessDirectWithChannel(ctx context.Context, content, sessionKey, channel, chatID string) (string, error) {
|
|
msg := bus.InboundMessage{
|
|
Channel: channel,
|
|
SenderID: "cron",
|
|
ChatID: chatID,
|
|
Content: content,
|
|
SessionKey: sessionKey,
|
|
}
|
|
|
|
return al.processMessage(ctx, msg)
|
|
}
|
|
|
|
// ProcessHeartbeat processes a heartbeat request without session history.
|
|
// Each heartbeat is independent and doesn't accumulate context.
|
|
func (al *AgentLoop) ProcessHeartbeat(ctx context.Context, content, channel, chatID string) (string, error) {
|
|
return al.runAgentLoop(ctx, processOptions{
|
|
SessionKey: "heartbeat",
|
|
Channel: channel,
|
|
ChatID: chatID,
|
|
UserMessage: content,
|
|
DefaultResponse: "I've completed processing but have no response to give.",
|
|
EnableSummary: false,
|
|
SendResponse: false,
|
|
NoHistory: true, // Don't load session history for heartbeat
|
|
})
|
|
}
|
|
|
|
func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) (string, error) {
|
|
// Add message preview to log (show full content for error messages)
|
|
var logContent string
|
|
if strings.Contains(msg.Content, "Error:") || strings.Contains(msg.Content, "error") {
|
|
logContent = msg.Content // Full content for errors
|
|
} else {
|
|
logContent = utils.Truncate(msg.Content, 80)
|
|
}
|
|
logger.InfoCF("agent", fmt.Sprintf("Processing message from %s:%s: %s", msg.Channel, msg.SenderID, logContent),
|
|
map[string]interface{}{
|
|
"channel": msg.Channel,
|
|
"chat_id": msg.ChatID,
|
|
"sender_id": msg.SenderID,
|
|
"session_key": msg.SessionKey,
|
|
})
|
|
|
|
// Route system messages to processSystemMessage
|
|
if msg.Channel == "system" {
|
|
return al.processSystemMessage(ctx, msg)
|
|
}
|
|
|
|
// Check for commands
|
|
if response, handled := al.handleCommand(ctx, msg); handled {
|
|
return response, nil
|
|
}
|
|
|
|
// Process as user message
|
|
return al.runAgentLoop(ctx, processOptions{
|
|
SessionKey: msg.SessionKey,
|
|
Channel: msg.Channel,
|
|
ChatID: msg.ChatID,
|
|
UserMessage: msg.Content,
|
|
DefaultResponse: "I've completed processing but have no response to give.",
|
|
EnableSummary: true,
|
|
SendResponse: false,
|
|
})
|
|
}
|
|
|
|
func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMessage) (string, error) {
|
|
// Verify this is a system message
|
|
if msg.Channel != "system" {
|
|
return "", fmt.Errorf("processSystemMessage called with non-system message channel: %s", msg.Channel)
|
|
}
|
|
|
|
logger.InfoCF("agent", "Processing system message",
|
|
map[string]interface{}{
|
|
"sender_id": msg.SenderID,
|
|
"chat_id": msg.ChatID,
|
|
})
|
|
|
|
// Parse origin channel from chat_id (format: "channel:chat_id")
|
|
var originChannel string
|
|
if idx := strings.Index(msg.ChatID, ":"); idx > 0 {
|
|
originChannel = msg.ChatID[:idx]
|
|
} else {
|
|
// Fallback
|
|
originChannel = "cli"
|
|
}
|
|
|
|
// Extract subagent result from message content
|
|
// Format: "Task 'label' completed.\n\nResult:\n<actual content>"
|
|
content := msg.Content
|
|
if idx := strings.Index(content, "Result:\n"); idx >= 0 {
|
|
content = content[idx+8:] // Extract just the result part
|
|
}
|
|
|
|
// Skip internal channels - only log, don't send to user
|
|
if constants.IsInternalChannel(originChannel) {
|
|
logger.InfoCF("agent", "Subagent completed (internal channel)",
|
|
map[string]interface{}{
|
|
"sender_id": msg.SenderID,
|
|
"content_len": len(content),
|
|
"channel": originChannel,
|
|
})
|
|
return "", nil
|
|
}
|
|
|
|
// Agent acts as dispatcher only - subagent handles user interaction via message tool
|
|
// Don't forward result here, subagent should use message tool to communicate with user
|
|
logger.InfoCF("agent", "Subagent completed",
|
|
map[string]interface{}{
|
|
"sender_id": msg.SenderID,
|
|
"channel": originChannel,
|
|
"content_len": len(content),
|
|
})
|
|
|
|
// Agent only logs, does not respond to user
|
|
return "", nil
|
|
}
|
|
|
|
// runAgentLoop is the core message processing logic.
|
|
// It handles context building, LLM calls, tool execution, and response handling.
|
|
func (al *AgentLoop) runAgentLoop(ctx context.Context, opts processOptions) (string, error) {
|
|
// 0. Record last channel for heartbeat notifications (skip internal channels)
|
|
if opts.Channel != "" && opts.ChatID != "" {
|
|
// Don't record internal channels (cli, system, subagent)
|
|
if !constants.IsInternalChannel(opts.Channel) {
|
|
channelKey := fmt.Sprintf("%s:%s", opts.Channel, opts.ChatID)
|
|
if err := al.RecordLastChannel(channelKey); err != nil {
|
|
logger.WarnCF("agent", "Failed to record last channel: %v", map[string]interface{}{"error": err.Error()})
|
|
}
|
|
}
|
|
}
|
|
|
|
// 1. Update tool contexts
|
|
al.updateToolContexts(opts.Channel, opts.ChatID)
|
|
|
|
// 2. Build messages (skip history for heartbeat)
|
|
var history []providers.Message
|
|
var summary string
|
|
if !opts.NoHistory {
|
|
history = al.sessions.GetHistory(opts.SessionKey)
|
|
summary = al.sessions.GetSummary(opts.SessionKey)
|
|
}
|
|
messages := al.contextBuilder.BuildMessages(
|
|
history,
|
|
summary,
|
|
opts.UserMessage,
|
|
nil,
|
|
opts.Channel,
|
|
opts.ChatID,
|
|
)
|
|
|
|
// 3. Save user message to session
|
|
al.sessions.AddMessage(opts.SessionKey, "user", opts.UserMessage)
|
|
|
|
// 4. Run LLM iteration loop
|
|
finalContent, iteration, err := al.runLLMIteration(ctx, messages, opts)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// If last tool had ForUser content and we already sent it, we might not need to send final response
|
|
// This is controlled by the tool's Silent flag and ForUser content
|
|
|
|
// 5. Handle empty response
|
|
if finalContent == "" {
|
|
finalContent = opts.DefaultResponse
|
|
}
|
|
|
|
// 6. Save final assistant message to session
|
|
al.sessions.AddMessage(opts.SessionKey, "assistant", finalContent)
|
|
al.sessions.Save(opts.SessionKey)
|
|
|
|
// 7. Optional: summarization
|
|
if opts.EnableSummary {
|
|
al.maybeSummarize(opts.SessionKey, opts.Channel, opts.ChatID)
|
|
}
|
|
|
|
// 8. Optional: send response via bus
|
|
if opts.SendResponse {
|
|
al.bus.PublishOutbound(bus.OutboundMessage{
|
|
Channel: opts.Channel,
|
|
ChatID: opts.ChatID,
|
|
Content: finalContent,
|
|
})
|
|
}
|
|
|
|
// 9. Log response
|
|
responsePreview := utils.Truncate(finalContent, 120)
|
|
logger.InfoCF("agent", fmt.Sprintf("Response: %s", responsePreview),
|
|
map[string]interface{}{
|
|
"session_key": opts.SessionKey,
|
|
"iterations": iteration,
|
|
"final_length": len(finalContent),
|
|
})
|
|
|
|
return finalContent, nil
|
|
}
|
|
|
|
// runLLMIteration executes the LLM call loop with tool handling.
|
|
// Returns the final content, iteration count, and any error.
|
|
func (al *AgentLoop) runLLMIteration(ctx context.Context, messages []providers.Message, opts processOptions) (string, int, error) {
|
|
iteration := 0
|
|
var finalContent string
|
|
|
|
for iteration < al.maxIterations {
|
|
iteration++
|
|
|
|
logger.DebugCF("agent", "LLM iteration",
|
|
map[string]interface{}{
|
|
"iteration": iteration,
|
|
"max": al.maxIterations,
|
|
})
|
|
|
|
// Build tool definitions
|
|
providerToolDefs := al.tools.ToProviderDefs()
|
|
|
|
// Log LLM request details
|
|
logger.DebugCF("agent", "LLM request",
|
|
map[string]interface{}{
|
|
"iteration": iteration,
|
|
"model": al.model,
|
|
"messages_count": len(messages),
|
|
"tools_count": len(providerToolDefs),
|
|
"max_tokens": 8192,
|
|
"temperature": 0.7,
|
|
"system_prompt_len": len(messages[0].Content),
|
|
})
|
|
|
|
// Log full messages (detailed)
|
|
logger.DebugCF("agent", "Full LLM request",
|
|
map[string]interface{}{
|
|
"iteration": iteration,
|
|
"messages_json": formatMessagesForLog(messages),
|
|
"tools_json": formatToolsForLog(providerToolDefs),
|
|
})
|
|
|
|
var response *providers.LLMResponse
|
|
var err error
|
|
|
|
// Retry loop for context/token errors
|
|
maxRetries := 2
|
|
for retry := 0; retry <= maxRetries; retry++ {
|
|
response, err = al.provider.Chat(ctx, messages, providerToolDefs, al.model, map[string]interface{}{
|
|
"max_tokens": 8192,
|
|
"temperature": 0.7,
|
|
})
|
|
|
|
if err == nil {
|
|
break // Success
|
|
}
|
|
|
|
errMsg := strings.ToLower(err.Error())
|
|
// Check for context window errors (provider specific, but usually contain "token" or "invalid")
|
|
isContextError := strings.Contains(errMsg, "token") ||
|
|
strings.Contains(errMsg, "context") ||
|
|
strings.Contains(errMsg, "invalidparameter") ||
|
|
strings.Contains(errMsg, "length")
|
|
|
|
if isContextError && retry < maxRetries {
|
|
logger.WarnCF("agent", "Context window error detected, attempting compression", map[string]interface{}{
|
|
"error": err.Error(),
|
|
"retry": retry,
|
|
})
|
|
|
|
// Notify user on first retry only
|
|
if retry == 0 && !constants.IsInternalChannel(opts.Channel) && opts.SendResponse {
|
|
al.bus.PublishOutbound(bus.OutboundMessage{
|
|
Channel: opts.Channel,
|
|
ChatID: opts.ChatID,
|
|
Content: "⚠️ Context window exceeded. Compressing history and retrying...",
|
|
})
|
|
}
|
|
|
|
// Force compression
|
|
al.forceCompression(opts.SessionKey)
|
|
|
|
// Rebuild messages with compressed history
|
|
// Note: We need to reload history from session manager because forceCompression changed it
|
|
newHistory := al.sessions.GetHistory(opts.SessionKey)
|
|
newSummary := al.sessions.GetSummary(opts.SessionKey)
|
|
|
|
// Re-create messages for the next attempt
|
|
// We keep the current user message (opts.UserMessage) effectively
|
|
messages = al.contextBuilder.BuildMessages(
|
|
newHistory,
|
|
newSummary,
|
|
opts.UserMessage,
|
|
nil,
|
|
opts.Channel,
|
|
opts.ChatID,
|
|
)
|
|
|
|
// Important: If we are in the middle of a tool loop (iteration > 1),
|
|
// rebuilding messages from session history might duplicate the flow or miss context
|
|
// if intermediate steps weren't saved correctly.
|
|
// However, al.sessions.AddFullMessage is called after every tool execution,
|
|
// so GetHistory should reflect the current state including partial tool execution.
|
|
// But we need to ensure we don't duplicate the user message which is appended in BuildMessages.
|
|
// BuildMessages(history...) takes the stored history and appends the *current* user message.
|
|
// If iteration > 1, the "current user message" was already added to history in step 3 of runAgentLoop.
|
|
// So if we pass opts.UserMessage again, we might duplicate it?
|
|
// Actually, step 3 is: al.sessions.AddMessage(opts.SessionKey, "user", opts.UserMessage)
|
|
// So GetHistory ALREADY contains the user message!
|
|
|
|
// CORRECTION:
|
|
// BuildMessages combines: [System] + [History] + [CurrentMessage]
|
|
// But Step 3 added CurrentMessage to History.
|
|
// So if we use GetHistory now, it has the user message.
|
|
// If we pass opts.UserMessage to BuildMessages, it adds it AGAIN.
|
|
|
|
// For retry in the middle of a loop, we should rely on what's in the session.
|
|
// BUT checking BuildMessages implementation:
|
|
// It appends history... then appends currentMessage.
|
|
|
|
// Logic fix for retry:
|
|
// If iteration == 1, opts.UserMessage corresponds to the user input.
|
|
// If iteration > 1, we are processing tool results. The "messages" passed to Chat
|
|
// already accumulated tool outputs.
|
|
// Rebuilding from session history is safest because it persists state.
|
|
// Start fresh with rebuilt history.
|
|
|
|
// Special case: standard BuildMessages appends "currentMessage".
|
|
// If we are strictly retrying the *LLM call*, we want the exact same state as before but compressed.
|
|
// However, the "messages" argument passed to runLLMIteration is constructed by the caller.
|
|
// If we rebuild from Session, we need to know if "currentMessage" should be appended or is already in history.
|
|
|
|
// In runAgentLoop:
|
|
// 3. sessions.AddMessage(userMsg)
|
|
// 4. runLLMIteration(..., UserMessage)
|
|
|
|
// So History contains the user message.
|
|
// BuildMessages typically appends the user message as a *new* pending message.
|
|
// Wait, standard BuildMessages usage in runAgentLoop:
|
|
// messages := BuildMessages(history (has old), UserMessage)
|
|
// THEN AddMessage(UserMessage).
|
|
// So "history" passed to BuildMessages does NOT contain the current UserMessage yet.
|
|
|
|
// But here, inside the loop, we have already saved it.
|
|
// So GetHistory() includes the current user message.
|
|
// If we call BuildMessages(GetHistory(), UserMessage), we get duplicates.
|
|
|
|
// Hack/Fix:
|
|
// If we are retrying, we rebuild from Session History ONLY.
|
|
// We pass empty string as "currentMessage" to BuildMessages
|
|
// because the "current message" is already saved in history (step 3).
|
|
|
|
messages = al.contextBuilder.BuildMessages(
|
|
newHistory,
|
|
newSummary,
|
|
"", // Empty because history already contains the relevant messages
|
|
nil,
|
|
opts.Channel,
|
|
opts.ChatID,
|
|
)
|
|
|
|
continue
|
|
}
|
|
|
|
// Real error or success, break loop
|
|
break
|
|
}
|
|
|
|
if err != nil {
|
|
logger.ErrorCF("agent", "LLM call failed",
|
|
map[string]interface{}{
|
|
"iteration": iteration,
|
|
"error": err.Error(),
|
|
})
|
|
return "", iteration, fmt.Errorf("LLM call failed after retries: %w", err)
|
|
}
|
|
|
|
// Check if no tool calls - we're done
|
|
if len(response.ToolCalls) == 0 {
|
|
finalContent = response.Content
|
|
logger.InfoCF("agent", "LLM response without tool calls (direct answer)",
|
|
map[string]interface{}{
|
|
"iteration": iteration,
|
|
"content_chars": len(finalContent),
|
|
})
|
|
break
|
|
}
|
|
|
|
// Log tool calls
|
|
toolNames := make([]string, 0, len(response.ToolCalls))
|
|
for _, tc := range response.ToolCalls {
|
|
toolNames = append(toolNames, tc.Name)
|
|
}
|
|
logger.InfoCF("agent", "LLM requested tool calls",
|
|
map[string]interface{}{
|
|
"tools": toolNames,
|
|
"count": len(response.ToolCalls),
|
|
"iteration": iteration,
|
|
})
|
|
|
|
// Build assistant message with tool calls
|
|
assistantMsg := providers.Message{
|
|
Role: "assistant",
|
|
Content: response.Content,
|
|
}
|
|
for _, tc := range response.ToolCalls {
|
|
argumentsJSON, _ := json.Marshal(tc.Arguments)
|
|
assistantMsg.ToolCalls = append(assistantMsg.ToolCalls, providers.ToolCall{
|
|
ID: tc.ID,
|
|
Type: "function",
|
|
Function: &providers.FunctionCall{
|
|
Name: tc.Name,
|
|
Arguments: string(argumentsJSON),
|
|
},
|
|
})
|
|
}
|
|
messages = append(messages, assistantMsg)
|
|
|
|
// Save assistant message with tool calls to session
|
|
al.sessions.AddFullMessage(opts.SessionKey, assistantMsg)
|
|
|
|
// Execute tool calls
|
|
for _, tc := range response.ToolCalls {
|
|
// Log tool call with arguments preview
|
|
argsJSON, _ := json.Marshal(tc.Arguments)
|
|
argsPreview := utils.Truncate(string(argsJSON), 200)
|
|
logger.InfoCF("agent", fmt.Sprintf("Tool call: %s(%s)", tc.Name, argsPreview),
|
|
map[string]interface{}{
|
|
"tool": tc.Name,
|
|
"iteration": iteration,
|
|
})
|
|
|
|
// Create async callback for tools that implement AsyncTool
|
|
// NOTE: Following openclaw's design, async tools do NOT send results directly to users.
|
|
// Instead, they notify the agent via PublishInbound, and the agent decides
|
|
// whether to forward the result to the user (in processSystemMessage).
|
|
asyncCallback := func(callbackCtx context.Context, result *tools.ToolResult) {
|
|
// Log the async completion but don't send directly to user
|
|
// The agent will handle user notification via processSystemMessage
|
|
if !result.Silent && result.ForUser != "" {
|
|
logger.InfoCF("agent", "Async tool completed, agent will handle notification",
|
|
map[string]interface{}{
|
|
"tool": tc.Name,
|
|
"content_len": len(result.ForUser),
|
|
})
|
|
}
|
|
}
|
|
|
|
toolResult := al.tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, opts.Channel, opts.ChatID, asyncCallback)
|
|
|
|
// Send ForUser content to user immediately if not Silent
|
|
if !toolResult.Silent && toolResult.ForUser != "" && opts.SendResponse {
|
|
al.bus.PublishOutbound(bus.OutboundMessage{
|
|
Channel: opts.Channel,
|
|
ChatID: opts.ChatID,
|
|
Content: toolResult.ForUser,
|
|
})
|
|
logger.DebugCF("agent", "Sent tool result to user",
|
|
map[string]interface{}{
|
|
"tool": tc.Name,
|
|
"content_len": len(toolResult.ForUser),
|
|
})
|
|
}
|
|
|
|
// Determine content for LLM based on tool result
|
|
contentForLLM := toolResult.ForLLM
|
|
if contentForLLM == "" && toolResult.Err != nil {
|
|
contentForLLM = toolResult.Err.Error()
|
|
}
|
|
|
|
toolResultMsg := providers.Message{
|
|
Role: "tool",
|
|
Content: contentForLLM,
|
|
ToolCallID: tc.ID,
|
|
}
|
|
messages = append(messages, toolResultMsg)
|
|
|
|
// Save tool result message to session
|
|
al.sessions.AddFullMessage(opts.SessionKey, toolResultMsg)
|
|
}
|
|
}
|
|
|
|
return finalContent, iteration, nil
|
|
}
|
|
|
|
// updateToolContexts updates the context for tools that need channel/chatID info.
|
|
func (al *AgentLoop) updateToolContexts(channel, chatID string) {
|
|
// Use ContextualTool interface instead of type assertions
|
|
if tool, ok := al.tools.Get("message"); ok {
|
|
if mt, ok := tool.(tools.ContextualTool); ok {
|
|
mt.SetContext(channel, chatID)
|
|
}
|
|
}
|
|
if tool, ok := al.tools.Get("spawn"); ok {
|
|
if st, ok := tool.(tools.ContextualTool); ok {
|
|
st.SetContext(channel, chatID)
|
|
}
|
|
}
|
|
if tool, ok := al.tools.Get("subagent"); ok {
|
|
if st, ok := tool.(tools.ContextualTool); ok {
|
|
st.SetContext(channel, chatID)
|
|
}
|
|
}
|
|
}
|
|
|
|
// maybeSummarize triggers summarization if the session history exceeds thresholds.
|
|
func (al *AgentLoop) maybeSummarize(sessionKey, channel, chatID string) {
|
|
newHistory := al.sessions.GetHistory(sessionKey)
|
|
tokenEstimate := al.estimateTokens(newHistory)
|
|
threshold := al.contextWindow * 75 / 100
|
|
|
|
if len(newHistory) > 20 || tokenEstimate > threshold {
|
|
if _, loading := al.summarizing.LoadOrStore(sessionKey, true); !loading {
|
|
go func() {
|
|
defer al.summarizing.Delete(sessionKey)
|
|
// Notify user about optimization if not an internal channel
|
|
if !constants.IsInternalChannel(channel) {
|
|
al.bus.PublishOutbound(bus.OutboundMessage{
|
|
Channel: channel,
|
|
ChatID: chatID,
|
|
Content: "⚠️ Memory threshold reached. Optimizing conversation history...",
|
|
})
|
|
}
|
|
al.summarizeSession(sessionKey)
|
|
}()
|
|
}
|
|
}
|
|
}
|
|
|
|
// forceCompression aggressively reduces context when the limit is hit.
|
|
// It drops the oldest 50% of messages (keeping system prompt and last user message).
|
|
func (al *AgentLoop) forceCompression(sessionKey string) {
|
|
history := al.sessions.GetHistory(sessionKey)
|
|
if len(history) <= 4 {
|
|
return
|
|
}
|
|
|
|
// Keep system prompt (usually [0]) and the very last message (user's trigger)
|
|
// We want to drop the oldest half of the *conversation*
|
|
// Assuming [0] is system, [1:] is conversation
|
|
conversation := history[1 : len(history)-1]
|
|
if len(conversation) == 0 {
|
|
return
|
|
}
|
|
|
|
// Helper to find the mid-point of the conversation
|
|
mid := len(conversation) / 2
|
|
|
|
// New history structure:
|
|
// 1. System Prompt
|
|
// 2. [Summary of dropped part] - synthesized
|
|
// 3. Second half of conversation
|
|
// 4. Last message
|
|
|
|
// Simplified approach for emergency: Drop first half of conversation
|
|
// and rely on existing summary if present, or create a placeholder.
|
|
|
|
droppedCount := mid
|
|
keptConversation := conversation[mid:]
|
|
|
|
newHistory := make([]providers.Message, 0)
|
|
newHistory = append(newHistory, history[0]) // System prompt
|
|
|
|
// Add a note about compression
|
|
compressionNote := fmt.Sprintf("[System: Emergency compression dropped %d oldest messages due to context limit]", droppedCount)
|
|
// If there was an existing summary, we might lose it if it was in the dropped part (which is just messages).
|
|
// The summary is stored separately in session.Summary, so it persists!
|
|
// We just need to ensure the user knows there's a gap.
|
|
|
|
// We only modify the messages list here
|
|
newHistory = append(newHistory, providers.Message{
|
|
Role: "system",
|
|
Content: compressionNote,
|
|
})
|
|
|
|
newHistory = append(newHistory, keptConversation...)
|
|
newHistory = append(newHistory, history[len(history)-1]) // Last message
|
|
|
|
// Update session
|
|
al.sessions.SetHistory(sessionKey, newHistory)
|
|
al.sessions.Save(sessionKey)
|
|
|
|
logger.WarnCF("agent", "Forced compression executed", map[string]interface{}{
|
|
"session_key": sessionKey,
|
|
"dropped_msgs": droppedCount,
|
|
"new_count": len(newHistory),
|
|
})
|
|
}
|
|
|
|
// GetStartupInfo returns information about loaded tools and skills for logging.
|
|
func (al *AgentLoop) GetStartupInfo() map[string]interface{} {
|
|
info := make(map[string]interface{})
|
|
|
|
// Tools info
|
|
tools := al.tools.List()
|
|
info["tools"] = map[string]interface{}{
|
|
"count": len(tools),
|
|
"names": tools,
|
|
}
|
|
|
|
// Skills info
|
|
info["skills"] = al.contextBuilder.GetSkillsInfo()
|
|
|
|
return info
|
|
}
|
|
|
|
// formatMessagesForLog formats messages for logging
|
|
func formatMessagesForLog(messages []providers.Message) string {
|
|
if len(messages) == 0 {
|
|
return "[]"
|
|
}
|
|
|
|
var result string
|
|
result += "[\n"
|
|
for i, msg := range messages {
|
|
result += fmt.Sprintf(" [%d] Role: %s\n", i, msg.Role)
|
|
if len(msg.ToolCalls) > 0 {
|
|
result += " ToolCalls:\n"
|
|
for _, tc := range msg.ToolCalls {
|
|
result += fmt.Sprintf(" - ID: %s, Type: %s, Name: %s\n", tc.ID, tc.Type, tc.Name)
|
|
if tc.Function != nil {
|
|
result += fmt.Sprintf(" Arguments: %s\n", utils.Truncate(tc.Function.Arguments, 200))
|
|
}
|
|
}
|
|
}
|
|
if msg.Content != "" {
|
|
content := utils.Truncate(msg.Content, 200)
|
|
result += fmt.Sprintf(" Content: %s\n", content)
|
|
}
|
|
if msg.ToolCallID != "" {
|
|
result += fmt.Sprintf(" ToolCallID: %s\n", msg.ToolCallID)
|
|
}
|
|
result += "\n"
|
|
}
|
|
result += "]"
|
|
return result
|
|
}
|
|
|
|
// formatToolsForLog formats tool definitions for logging
|
|
func formatToolsForLog(tools []providers.ToolDefinition) string {
|
|
if len(tools) == 0 {
|
|
return "[]"
|
|
}
|
|
|
|
var result string
|
|
result += "[\n"
|
|
for i, tool := range tools {
|
|
result += fmt.Sprintf(" [%d] Type: %s, Name: %s\n", i, tool.Type, tool.Function.Name)
|
|
result += fmt.Sprintf(" Description: %s\n", tool.Function.Description)
|
|
if len(tool.Function.Parameters) > 0 {
|
|
result += fmt.Sprintf(" Parameters: %s\n", utils.Truncate(fmt.Sprintf("%v", tool.Function.Parameters), 200))
|
|
}
|
|
}
|
|
result += "]"
|
|
return result
|
|
}
|
|
|
|
// summarizeSession summarizes the conversation history for a session.
|
|
func (al *AgentLoop) summarizeSession(sessionKey string) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
|
|
defer cancel()
|
|
|
|
history := al.sessions.GetHistory(sessionKey)
|
|
summary := al.sessions.GetSummary(sessionKey)
|
|
|
|
// Keep last 4 messages for continuity
|
|
if len(history) <= 4 {
|
|
return
|
|
}
|
|
|
|
toSummarize := history[:len(history)-4]
|
|
|
|
// Oversized Message Guard
|
|
// Skip messages larger than 50% of context window to prevent summarizer overflow
|
|
maxMessageTokens := al.contextWindow / 2
|
|
validMessages := make([]providers.Message, 0)
|
|
omitted := false
|
|
|
|
for _, m := range toSummarize {
|
|
if m.Role != "user" && m.Role != "assistant" {
|
|
continue
|
|
}
|
|
// Estimate tokens for this message
|
|
msgTokens := len(m.Content) / 2 // Use safer estimate here too (2.5 -> 2 for integer division safety)
|
|
if msgTokens > maxMessageTokens {
|
|
omitted = true
|
|
continue
|
|
}
|
|
validMessages = append(validMessages, m)
|
|
}
|
|
|
|
if len(validMessages) == 0 {
|
|
return
|
|
}
|
|
|
|
// Multi-Part Summarization
|
|
// Split into two parts if history is significant
|
|
var finalSummary string
|
|
if len(validMessages) > 10 {
|
|
mid := len(validMessages) / 2
|
|
part1 := validMessages[:mid]
|
|
part2 := validMessages[mid:]
|
|
|
|
s1, _ := al.summarizeBatch(ctx, part1, "")
|
|
s2, _ := al.summarizeBatch(ctx, part2, "")
|
|
|
|
// Merge them
|
|
mergePrompt := fmt.Sprintf("Merge these two conversation summaries into one cohesive summary:\n\n1: %s\n\n2: %s", s1, s2)
|
|
resp, err := al.provider.Chat(ctx, []providers.Message{{Role: "user", Content: mergePrompt}}, nil, al.model, map[string]interface{}{
|
|
"max_tokens": 1024,
|
|
"temperature": 0.3,
|
|
})
|
|
if err == nil {
|
|
finalSummary = resp.Content
|
|
} else {
|
|
finalSummary = s1 + " " + s2
|
|
}
|
|
} else {
|
|
finalSummary, _ = al.summarizeBatch(ctx, validMessages, summary)
|
|
}
|
|
|
|
if omitted && finalSummary != "" {
|
|
finalSummary += "\n[Note: Some oversized messages were omitted from this summary for efficiency.]"
|
|
}
|
|
|
|
if finalSummary != "" {
|
|
al.sessions.SetSummary(sessionKey, finalSummary)
|
|
al.sessions.TruncateHistory(sessionKey, 4)
|
|
al.sessions.Save(sessionKey)
|
|
}
|
|
}
|
|
|
|
// summarizeBatch summarizes a batch of messages.
|
|
func (al *AgentLoop) summarizeBatch(ctx context.Context, batch []providers.Message, existingSummary string) (string, error) {
|
|
prompt := "Provide a concise summary of this conversation segment, preserving core context and key points.\n"
|
|
if existingSummary != "" {
|
|
prompt += "Existing context: " + existingSummary + "\n"
|
|
}
|
|
prompt += "\nCONVERSATION:\n"
|
|
for _, m := range batch {
|
|
prompt += fmt.Sprintf("%s: %s\n", m.Role, m.Content)
|
|
}
|
|
|
|
response, err := al.provider.Chat(ctx, []providers.Message{{Role: "user", Content: prompt}}, nil, al.model, map[string]interface{}{
|
|
"max_tokens": 1024,
|
|
"temperature": 0.3,
|
|
})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return response.Content, nil
|
|
}
|
|
|
|
// estimateTokens estimates the number of tokens in a message list.
|
|
// Uses a safe heuristic of 2.5 characters per token to account for CJK and other
|
|
// overheads better than the previous 3 chars/token.
|
|
func (al *AgentLoop) estimateTokens(messages []providers.Message) int {
|
|
totalChars := 0
|
|
for _, m := range messages {
|
|
totalChars += utf8.RuneCountInString(m.Content)
|
|
}
|
|
// 2.5 chars per token = totalChars * 2 / 5
|
|
return totalChars * 2 / 5
|
|
}
|
|
|
|
func (al *AgentLoop) handleCommand(ctx context.Context, msg bus.InboundMessage) (string, bool) {
|
|
content := strings.TrimSpace(msg.Content)
|
|
if !strings.HasPrefix(content, "/") {
|
|
return "", false
|
|
}
|
|
|
|
parts := strings.Fields(content)
|
|
if len(parts) == 0 {
|
|
return "", false
|
|
}
|
|
|
|
cmd := parts[0]
|
|
args := parts[1:]
|
|
|
|
switch cmd {
|
|
case "/show":
|
|
if len(args) < 1 {
|
|
return "Usage: /show [model|channel]", true
|
|
}
|
|
switch args[0] {
|
|
case "model":
|
|
return fmt.Sprintf("Current model: %s", al.model), true
|
|
case "channel":
|
|
return fmt.Sprintf("Current channel: %s", msg.Channel), true
|
|
default:
|
|
return fmt.Sprintf("Unknown show target: %s", args[0]), true
|
|
}
|
|
|
|
case "/list":
|
|
if len(args) < 1 {
|
|
return "Usage: /list [models|channels]", true
|
|
}
|
|
switch args[0] {
|
|
case "models":
|
|
// TODO: Fetch available models dynamically if possible
|
|
return "Available models: glm-4.7, claude-3-5-sonnet, gpt-4o (configured in config.json/env)", true
|
|
case "channels":
|
|
if al.channelManager == nil {
|
|
return "Channel manager not initialized", true
|
|
}
|
|
channels := al.channelManager.GetEnabledChannels()
|
|
if len(channels) == 0 {
|
|
return "No channels enabled", true
|
|
}
|
|
return fmt.Sprintf("Enabled channels: %s", strings.Join(channels, ", ")), true
|
|
default:
|
|
return fmt.Sprintf("Unknown list target: %s", args[0]), true
|
|
}
|
|
|
|
case "/switch":
|
|
if len(args) < 3 || args[1] != "to" {
|
|
return "Usage: /switch [model|channel] to <name>", true
|
|
}
|
|
target := args[0]
|
|
value := args[2]
|
|
|
|
switch target {
|
|
case "model":
|
|
oldModel := al.model
|
|
al.model = value
|
|
return fmt.Sprintf("Switched model from %s to %s", oldModel, value), true
|
|
case "channel":
|
|
// This changes the 'default' channel for some operations, or effectively redirects output?
|
|
// For now, let's just validate if the channel exists
|
|
if al.channelManager == nil {
|
|
return "Channel manager not initialized", true
|
|
}
|
|
if _, exists := al.channelManager.GetChannel(value); !exists && value != "cli" {
|
|
return fmt.Sprintf("Channel '%s' not found or not enabled", value), true
|
|
}
|
|
|
|
// If message came from CLI, maybe we want to redirect CLI output to this channel?
|
|
// That would require state persistence about "redirected channel"
|
|
// For now, just acknowledged.
|
|
return fmt.Sprintf("Switched target channel to %s (Note: this currently only validates existence)", value), true
|
|
default:
|
|
return fmt.Sprintf("Unknown switch target: %s", target), true
|
|
}
|
|
}
|
|
|
|
return "", false
|
|
}
|