mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
740cdcaeaf
* fix: remove redundant tools definitions from system prompt Tools are already provided to the LLM via JSON schema through ToProviderDefs(), so the text-based tools section in the system prompt is redundant. This removes the buildToolsSection() logic and the tools field from ContextBuilder, reducing system prompt length while maintaining the "ALWAYS use tools" rule reminder. Fixes #731 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix: correct spelling 'initialized' (was 'initialised') --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1154 lines
34 KiB
Go
1154 lines
34 KiB
Go
// PicoClaw - Ultra-lightweight personal AI agent
|
|
// Inspired by and based on nanobot: https://github.com/HKUDS/nanobot
|
|
// License: MIT
|
|
//
|
|
// Copyright (c) 2026 PicoClaw contributors
|
|
|
|
package agent
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
"unicode/utf8"
|
|
|
|
"github.com/sipeed/picoclaw/pkg/bus"
|
|
"github.com/sipeed/picoclaw/pkg/channels"
|
|
"github.com/sipeed/picoclaw/pkg/config"
|
|
"github.com/sipeed/picoclaw/pkg/constants"
|
|
"github.com/sipeed/picoclaw/pkg/logger"
|
|
"github.com/sipeed/picoclaw/pkg/providers"
|
|
"github.com/sipeed/picoclaw/pkg/routing"
|
|
"github.com/sipeed/picoclaw/pkg/skills"
|
|
"github.com/sipeed/picoclaw/pkg/state"
|
|
"github.com/sipeed/picoclaw/pkg/tools"
|
|
"github.com/sipeed/picoclaw/pkg/utils"
|
|
)
|
|
|
|
type AgentLoop struct {
|
|
bus *bus.MessageBus
|
|
cfg *config.Config
|
|
registry *AgentRegistry
|
|
state *state.Manager
|
|
running atomic.Bool
|
|
summarizing sync.Map
|
|
fallback *providers.FallbackChain
|
|
channelManager *channels.Manager
|
|
}
|
|
|
|
// processOptions configures how a message is processed
|
|
type processOptions struct {
|
|
SessionKey string // Session identifier for history/context
|
|
Channel string // Target channel for tool execution
|
|
ChatID string // Target chat ID for tool execution
|
|
UserMessage string // User message content (may include prefix)
|
|
DefaultResponse string // Response when LLM returns empty
|
|
EnableSummary bool // Whether to trigger summarization
|
|
SendResponse bool // Whether to send response via bus
|
|
NoHistory bool // If true, don't load session history (for heartbeat)
|
|
}
|
|
|
|
func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers.LLMProvider) *AgentLoop {
|
|
registry := NewAgentRegistry(cfg, provider)
|
|
|
|
// Register shared tools to all agents
|
|
registerSharedTools(cfg, msgBus, registry, provider)
|
|
|
|
// Set up shared fallback chain
|
|
cooldown := providers.NewCooldownTracker()
|
|
fallbackChain := providers.NewFallbackChain(cooldown)
|
|
|
|
// Create state manager using default agent's workspace for channel recording
|
|
defaultAgent := registry.GetDefaultAgent()
|
|
var stateManager *state.Manager
|
|
if defaultAgent != nil {
|
|
stateManager = state.NewManager(defaultAgent.Workspace)
|
|
}
|
|
|
|
return &AgentLoop{
|
|
bus: msgBus,
|
|
cfg: cfg,
|
|
registry: registry,
|
|
state: stateManager,
|
|
summarizing: sync.Map{},
|
|
fallback: fallbackChain,
|
|
}
|
|
}
|
|
|
|
// registerSharedTools registers tools that are shared across all agents (web, message, spawn).
|
|
func registerSharedTools(
|
|
cfg *config.Config,
|
|
msgBus *bus.MessageBus,
|
|
registry *AgentRegistry,
|
|
provider providers.LLMProvider,
|
|
) {
|
|
for _, agentID := range registry.ListAgentIDs() {
|
|
agent, ok := registry.GetAgent(agentID)
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
// Web tools
|
|
if searchTool := tools.NewWebSearchTool(tools.WebSearchToolOptions{
|
|
BraveAPIKey: cfg.Tools.Web.Brave.APIKey,
|
|
BraveMaxResults: cfg.Tools.Web.Brave.MaxResults,
|
|
BraveEnabled: cfg.Tools.Web.Brave.Enabled,
|
|
TavilyAPIKey: cfg.Tools.Web.Tavily.APIKey,
|
|
TavilyBaseURL: cfg.Tools.Web.Tavily.BaseURL,
|
|
TavilyMaxResults: cfg.Tools.Web.Tavily.MaxResults,
|
|
TavilyEnabled: cfg.Tools.Web.Tavily.Enabled,
|
|
DuckDuckGoMaxResults: cfg.Tools.Web.DuckDuckGo.MaxResults,
|
|
DuckDuckGoEnabled: cfg.Tools.Web.DuckDuckGo.Enabled,
|
|
PerplexityAPIKey: cfg.Tools.Web.Perplexity.APIKey,
|
|
PerplexityMaxResults: cfg.Tools.Web.Perplexity.MaxResults,
|
|
PerplexityEnabled: cfg.Tools.Web.Perplexity.Enabled,
|
|
Proxy: cfg.Tools.Web.Proxy,
|
|
}); searchTool != nil {
|
|
agent.Tools.Register(searchTool)
|
|
}
|
|
agent.Tools.Register(tools.NewWebFetchToolWithProxy(50000, cfg.Tools.Web.Proxy))
|
|
|
|
// Hardware tools (I2C, SPI) - Linux only, returns error on other platforms
|
|
agent.Tools.Register(tools.NewI2CTool())
|
|
agent.Tools.Register(tools.NewSPITool())
|
|
|
|
// Message tool
|
|
messageTool := tools.NewMessageTool()
|
|
messageTool.SetSendCallback(func(channel, chatID, content string) error {
|
|
msgBus.PublishOutbound(bus.OutboundMessage{
|
|
Channel: channel,
|
|
ChatID: chatID,
|
|
Content: content,
|
|
})
|
|
return nil
|
|
})
|
|
agent.Tools.Register(messageTool)
|
|
|
|
// Skill discovery and installation tools
|
|
registryMgr := skills.NewRegistryManagerFromConfig(skills.RegistryConfig{
|
|
MaxConcurrentSearches: cfg.Tools.Skills.MaxConcurrentSearches,
|
|
ClawHub: skills.ClawHubConfig(cfg.Tools.Skills.Registries.ClawHub),
|
|
})
|
|
searchCache := skills.NewSearchCache(
|
|
cfg.Tools.Skills.SearchCache.MaxSize,
|
|
time.Duration(cfg.Tools.Skills.SearchCache.TTLSeconds)*time.Second,
|
|
)
|
|
agent.Tools.Register(tools.NewFindSkillsTool(registryMgr, searchCache))
|
|
agent.Tools.Register(tools.NewInstallSkillTool(registryMgr, agent.Workspace))
|
|
|
|
// Spawn tool with allowlist checker
|
|
subagentManager := tools.NewSubagentManager(provider, agent.Model, agent.Workspace, msgBus)
|
|
subagentManager.SetLLMOptions(agent.MaxTokens, agent.Temperature)
|
|
spawnTool := tools.NewSpawnTool(subagentManager)
|
|
currentAgentID := agentID
|
|
spawnTool.SetAllowlistChecker(func(targetAgentID string) bool {
|
|
return registry.CanSpawnSubagent(currentAgentID, targetAgentID)
|
|
})
|
|
agent.Tools.Register(spawnTool)
|
|
}
|
|
}
|
|
|
|
func (al *AgentLoop) Run(ctx context.Context) error {
|
|
al.running.Store(true)
|
|
|
|
for al.running.Load() {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
default:
|
|
msg, ok := al.bus.ConsumeInbound(ctx)
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
response, err := al.processMessage(ctx, msg)
|
|
if err != nil {
|
|
response = fmt.Sprintf("Error processing message: %v", err)
|
|
}
|
|
|
|
if response != "" {
|
|
// Check if the message tool already sent a response during this round.
|
|
// If so, skip publishing to avoid duplicate messages to the user.
|
|
// Use default agent's tools to check (message tool is shared).
|
|
alreadySent := false
|
|
defaultAgent := al.registry.GetDefaultAgent()
|
|
if defaultAgent != nil {
|
|
if tool, ok := defaultAgent.Tools.Get("message"); ok {
|
|
if mt, ok := tool.(*tools.MessageTool); ok {
|
|
alreadySent = mt.HasSentInRound()
|
|
}
|
|
}
|
|
}
|
|
|
|
if !alreadySent {
|
|
al.bus.PublishOutbound(bus.OutboundMessage{
|
|
Channel: msg.Channel,
|
|
ChatID: msg.ChatID,
|
|
Content: response,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (al *AgentLoop) Stop() {
|
|
al.running.Store(false)
|
|
}
|
|
|
|
func (al *AgentLoop) RegisterTool(tool tools.Tool) {
|
|
for _, agentID := range al.registry.ListAgentIDs() {
|
|
if agent, ok := al.registry.GetAgent(agentID); ok {
|
|
agent.Tools.Register(tool)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (al *AgentLoop) SetChannelManager(cm *channels.Manager) {
|
|
al.channelManager = cm
|
|
}
|
|
|
|
// RecordLastChannel records the last active channel for this workspace.
|
|
// This uses the atomic state save mechanism to prevent data loss on crash.
|
|
func (al *AgentLoop) RecordLastChannel(channel string) error {
|
|
if al.state == nil {
|
|
return nil
|
|
}
|
|
return al.state.SetLastChannel(channel)
|
|
}
|
|
|
|
// RecordLastChatID records the last active chat ID for this workspace.
|
|
// This uses the atomic state save mechanism to prevent data loss on crash.
|
|
func (al *AgentLoop) RecordLastChatID(chatID string) error {
|
|
if al.state == nil {
|
|
return nil
|
|
}
|
|
return al.state.SetLastChatID(chatID)
|
|
}
|
|
|
|
func (al *AgentLoop) ProcessDirect(ctx context.Context, content, sessionKey string) (string, error) {
|
|
return al.ProcessDirectWithChannel(ctx, content, sessionKey, "cli", "direct")
|
|
}
|
|
|
|
func (al *AgentLoop) ProcessDirectWithChannel(
|
|
ctx context.Context,
|
|
content, sessionKey, channel, chatID string,
|
|
) (string, error) {
|
|
msg := bus.InboundMessage{
|
|
Channel: channel,
|
|
SenderID: "cron",
|
|
ChatID: chatID,
|
|
Content: content,
|
|
SessionKey: sessionKey,
|
|
}
|
|
|
|
return al.processMessage(ctx, msg)
|
|
}
|
|
|
|
// ProcessHeartbeat processes a heartbeat request without session history.
|
|
// Each heartbeat is independent and doesn't accumulate context.
|
|
func (al *AgentLoop) ProcessHeartbeat(ctx context.Context, content, channel, chatID string) (string, error) {
|
|
agent := al.registry.GetDefaultAgent()
|
|
return al.runAgentLoop(ctx, agent, processOptions{
|
|
SessionKey: "heartbeat",
|
|
Channel: channel,
|
|
ChatID: chatID,
|
|
UserMessage: content,
|
|
DefaultResponse: "I've completed processing but have no response to give.",
|
|
EnableSummary: false,
|
|
SendResponse: false,
|
|
NoHistory: true, // Don't load session history for heartbeat
|
|
})
|
|
}
|
|
|
|
func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) (string, error) {
|
|
// Add message preview to log (show full content for error messages)
|
|
var logContent string
|
|
if strings.Contains(msg.Content, "Error:") || strings.Contains(msg.Content, "error") {
|
|
logContent = msg.Content // Full content for errors
|
|
} else {
|
|
logContent = utils.Truncate(msg.Content, 80)
|
|
}
|
|
logger.InfoCF("agent", fmt.Sprintf("Processing message from %s:%s: %s", msg.Channel, msg.SenderID, logContent),
|
|
map[string]any{
|
|
"channel": msg.Channel,
|
|
"chat_id": msg.ChatID,
|
|
"sender_id": msg.SenderID,
|
|
"session_key": msg.SessionKey,
|
|
})
|
|
|
|
// Route system messages to processSystemMessage
|
|
if msg.Channel == "system" {
|
|
return al.processSystemMessage(ctx, msg)
|
|
}
|
|
|
|
// Check for commands
|
|
if response, handled := al.handleCommand(ctx, msg); handled {
|
|
return response, nil
|
|
}
|
|
|
|
// Route to determine agent and session key
|
|
route := al.registry.ResolveRoute(routing.RouteInput{
|
|
Channel: msg.Channel,
|
|
AccountID: msg.Metadata["account_id"],
|
|
Peer: extractPeer(msg),
|
|
ParentPeer: extractParentPeer(msg),
|
|
GuildID: msg.Metadata["guild_id"],
|
|
TeamID: msg.Metadata["team_id"],
|
|
})
|
|
|
|
agent, ok := al.registry.GetAgent(route.AgentID)
|
|
if !ok {
|
|
agent = al.registry.GetDefaultAgent()
|
|
}
|
|
|
|
// Use routed session key, but honor pre-set agent-scoped keys (for ProcessDirect/cron)
|
|
sessionKey := route.SessionKey
|
|
if msg.SessionKey != "" && strings.HasPrefix(msg.SessionKey, "agent:") {
|
|
sessionKey = msg.SessionKey
|
|
}
|
|
|
|
logger.InfoCF("agent", "Routed message",
|
|
map[string]any{
|
|
"agent_id": agent.ID,
|
|
"session_key": sessionKey,
|
|
"matched_by": route.MatchedBy,
|
|
})
|
|
|
|
return al.runAgentLoop(ctx, agent, processOptions{
|
|
SessionKey: sessionKey,
|
|
Channel: msg.Channel,
|
|
ChatID: msg.ChatID,
|
|
UserMessage: msg.Content,
|
|
DefaultResponse: "I've completed processing but have no response to give.",
|
|
EnableSummary: true,
|
|
SendResponse: false,
|
|
})
|
|
}
|
|
|
|
func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMessage) (string, error) {
|
|
if msg.Channel != "system" {
|
|
return "", fmt.Errorf("processSystemMessage called with non-system message channel: %s", msg.Channel)
|
|
}
|
|
|
|
logger.InfoCF("agent", "Processing system message",
|
|
map[string]any{
|
|
"sender_id": msg.SenderID,
|
|
"chat_id": msg.ChatID,
|
|
})
|
|
|
|
// Parse origin channel from chat_id (format: "channel:chat_id")
|
|
var originChannel, originChatID string
|
|
if idx := strings.Index(msg.ChatID, ":"); idx > 0 {
|
|
originChannel = msg.ChatID[:idx]
|
|
originChatID = msg.ChatID[idx+1:]
|
|
} else {
|
|
originChannel = "cli"
|
|
originChatID = msg.ChatID
|
|
}
|
|
|
|
// Extract subagent result from message content
|
|
// Format: "Task 'label' completed.\n\nResult:\n<actual content>"
|
|
content := msg.Content
|
|
if idx := strings.Index(content, "Result:\n"); idx >= 0 {
|
|
content = content[idx+8:] // Extract just the result part
|
|
}
|
|
|
|
// Skip internal channels - only log, don't send to user
|
|
if constants.IsInternalChannel(originChannel) {
|
|
logger.InfoCF("agent", "Subagent completed (internal channel)",
|
|
map[string]any{
|
|
"sender_id": msg.SenderID,
|
|
"content_len": len(content),
|
|
"channel": originChannel,
|
|
})
|
|
return "", nil
|
|
}
|
|
|
|
// Use default agent for system messages
|
|
agent := al.registry.GetDefaultAgent()
|
|
|
|
// Use the origin session for context
|
|
sessionKey := routing.BuildAgentMainSessionKey(agent.ID)
|
|
|
|
return al.runAgentLoop(ctx, agent, processOptions{
|
|
SessionKey: sessionKey,
|
|
Channel: originChannel,
|
|
ChatID: originChatID,
|
|
UserMessage: fmt.Sprintf("[System: %s] %s", msg.SenderID, msg.Content),
|
|
DefaultResponse: "Background task completed.",
|
|
EnableSummary: false,
|
|
SendResponse: true,
|
|
})
|
|
}
|
|
|
|
// runAgentLoop is the core message processing logic.
|
|
func (al *AgentLoop) runAgentLoop(ctx context.Context, agent *AgentInstance, opts processOptions) (string, error) {
|
|
// 0. Record last channel for heartbeat notifications (skip internal channels)
|
|
if opts.Channel != "" && opts.ChatID != "" {
|
|
// Don't record internal channels (cli, system, subagent)
|
|
if !constants.IsInternalChannel(opts.Channel) {
|
|
channelKey := fmt.Sprintf("%s:%s", opts.Channel, opts.ChatID)
|
|
if err := al.RecordLastChannel(channelKey); err != nil {
|
|
logger.WarnCF("agent", "Failed to record last channel", map[string]any{"error": err.Error()})
|
|
}
|
|
}
|
|
}
|
|
|
|
// 1. Update tool contexts
|
|
al.updateToolContexts(agent, opts.Channel, opts.ChatID)
|
|
|
|
// 2. Build messages (skip history for heartbeat)
|
|
var history []providers.Message
|
|
var summary string
|
|
if !opts.NoHistory {
|
|
history = agent.Sessions.GetHistory(opts.SessionKey)
|
|
summary = agent.Sessions.GetSummary(opts.SessionKey)
|
|
}
|
|
messages := agent.ContextBuilder.BuildMessages(
|
|
history,
|
|
summary,
|
|
opts.UserMessage,
|
|
nil,
|
|
opts.Channel,
|
|
opts.ChatID,
|
|
)
|
|
|
|
// 3. Save user message to session
|
|
agent.Sessions.AddMessage(opts.SessionKey, "user", opts.UserMessage)
|
|
|
|
// 4. Run LLM iteration loop
|
|
finalContent, iteration, err := al.runLLMIteration(ctx, agent, messages, opts)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// If last tool had ForUser content and we already sent it, we might not need to send final response
|
|
// This is controlled by the tool's Silent flag and ForUser content
|
|
|
|
// 5. Handle empty response
|
|
if finalContent == "" {
|
|
finalContent = opts.DefaultResponse
|
|
}
|
|
|
|
// 6. Save final assistant message to session
|
|
agent.Sessions.AddMessage(opts.SessionKey, "assistant", finalContent)
|
|
agent.Sessions.Save(opts.SessionKey)
|
|
|
|
// 7. Optional: summarization
|
|
if opts.EnableSummary {
|
|
al.maybeSummarize(agent, opts.SessionKey, opts.Channel, opts.ChatID)
|
|
}
|
|
|
|
// 8. Optional: send response via bus
|
|
if opts.SendResponse {
|
|
al.bus.PublishOutbound(bus.OutboundMessage{
|
|
Channel: opts.Channel,
|
|
ChatID: opts.ChatID,
|
|
Content: finalContent,
|
|
})
|
|
}
|
|
|
|
// 9. Log response
|
|
responsePreview := utils.Truncate(finalContent, 120)
|
|
logger.InfoCF("agent", fmt.Sprintf("Response: %s", responsePreview),
|
|
map[string]any{
|
|
"agent_id": agent.ID,
|
|
"session_key": opts.SessionKey,
|
|
"iterations": iteration,
|
|
"final_length": len(finalContent),
|
|
})
|
|
|
|
return finalContent, nil
|
|
}
|
|
|
|
// runLLMIteration executes the LLM call loop with tool handling.
|
|
func (al *AgentLoop) runLLMIteration(
|
|
ctx context.Context,
|
|
agent *AgentInstance,
|
|
messages []providers.Message,
|
|
opts processOptions,
|
|
) (string, int, error) {
|
|
iteration := 0
|
|
var finalContent string
|
|
|
|
for iteration < agent.MaxIterations {
|
|
iteration++
|
|
|
|
logger.DebugCF("agent", "LLM iteration",
|
|
map[string]any{
|
|
"agent_id": agent.ID,
|
|
"iteration": iteration,
|
|
"max": agent.MaxIterations,
|
|
})
|
|
|
|
// Build tool definitions
|
|
providerToolDefs := agent.Tools.ToProviderDefs()
|
|
|
|
// Log LLM request details
|
|
logger.DebugCF("agent", "LLM request",
|
|
map[string]any{
|
|
"agent_id": agent.ID,
|
|
"iteration": iteration,
|
|
"model": agent.Model,
|
|
"messages_count": len(messages),
|
|
"tools_count": len(providerToolDefs),
|
|
"max_tokens": agent.MaxTokens,
|
|
"temperature": agent.Temperature,
|
|
"system_prompt_len": len(messages[0].Content),
|
|
})
|
|
|
|
// Log full messages (detailed)
|
|
logger.DebugCF("agent", "Full LLM request",
|
|
map[string]any{
|
|
"iteration": iteration,
|
|
"messages_json": formatMessagesForLog(messages),
|
|
"tools_json": formatToolsForLog(providerToolDefs),
|
|
})
|
|
|
|
// Call LLM with fallback chain if candidates are configured.
|
|
var response *providers.LLMResponse
|
|
var err error
|
|
|
|
callLLM := func() (*providers.LLMResponse, error) {
|
|
if len(agent.Candidates) > 1 && al.fallback != nil {
|
|
fbResult, fbErr := al.fallback.Execute(ctx, agent.Candidates,
|
|
func(ctx context.Context, provider, model string) (*providers.LLMResponse, error) {
|
|
return agent.Provider.Chat(ctx, messages, providerToolDefs, model, map[string]any{
|
|
"max_tokens": agent.MaxTokens,
|
|
"temperature": agent.Temperature,
|
|
"prompt_cache_key": agent.ID,
|
|
})
|
|
},
|
|
)
|
|
if fbErr != nil {
|
|
return nil, fbErr
|
|
}
|
|
if fbResult.Provider != "" && len(fbResult.Attempts) > 0 {
|
|
logger.InfoCF("agent", fmt.Sprintf("Fallback: succeeded with %s/%s after %d attempts",
|
|
fbResult.Provider, fbResult.Model, len(fbResult.Attempts)+1),
|
|
map[string]any{"agent_id": agent.ID, "iteration": iteration})
|
|
}
|
|
return fbResult.Response, nil
|
|
}
|
|
return agent.Provider.Chat(ctx, messages, providerToolDefs, agent.Model, map[string]any{
|
|
"max_tokens": agent.MaxTokens,
|
|
"temperature": agent.Temperature,
|
|
"prompt_cache_key": agent.ID,
|
|
})
|
|
}
|
|
|
|
// Retry loop for context/token errors
|
|
maxRetries := 2
|
|
for retry := 0; retry <= maxRetries; retry++ {
|
|
response, err = callLLM()
|
|
if err == nil {
|
|
break
|
|
}
|
|
|
|
errMsg := strings.ToLower(err.Error())
|
|
isContextError := strings.Contains(errMsg, "token") ||
|
|
strings.Contains(errMsg, "context") ||
|
|
strings.Contains(errMsg, "invalidparameter") ||
|
|
strings.Contains(errMsg, "length")
|
|
|
|
if isContextError && retry < maxRetries {
|
|
logger.WarnCF("agent", "Context window error detected, attempting compression", map[string]any{
|
|
"error": err.Error(),
|
|
"retry": retry,
|
|
})
|
|
|
|
if retry == 0 && !constants.IsInternalChannel(opts.Channel) {
|
|
al.bus.PublishOutbound(bus.OutboundMessage{
|
|
Channel: opts.Channel,
|
|
ChatID: opts.ChatID,
|
|
Content: "Context window exceeded. Compressing history and retrying...",
|
|
})
|
|
}
|
|
|
|
al.forceCompression(agent, opts.SessionKey)
|
|
newHistory := agent.Sessions.GetHistory(opts.SessionKey)
|
|
newSummary := agent.Sessions.GetSummary(opts.SessionKey)
|
|
messages = agent.ContextBuilder.BuildMessages(
|
|
newHistory, newSummary, "",
|
|
nil, opts.Channel, opts.ChatID,
|
|
)
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
|
|
if err != nil {
|
|
logger.ErrorCF("agent", "LLM call failed",
|
|
map[string]any{
|
|
"agent_id": agent.ID,
|
|
"iteration": iteration,
|
|
"error": err.Error(),
|
|
})
|
|
return "", iteration, fmt.Errorf("LLM call failed after retries: %w", err)
|
|
}
|
|
|
|
// Check if no tool calls - we're done
|
|
if len(response.ToolCalls) == 0 {
|
|
finalContent = response.Content
|
|
logger.InfoCF("agent", "LLM response without tool calls (direct answer)",
|
|
map[string]any{
|
|
"agent_id": agent.ID,
|
|
"iteration": iteration,
|
|
"content_chars": len(finalContent),
|
|
})
|
|
break
|
|
}
|
|
|
|
normalizedToolCalls := make([]providers.ToolCall, 0, len(response.ToolCalls))
|
|
for _, tc := range response.ToolCalls {
|
|
normalizedToolCalls = append(normalizedToolCalls, providers.NormalizeToolCall(tc))
|
|
}
|
|
|
|
// Log tool calls
|
|
toolNames := make([]string, 0, len(normalizedToolCalls))
|
|
for _, tc := range normalizedToolCalls {
|
|
toolNames = append(toolNames, tc.Name)
|
|
}
|
|
logger.InfoCF("agent", "LLM requested tool calls",
|
|
map[string]any{
|
|
"agent_id": agent.ID,
|
|
"tools": toolNames,
|
|
"count": len(normalizedToolCalls),
|
|
"iteration": iteration,
|
|
})
|
|
|
|
// Build assistant message with tool calls
|
|
assistantMsg := providers.Message{
|
|
Role: "assistant",
|
|
Content: response.Content,
|
|
ReasoningContent: response.ReasoningContent,
|
|
}
|
|
for _, tc := range normalizedToolCalls {
|
|
argumentsJSON, _ := json.Marshal(tc.Arguments)
|
|
// Copy ExtraContent to ensure thought_signature is persisted for Gemini 3
|
|
extraContent := tc.ExtraContent
|
|
thoughtSignature := ""
|
|
if tc.Function != nil {
|
|
thoughtSignature = tc.Function.ThoughtSignature
|
|
}
|
|
|
|
assistantMsg.ToolCalls = append(assistantMsg.ToolCalls, providers.ToolCall{
|
|
ID: tc.ID,
|
|
Type: "function",
|
|
Name: tc.Name,
|
|
Function: &providers.FunctionCall{
|
|
Name: tc.Name,
|
|
Arguments: string(argumentsJSON),
|
|
ThoughtSignature: thoughtSignature,
|
|
},
|
|
ExtraContent: extraContent,
|
|
ThoughtSignature: thoughtSignature,
|
|
})
|
|
}
|
|
messages = append(messages, assistantMsg)
|
|
|
|
// Save assistant message with tool calls to session
|
|
agent.Sessions.AddFullMessage(opts.SessionKey, assistantMsg)
|
|
|
|
// Execute tool calls
|
|
for _, tc := range normalizedToolCalls {
|
|
argsJSON, _ := json.Marshal(tc.Arguments)
|
|
argsPreview := utils.Truncate(string(argsJSON), 200)
|
|
logger.InfoCF("agent", fmt.Sprintf("Tool call: %s(%s)", tc.Name, argsPreview),
|
|
map[string]any{
|
|
"agent_id": agent.ID,
|
|
"tool": tc.Name,
|
|
"iteration": iteration,
|
|
})
|
|
|
|
// Create async callback for tools that implement AsyncTool
|
|
// NOTE: Following openclaw's design, async tools do NOT send results directly to users.
|
|
// Instead, they notify the agent via PublishInbound, and the agent decides
|
|
// whether to forward the result to the user (in processSystemMessage).
|
|
asyncCallback := func(callbackCtx context.Context, result *tools.ToolResult) {
|
|
// Log the async completion but don't send directly to user
|
|
// The agent will handle user notification via processSystemMessage
|
|
if !result.Silent && result.ForUser != "" {
|
|
logger.InfoCF("agent", "Async tool completed, agent will handle notification",
|
|
map[string]any{
|
|
"tool": tc.Name,
|
|
"content_len": len(result.ForUser),
|
|
})
|
|
}
|
|
}
|
|
|
|
toolResult := agent.Tools.ExecuteWithContext(
|
|
ctx,
|
|
tc.Name,
|
|
tc.Arguments,
|
|
opts.Channel,
|
|
opts.ChatID,
|
|
asyncCallback,
|
|
)
|
|
|
|
// Send ForUser content to user immediately if not Silent
|
|
if !toolResult.Silent && toolResult.ForUser != "" && opts.SendResponse {
|
|
al.bus.PublishOutbound(bus.OutboundMessage{
|
|
Channel: opts.Channel,
|
|
ChatID: opts.ChatID,
|
|
Content: toolResult.ForUser,
|
|
})
|
|
logger.DebugCF("agent", "Sent tool result to user",
|
|
map[string]any{
|
|
"tool": tc.Name,
|
|
"content_len": len(toolResult.ForUser),
|
|
})
|
|
}
|
|
|
|
// Determine content for LLM based on tool result
|
|
contentForLLM := toolResult.ForLLM
|
|
if contentForLLM == "" && toolResult.Err != nil {
|
|
contentForLLM = toolResult.Err.Error()
|
|
}
|
|
|
|
toolResultMsg := providers.Message{
|
|
Role: "tool",
|
|
Content: contentForLLM,
|
|
ToolCallID: tc.ID,
|
|
}
|
|
messages = append(messages, toolResultMsg)
|
|
|
|
// Save tool result message to session
|
|
agent.Sessions.AddFullMessage(opts.SessionKey, toolResultMsg)
|
|
}
|
|
}
|
|
|
|
return finalContent, iteration, nil
|
|
}
|
|
|
|
// updateToolContexts updates the context for tools that need channel/chatID info.
|
|
func (al *AgentLoop) updateToolContexts(agent *AgentInstance, channel, chatID string) {
|
|
// Use ContextualTool interface instead of type assertions
|
|
if tool, ok := agent.Tools.Get("message"); ok {
|
|
if mt, ok := tool.(tools.ContextualTool); ok {
|
|
mt.SetContext(channel, chatID)
|
|
}
|
|
}
|
|
if tool, ok := agent.Tools.Get("spawn"); ok {
|
|
if st, ok := tool.(tools.ContextualTool); ok {
|
|
st.SetContext(channel, chatID)
|
|
}
|
|
}
|
|
if tool, ok := agent.Tools.Get("subagent"); ok {
|
|
if st, ok := tool.(tools.ContextualTool); ok {
|
|
st.SetContext(channel, chatID)
|
|
}
|
|
}
|
|
}
|
|
|
|
// maybeSummarize triggers summarization if the session history exceeds thresholds.
|
|
func (al *AgentLoop) maybeSummarize(agent *AgentInstance, sessionKey, channel, chatID string) {
|
|
newHistory := agent.Sessions.GetHistory(sessionKey)
|
|
tokenEstimate := al.estimateTokens(newHistory)
|
|
threshold := agent.ContextWindow * 75 / 100
|
|
|
|
if len(newHistory) > 20 || tokenEstimate > threshold {
|
|
summarizeKey := agent.ID + ":" + sessionKey
|
|
if _, loading := al.summarizing.LoadOrStore(summarizeKey, true); !loading {
|
|
go func() {
|
|
defer al.summarizing.Delete(summarizeKey)
|
|
if !constants.IsInternalChannel(channel) {
|
|
al.bus.PublishOutbound(bus.OutboundMessage{
|
|
Channel: channel,
|
|
ChatID: chatID,
|
|
Content: "Memory threshold reached. Optimizing conversation history...",
|
|
})
|
|
}
|
|
al.summarizeSession(agent, sessionKey)
|
|
}()
|
|
}
|
|
}
|
|
}
|
|
|
|
// forceCompression aggressively reduces context when the limit is hit.
|
|
// It drops the oldest 50% of messages (keeping system prompt and last user message).
|
|
func (al *AgentLoop) forceCompression(agent *AgentInstance, sessionKey string) {
|
|
history := agent.Sessions.GetHistory(sessionKey)
|
|
if len(history) <= 4 {
|
|
return
|
|
}
|
|
|
|
// Keep system prompt (usually [0]) and the very last message (user's trigger)
|
|
// We want to drop the oldest half of the *conversation*
|
|
// Assuming [0] is system, [1:] is conversation
|
|
conversation := history[1 : len(history)-1]
|
|
if len(conversation) == 0 {
|
|
return
|
|
}
|
|
|
|
// Helper to find the mid-point of the conversation
|
|
mid := len(conversation) / 2
|
|
|
|
// New history structure:
|
|
// 1. System Prompt (with compression note appended)
|
|
// 2. Second half of conversation
|
|
// 3. Last message
|
|
|
|
droppedCount := mid
|
|
keptConversation := conversation[mid:]
|
|
|
|
newHistory := make([]providers.Message, 0, 1+len(keptConversation)+1)
|
|
|
|
// Append compression note to the original system prompt instead of adding a new system message
|
|
// This avoids having two consecutive system messages which some APIs (like Zhipu) reject
|
|
compressionNote := fmt.Sprintf(
|
|
"\n\n[System Note: Emergency compression dropped %d oldest messages due to context limit]",
|
|
droppedCount,
|
|
)
|
|
enhancedSystemPrompt := history[0]
|
|
enhancedSystemPrompt.Content = enhancedSystemPrompt.Content + compressionNote
|
|
newHistory = append(newHistory, enhancedSystemPrompt)
|
|
|
|
newHistory = append(newHistory, keptConversation...)
|
|
newHistory = append(newHistory, history[len(history)-1]) // Last message
|
|
|
|
// Update session
|
|
agent.Sessions.SetHistory(sessionKey, newHistory)
|
|
agent.Sessions.Save(sessionKey)
|
|
|
|
logger.WarnCF("agent", "Forced compression executed", map[string]any{
|
|
"session_key": sessionKey,
|
|
"dropped_msgs": droppedCount,
|
|
"new_count": len(newHistory),
|
|
})
|
|
}
|
|
|
|
// GetStartupInfo returns information about loaded tools and skills for logging.
|
|
func (al *AgentLoop) GetStartupInfo() map[string]any {
|
|
info := make(map[string]any)
|
|
|
|
agent := al.registry.GetDefaultAgent()
|
|
if agent == nil {
|
|
return info
|
|
}
|
|
|
|
// Tools info
|
|
toolsList := agent.Tools.List()
|
|
info["tools"] = map[string]any{
|
|
"count": len(toolsList),
|
|
"names": toolsList,
|
|
}
|
|
|
|
// Skills info
|
|
info["skills"] = agent.ContextBuilder.GetSkillsInfo()
|
|
|
|
// Agents info
|
|
info["agents"] = map[string]any{
|
|
"count": len(al.registry.ListAgentIDs()),
|
|
"ids": al.registry.ListAgentIDs(),
|
|
}
|
|
|
|
return info
|
|
}
|
|
|
|
// formatMessagesForLog formats messages for logging
|
|
func formatMessagesForLog(messages []providers.Message) string {
|
|
if len(messages) == 0 {
|
|
return "[]"
|
|
}
|
|
|
|
var sb strings.Builder
|
|
sb.WriteString("[\n")
|
|
for i, msg := range messages {
|
|
fmt.Fprintf(&sb, " [%d] Role: %s\n", i, msg.Role)
|
|
if len(msg.ToolCalls) > 0 {
|
|
sb.WriteString(" ToolCalls:\n")
|
|
for _, tc := range msg.ToolCalls {
|
|
fmt.Fprintf(&sb, " - ID: %s, Type: %s, Name: %s\n", tc.ID, tc.Type, tc.Name)
|
|
if tc.Function != nil {
|
|
fmt.Fprintf(&sb, " Arguments: %s\n", utils.Truncate(tc.Function.Arguments, 200))
|
|
}
|
|
}
|
|
}
|
|
if msg.Content != "" {
|
|
content := utils.Truncate(msg.Content, 200)
|
|
fmt.Fprintf(&sb, " Content: %s\n", content)
|
|
}
|
|
if msg.ToolCallID != "" {
|
|
fmt.Fprintf(&sb, " ToolCallID: %s\n", msg.ToolCallID)
|
|
}
|
|
sb.WriteString("\n")
|
|
}
|
|
sb.WriteString("]")
|
|
return sb.String()
|
|
}
|
|
|
|
// formatToolsForLog formats tool definitions for logging
|
|
func formatToolsForLog(toolDefs []providers.ToolDefinition) string {
|
|
if len(toolDefs) == 0 {
|
|
return "[]"
|
|
}
|
|
|
|
var sb strings.Builder
|
|
sb.WriteString("[\n")
|
|
for i, tool := range toolDefs {
|
|
fmt.Fprintf(&sb, " [%d] Type: %s, Name: %s\n", i, tool.Type, tool.Function.Name)
|
|
fmt.Fprintf(&sb, " Description: %s\n", tool.Function.Description)
|
|
if len(tool.Function.Parameters) > 0 {
|
|
fmt.Fprintf(&sb, " Parameters: %s\n", utils.Truncate(fmt.Sprintf("%v", tool.Function.Parameters), 200))
|
|
}
|
|
}
|
|
sb.WriteString("]")
|
|
return sb.String()
|
|
}
|
|
|
|
// summarizeSession summarizes the conversation history for a session.
|
|
func (al *AgentLoop) summarizeSession(agent *AgentInstance, sessionKey string) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
|
|
defer cancel()
|
|
|
|
history := agent.Sessions.GetHistory(sessionKey)
|
|
summary := agent.Sessions.GetSummary(sessionKey)
|
|
|
|
// Keep last 4 messages for continuity
|
|
if len(history) <= 4 {
|
|
return
|
|
}
|
|
|
|
toSummarize := history[:len(history)-4]
|
|
|
|
// Oversized Message Guard
|
|
maxMessageTokens := agent.ContextWindow / 2
|
|
validMessages := make([]providers.Message, 0)
|
|
omitted := false
|
|
|
|
for _, m := range toSummarize {
|
|
if m.Role != "user" && m.Role != "assistant" {
|
|
continue
|
|
}
|
|
msgTokens := len(m.Content) / 2
|
|
if msgTokens > maxMessageTokens {
|
|
omitted = true
|
|
continue
|
|
}
|
|
validMessages = append(validMessages, m)
|
|
}
|
|
|
|
if len(validMessages) == 0 {
|
|
return
|
|
}
|
|
|
|
// Multi-Part Summarization
|
|
var finalSummary string
|
|
if len(validMessages) > 10 {
|
|
mid := len(validMessages) / 2
|
|
part1 := validMessages[:mid]
|
|
part2 := validMessages[mid:]
|
|
|
|
s1, _ := al.summarizeBatch(ctx, agent, part1, "")
|
|
s2, _ := al.summarizeBatch(ctx, agent, part2, "")
|
|
|
|
mergePrompt := fmt.Sprintf(
|
|
"Merge these two conversation summaries into one cohesive summary:\n\n1: %s\n\n2: %s",
|
|
s1,
|
|
s2,
|
|
)
|
|
resp, err := agent.Provider.Chat(
|
|
ctx,
|
|
[]providers.Message{{Role: "user", Content: mergePrompt}},
|
|
nil,
|
|
agent.Model,
|
|
map[string]any{
|
|
"max_tokens": 1024,
|
|
"temperature": 0.3,
|
|
"prompt_cache_key": agent.ID,
|
|
},
|
|
)
|
|
if err == nil {
|
|
finalSummary = resp.Content
|
|
} else {
|
|
finalSummary = s1 + " " + s2
|
|
}
|
|
} else {
|
|
finalSummary, _ = al.summarizeBatch(ctx, agent, validMessages, summary)
|
|
}
|
|
|
|
if omitted && finalSummary != "" {
|
|
finalSummary += "\n[Note: Some oversized messages were omitted from this summary for efficiency.]"
|
|
}
|
|
|
|
if finalSummary != "" {
|
|
agent.Sessions.SetSummary(sessionKey, finalSummary)
|
|
agent.Sessions.TruncateHistory(sessionKey, 4)
|
|
agent.Sessions.Save(sessionKey)
|
|
}
|
|
}
|
|
|
|
// summarizeBatch summarizes a batch of messages.
|
|
func (al *AgentLoop) summarizeBatch(
|
|
ctx context.Context,
|
|
agent *AgentInstance,
|
|
batch []providers.Message,
|
|
existingSummary string,
|
|
) (string, error) {
|
|
var sb strings.Builder
|
|
sb.WriteString("Provide a concise summary of this conversation segment, preserving core context and key points.\n")
|
|
if existingSummary != "" {
|
|
sb.WriteString("Existing context: ")
|
|
sb.WriteString(existingSummary)
|
|
sb.WriteString("\n")
|
|
}
|
|
sb.WriteString("\nCONVERSATION:\n")
|
|
for _, m := range batch {
|
|
fmt.Fprintf(&sb, "%s: %s\n", m.Role, m.Content)
|
|
}
|
|
prompt := sb.String()
|
|
|
|
response, err := agent.Provider.Chat(
|
|
ctx,
|
|
[]providers.Message{{Role: "user", Content: prompt}},
|
|
nil,
|
|
agent.Model,
|
|
map[string]any{
|
|
"max_tokens": 1024,
|
|
"temperature": 0.3,
|
|
"prompt_cache_key": agent.ID,
|
|
},
|
|
)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return response.Content, nil
|
|
}
|
|
|
|
// estimateTokens estimates the number of tokens in a message list.
|
|
// Uses a safe heuristic of 2.5 characters per token to account for CJK and other
|
|
// overheads better than the previous 3 chars/token.
|
|
func (al *AgentLoop) estimateTokens(messages []providers.Message) int {
|
|
totalChars := 0
|
|
for _, m := range messages {
|
|
totalChars += utf8.RuneCountInString(m.Content)
|
|
}
|
|
// 2.5 chars per token = totalChars * 2 / 5
|
|
return totalChars * 2 / 5
|
|
}
|
|
|
|
func (al *AgentLoop) handleCommand(ctx context.Context, msg bus.InboundMessage) (string, bool) {
|
|
content := strings.TrimSpace(msg.Content)
|
|
if !strings.HasPrefix(content, "/") {
|
|
return "", false
|
|
}
|
|
|
|
parts := strings.Fields(content)
|
|
if len(parts) == 0 {
|
|
return "", false
|
|
}
|
|
|
|
cmd := parts[0]
|
|
args := parts[1:]
|
|
|
|
switch cmd {
|
|
case "/show":
|
|
if len(args) < 1 {
|
|
return "Usage: /show [model|channel|agents]", true
|
|
}
|
|
switch args[0] {
|
|
case "model":
|
|
defaultAgent := al.registry.GetDefaultAgent()
|
|
if defaultAgent == nil {
|
|
return "No default agent configured", true
|
|
}
|
|
return fmt.Sprintf("Current model: %s", defaultAgent.Model), true
|
|
case "channel":
|
|
return fmt.Sprintf("Current channel: %s", msg.Channel), true
|
|
case "agents":
|
|
agentIDs := al.registry.ListAgentIDs()
|
|
return fmt.Sprintf("Registered agents: %s", strings.Join(agentIDs, ", ")), true
|
|
default:
|
|
return fmt.Sprintf("Unknown show target: %s", args[0]), true
|
|
}
|
|
|
|
case "/list":
|
|
if len(args) < 1 {
|
|
return "Usage: /list [models|channels|agents]", true
|
|
}
|
|
switch args[0] {
|
|
case "models":
|
|
return "Available models: configured in config.json per agent", true
|
|
case "channels":
|
|
if al.channelManager == nil {
|
|
return "Channel manager not initialized", true
|
|
}
|
|
channels := al.channelManager.GetEnabledChannels()
|
|
if len(channels) == 0 {
|
|
return "No channels enabled", true
|
|
}
|
|
return fmt.Sprintf("Enabled channels: %s", strings.Join(channels, ", ")), true
|
|
case "agents":
|
|
agentIDs := al.registry.ListAgentIDs()
|
|
return fmt.Sprintf("Registered agents: %s", strings.Join(agentIDs, ", ")), true
|
|
default:
|
|
return fmt.Sprintf("Unknown list target: %s", args[0]), true
|
|
}
|
|
|
|
case "/switch":
|
|
if len(args) < 3 || args[1] != "to" {
|
|
return "Usage: /switch [model|channel] to <name>", true
|
|
}
|
|
target := args[0]
|
|
value := args[2]
|
|
|
|
switch target {
|
|
case "model":
|
|
defaultAgent := al.registry.GetDefaultAgent()
|
|
if defaultAgent == nil {
|
|
return "No default agent configured", true
|
|
}
|
|
oldModel := defaultAgent.Model
|
|
defaultAgent.Model = value
|
|
return fmt.Sprintf("Switched model from %s to %s", oldModel, value), true
|
|
case "channel":
|
|
if al.channelManager == nil {
|
|
return "Channel manager not initialized", true
|
|
}
|
|
if _, exists := al.channelManager.GetChannel(value); !exists && value != "cli" {
|
|
return fmt.Sprintf("Channel '%s' not found or not enabled", value), true
|
|
}
|
|
return fmt.Sprintf("Switched target channel to %s", value), true
|
|
default:
|
|
return fmt.Sprintf("Unknown switch target: %s", target), true
|
|
}
|
|
}
|
|
|
|
return "", false
|
|
}
|
|
|
|
// extractPeer extracts the routing peer from inbound message metadata.
|
|
func extractPeer(msg bus.InboundMessage) *routing.RoutePeer {
|
|
peerKind := msg.Metadata["peer_kind"]
|
|
if peerKind == "" {
|
|
return nil
|
|
}
|
|
peerID := msg.Metadata["peer_id"]
|
|
if peerID == "" {
|
|
if peerKind == "direct" {
|
|
peerID = msg.SenderID
|
|
} else {
|
|
peerID = msg.ChatID
|
|
}
|
|
}
|
|
return &routing.RoutePeer{Kind: peerKind, ID: peerID}
|
|
}
|
|
|
|
// extractParentPeer extracts the parent peer (reply-to) from inbound message metadata.
|
|
func extractParentPeer(msg bus.InboundMessage) *routing.RoutePeer {
|
|
parentKind := msg.Metadata["parent_peer_kind"]
|
|
parentID := msg.Metadata["parent_peer_id"]
|
|
if parentKind == "" || parentID == "" {
|
|
return nil
|
|
}
|
|
return &routing.RoutePeer{Kind: parentKind, ID: parentID}
|
|
}
|