fix(agent): initialize MCP in direct agent mode (#1361)

This commit is contained in:
Congregalis
2026-03-12 01:06:48 +08:00
committed by GitHub
parent 4a8a2e9c23
commit 9b0a48ac6d
3 changed files with 252 additions and 114 deletions
+18 -114
View File
@@ -25,7 +25,6 @@ import (
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/constants"
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/mcp"
"github.com/sipeed/picoclaw/pkg/media"
"github.com/sipeed/picoclaw/pkg/providers"
"github.com/sipeed/picoclaw/pkg/routing"
@@ -48,6 +47,7 @@ type AgentLoop struct {
mediaStore media.MediaStore
transcriber voice.Transcriber
cmdRegistry *commands.Registry
mcp mcpRuntime
}
// processOptions configures how a message is processed
@@ -239,119 +239,8 @@ func registerSharedTools(
func (al *AgentLoop) Run(ctx context.Context) error {
al.running.Store(true)
// Initialize MCP servers for all agents
if al.cfg.Tools.IsToolEnabled("mcp") {
mcpManager := mcp.NewManager()
// Ensure MCP connections are cleaned up on exit, regardless of initialization success
// This fixes resource leak when LoadFromMCPConfig partially succeeds then fails
defer func() {
if err := mcpManager.Close(); err != nil {
logger.ErrorCF("agent", "Failed to close MCP manager",
map[string]any{
"error": err.Error(),
})
}
}()
defaultAgent := al.registry.GetDefaultAgent()
var workspacePath string
if defaultAgent != nil && defaultAgent.Workspace != "" {
workspacePath = defaultAgent.Workspace
} else {
workspacePath = al.cfg.WorkspacePath()
}
if err := mcpManager.LoadFromMCPConfig(ctx, al.cfg.Tools.MCP, workspacePath); err != nil {
logger.WarnCF("agent", "Failed to load MCP servers, MCP tools will not be available",
map[string]any{
"error": err.Error(),
})
} else {
// Register MCP tools for all agents
servers := mcpManager.GetServers()
uniqueTools := 0
totalRegistrations := 0
agentIDs := al.registry.ListAgentIDs()
agentCount := len(agentIDs)
for serverName, conn := range servers {
uniqueTools += len(conn.Tools)
for _, tool := range conn.Tools {
for _, agentID := range agentIDs {
agent, ok := al.registry.GetAgent(agentID)
if !ok {
continue
}
mcpTool := tools.NewMCPTool(mcpManager, serverName, tool)
if al.cfg.Tools.MCP.Discovery.Enabled {
agent.Tools.RegisterHidden(mcpTool)
} else {
agent.Tools.Register(mcpTool)
}
totalRegistrations++
logger.DebugCF("agent", "Registered MCP tool",
map[string]any{
"agent_id": agentID,
"server": serverName,
"tool": tool.Name,
"name": mcpTool.Name(),
})
}
}
}
logger.InfoCF("agent", "MCP tools registered successfully",
map[string]any{
"server_count": len(servers),
"unique_tools": uniqueTools,
"total_registrations": totalRegistrations,
"agent_count": agentCount,
})
// Initializes Discovery Tools only if enabled by configuration
if al.cfg.Tools.MCP.Enabled && al.cfg.Tools.MCP.Discovery.Enabled {
useBM25 := al.cfg.Tools.MCP.Discovery.UseBM25
useRegex := al.cfg.Tools.MCP.Discovery.UseRegex
// Fail fast: If discovery is enabled but no search method is turned on
if !useBM25 && !useRegex {
return fmt.Errorf(
"tool discovery is enabled but neither 'use_bm25' nor 'use_regex' is set to true in the configuration",
)
}
ttl := al.cfg.Tools.MCP.Discovery.TTL
if ttl <= 0 {
ttl = 5 // Default value
}
maxSearchResults := al.cfg.Tools.MCP.Discovery.MaxSearchResults
if maxSearchResults <= 0 {
maxSearchResults = 5 // Default value
}
logger.InfoCF("agent", "Initializing tool discovery", map[string]any{
"bm25": useBM25, "regex": useRegex, "ttl": ttl, "max_results": maxSearchResults,
})
for _, agentID := range agentIDs {
agent, ok := al.registry.GetAgent(agentID)
if !ok {
continue
}
if useRegex {
agent.Tools.Register(tools.NewRegexSearchTool(agent.Tools, ttl, maxSearchResults))
}
if useBM25 {
agent.Tools.Register(tools.NewBM25SearchTool(agent.Tools, ttl, maxSearchResults))
}
}
}
}
if err := al.ensureMCPInitialized(ctx); err != nil {
return err
}
for al.running.Load() {
@@ -431,6 +320,17 @@ func (al *AgentLoop) Stop() {
// Close releases resources held by agent session stores. Call after Stop.
func (al *AgentLoop) Close() {
mcpManager := al.mcp.takeManager()
if mcpManager != nil {
if err := mcpManager.Close(); err != nil {
logger.ErrorCF("agent", "Failed to close MCP manager",
map[string]any{
"error": err.Error(),
})
}
}
al.registry.Close()
}
@@ -619,6 +519,10 @@ func (al *AgentLoop) ProcessDirectWithChannel(
ctx context.Context,
content, sessionKey, channel, chatID string,
) (string, error) {
if err := al.ensureMCPInitialized(ctx); err != nil {
return "", err
}
msg := bus.InboundMessage{
Channel: channel,
SenderID: "cron",
+184
View File
@@ -0,0 +1,184 @@
// PicoClaw - Ultra-lightweight personal AI agent
// Inspired by and based on nanobot: https://github.com/HKUDS/nanobot
// License: MIT
//
// Copyright (c) 2026 PicoClaw contributors
package agent
import (
"context"
"fmt"
"sync"
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/mcp"
"github.com/sipeed/picoclaw/pkg/tools"
)
type mcpRuntime struct {
initOnce sync.Once
mu sync.Mutex
manager *mcp.Manager
initErr error
}
func (r *mcpRuntime) setManager(manager *mcp.Manager) {
r.mu.Lock()
r.manager = manager
r.initErr = nil
r.mu.Unlock()
}
func (r *mcpRuntime) setInitErr(err error) {
r.mu.Lock()
r.initErr = err
r.mu.Unlock()
}
func (r *mcpRuntime) getInitErr() error {
r.mu.Lock()
defer r.mu.Unlock()
return r.initErr
}
func (r *mcpRuntime) takeManager() *mcp.Manager {
r.mu.Lock()
defer r.mu.Unlock()
manager := r.manager
r.manager = nil
return manager
}
func (r *mcpRuntime) hasManager() bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.manager != nil
}
// ensureMCPInitialized loads MCP servers/tools once so both Run() and direct
// agent mode share the same initialization path.
func (al *AgentLoop) ensureMCPInitialized(ctx context.Context) error {
if !al.cfg.Tools.IsToolEnabled("mcp") {
return nil
}
al.mcp.initOnce.Do(func() {
mcpManager := mcp.NewManager()
defaultAgent := al.registry.GetDefaultAgent()
workspacePath := al.cfg.WorkspacePath()
if defaultAgent != nil && defaultAgent.Workspace != "" {
workspacePath = defaultAgent.Workspace
}
if err := mcpManager.LoadFromMCPConfig(ctx, al.cfg.Tools.MCP, workspacePath); err != nil {
logger.WarnCF("agent", "Failed to load MCP servers, MCP tools will not be available",
map[string]any{
"error": err.Error(),
})
if closeErr := mcpManager.Close(); closeErr != nil {
logger.ErrorCF("agent", "Failed to close MCP manager",
map[string]any{
"error": closeErr.Error(),
})
}
return
}
// Register MCP tools for all agents
servers := mcpManager.GetServers()
uniqueTools := 0
totalRegistrations := 0
agentIDs := al.registry.ListAgentIDs()
agentCount := len(agentIDs)
for serverName, conn := range servers {
uniqueTools += len(conn.Tools)
for _, tool := range conn.Tools {
for _, agentID := range agentIDs {
agent, ok := al.registry.GetAgent(agentID)
if !ok {
continue
}
mcpTool := tools.NewMCPTool(mcpManager, serverName, tool)
if al.cfg.Tools.MCP.Discovery.Enabled {
agent.Tools.RegisterHidden(mcpTool)
} else {
agent.Tools.Register(mcpTool)
}
totalRegistrations++
logger.DebugCF("agent", "Registered MCP tool",
map[string]any{
"agent_id": agentID,
"server": serverName,
"tool": tool.Name,
"name": mcpTool.Name(),
})
}
}
}
logger.InfoCF("agent", "MCP tools registered successfully",
map[string]any{
"server_count": len(servers),
"unique_tools": uniqueTools,
"total_registrations": totalRegistrations,
"agent_count": agentCount,
})
// Initializes Discovery Tools only if enabled by configuration
if al.cfg.Tools.MCP.Enabled && al.cfg.Tools.MCP.Discovery.Enabled {
useBM25 := al.cfg.Tools.MCP.Discovery.UseBM25
useRegex := al.cfg.Tools.MCP.Discovery.UseRegex
// Fail fast: If discovery is enabled but no search method is turned on
if !useBM25 && !useRegex {
al.mcp.setInitErr(fmt.Errorf(
"tool discovery is enabled but neither 'use_bm25' nor 'use_regex' is set to true in the configuration",
))
if closeErr := mcpManager.Close(); closeErr != nil {
logger.ErrorCF("agent", "Failed to close MCP manager",
map[string]any{
"error": closeErr.Error(),
})
}
return
}
ttl := al.cfg.Tools.MCP.Discovery.TTL
if ttl <= 0 {
ttl = 5 // Default value
}
maxSearchResults := al.cfg.Tools.MCP.Discovery.MaxSearchResults
if maxSearchResults <= 0 {
maxSearchResults = 5 // Default value
}
logger.InfoCF("agent", "Initializing tool discovery", map[string]any{
"bm25": useBM25, "regex": useRegex, "ttl": ttl, "max_results": maxSearchResults,
})
for _, agentID := range agentIDs {
agent, ok := al.registry.GetAgent(agentID)
if !ok {
continue
}
if useRegex {
agent.Tools.Register(tools.NewRegexSearchTool(agent.Tools, ttl, maxSearchResults))
}
if useBM25 {
agent.Tools.Register(tools.NewBM25SearchTool(agent.Tools, ttl, maxSearchResults))
}
}
}
al.mcp.setManager(mcpManager)
})
return al.mcp.getInitErr()
}
+50
View File
@@ -770,6 +770,56 @@ func TestAgentLoop_ContextExhaustionRetry(t *testing.T) {
}
}
func TestProcessDirectWithChannel_InitializesMCPInAgentMode(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "agent-test-*")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
cfg := &config.Config{
Agents: config.AgentsConfig{
Defaults: config.AgentDefaults{
Workspace: tmpDir,
Model: "test-model",
MaxTokens: 4096,
MaxToolIterations: 10,
},
},
Tools: config.ToolsConfig{
MCP: config.MCPConfig{
ToolConfig: config.ToolConfig{
Enabled: true,
},
},
},
}
msgBus := bus.NewMessageBus()
provider := &mockProvider{}
al := NewAgentLoop(cfg, msgBus, provider)
defer al.Close()
if al.mcp.hasManager() {
t.Fatal("expected MCP manager to be nil before first direct processing")
}
_, err = al.ProcessDirectWithChannel(
context.Background(),
"hello",
"session-1",
"cli",
"direct",
)
if err != nil {
t.Fatalf("ProcessDirectWithChannel failed: %v", err)
}
if !al.mcp.hasManager() {
t.Fatal("expected MCP manager to be initialized in direct agent mode")
}
}
func TestTargetReasoningChannelID_AllChannels(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "agent-test-*")
if err != nil {