diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index bee8d91a7..28e549ce0 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -25,7 +25,6 @@ import ( "github.com/sipeed/picoclaw/pkg/config" "github.com/sipeed/picoclaw/pkg/constants" "github.com/sipeed/picoclaw/pkg/logger" - "github.com/sipeed/picoclaw/pkg/mcp" "github.com/sipeed/picoclaw/pkg/media" "github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/routing" @@ -48,6 +47,7 @@ type AgentLoop struct { mediaStore media.MediaStore transcriber voice.Transcriber cmdRegistry *commands.Registry + mcp mcpRuntime } // processOptions configures how a message is processed @@ -239,119 +239,8 @@ func registerSharedTools( func (al *AgentLoop) Run(ctx context.Context) error { al.running.Store(true) - - // Initialize MCP servers for all agents - if al.cfg.Tools.IsToolEnabled("mcp") { - mcpManager := mcp.NewManager() - // Ensure MCP connections are cleaned up on exit, regardless of initialization success - // This fixes resource leak when LoadFromMCPConfig partially succeeds then fails - defer func() { - if err := mcpManager.Close(); err != nil { - logger.ErrorCF("agent", "Failed to close MCP manager", - map[string]any{ - "error": err.Error(), - }) - } - }() - - defaultAgent := al.registry.GetDefaultAgent() - var workspacePath string - if defaultAgent != nil && defaultAgent.Workspace != "" { - workspacePath = defaultAgent.Workspace - } else { - workspacePath = al.cfg.WorkspacePath() - } - - if err := mcpManager.LoadFromMCPConfig(ctx, al.cfg.Tools.MCP, workspacePath); err != nil { - logger.WarnCF("agent", "Failed to load MCP servers, MCP tools will not be available", - map[string]any{ - "error": err.Error(), - }) - } else { - // Register MCP tools for all agents - servers := mcpManager.GetServers() - uniqueTools := 0 - totalRegistrations := 0 - agentIDs := al.registry.ListAgentIDs() - agentCount := len(agentIDs) - - for serverName, conn := range servers { - uniqueTools += len(conn.Tools) - for _, tool := range conn.Tools { - for _, agentID := range agentIDs { - agent, ok := al.registry.GetAgent(agentID) - if !ok { - continue - } - - mcpTool := tools.NewMCPTool(mcpManager, serverName, tool) - - if al.cfg.Tools.MCP.Discovery.Enabled { - agent.Tools.RegisterHidden(mcpTool) - } else { - agent.Tools.Register(mcpTool) - } - - totalRegistrations++ - logger.DebugCF("agent", "Registered MCP tool", - map[string]any{ - "agent_id": agentID, - "server": serverName, - "tool": tool.Name, - "name": mcpTool.Name(), - }) - } - } - } - logger.InfoCF("agent", "MCP tools registered successfully", - map[string]any{ - "server_count": len(servers), - "unique_tools": uniqueTools, - "total_registrations": totalRegistrations, - "agent_count": agentCount, - }) - - // Initializes Discovery Tools only if enabled by configuration - if al.cfg.Tools.MCP.Enabled && al.cfg.Tools.MCP.Discovery.Enabled { - useBM25 := al.cfg.Tools.MCP.Discovery.UseBM25 - useRegex := al.cfg.Tools.MCP.Discovery.UseRegex - - // Fail fast: If discovery is enabled but no search method is turned on - if !useBM25 && !useRegex { - return fmt.Errorf( - "tool discovery is enabled but neither 'use_bm25' nor 'use_regex' is set to true in the configuration", - ) - } - - ttl := al.cfg.Tools.MCP.Discovery.TTL - if ttl <= 0 { - ttl = 5 // Default value - } - - maxSearchResults := al.cfg.Tools.MCP.Discovery.MaxSearchResults - if maxSearchResults <= 0 { - maxSearchResults = 5 // Default value - } - - logger.InfoCF("agent", "Initializing tool discovery", map[string]any{ - "bm25": useBM25, "regex": useRegex, "ttl": ttl, "max_results": maxSearchResults, - }) - - for _, agentID := range agentIDs { - agent, ok := al.registry.GetAgent(agentID) - if !ok { - continue - } - - if useRegex { - agent.Tools.Register(tools.NewRegexSearchTool(agent.Tools, ttl, maxSearchResults)) - } - if useBM25 { - agent.Tools.Register(tools.NewBM25SearchTool(agent.Tools, ttl, maxSearchResults)) - } - } - } - } + if err := al.ensureMCPInitialized(ctx); err != nil { + return err } for al.running.Load() { @@ -431,6 +320,17 @@ func (al *AgentLoop) Stop() { // Close releases resources held by agent session stores. Call after Stop. func (al *AgentLoop) Close() { + mcpManager := al.mcp.takeManager() + + if mcpManager != nil { + if err := mcpManager.Close(); err != nil { + logger.ErrorCF("agent", "Failed to close MCP manager", + map[string]any{ + "error": err.Error(), + }) + } + } + al.registry.Close() } @@ -619,6 +519,10 @@ func (al *AgentLoop) ProcessDirectWithChannel( ctx context.Context, content, sessionKey, channel, chatID string, ) (string, error) { + if err := al.ensureMCPInitialized(ctx); err != nil { + return "", err + } + msg := bus.InboundMessage{ Channel: channel, SenderID: "cron", diff --git a/pkg/agent/loop_mcp.go b/pkg/agent/loop_mcp.go new file mode 100644 index 000000000..2795db52a --- /dev/null +++ b/pkg/agent/loop_mcp.go @@ -0,0 +1,184 @@ +// PicoClaw - Ultra-lightweight personal AI agent +// Inspired by and based on nanobot: https://github.com/HKUDS/nanobot +// License: MIT +// +// Copyright (c) 2026 PicoClaw contributors + +package agent + +import ( + "context" + "fmt" + "sync" + + "github.com/sipeed/picoclaw/pkg/logger" + "github.com/sipeed/picoclaw/pkg/mcp" + "github.com/sipeed/picoclaw/pkg/tools" +) + +type mcpRuntime struct { + initOnce sync.Once + mu sync.Mutex + manager *mcp.Manager + initErr error +} + +func (r *mcpRuntime) setManager(manager *mcp.Manager) { + r.mu.Lock() + r.manager = manager + r.initErr = nil + r.mu.Unlock() +} + +func (r *mcpRuntime) setInitErr(err error) { + r.mu.Lock() + r.initErr = err + r.mu.Unlock() +} + +func (r *mcpRuntime) getInitErr() error { + r.mu.Lock() + defer r.mu.Unlock() + return r.initErr +} + +func (r *mcpRuntime) takeManager() *mcp.Manager { + r.mu.Lock() + defer r.mu.Unlock() + manager := r.manager + r.manager = nil + return manager +} + +func (r *mcpRuntime) hasManager() bool { + r.mu.Lock() + defer r.mu.Unlock() + return r.manager != nil +} + +// ensureMCPInitialized loads MCP servers/tools once so both Run() and direct +// agent mode share the same initialization path. +func (al *AgentLoop) ensureMCPInitialized(ctx context.Context) error { + if !al.cfg.Tools.IsToolEnabled("mcp") { + return nil + } + + al.mcp.initOnce.Do(func() { + mcpManager := mcp.NewManager() + + defaultAgent := al.registry.GetDefaultAgent() + workspacePath := al.cfg.WorkspacePath() + if defaultAgent != nil && defaultAgent.Workspace != "" { + workspacePath = defaultAgent.Workspace + } + + if err := mcpManager.LoadFromMCPConfig(ctx, al.cfg.Tools.MCP, workspacePath); err != nil { + logger.WarnCF("agent", "Failed to load MCP servers, MCP tools will not be available", + map[string]any{ + "error": err.Error(), + }) + if closeErr := mcpManager.Close(); closeErr != nil { + logger.ErrorCF("agent", "Failed to close MCP manager", + map[string]any{ + "error": closeErr.Error(), + }) + } + return + } + + // Register MCP tools for all agents + servers := mcpManager.GetServers() + uniqueTools := 0 + totalRegistrations := 0 + agentIDs := al.registry.ListAgentIDs() + agentCount := len(agentIDs) + + for serverName, conn := range servers { + uniqueTools += len(conn.Tools) + for _, tool := range conn.Tools { + for _, agentID := range agentIDs { + agent, ok := al.registry.GetAgent(agentID) + if !ok { + continue + } + + mcpTool := tools.NewMCPTool(mcpManager, serverName, tool) + + if al.cfg.Tools.MCP.Discovery.Enabled { + agent.Tools.RegisterHidden(mcpTool) + } else { + agent.Tools.Register(mcpTool) + } + + totalRegistrations++ + logger.DebugCF("agent", "Registered MCP tool", + map[string]any{ + "agent_id": agentID, + "server": serverName, + "tool": tool.Name, + "name": mcpTool.Name(), + }) + } + } + } + logger.InfoCF("agent", "MCP tools registered successfully", + map[string]any{ + "server_count": len(servers), + "unique_tools": uniqueTools, + "total_registrations": totalRegistrations, + "agent_count": agentCount, + }) + + // Initializes Discovery Tools only if enabled by configuration + if al.cfg.Tools.MCP.Enabled && al.cfg.Tools.MCP.Discovery.Enabled { + useBM25 := al.cfg.Tools.MCP.Discovery.UseBM25 + useRegex := al.cfg.Tools.MCP.Discovery.UseRegex + + // Fail fast: If discovery is enabled but no search method is turned on + if !useBM25 && !useRegex { + al.mcp.setInitErr(fmt.Errorf( + "tool discovery is enabled but neither 'use_bm25' nor 'use_regex' is set to true in the configuration", + )) + if closeErr := mcpManager.Close(); closeErr != nil { + logger.ErrorCF("agent", "Failed to close MCP manager", + map[string]any{ + "error": closeErr.Error(), + }) + } + return + } + + ttl := al.cfg.Tools.MCP.Discovery.TTL + if ttl <= 0 { + ttl = 5 // Default value + } + + maxSearchResults := al.cfg.Tools.MCP.Discovery.MaxSearchResults + if maxSearchResults <= 0 { + maxSearchResults = 5 // Default value + } + + logger.InfoCF("agent", "Initializing tool discovery", map[string]any{ + "bm25": useBM25, "regex": useRegex, "ttl": ttl, "max_results": maxSearchResults, + }) + + for _, agentID := range agentIDs { + agent, ok := al.registry.GetAgent(agentID) + if !ok { + continue + } + + if useRegex { + agent.Tools.Register(tools.NewRegexSearchTool(agent.Tools, ttl, maxSearchResults)) + } + if useBM25 { + agent.Tools.Register(tools.NewBM25SearchTool(agent.Tools, ttl, maxSearchResults)) + } + } + } + + al.mcp.setManager(mcpManager) + }) + + return al.mcp.getInitErr() +} diff --git a/pkg/agent/loop_test.go b/pkg/agent/loop_test.go index 2e456fa60..cab82e176 100644 --- a/pkg/agent/loop_test.go +++ b/pkg/agent/loop_test.go @@ -770,6 +770,56 @@ func TestAgentLoop_ContextExhaustionRetry(t *testing.T) { } } +func TestProcessDirectWithChannel_InitializesMCPInAgentMode(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "agent-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + cfg := &config.Config{ + Agents: config.AgentsConfig{ + Defaults: config.AgentDefaults{ + Workspace: tmpDir, + Model: "test-model", + MaxTokens: 4096, + MaxToolIterations: 10, + }, + }, + Tools: config.ToolsConfig{ + MCP: config.MCPConfig{ + ToolConfig: config.ToolConfig{ + Enabled: true, + }, + }, + }, + } + + msgBus := bus.NewMessageBus() + provider := &mockProvider{} + al := NewAgentLoop(cfg, msgBus, provider) + defer al.Close() + + if al.mcp.hasManager() { + t.Fatal("expected MCP manager to be nil before first direct processing") + } + + _, err = al.ProcessDirectWithChannel( + context.Background(), + "hello", + "session-1", + "cli", + "direct", + ) + if err != nil { + t.Fatalf("ProcessDirectWithChannel failed: %v", err) + } + + if !al.mcp.hasManager() { + t.Fatal("expected MCP manager to be initialized in direct agent mode") + } +} + func TestTargetReasoningChannelID_AllChannels(t *testing.T) { tmpDir, err := os.MkdirTemp("", "agent-test-*") if err != nil {