Files
picoclaw/pkg/agent/loop.go
T
shikihane c368b5b359 feat(feishu,tools): add outbound media delivery via send_file tool (#1156)
* feat(feishu): implement SendMedia and add send_file tool

Add outbound media support for the Feishu channel so the agent can send
images and files to users via the MediaStore pipeline.

Feishu channel:
- SendMedia dispatches media parts as image or file uploads
- sendImage uploads via Image.Create then sends image message
- sendFile uploads via File.Create then sends file message
- feishuFileType maps extensions to Feishu file_type values

send_file tool:
- New tool lets the LLM send a local file to the current chat
- Validates path, registers file in MediaStore, returns media ref
- Agent loop wires tool registration, MediaStore propagation, and
  context updates

Tested on Radxa Cubie A7A (arm64) with Feishu websocket channel.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(agent): publish outbound media regardless of SendResponse flag

The SendResponse flag controls whether the agent loop publishes the
final text response (callers that publish it themselves set this to
false). However, the media publish path was also gated behind this
flag, which meant tool-produced media was silently dropped for normal
channel messages.

Media should be published immediately when a tool returns media refs,
independent of how the text response is delivered.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(tools): use magic-bytes MIME detection and add file size limit to send_file

- Replace hardcoded extension-to-MIME map with h2non/filetype (magic
  bytes) + mime.TypeByExtension fallback, consistent with the vision
  pipeline in resolveMediaRefs
- Add configurable max file size check (defaults to config.DefaultMaxMediaSize,
  20 MB) to prevent oversized uploads
- Add tests for magic-bytes detection, extension fallback, size limit,
  and default max size

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* refactor(agent): add ForEachTool to AgentRegistry for cross-agent tool lookup

Extract the pattern of iterating agents to find a named tool into
AgentRegistry.ForEachTool, simplifying SetMediaStore propagation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(agent,tools): adapt send_file to ctx-based channel injection after upstream refactor

Replace ContextualTool interface (removed upstream) with direct ctx
reading in SendFileTool.Execute, using ToolChannel/ToolChatID helpers.
Remove updateToolContexts which is no longer needed since ExecuteWithContext
already injects channel/chatID into ctx for all tools.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat(tools): support toggling send_file tool via config

Add SendFileConfig with Enabled field to ToolsConfig, defaulting to
true. Wrap send_file tool registration in loop.go with the config
check, consistent with the pattern used by other tools.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 19:42:52 +08:00

1673 lines
50 KiB
Go

// PicoClaw - Ultra-lightweight personal AI agent
// Inspired by and based on nanobot: https://github.com/HKUDS/nanobot
// License: MIT
//
// Copyright (c) 2026 PicoClaw contributors
package agent
import (
"context"
"encoding/json"
"errors"
"fmt"
"path/filepath"
"regexp"
"strings"
"sync"
"sync/atomic"
"time"
"unicode/utf8"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/channels"
"github.com/sipeed/picoclaw/pkg/commands"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/constants"
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/mcp"
"github.com/sipeed/picoclaw/pkg/media"
"github.com/sipeed/picoclaw/pkg/providers"
"github.com/sipeed/picoclaw/pkg/routing"
"github.com/sipeed/picoclaw/pkg/skills"
"github.com/sipeed/picoclaw/pkg/state"
"github.com/sipeed/picoclaw/pkg/tools"
"github.com/sipeed/picoclaw/pkg/utils"
"github.com/sipeed/picoclaw/pkg/voice"
)
type AgentLoop struct {
bus *bus.MessageBus
cfg *config.Config
registry *AgentRegistry
state *state.Manager
running atomic.Bool
summarizing sync.Map
fallback *providers.FallbackChain
channelManager *channels.Manager
mediaStore media.MediaStore
transcriber voice.Transcriber
cmdRegistry *commands.Registry
}
// processOptions configures how a message is processed
type processOptions struct {
SessionKey string // Session identifier for history/context
Channel string // Target channel for tool execution
ChatID string // Target chat ID for tool execution
UserMessage string // User message content (may include prefix)
Media []string // media:// refs from inbound message
DefaultResponse string // Response when LLM returns empty
EnableSummary bool // Whether to trigger summarization
SendResponse bool // Whether to send response via bus
NoHistory bool // If true, don't load session history (for heartbeat)
}
const (
defaultResponse = "I've completed processing but have no response to give. Increase `max_tool_iterations` in config.json."
sessionKeyAgentPrefix = "agent:"
metadataKeyAccountID = "account_id"
metadataKeyGuildID = "guild_id"
metadataKeyTeamID = "team_id"
metadataKeyParentPeerKind = "parent_peer_kind"
metadataKeyParentPeerID = "parent_peer_id"
)
func NewAgentLoop(
cfg *config.Config,
msgBus *bus.MessageBus,
provider providers.LLMProvider,
) *AgentLoop {
registry := NewAgentRegistry(cfg, provider)
// Register shared tools to all agents
registerSharedTools(cfg, msgBus, registry, provider)
// Set up shared fallback chain
cooldown := providers.NewCooldownTracker()
fallbackChain := providers.NewFallbackChain(cooldown)
// Create state manager using default agent's workspace for channel recording
defaultAgent := registry.GetDefaultAgent()
var stateManager *state.Manager
if defaultAgent != nil {
stateManager = state.NewManager(defaultAgent.Workspace)
}
al := &AgentLoop{
bus: msgBus,
cfg: cfg,
registry: registry,
state: stateManager,
summarizing: sync.Map{},
fallback: fallbackChain,
cmdRegistry: commands.NewRegistry(commands.BuiltinDefinitions()),
}
return al
}
// registerSharedTools registers tools that are shared across all agents (web, message, spawn).
func registerSharedTools(
cfg *config.Config,
msgBus *bus.MessageBus,
registry *AgentRegistry,
provider providers.LLMProvider,
) {
for _, agentID := range registry.ListAgentIDs() {
agent, ok := registry.GetAgent(agentID)
if !ok {
continue
}
// Web tools
if cfg.Tools.IsToolEnabled("web") {
searchTool, err := tools.NewWebSearchTool(tools.WebSearchToolOptions{
BraveAPIKey: cfg.Tools.Web.Brave.APIKey,
BraveMaxResults: cfg.Tools.Web.Brave.MaxResults,
BraveEnabled: cfg.Tools.Web.Brave.Enabled,
TavilyAPIKey: cfg.Tools.Web.Tavily.APIKey,
TavilyBaseURL: cfg.Tools.Web.Tavily.BaseURL,
TavilyMaxResults: cfg.Tools.Web.Tavily.MaxResults,
TavilyEnabled: cfg.Tools.Web.Tavily.Enabled,
DuckDuckGoMaxResults: cfg.Tools.Web.DuckDuckGo.MaxResults,
DuckDuckGoEnabled: cfg.Tools.Web.DuckDuckGo.Enabled,
PerplexityAPIKey: cfg.Tools.Web.Perplexity.APIKey,
PerplexityMaxResults: cfg.Tools.Web.Perplexity.MaxResults,
PerplexityEnabled: cfg.Tools.Web.Perplexity.Enabled,
SearXNGBaseURL: cfg.Tools.Web.SearXNG.BaseURL,
SearXNGMaxResults: cfg.Tools.Web.SearXNG.MaxResults,
SearXNGEnabled: cfg.Tools.Web.SearXNG.Enabled,
GLMSearchAPIKey: cfg.Tools.Web.GLMSearch.APIKey,
GLMSearchBaseURL: cfg.Tools.Web.GLMSearch.BaseURL,
GLMSearchEngine: cfg.Tools.Web.GLMSearch.SearchEngine,
GLMSearchMaxResults: cfg.Tools.Web.GLMSearch.MaxResults,
GLMSearchEnabled: cfg.Tools.Web.GLMSearch.Enabled,
Proxy: cfg.Tools.Web.Proxy,
})
if err != nil {
logger.ErrorCF("agent", "Failed to create web search tool", map[string]any{"error": err.Error()})
} else if searchTool != nil {
agent.Tools.Register(searchTool)
}
}
if cfg.Tools.IsToolEnabled("web_fetch") {
fetchTool, err := tools.NewWebFetchToolWithProxy(50000, cfg.Tools.Web.Proxy, cfg.Tools.Web.FetchLimitBytes)
if err != nil {
logger.ErrorCF("agent", "Failed to create web fetch tool", map[string]any{"error": err.Error()})
} else {
agent.Tools.Register(fetchTool)
}
}
// Hardware tools (I2C, SPI) - Linux only, returns error on other platforms
if cfg.Tools.IsToolEnabled("i2c") {
agent.Tools.Register(tools.NewI2CTool())
}
if cfg.Tools.IsToolEnabled("spi") {
agent.Tools.Register(tools.NewSPITool())
}
// Message tool
if cfg.Tools.IsToolEnabled("message") {
messageTool := tools.NewMessageTool()
messageTool.SetSendCallback(func(channel, chatID, content string) error {
pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer pubCancel()
return msgBus.PublishOutbound(pubCtx, bus.OutboundMessage{
Channel: channel,
ChatID: chatID,
Content: content,
})
})
agent.Tools.Register(messageTool)
}
// Send file tool (outbound media via MediaStore — store injected later by SetMediaStore)
if cfg.Tools.IsToolEnabled("send_file") {
sendFileTool := tools.NewSendFileTool(
agent.Workspace,
cfg.Agents.Defaults.RestrictToWorkspace,
cfg.Agents.Defaults.GetMaxMediaSize(),
nil,
)
agent.Tools.Register(sendFileTool)
}
// Skill discovery and installation tools
skills_enabled := cfg.Tools.IsToolEnabled("skills")
find_skills_enable := cfg.Tools.IsToolEnabled("find_skills")
install_skills_enable := cfg.Tools.IsToolEnabled("install_skill")
if skills_enabled && (find_skills_enable || install_skills_enable) {
registryMgr := skills.NewRegistryManagerFromConfig(skills.RegistryConfig{
MaxConcurrentSearches: cfg.Tools.Skills.MaxConcurrentSearches,
ClawHub: skills.ClawHubConfig(cfg.Tools.Skills.Registries.ClawHub),
})
if find_skills_enable {
searchCache := skills.NewSearchCache(
cfg.Tools.Skills.SearchCache.MaxSize,
time.Duration(cfg.Tools.Skills.SearchCache.TTLSeconds)*time.Second,
)
agent.Tools.Register(tools.NewFindSkillsTool(registryMgr, searchCache))
}
if install_skills_enable {
agent.Tools.Register(tools.NewInstallSkillTool(registryMgr, agent.Workspace))
}
}
// Spawn tool with allowlist checker
if cfg.Tools.IsToolEnabled("spawn") {
if cfg.Tools.IsToolEnabled("subagent") {
subagentManager := tools.NewSubagentManager(provider, agent.Model, agent.Workspace, msgBus)
subagentManager.SetLLMOptions(agent.MaxTokens, agent.Temperature)
spawnTool := tools.NewSpawnTool(subagentManager)
currentAgentID := agentID
spawnTool.SetAllowlistChecker(func(targetAgentID string) bool {
return registry.CanSpawnSubagent(currentAgentID, targetAgentID)
})
agent.Tools.Register(spawnTool)
} else {
logger.WarnCF("agent", "spawn tool requires subagent to be enabled", nil)
}
}
}
}
func (al *AgentLoop) Run(ctx context.Context) error {
al.running.Store(true)
// Initialize MCP servers for all agents
if al.cfg.Tools.IsToolEnabled("mcp") {
mcpManager := mcp.NewManager()
// Ensure MCP connections are cleaned up on exit, regardless of initialization success
// This fixes resource leak when LoadFromMCPConfig partially succeeds then fails
defer func() {
if err := mcpManager.Close(); err != nil {
logger.ErrorCF("agent", "Failed to close MCP manager",
map[string]any{
"error": err.Error(),
})
}
}()
defaultAgent := al.registry.GetDefaultAgent()
var workspacePath string
if defaultAgent != nil && defaultAgent.Workspace != "" {
workspacePath = defaultAgent.Workspace
} else {
workspacePath = al.cfg.WorkspacePath()
}
if err := mcpManager.LoadFromMCPConfig(ctx, al.cfg.Tools.MCP, workspacePath); err != nil {
logger.WarnCF("agent", "Failed to load MCP servers, MCP tools will not be available",
map[string]any{
"error": err.Error(),
})
} else {
// Register MCP tools for all agents
servers := mcpManager.GetServers()
uniqueTools := 0
totalRegistrations := 0
agentIDs := al.registry.ListAgentIDs()
agentCount := len(agentIDs)
for serverName, conn := range servers {
uniqueTools += len(conn.Tools)
for _, tool := range conn.Tools {
for _, agentID := range agentIDs {
agent, ok := al.registry.GetAgent(agentID)
if !ok {
continue
}
mcpTool := tools.NewMCPTool(mcpManager, serverName, tool)
agent.Tools.Register(mcpTool)
totalRegistrations++
logger.DebugCF("agent", "Registered MCP tool",
map[string]any{
"agent_id": agentID,
"server": serverName,
"tool": tool.Name,
"name": mcpTool.Name(),
})
}
}
}
logger.InfoCF("agent", "MCP tools registered successfully",
map[string]any{
"server_count": len(servers),
"unique_tools": uniqueTools,
"total_registrations": totalRegistrations,
"agent_count": agentCount,
})
}
}
for al.running.Load() {
select {
case <-ctx.Done():
return nil
default:
msg, ok := al.bus.ConsumeInbound(ctx)
if !ok {
continue
}
// Process message
func() {
// TODO: Re-enable media cleanup after inbound media is properly consumed by the agent.
// Currently disabled because files are deleted before the LLM can access their content.
// defer func() {
// if al.mediaStore != nil && msg.MediaScope != "" {
// if releaseErr := al.mediaStore.ReleaseAll(msg.MediaScope); releaseErr != nil {
// logger.WarnCF("agent", "Failed to release media", map[string]any{
// "scope": msg.MediaScope,
// "error": releaseErr.Error(),
// })
// }
// }
// }()
response, err := al.processMessage(ctx, msg)
if err != nil {
response = fmt.Sprintf("Error processing message: %v", err)
}
if response != "" {
// Check if the message tool already sent a response during this round.
// If so, skip publishing to avoid duplicate messages to the user.
// Use default agent's tools to check (message tool is shared).
alreadySent := false
defaultAgent := al.registry.GetDefaultAgent()
if defaultAgent != nil {
if tool, ok := defaultAgent.Tools.Get("message"); ok {
if mt, ok := tool.(*tools.MessageTool); ok {
alreadySent = mt.HasSentInRound()
}
}
}
if !alreadySent {
al.bus.PublishOutbound(ctx, bus.OutboundMessage{
Channel: msg.Channel,
ChatID: msg.ChatID,
Content: response,
})
logger.InfoCF("agent", "Published outbound response",
map[string]any{
"channel": msg.Channel,
"chat_id": msg.ChatID,
"content_len": len(response),
})
} else {
logger.DebugCF(
"agent",
"Skipped outbound (message tool already sent)",
map[string]any{"channel": msg.Channel},
)
}
}
}()
}
}
return nil
}
func (al *AgentLoop) Stop() {
al.running.Store(false)
}
func (al *AgentLoop) RegisterTool(tool tools.Tool) {
for _, agentID := range al.registry.ListAgentIDs() {
if agent, ok := al.registry.GetAgent(agentID); ok {
agent.Tools.Register(tool)
}
}
}
func (al *AgentLoop) SetChannelManager(cm *channels.Manager) {
al.channelManager = cm
}
// SetMediaStore injects a MediaStore for media lifecycle management.
func (al *AgentLoop) SetMediaStore(s media.MediaStore) {
al.mediaStore = s
// Propagate store to send_file tools in all agents.
al.registry.ForEachTool("send_file", func(t tools.Tool) {
if sf, ok := t.(*tools.SendFileTool); ok {
sf.SetMediaStore(s)
}
})
}
// SetTranscriber injects a voice transcriber for agent-level audio transcription.
func (al *AgentLoop) SetTranscriber(t voice.Transcriber) {
al.transcriber = t
}
var audioAnnotationRe = regexp.MustCompile(`\[(voice|audio)(?::[^\]]*)?\]`)
// transcribeAudioInMessage resolves audio media refs, transcribes them, and
// replaces audio annotations in msg.Content with the transcribed text.
func (al *AgentLoop) transcribeAudioInMessage(ctx context.Context, msg bus.InboundMessage) bus.InboundMessage {
if al.transcriber == nil || al.mediaStore == nil || len(msg.Media) == 0 {
return msg
}
// Transcribe each audio media ref in order.
var transcriptions []string
for _, ref := range msg.Media {
path, meta, err := al.mediaStore.ResolveWithMeta(ref)
if err != nil {
logger.WarnCF("voice", "Failed to resolve media ref", map[string]any{"ref": ref, "error": err})
continue
}
if !utils.IsAudioFile(meta.Filename, meta.ContentType) {
continue
}
result, err := al.transcriber.Transcribe(ctx, path)
if err != nil {
logger.WarnCF("voice", "Transcription failed", map[string]any{"ref": ref, "error": err})
transcriptions = append(transcriptions, "")
continue
}
transcriptions = append(transcriptions, result.Text)
}
if len(transcriptions) == 0 {
return msg
}
// Replace audio annotations sequentially with transcriptions.
idx := 0
newContent := audioAnnotationRe.ReplaceAllStringFunc(msg.Content, func(match string) string {
if idx >= len(transcriptions) {
return match
}
text := transcriptions[idx]
idx++
return "[voice: " + text + "]"
})
// Append any remaining transcriptions not matched by an annotation.
for ; idx < len(transcriptions); idx++ {
newContent += "\n[voice: " + transcriptions[idx] + "]"
}
msg.Content = newContent
return msg
}
// inferMediaType determines the media type ("image", "audio", "video", "file")
// from a filename and MIME content type.
func inferMediaType(filename, contentType string) string {
ct := strings.ToLower(contentType)
fn := strings.ToLower(filename)
if strings.HasPrefix(ct, "image/") {
return "image"
}
if strings.HasPrefix(ct, "audio/") || ct == "application/ogg" {
return "audio"
}
if strings.HasPrefix(ct, "video/") {
return "video"
}
// Fallback: infer from extension
ext := filepath.Ext(fn)
switch ext {
case ".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".svg":
return "image"
case ".mp3", ".wav", ".ogg", ".m4a", ".flac", ".aac", ".wma", ".opus":
return "audio"
case ".mp4", ".avi", ".mov", ".webm", ".mkv":
return "video"
}
return "file"
}
// RecordLastChannel records the last active channel for this workspace.
// This uses the atomic state save mechanism to prevent data loss on crash.
func (al *AgentLoop) RecordLastChannel(channel string) error {
if al.state == nil {
return nil
}
return al.state.SetLastChannel(channel)
}
// RecordLastChatID records the last active chat ID for this workspace.
// This uses the atomic state save mechanism to prevent data loss on crash.
func (al *AgentLoop) RecordLastChatID(chatID string) error {
if al.state == nil {
return nil
}
return al.state.SetLastChatID(chatID)
}
func (al *AgentLoop) ProcessDirect(
ctx context.Context,
content, sessionKey string,
) (string, error) {
return al.ProcessDirectWithChannel(ctx, content, sessionKey, "cli", "direct")
}
func (al *AgentLoop) ProcessDirectWithChannel(
ctx context.Context,
content, sessionKey, channel, chatID string,
) (string, error) {
msg := bus.InboundMessage{
Channel: channel,
SenderID: "cron",
ChatID: chatID,
Content: content,
SessionKey: sessionKey,
}
return al.processMessage(ctx, msg)
}
// ProcessHeartbeat processes a heartbeat request without session history.
// Each heartbeat is independent and doesn't accumulate context.
func (al *AgentLoop) ProcessHeartbeat(
ctx context.Context,
content, channel, chatID string,
) (string, error) {
agent := al.registry.GetDefaultAgent()
if agent == nil {
return "", fmt.Errorf("no default agent for heartbeat")
}
return al.runAgentLoop(ctx, agent, processOptions{
SessionKey: "heartbeat",
Channel: channel,
ChatID: chatID,
UserMessage: content,
DefaultResponse: defaultResponse,
EnableSummary: false,
SendResponse: false,
NoHistory: true, // Don't load session history for heartbeat
})
}
func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) (string, error) {
// Add message preview to log (show full content for error messages)
var logContent string
if strings.Contains(msg.Content, "Error:") || strings.Contains(msg.Content, "error") {
logContent = msg.Content // Full content for errors
} else {
logContent = utils.Truncate(msg.Content, 80)
}
logger.InfoCF(
"agent",
fmt.Sprintf("Processing message from %s:%s: %s", msg.Channel, msg.SenderID, logContent),
map[string]any{
"channel": msg.Channel,
"chat_id": msg.ChatID,
"sender_id": msg.SenderID,
"session_key": msg.SessionKey,
},
)
msg = al.transcribeAudioInMessage(ctx, msg)
// Route system messages to processSystemMessage
if msg.Channel == "system" {
return al.processSystemMessage(ctx, msg)
}
route, agent, routeErr := al.resolveMessageRoute(msg)
// Commands are checked before requiring a successful route.
// Global commands (/help, /show, /switch) work even when routing fails;
// context-dependent commands check their own Runtime fields and report
// "unavailable" when the required capability is nil.
if response, handled := al.handleCommand(ctx, msg, agent); handled {
return response, nil
}
if routeErr != nil {
return "", routeErr
}
// Reset message-tool state for this round so we don't skip publishing due to a previous round.
if tool, ok := agent.Tools.Get("message"); ok {
if resetter, ok := tool.(interface{ ResetSentInRound() }); ok {
resetter.ResetSentInRound()
}
}
// Resolve session key from route, while preserving explicit agent-scoped keys.
scopeKey := resolveScopeKey(route, msg.SessionKey)
sessionKey := scopeKey
logger.InfoCF("agent", "Routed message",
map[string]any{
"agent_id": agent.ID,
"scope_key": scopeKey,
"session_key": sessionKey,
"matched_by": route.MatchedBy,
"route_agent": route.AgentID,
"route_channel": route.Channel,
})
return al.runAgentLoop(ctx, agent, processOptions{
SessionKey: sessionKey,
Channel: msg.Channel,
ChatID: msg.ChatID,
UserMessage: msg.Content,
Media: msg.Media,
DefaultResponse: defaultResponse,
EnableSummary: true,
SendResponse: false,
})
}
func (al *AgentLoop) resolveMessageRoute(msg bus.InboundMessage) (routing.ResolvedRoute, *AgentInstance, error) {
route := al.registry.ResolveRoute(routing.RouteInput{
Channel: msg.Channel,
AccountID: inboundMetadata(msg, metadataKeyAccountID),
Peer: extractPeer(msg),
ParentPeer: extractParentPeer(msg),
GuildID: inboundMetadata(msg, metadataKeyGuildID),
TeamID: inboundMetadata(msg, metadataKeyTeamID),
})
agent, ok := al.registry.GetAgent(route.AgentID)
if !ok {
agent = al.registry.GetDefaultAgent()
}
if agent == nil {
return routing.ResolvedRoute{}, nil, fmt.Errorf("no agent available for route (agent_id=%s)", route.AgentID)
}
return route, agent, nil
}
func resolveScopeKey(route routing.ResolvedRoute, msgSessionKey string) string {
if msgSessionKey != "" && strings.HasPrefix(msgSessionKey, sessionKeyAgentPrefix) {
return msgSessionKey
}
return route.SessionKey
}
func (al *AgentLoop) processSystemMessage(
ctx context.Context,
msg bus.InboundMessage,
) (string, error) {
if msg.Channel != "system" {
return "", fmt.Errorf(
"processSystemMessage called with non-system message channel: %s",
msg.Channel,
)
}
logger.InfoCF("agent", "Processing system message",
map[string]any{
"sender_id": msg.SenderID,
"chat_id": msg.ChatID,
})
// Parse origin channel from chat_id (format: "channel:chat_id")
var originChannel, originChatID string
if idx := strings.Index(msg.ChatID, ":"); idx > 0 {
originChannel = msg.ChatID[:idx]
originChatID = msg.ChatID[idx+1:]
} else {
originChannel = "cli"
originChatID = msg.ChatID
}
// Extract subagent result from message content
// Format: "Task 'label' completed.\n\nResult:\n<actual content>"
content := msg.Content
if idx := strings.Index(content, "Result:\n"); idx >= 0 {
content = content[idx+8:] // Extract just the result part
}
// Skip internal channels - only log, don't send to user
if constants.IsInternalChannel(originChannel) {
logger.InfoCF("agent", "Subagent completed (internal channel)",
map[string]any{
"sender_id": msg.SenderID,
"content_len": len(content),
"channel": originChannel,
})
return "", nil
}
// Use default agent for system messages
agent := al.registry.GetDefaultAgent()
if agent == nil {
return "", fmt.Errorf("no default agent for system message")
}
// Use the origin session for context
sessionKey := routing.BuildAgentMainSessionKey(agent.ID)
return al.runAgentLoop(ctx, agent, processOptions{
SessionKey: sessionKey,
Channel: originChannel,
ChatID: originChatID,
UserMessage: fmt.Sprintf("[System: %s] %s", msg.SenderID, msg.Content),
DefaultResponse: "Background task completed.",
EnableSummary: false,
SendResponse: true,
})
}
// runAgentLoop is the core message processing logic.
func (al *AgentLoop) runAgentLoop(
ctx context.Context,
agent *AgentInstance,
opts processOptions,
) (string, error) {
// 0. Record last channel for heartbeat notifications (skip internal channels)
if opts.Channel != "" && opts.ChatID != "" {
// Don't record internal channels (cli, system, subagent)
if !constants.IsInternalChannel(opts.Channel) {
channelKey := fmt.Sprintf("%s:%s", opts.Channel, opts.ChatID)
if err := al.RecordLastChannel(channelKey); err != nil {
logger.WarnCF(
"agent",
"Failed to record last channel",
map[string]any{"error": err.Error()},
)
}
}
}
// 1. Build messages (skip history for heartbeat)
var history []providers.Message
var summary string
if !opts.NoHistory {
history = agent.Sessions.GetHistory(opts.SessionKey)
summary = agent.Sessions.GetSummary(opts.SessionKey)
}
messages := agent.ContextBuilder.BuildMessages(
history,
summary,
opts.UserMessage,
opts.Media,
opts.Channel,
opts.ChatID,
)
// Resolve media:// refs to base64 data URLs (streaming)
maxMediaSize := al.cfg.Agents.Defaults.GetMaxMediaSize()
messages = resolveMediaRefs(messages, al.mediaStore, maxMediaSize)
// 2. Save user message to session
agent.Sessions.AddMessage(opts.SessionKey, "user", opts.UserMessage)
// 3. Run LLM iteration loop
finalContent, iteration, err := al.runLLMIteration(ctx, agent, messages, opts)
if err != nil {
return "", err
}
// If last tool had ForUser content and we already sent it, we might not need to send final response
// This is controlled by the tool's Silent flag and ForUser content
// 4. Handle empty response
if finalContent == "" {
finalContent = opts.DefaultResponse
}
// 5. Save final assistant message to session
agent.Sessions.AddMessage(opts.SessionKey, "assistant", finalContent)
agent.Sessions.Save(opts.SessionKey)
// 6. Optional: summarization
if opts.EnableSummary {
al.maybeSummarize(agent, opts.SessionKey, opts.Channel, opts.ChatID)
}
// 7. Optional: send response via bus
if opts.SendResponse {
al.bus.PublishOutbound(ctx, bus.OutboundMessage{
Channel: opts.Channel,
ChatID: opts.ChatID,
Content: finalContent,
})
}
// 8. Log response
responsePreview := utils.Truncate(finalContent, 120)
logger.InfoCF("agent", fmt.Sprintf("Response: %s", responsePreview),
map[string]any{
"agent_id": agent.ID,
"session_key": opts.SessionKey,
"iterations": iteration,
"final_length": len(finalContent),
})
return finalContent, nil
}
func (al *AgentLoop) targetReasoningChannelID(channelName string) (chatID string) {
if al.channelManager == nil {
return ""
}
if ch, ok := al.channelManager.GetChannel(channelName); ok {
return ch.ReasoningChannelID()
}
return ""
}
func (al *AgentLoop) handleReasoning(
ctx context.Context,
reasoningContent, channelName, channelID string,
) {
if reasoningContent == "" || channelName == "" || channelID == "" {
return
}
// Check context cancellation before attempting to publish,
// since PublishOutbound's select may race between send and ctx.Done().
if ctx.Err() != nil {
return
}
// Use a short timeout so the goroutine does not block indefinitely when
// the outbound bus is full. Reasoning output is best-effort; dropping it
// is acceptable to avoid goroutine accumulation.
pubCtx, pubCancel := context.WithTimeout(ctx, 5*time.Second)
defer pubCancel()
if err := al.bus.PublishOutbound(pubCtx, bus.OutboundMessage{
Channel: channelName,
ChatID: channelID,
Content: reasoningContent,
}); err != nil {
// Treat context.DeadlineExceeded / context.Canceled as expected
// (bus full under load, or parent canceled). Check the error
// itself rather than ctx.Err(), because pubCtx may time out
// (5 s) while the parent ctx is still active.
// Also treat ErrBusClosed as expected — it occurs during normal
// shutdown when the bus is closed before all goroutines finish.
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) ||
errors.Is(err, bus.ErrBusClosed) {
logger.DebugCF("agent", "Reasoning publish skipped (timeout/cancel)", map[string]any{
"channel": channelName,
"error": err.Error(),
})
} else {
logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{
"channel": channelName,
"error": err.Error(),
})
}
}
}
// runLLMIteration executes the LLM call loop with tool handling.
func (al *AgentLoop) runLLMIteration(
ctx context.Context,
agent *AgentInstance,
messages []providers.Message,
opts processOptions,
) (string, int, error) {
iteration := 0
var finalContent string
// Determine effective model tier for this conversation turn.
// selectCandidates evaluates routing once and the decision is sticky for
// all tool-follow-up iterations within the same turn so that a multi-step
// tool chain doesn't switch models mid-way through.
activeCandidates, activeModel := al.selectCandidates(agent, opts.UserMessage, messages)
for iteration < agent.MaxIterations {
iteration++
logger.DebugCF("agent", "LLM iteration",
map[string]any{
"agent_id": agent.ID,
"iteration": iteration,
"max": agent.MaxIterations,
})
// Build tool definitions
providerToolDefs := agent.Tools.ToProviderDefs()
// Log LLM request details
logger.DebugCF("agent", "LLM request",
map[string]any{
"agent_id": agent.ID,
"iteration": iteration,
"model": activeModel,
"messages_count": len(messages),
"tools_count": len(providerToolDefs),
"max_tokens": agent.MaxTokens,
"temperature": agent.Temperature,
"system_prompt_len": len(messages[0].Content),
})
// Log full messages (detailed)
logger.DebugCF("agent", "Full LLM request",
map[string]any{
"iteration": iteration,
"messages_json": formatMessagesForLog(messages),
"tools_json": formatToolsForLog(providerToolDefs),
})
// Call LLM with fallback chain if multiple candidates are configured.
var response *providers.LLMResponse
var err error
llmOpts := map[string]any{
"max_tokens": agent.MaxTokens,
"temperature": agent.Temperature,
"prompt_cache_key": agent.ID,
}
// parseThinkingLevel guarantees ThinkingOff for empty/unknown values,
// so checking != ThinkingOff is sufficient.
if agent.ThinkingLevel != ThinkingOff {
if tc, ok := agent.Provider.(providers.ThinkingCapable); ok && tc.SupportsThinking() {
llmOpts["thinking_level"] = string(agent.ThinkingLevel)
} else {
logger.WarnCF("agent", "thinking_level is set but current provider does not support it, ignoring",
map[string]any{"agent_id": agent.ID, "thinking_level": string(agent.ThinkingLevel)})
}
}
callLLM := func() (*providers.LLMResponse, error) {
if len(activeCandidates) > 1 && al.fallback != nil {
fbResult, fbErr := al.fallback.Execute(
ctx,
activeCandidates,
func(ctx context.Context, provider, model string) (*providers.LLMResponse, error) {
return agent.Provider.Chat(ctx, messages, providerToolDefs, model, llmOpts)
},
)
if fbErr != nil {
return nil, fbErr
}
if fbResult.Provider != "" && len(fbResult.Attempts) > 0 {
logger.InfoCF(
"agent",
fmt.Sprintf("Fallback: succeeded with %s/%s after %d attempts",
fbResult.Provider, fbResult.Model, len(fbResult.Attempts)+1),
map[string]any{"agent_id": agent.ID, "iteration": iteration},
)
}
return fbResult.Response, nil
}
return agent.Provider.Chat(ctx, messages, providerToolDefs, activeModel, llmOpts)
}
// Retry loop for context/token errors
maxRetries := 2
for retry := 0; retry <= maxRetries; retry++ {
response, err = callLLM()
if err == nil {
break
}
errMsg := strings.ToLower(err.Error())
// Check if this is a network/HTTP timeout — not a context window error.
isTimeoutError := errors.Is(err, context.DeadlineExceeded) ||
strings.Contains(errMsg, "deadline exceeded") ||
strings.Contains(errMsg, "client.timeout") ||
strings.Contains(errMsg, "timed out") ||
strings.Contains(errMsg, "timeout exceeded")
// Detect real context window / token limit errors, excluding network timeouts.
isContextError := !isTimeoutError && (strings.Contains(errMsg, "context_length_exceeded") ||
strings.Contains(errMsg, "context window") ||
strings.Contains(errMsg, "maximum context length") ||
strings.Contains(errMsg, "token limit") ||
strings.Contains(errMsg, "too many tokens") ||
strings.Contains(errMsg, "max_tokens") ||
strings.Contains(errMsg, "invalidparameter") ||
strings.Contains(errMsg, "prompt is too long") ||
strings.Contains(errMsg, "request too large"))
if isTimeoutError && retry < maxRetries {
backoff := time.Duration(retry+1) * 5 * time.Second
logger.WarnCF("agent", "Timeout error, retrying after backoff", map[string]any{
"error": err.Error(),
"retry": retry,
"backoff": backoff.String(),
})
time.Sleep(backoff)
continue
}
if isContextError && retry < maxRetries {
logger.WarnCF(
"agent",
"Context window error detected, attempting compression",
map[string]any{
"error": err.Error(),
"retry": retry,
},
)
if retry == 0 && !constants.IsInternalChannel(opts.Channel) {
al.bus.PublishOutbound(ctx, bus.OutboundMessage{
Channel: opts.Channel,
ChatID: opts.ChatID,
Content: "Context window exceeded. Compressing history and retrying...",
})
}
al.forceCompression(agent, opts.SessionKey)
newHistory := agent.Sessions.GetHistory(opts.SessionKey)
newSummary := agent.Sessions.GetSummary(opts.SessionKey)
messages = agent.ContextBuilder.BuildMessages(
newHistory, newSummary, "",
nil, opts.Channel, opts.ChatID,
)
continue
}
break
}
if err != nil {
logger.ErrorCF("agent", "LLM call failed",
map[string]any{
"agent_id": agent.ID,
"iteration": iteration,
"error": err.Error(),
})
return "", iteration, fmt.Errorf("LLM call failed after retries: %w", err)
}
go al.handleReasoning(
ctx,
response.Reasoning,
opts.Channel,
al.targetReasoningChannelID(opts.Channel),
)
logger.DebugCF("agent", "LLM response",
map[string]any{
"agent_id": agent.ID,
"iteration": iteration,
"content_chars": len(response.Content),
"tool_calls": len(response.ToolCalls),
"reasoning": response.Reasoning,
"target_channel": al.targetReasoningChannelID(opts.Channel),
"channel": opts.Channel,
})
// Check if no tool calls - we're done
if len(response.ToolCalls) == 0 {
finalContent = response.Content
logger.InfoCF("agent", "LLM response without tool calls (direct answer)",
map[string]any{
"agent_id": agent.ID,
"iteration": iteration,
"content_chars": len(finalContent),
})
break
}
normalizedToolCalls := make([]providers.ToolCall, 0, len(response.ToolCalls))
for _, tc := range response.ToolCalls {
normalizedToolCalls = append(normalizedToolCalls, providers.NormalizeToolCall(tc))
}
// Log tool calls
toolNames := make([]string, 0, len(normalizedToolCalls))
for _, tc := range normalizedToolCalls {
toolNames = append(toolNames, tc.Name)
}
logger.InfoCF("agent", "LLM requested tool calls",
map[string]any{
"agent_id": agent.ID,
"tools": toolNames,
"count": len(normalizedToolCalls),
"iteration": iteration,
})
// Build assistant message with tool calls
assistantMsg := providers.Message{
Role: "assistant",
Content: response.Content,
ReasoningContent: response.ReasoningContent,
}
for _, tc := range normalizedToolCalls {
argumentsJSON, _ := json.Marshal(tc.Arguments)
// Copy ExtraContent to ensure thought_signature is persisted for Gemini 3
extraContent := tc.ExtraContent
thoughtSignature := ""
if tc.Function != nil {
thoughtSignature = tc.Function.ThoughtSignature
}
assistantMsg.ToolCalls = append(assistantMsg.ToolCalls, providers.ToolCall{
ID: tc.ID,
Type: "function",
Name: tc.Name,
Function: &providers.FunctionCall{
Name: tc.Name,
Arguments: string(argumentsJSON),
ThoughtSignature: thoughtSignature,
},
ExtraContent: extraContent,
ThoughtSignature: thoughtSignature,
})
}
messages = append(messages, assistantMsg)
// Save assistant message with tool calls to session
agent.Sessions.AddFullMessage(opts.SessionKey, assistantMsg)
// Execute tool calls in parallel
type indexedAgentResult struct {
result *tools.ToolResult
tc providers.ToolCall
}
agentResults := make([]indexedAgentResult, len(normalizedToolCalls))
var wg sync.WaitGroup
for i, tc := range normalizedToolCalls {
agentResults[i].tc = tc
wg.Add(1)
go func(idx int, tc providers.ToolCall) {
defer wg.Done()
argsJSON, _ := json.Marshal(tc.Arguments)
argsPreview := utils.Truncate(string(argsJSON), 200)
logger.InfoCF("agent", fmt.Sprintf("Tool call: %s(%s)", tc.Name, argsPreview),
map[string]any{
"agent_id": agent.ID,
"tool": tc.Name,
"iteration": iteration,
})
// Create async callback for tools that implement AsyncExecutor
asyncCallback := func(callbackCtx context.Context, result *tools.ToolResult) {
if !result.Silent && result.ForUser != "" {
logger.InfoCF("agent", "Async tool completed, agent will handle notification",
map[string]any{
"tool": tc.Name,
"content_len": len(result.ForUser),
})
}
}
toolResult := agent.Tools.ExecuteWithContext(
ctx,
tc.Name,
tc.Arguments,
opts.Channel,
opts.ChatID,
asyncCallback,
)
agentResults[idx].result = toolResult
}(i, tc)
}
wg.Wait()
// Process results in original order (send to user, save to session)
for _, r := range agentResults {
// Send ForUser content to user immediately if not Silent
if !r.result.Silent && r.result.ForUser != "" && opts.SendResponse {
al.bus.PublishOutbound(ctx, bus.OutboundMessage{
Channel: opts.Channel,
ChatID: opts.ChatID,
Content: r.result.ForUser,
})
logger.DebugCF("agent", "Sent tool result to user",
map[string]any{
"tool": r.tc.Name,
"content_len": len(r.result.ForUser),
})
}
// If tool returned media refs, publish them as outbound media
if len(r.result.Media) > 0 {
parts := make([]bus.MediaPart, 0, len(r.result.Media))
for _, ref := range r.result.Media {
part := bus.MediaPart{Ref: ref}
if al.mediaStore != nil {
if _, meta, err := al.mediaStore.ResolveWithMeta(ref); err == nil {
part.Filename = meta.Filename
part.ContentType = meta.ContentType
part.Type = inferMediaType(meta.Filename, meta.ContentType)
}
}
parts = append(parts, part)
}
al.bus.PublishOutboundMedia(ctx, bus.OutboundMediaMessage{
Channel: opts.Channel,
ChatID: opts.ChatID,
Parts: parts,
})
}
// Determine content for LLM based on tool result
contentForLLM := r.result.ForLLM
if contentForLLM == "" && r.result.Err != nil {
contentForLLM = r.result.Err.Error()
}
toolResultMsg := providers.Message{
Role: "tool",
Content: contentForLLM,
ToolCallID: r.tc.ID,
}
messages = append(messages, toolResultMsg)
// Save tool result message to session
agent.Sessions.AddFullMessage(opts.SessionKey, toolResultMsg)
}
}
return finalContent, iteration, nil
}
// selectCandidates returns the model candidates and resolved model name to use
// for a conversation turn. When model routing is configured and the incoming
// message scores below the complexity threshold, it returns the light model
// candidates instead of the primary ones.
//
// The returned (candidates, model) pair is used for all LLM calls within one
// turn — tool follow-up iterations use the same tier as the initial call so
// that a multi-step tool chain doesn't switch models mid-way.
func (al *AgentLoop) selectCandidates(
agent *AgentInstance,
userMsg string,
history []providers.Message,
) (candidates []providers.FallbackCandidate, model string) {
if agent.Router == nil || len(agent.LightCandidates) == 0 {
return agent.Candidates, agent.Model
}
_, usedLight, score := agent.Router.SelectModel(userMsg, history, agent.Model)
if !usedLight {
logger.DebugCF("agent", "Model routing: primary model selected",
map[string]any{
"agent_id": agent.ID,
"score": score,
"threshold": agent.Router.Threshold(),
})
return agent.Candidates, agent.Model
}
logger.InfoCF("agent", "Model routing: light model selected",
map[string]any{
"agent_id": agent.ID,
"light_model": agent.Router.LightModel(),
"score": score,
"threshold": agent.Router.Threshold(),
})
return agent.LightCandidates, agent.Router.LightModel()
}
// maybeSummarize triggers summarization if the session history exceeds thresholds.
func (al *AgentLoop) maybeSummarize(agent *AgentInstance, sessionKey, channel, chatID string) {
newHistory := agent.Sessions.GetHistory(sessionKey)
tokenEstimate := al.estimateTokens(newHistory)
threshold := agent.ContextWindow * agent.SummarizeTokenPercent / 100
if len(newHistory) > agent.SummarizeMessageThreshold || tokenEstimate > threshold {
summarizeKey := agent.ID + ":" + sessionKey
if _, loading := al.summarizing.LoadOrStore(summarizeKey, true); !loading {
go func() {
defer al.summarizing.Delete(summarizeKey)
logger.Debug("Memory threshold reached. Optimizing conversation history...")
al.summarizeSession(agent, sessionKey)
}()
}
}
}
// forceCompression aggressively reduces context when the limit is hit.
// It drops the oldest 50% of messages (keeping system prompt and last user message).
func (al *AgentLoop) forceCompression(agent *AgentInstance, sessionKey string) {
history := agent.Sessions.GetHistory(sessionKey)
if len(history) <= 4 {
return
}
// Keep system prompt (usually [0]) and the very last message (user's trigger)
// We want to drop the oldest half of the *conversation*
// Assuming [0] is system, [1:] is conversation
conversation := history[1 : len(history)-1]
if len(conversation) == 0 {
return
}
// Helper to find the mid-point of the conversation
mid := len(conversation) / 2
// New history structure:
// 1. System Prompt (with compression note appended)
// 2. Second half of conversation
// 3. Last message
droppedCount := mid
keptConversation := conversation[mid:]
newHistory := make([]providers.Message, 0, 1+len(keptConversation)+1)
// Append compression note to the original system prompt instead of adding a new system message
// This avoids having two consecutive system messages which some APIs (like Zhipu) reject
compressionNote := fmt.Sprintf(
"\n\n[System Note: Emergency compression dropped %d oldest messages due to context limit]",
droppedCount,
)
enhancedSystemPrompt := history[0]
enhancedSystemPrompt.Content = enhancedSystemPrompt.Content + compressionNote
newHistory = append(newHistory, enhancedSystemPrompt)
newHistory = append(newHistory, keptConversation...)
newHistory = append(newHistory, history[len(history)-1]) // Last message
// Update session
agent.Sessions.SetHistory(sessionKey, newHistory)
agent.Sessions.Save(sessionKey)
logger.WarnCF("agent", "Forced compression executed", map[string]any{
"session_key": sessionKey,
"dropped_msgs": droppedCount,
"new_count": len(newHistory),
})
}
// GetStartupInfo returns information about loaded tools and skills for logging.
func (al *AgentLoop) GetStartupInfo() map[string]any {
info := make(map[string]any)
agent := al.registry.GetDefaultAgent()
if agent == nil {
return info
}
// Tools info
toolsList := agent.Tools.List()
info["tools"] = map[string]any{
"count": len(toolsList),
"names": toolsList,
}
// Skills info
info["skills"] = agent.ContextBuilder.GetSkillsInfo()
// Agents info
info["agents"] = map[string]any{
"count": len(al.registry.ListAgentIDs()),
"ids": al.registry.ListAgentIDs(),
}
return info
}
// formatMessagesForLog formats messages for logging
func formatMessagesForLog(messages []providers.Message) string {
if len(messages) == 0 {
return "[]"
}
var sb strings.Builder
sb.WriteString("[\n")
for i, msg := range messages {
fmt.Fprintf(&sb, " [%d] Role: %s\n", i, msg.Role)
if len(msg.ToolCalls) > 0 {
sb.WriteString(" ToolCalls:\n")
for _, tc := range msg.ToolCalls {
fmt.Fprintf(&sb, " - ID: %s, Type: %s, Name: %s\n", tc.ID, tc.Type, tc.Name)
if tc.Function != nil {
fmt.Fprintf(
&sb,
" Arguments: %s\n",
utils.Truncate(tc.Function.Arguments, 200),
)
}
}
}
if msg.Content != "" {
content := utils.Truncate(msg.Content, 200)
fmt.Fprintf(&sb, " Content: %s\n", content)
}
if msg.ToolCallID != "" {
fmt.Fprintf(&sb, " ToolCallID: %s\n", msg.ToolCallID)
}
sb.WriteString("\n")
}
sb.WriteString("]")
return sb.String()
}
// formatToolsForLog formats tool definitions for logging
func formatToolsForLog(toolDefs []providers.ToolDefinition) string {
if len(toolDefs) == 0 {
return "[]"
}
var sb strings.Builder
sb.WriteString("[\n")
for i, tool := range toolDefs {
fmt.Fprintf(&sb, " [%d] Type: %s, Name: %s\n", i, tool.Type, tool.Function.Name)
fmt.Fprintf(&sb, " Description: %s\n", tool.Function.Description)
if len(tool.Function.Parameters) > 0 {
fmt.Fprintf(
&sb,
" Parameters: %s\n",
utils.Truncate(fmt.Sprintf("%v", tool.Function.Parameters), 200),
)
}
}
sb.WriteString("]")
return sb.String()
}
// summarizeSession summarizes the conversation history for a session.
func (al *AgentLoop) summarizeSession(agent *AgentInstance, sessionKey string) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
history := agent.Sessions.GetHistory(sessionKey)
summary := agent.Sessions.GetSummary(sessionKey)
// Keep last 4 messages for continuity
if len(history) <= 4 {
return
}
toSummarize := history[:len(history)-4]
// Oversized Message Guard
maxMessageTokens := agent.ContextWindow / 2
validMessages := make([]providers.Message, 0)
omitted := false
for _, m := range toSummarize {
if m.Role != "user" && m.Role != "assistant" {
continue
}
msgTokens := len(m.Content) / 2
if msgTokens > maxMessageTokens {
omitted = true
continue
}
validMessages = append(validMessages, m)
}
if len(validMessages) == 0 {
return
}
// Multi-Part Summarization
var finalSummary string
if len(validMessages) > 10 {
mid := len(validMessages) / 2
part1 := validMessages[:mid]
part2 := validMessages[mid:]
s1, _ := al.summarizeBatch(ctx, agent, part1, "")
s2, _ := al.summarizeBatch(ctx, agent, part2, "")
mergePrompt := fmt.Sprintf(
"Merge these two conversation summaries into one cohesive summary:\n\n1: %s\n\n2: %s",
s1,
s2,
)
resp, err := agent.Provider.Chat(
ctx,
[]providers.Message{{Role: "user", Content: mergePrompt}},
nil,
agent.Model,
map[string]any{
"max_tokens": 1024,
"temperature": 0.3,
"prompt_cache_key": agent.ID,
},
)
if err == nil {
finalSummary = resp.Content
} else {
finalSummary = s1 + " " + s2
}
} else {
finalSummary, _ = al.summarizeBatch(ctx, agent, validMessages, summary)
}
if omitted && finalSummary != "" {
finalSummary += "\n[Note: Some oversized messages were omitted from this summary for efficiency.]"
}
if finalSummary != "" {
agent.Sessions.SetSummary(sessionKey, finalSummary)
agent.Sessions.TruncateHistory(sessionKey, 4)
agent.Sessions.Save(sessionKey)
}
}
// summarizeBatch summarizes a batch of messages.
func (al *AgentLoop) summarizeBatch(
ctx context.Context,
agent *AgentInstance,
batch []providers.Message,
existingSummary string,
) (string, error) {
var sb strings.Builder
sb.WriteString(
"Provide a concise summary of this conversation segment, preserving core context and key points.\n",
)
if existingSummary != "" {
sb.WriteString("Existing context: ")
sb.WriteString(existingSummary)
sb.WriteString("\n")
}
sb.WriteString("\nCONVERSATION:\n")
for _, m := range batch {
fmt.Fprintf(&sb, "%s: %s\n", m.Role, m.Content)
}
prompt := sb.String()
response, err := agent.Provider.Chat(
ctx,
[]providers.Message{{Role: "user", Content: prompt}},
nil,
agent.Model,
map[string]any{
"max_tokens": 1024,
"temperature": 0.3,
"prompt_cache_key": agent.ID,
},
)
if err != nil {
return "", err
}
return response.Content, nil
}
// estimateTokens estimates the number of tokens in a message list.
// Uses a safe heuristic of 2.5 characters per token to account for CJK and other
// overheads better than the previous 3 chars/token.
func (al *AgentLoop) estimateTokens(messages []providers.Message) int {
totalChars := 0
for _, m := range messages {
totalChars += utf8.RuneCountInString(m.Content)
}
// 2.5 chars per token = totalChars * 2 / 5
return totalChars * 2 / 5
}
func (al *AgentLoop) handleCommand(
ctx context.Context,
msg bus.InboundMessage,
agent *AgentInstance,
) (string, bool) {
if !commands.HasCommandPrefix(msg.Content) {
return "", false
}
if al.cmdRegistry == nil {
return "", false
}
rt := al.buildCommandsRuntime(agent)
executor := commands.NewExecutor(al.cmdRegistry, rt)
var commandReply string
result := executor.Execute(ctx, commands.Request{
Channel: msg.Channel,
ChatID: msg.ChatID,
SenderID: msg.SenderID,
Text: msg.Content,
Reply: func(text string) error {
commandReply = text
return nil
},
})
switch result.Outcome {
case commands.OutcomeHandled:
if result.Err != nil {
return mapCommandError(result), true
}
if commandReply != "" {
return commandReply, true
}
return "", true
default: // OutcomePassthrough — let the message fall through to LLM
return "", false
}
}
func (al *AgentLoop) buildCommandsRuntime(agent *AgentInstance) *commands.Runtime {
rt := &commands.Runtime{
Config: al.cfg,
ListAgentIDs: al.registry.ListAgentIDs,
ListDefinitions: al.cmdRegistry.Definitions,
GetEnabledChannels: func() []string {
if al.channelManager == nil {
return nil
}
return al.channelManager.GetEnabledChannels()
},
SwitchChannel: func(value string) error {
if al.channelManager == nil {
return fmt.Errorf("channel manager not initialized")
}
if _, exists := al.channelManager.GetChannel(value); !exists && value != "cli" {
return fmt.Errorf("channel '%s' not found or not enabled", value)
}
return nil
},
}
if agent != nil {
rt.GetModelInfo = func() (string, string) {
return agent.Model, al.cfg.Agents.Defaults.Provider
}
rt.SwitchModel = func(value string) (string, error) {
oldModel := agent.Model
agent.Model = value
return oldModel, nil
}
}
return rt
}
func mapCommandError(result commands.ExecuteResult) string {
if result.Command == "" {
return fmt.Sprintf("Failed to execute command: %v", result.Err)
}
return fmt.Sprintf("Failed to execute /%s: %v", result.Command, result.Err)
}
// extractPeer extracts the routing peer from the inbound message's structured Peer field.
func extractPeer(msg bus.InboundMessage) *routing.RoutePeer {
if msg.Peer.Kind == "" {
return nil
}
peerID := msg.Peer.ID
if peerID == "" {
if msg.Peer.Kind == "direct" {
peerID = msg.SenderID
} else {
peerID = msg.ChatID
}
}
return &routing.RoutePeer{Kind: msg.Peer.Kind, ID: peerID}
}
func inboundMetadata(msg bus.InboundMessage, key string) string {
if msg.Metadata == nil {
return ""
}
return msg.Metadata[key]
}
// extractParentPeer extracts the parent peer (reply-to) from inbound message metadata.
func extractParentPeer(msg bus.InboundMessage) *routing.RoutePeer {
parentKind := inboundMetadata(msg, metadataKeyParentPeerKind)
parentID := inboundMetadata(msg, metadataKeyParentPeerID)
if parentKind == "" || parentID == "" {
return nil
}
return &routing.RoutePeer{Kind: parentKind, ID: parentID}
}