Compare commits

...

20 Commits

Author SHA1 Message Date
Danieldd28 50c58a3462 chore: remove MCP-focused test files 2026-02-18 00:40:37 +07:00
Danieldd28 403e048821 feat: add MCP integration with context7 compatibility 2026-02-18 00:35:35 +07:00
Leandro Barbosa 4fde0175cf Merge pull request #227 from mattn/fix-shadowing-running
Fix shadowing field runnnig
2026-02-17 12:58:48 -03:00
zenix.huang 0d16525fab fix: codex tool call 2026-02-17 22:56:31 +08:00
zenix.huang 4cd3f99dd6 fix: remove max_tokens 2026-02-17 22:56:31 +08:00
lxowalle 920e30a241 fix:pr-272 reverted the changes from pr-227 (#361) 2026-02-17 21:31:54 +08:00
daming大铭 7b9b8104c8 Merge pull request #225 from yinwm/feat/cron-exec-timeout-config
feat(cron): add configurable execution timeout for cron jobs
2026-02-17 21:12:59 +08:00
yinwm 881999aceb refactor(shell): interpret zero timeout as unlimited execution
Replace unconditional WithTimeout usage with conditional context creation
based on timeout configuration. Zero values now bypass timeout enforcement,
using WithCancel for graceful cancellation while preserving existing timeout
behavior for positive values. Simplifies CronTool initialization by removing
unnecessary conditional timeout assignment.
2026-02-17 21:10:20 +08:00
Hua Audio f929268ab2 feat: Add Perplexity search provider integration (#138)
* feat: Add Perplexity search provider integration

- Add PerplexityConfig struct to config package
- Add PerplexitySearchProvider implementing SearchProvider interface
- Update WebSearchTool to support Perplexity with priority system (Perplexity > Brave > DuckDuckGo)
- Update agent loop to pass Perplexity config options
- Update config.example.json with Perplexity configuration template
- Uses Perplexity's 'sonar' model for web search capabilities

* Edit config example

* make fmt

---------

Co-authored-by: Hua <zhangmikoto@gmail.com>
2026-02-17 21:02:56 +08:00
yinwm 684e7413e1 Merge remote-tracking branch 'origin/main' into feat/cron-exec-timeout-config 2026-02-17 20:53:31 +08:00
zepan da79c201c7 1. fix typo 2026-02-17 18:03:02 +08:00
zepan 5fb2721d22 1. add android phone termux quick guide 2026-02-17 18:01:39 +08:00
zepan 951b05d255 1. add AI Code Generation selection in pr template 2026-02-17 17:15:40 +08:00
zepan ac4b16dfb4 1. rename doc to docs 2026-02-17 16:51:38 +08:00
zepan 0fadbcd340 1. add roadmap.md 2026-02-17 16:03:07 +08:00
Guoguo a961a2df87 fix(ci): use env var for release tag (#342)
Signed-off-by: Guoguo <i@qwq.trade>
2026-02-17 14:32:51 +08:00
zepan 57dac394c5 update pr template 2026-02-17 09:30:30 +08:00
Yasuhiro Matsumoto 7ce5b75178 Fix shadowing field runnnig 2026-02-16 00:47:17 +09:00
yinwm 40f90281e5 Merge remote-tracking branch 'upstream/main' into feat/cron-exec-timeout-config 2026-02-15 18:41:54 +08:00
yinwm 82856bc57a feat(cron): add configurable execution timeout for cron jobs
Add a new configuration option `exec_timeout_minutes` under the `tools.cron`
section to control the maximum execution time for cron jobs. The default
timeout is set to 5 minutes, which is appropriate for LLM operations.

The configuration can be set in the config file or via the
`PICOCLAW_TOOLS_CRON_EXEC_TIMEOUT_MINUTES` environment variable. A value of
0 disables the timeout entirely.

This change improves system reliability by preventing cron jobs from running
indefinitely in case of unexpected failures or hanging processes.
2026-02-15 18:41:39 +08:00
26 changed files with 1762 additions and 29 deletions
+37
View File
@@ -0,0 +1,37 @@
## 📝 Description
## 🗣️ Type of Change
- [ ] 🐞 Bug fix (non-breaking change which fixes an issue)
- [ ] ✨ New feature (non-breaking change which adds functionality)
- [ ] 📖 Documentation update
- [ ] ⚡ Code refactoring (no functional changes, no api changes)
## 🤖 AI Code Generation
- [ ] 🤖 Fully AI-generated (100% AI, 0% Human)
- [ ] 🛠️ Mostly AI-generated (AI draft, Human verified/modified)
- [ ] 👨‍💻 Mostly Human-written (Human lead, AI assisted or none)
## 🔗 Linked Issue
## 📚 Technical Context (Skip for Docs)
* **Reference:** [URL]
* **Reasoning:** ...
## 🧪 Test Environment & Hardware
- **Hardware:** [e.g. Raspberry Pi 5, Orange Pi, PC]
- **OS:** [e.g. Debian 12, Ubuntu 22.04]
- **Model/Provider:** [e.g. OpenAI GPT-4o, Kimi k2, DeepSeek-V3]
- **Channels:** [e.g. Discord, Telegram, Feishu, ...]
## 📸 Proof of Work (Optional for Docs)
<details>
<summary>Click to view Logs/Screenshots</summary>
</details>
## ☑️ Checklist
- [ ] My code/docs follow the style of this project.
- [ ] I have performed a self-review of my own changes.
- [ ] I have updated the documentation accordingly.
+4 -2
View File
@@ -32,11 +32,13 @@ jobs:
- name: Create and push tag
shell: bash
env:
RELEASE_TAG: ${{ inputs.tag }}
run: |
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"
git tag -a "${{ inputs.tag }}" -m "Release ${{ inputs.tag }}"
git push origin "${{ inputs.tag }}"
git tag -a "$RELEASE_TAG" -m "Release $RELEASE_TAG"
git push origin "$RELEASE_TAG"
release:
name: GoReleaser Release
+3
View File
@@ -39,6 +39,8 @@ ifeq ($(UNAME_S),Linux)
ARCH=amd64
else ifeq ($(UNAME_M),aarch64)
ARCH=arm64
else ifeq ($(UNAME_M),loongarch64)
ARCH=loong64
else ifeq ($(UNAME_M),riscv64)
ARCH=riscv64
else
@@ -84,6 +86,7 @@ build-all: generate
@mkdir -p $(BUILD_DIR)
GOOS=linux GOARCH=amd64 $(GO) build $(LDFLAGS) -o $(BUILD_DIR)/$(BINARY_NAME)-linux-amd64 ./$(CMD_DIR)
GOOS=linux GOARCH=arm64 $(GO) build $(LDFLAGS) -o $(BUILD_DIR)/$(BINARY_NAME)-linux-arm64 ./$(CMD_DIR)
GOOS=linux GOARCH=loong64 $(GO) build $(LDFLAGS) -o $(BUILD_DIR)/$(BINARY_NAME)-linux-loong64 ./$(CMD_DIR)
GOOS=linux GOARCH=riscv64 $(GO) build $(LDFLAGS) -o $(BUILD_DIR)/$(BINARY_NAME)-linux-riscv64 ./$(CMD_DIR)
GOOS=darwin GOARCH=arm64 $(GO) build $(LDFLAGS) -o $(BUILD_DIR)/$(BINARY_NAME)-darwin-arm64 ./$(CMD_DIR)
GOOS=windows GOARCH=amd64 $(GO) build $(LDFLAGS) -o $(BUILD_DIR)/$(BINARY_NAME)-windows-amd64.exe ./$(CMD_DIR)
+6
View File
@@ -195,6 +195,9 @@ picoclaw onboard
"api_key": "YOUR_BRAVE_API_KEY",
"max_results": 5
}
},
"cron": {
"exec_timeout_minutes": 5
}
},
"heartbeat": {
@@ -697,6 +700,9 @@ HEARTBEAT_OK 応答 ユーザーが直接結果を受け取る
"search": {
"apiKey": "BSA..."
}
},
"cron": {
"exec_timeout_minutes": 5
}
},
"heartbeat": {
+18 -1
View File
@@ -49,7 +49,7 @@
## 📢 News
2026-02-16 🎉 PicoClaw hit 12K stars in one week! Thank you all for your support! PicoClaw is growing faster than we ever imagined. Given the high volume of PRs, we urgently need community maintainers. Our volunteer roles and roadmap are officially posted [here](doc/picoclaw_community_roadmap_260216.md) —we cant wait to have you on board!
2026-02-16 🎉 PicoClaw hit 12K stars in one week! Thank you all for your support! PicoClaw is growing faster than we ever imagined. Given the high volume of PRs, we urgently need community maintainers. Our volunteer roles and roadmap are officially posted [here](docs/picoclaw_community_roadmap_260216.md) —we cant wait to have you on board!
2026-02-13 🎉 PicoClaw hit 5000 stars in 4days! Thank you for the community! There are so many PRs&issues come in (during Chinese New Year holidays), we are finalizing the Project Roadmap and setting up the Developer Group to accelerate PicoClaw's development.
🚀 Call to Action: Please submit your feature requests in GitHub Discussions. We will review and prioritize them during our upcoming weekly meeting.
@@ -99,6 +99,20 @@
</tr>
</table>
### 📱 Run on old Android Phones
Give your decade-old phone a second life! Turn it into a smart AI Assistant with PicoClaw. Quick Start:
1. **Install Termux** (Available on F-Droid or Google Play).
2. **Execute cmds**
```bash
# Note: Replace v0.1.1 with the latest version from the Releases page
wget https://github.com/sipeed/picoclaw/releases/download/v0.1.1/picoclaw-linux-arm64
chmod +x picoclaw-linux-arm64
pkg install proot
termux-chroot ./picoclaw-linux-arm64 onboard
```
And then follow the instructions in the "Quick Start" section to complete the configuration!
<img src="assets/termux.jpg" alt="PicoClaw" width="512">
### 🐜 Innovative Low-Footprint Deploy
PicoClaw can be deployed on almost any Linux device!
@@ -760,6 +774,9 @@ picoclaw agent -m "Hello"
"enabled": true,
"max_results": 5
}
},
"cron": {
"exec_timeout_minutes": 5
}
},
"heartbeat": {
+24 -1
View File
@@ -50,7 +50,7 @@
## 📢 新闻 (News)
2026-02-16 🎉 PicoClaw 在一周内突破了12K star! 感谢大家的关注!PicoClaw 的成长速度超乎我们预期. 由于PR数量的快速膨胀,我们亟需社区开发者参与维护. 我们需要的志愿者角色和roadmap已经发布到了[这里](doc/picoclaw_community_roadmap_260216.md), 期待你的参与!
2026-02-16 🎉 PicoClaw 在一周内突破了12K star! 感谢大家的关注!PicoClaw 的成长速度超乎我们预期. 由于PR数量的快速膨胀,我们亟需社区开发者参与维护. 我们需要的志愿者角色和roadmap已经发布到了[这里](docs/picoclaw_community_roadmap_260216.md), 期待你的参与!
2026-02-13 🎉 **PicoClaw 在 4 天内突破 5000 Stars** 感谢社区的支持!由于正值中国春节假期,PR 和 Issue 涌入较多,我们正在利用这段时间敲定 **项目路线图 (Roadmap)** 并组建 **开发者群组**,以便加速 PicoClaw 的开发。
🚀 **行动号召:** 请在 GitHub Discussions 中提交您的功能请求 (Feature Requests)。我们将在接下来的周会上进行审查和优先级排序。
@@ -100,6 +100,23 @@
</tr>
</table>
### 📱 在手机上轻松运行
picoclaw 可以将你10年前的老旧手机废物利用,变身成为你的AI助理!快速指南:
1. 先去应用商店下载安装Termux
2. 打开后执行指令
```bash
# 注意: 下面的v0.1.1 可以换为你实际看到的最新版本
wget https://github.com/sipeed/picoclaw/releases/download/v0.1.1/picoclaw-linux-arm64
chmod +x picoclaw-linux-arm64
pkg install proot
termux-chroot ./picoclaw-linux-arm64 onboard
```
然后跟随下面的“快速开始”章节继续配置picoclaw即可使用!
<img src="assets/termux.jpg" alt="PicoClaw" width="512">
### 🐜 创新的低占用部署
PicoClaw 几乎可以部署在任何 Linux 设备上!
@@ -219,6 +236,9 @@ picoclaw onboard
"api_key": "YOUR_BRAVE_API_KEY",
"max_results": 5
}
},
"cron": {
"exec_timeout_minutes": 5
}
}
}
@@ -627,6 +647,9 @@ picoclaw agent -m "你好"
"search": {
"api_key": "BSA..."
}
},
"cron": {
"exec_timeout_minutes": 5
}
},
"heartbeat": {
+116
View File
@@ -0,0 +1,116 @@
# 🦐 PicoClaw Roadmap
> **Vision**: To build the ultimate lightweight, secure, and fully autonomous AI Agent infrastructure.automate the mundane, unleash your creativity
---
## 🚀 1. Core Optimization: Extreme Lightweight
*Our defining characteristic. We fight software bloat to ensure PicoClaw runs smoothly on the smallest embedded devices.*
* [**Memory Footprint Reduction**](https://github.com/sipeed/picoclaw/issues/346)
* **Goal**: Run smoothly on 64MB RAM embedded boards (e.g., low-end RISC-V SBCs) with the core process consuming < 20MB.
* **Context**: RAM is expensive and scarce on edge devices. Memory optimization takes precedence over storage size.
* **Action**: Analyze memory growth between releases, remove redundant dependencies, and optimize data structures.
## 🛡️ 2. Security Hardening: Defense in Depth
*Paying off early technical debt. We invite security experts to help build a "Secure-by-Default" agent.*
* **Input Defense & Permission Control**
* **Prompt Injection Defense**: Harden JSON extraction logic to prevent LLM manipulation.
* **Tool Abuse Prevention**: Strict parameter validation to ensure generated commands stay within safe boundaries.
* **SSRF Protection**: Built-in blocklists for network tools to prevent accessing internal IPs (LAN/Metadata services).
* **Sandboxing & Isolation**
* **Filesystem Sandbox**: Restrict file R/W operations to specific directories only.
* **Context Isolation**: Prevent data leakage between different user sessions or channels.
* **Privacy Redaction**: Auto-redact sensitive info (API Keys, PII) from logs and standard outputs.
* **Authentication & Secrets**
* **Crypto Upgrade**: Adopt modern algorithms like `ChaCha20-Poly1305` for secret storage.
* **OAuth 2.0 Flow**: Deprecate hardcoded API keys in the CLI; move to secure OAuth flows.
## 🔌 3. Connectivity: Protocol-First Architecture
*Connect every model, reach every platform.*
* **Provider**
* [**Architecture Upgrade**](https://github.com/sipeed/picoclaw/issues/283): Refactor from "Vendor-based" to "Protocol-based" classification (e.g., OpenAI-compatible, Ollama-compatible). *(Status: In progress by @Daming, ETA 5 days)*
* **Local Models**: Deep integration with **Ollama**, **vLLM**, **LM Studio**, and **Mistral** (local inference).
* **Online Models**: Continued support for frontier closed-source models.
* **Channel**
* **IM Matrix**: QQ, WeChat (Work), DingTalk, Feishu (Lark), Telegram, Discord, WhatsApp, LINE, Slack, Email, KOOK, Signal, ...
* **Standards**: Support for the **OneBot** protocol.
* [**attachment**](https://github.com/sipeed/picoclaw/issues/348): Native handling of images, audio, and video attachments.
* **Skill Marketplace**
* [**Discovery skills**](https://github.com/sipeed/picoclaw/issues/287): Implement `find_skill` to automatically discover and install skills from the [GitHub Skills Repo] or other registries.
## 🧠 4. Advanced Capabilities: From Chatbot to Agentic AI
*Beyond conversation—focusing on action and collaboration.*
* **Operations**
* [**MCP Support**](https://github.com/sipeed/picoclaw/issues/290): Native support for the **Model Context Protocol (MCP)**.
* [**Browser Automation**](https://github.com/sipeed/picoclaw/issues/293): Headless browser control via CDP (Chrome DevTools Protocol) or ActionBook.
* [**Mobile Operation**](https://github.com/sipeed/picoclaw/issues/292): Android device control (similar to BotDrop).
* **Multi-Agent Collaboration**
* [**Basic Multi-Agent**](https://github.com/sipeed/picoclaw/issues/294) implement
* [**Model Routing**](https://github.com/sipeed/picoclaw/issues/295): "Smart Routing" — dispatch simple tasks to small/local models (fast/cheap) and complex tasks to SOTA models (smart).
* [**Swarm Mode**](https://github.com/sipeed/picoclaw/issues/284): Collaboration between multiple PicoClaw instances on the same network.
* [**AIEOS**](https://github.com/sipeed/picoclaw/issues/296): Exploring AI-Native Operating System interaction paradigms.
## 📚 5. Developer Experience (DevEx) & Documentation
*Lowering the barrier to entry so anyone can deploy in minutes.*
* [**QuickGuide (Zero-Config Start)**](https://github.com/sipeed/picoclaw/issues/350)
* Interactive CLI Wizard: If launched without config, automatically detect the environment and guide the user through Token/Network setup step-by-step.
* **Comprehensive Documentation**
* **Platform Guides**: Dedicated guides for Windows, macOS, Linux, and Android.
* **Step-by-Step Tutorials**: "Babysitter-level" guides for configuring Providers and Channels.
* **AI-Assisted Docs**: Using AI to auto-generate API references and code comments (with human verification to prevent hallucinations).
## 🤖 6. Engineering: AI-Powered Open Source
*Born from Vibe Coding, we continue to use AI to accelerate development.*
* **AI-Enhanced CI/CD**
* Integrate AI for automated Code Review, Linting, and PR Labeling.
* **Bot Noise Reduction**: Optimize bot interactions to keep PR timelines clean.
* **Issue Triage**: AI agents to analyze incoming issues and suggest preliminary fixes.
## 🎨 7. Brand & Community
* [**Logo Design**](https://github.com/sipeed/picoclaw/issues/297): We are looking for a **Mantis Shrimp (Stomatopoda)** logo design!
* *Concept*: Needs to reflect "Small but Mighty" and "Lightning Fast Strikes."
---
### 🤝 Call for Contributions
We welcome community contributions to any item on this roadmap! Please comment on the relevant Issue or submit a PR. Let's build the best Edge AI Agent together!
BIN
View File
Binary file not shown.

After

Width:  |  Height:  |  Size: 97 KiB

+4 -3
View File
@@ -562,7 +562,8 @@ func gatewayCmd() {
})
// Setup cron tool and service
cronService := setupCronTool(agentLoop, msgBus, cfg.WorkspacePath(), cfg.Agents.Defaults.RestrictToWorkspace)
execTimeout := time.Duration(cfg.Tools.Cron.ExecTimeoutMinutes) * time.Minute
cronService := setupCronTool(agentLoop, msgBus, cfg.WorkspacePath(), cfg.Agents.Defaults.RestrictToWorkspace, execTimeout)
heartbeatService := heartbeat.NewHeartbeatService(
cfg.WorkspacePath(),
@@ -987,14 +988,14 @@ func getConfigPath() string {
return filepath.Join(home, ".picoclaw", "config.json")
}
func setupCronTool(agentLoop *agent.AgentLoop, msgBus *bus.MessageBus, workspace string, restrict bool) *cron.CronService {
func setupCronTool(agentLoop *agent.AgentLoop, msgBus *bus.MessageBus, workspace string, restrict bool, execTimeout time.Duration) *cron.CronService {
cronStorePath := filepath.Join(workspace, "cron", "jobs.json")
// Create cron service
cronService := cron.NewCronService(cronStorePath, nil)
// Create and register CronTool
cronTool := tools.NewCronTool(cronService, agentLoop, msgBus, workspace, restrict)
cronTool := tools.NewCronTool(cronService, agentLoop, msgBus, workspace, restrict, execTimeout)
agentLoop.RegisterTool(cronTool)
// Set the onJob handler
+35 -2
View File
@@ -14,7 +14,9 @@
"enabled": false,
"token": "YOUR_TELEGRAM_BOT_TOKEN",
"proxy": "",
"allow_from": ["YOUR_USER_ID"]
"allow_from": [
"YOUR_USER_ID"
]
},
"discord": {
"enabled": false,
@@ -115,9 +117,40 @@
},
"tools": {
"web": {
"search": {
"brave": {
"enabled": false,
"api_key": "YOUR_BRAVE_API_KEY",
"max_results": 5
},
"perplexity": {
"enabled": false,
"api_key": "pplx-xxx",
"max_results": 5
}
},
"cron": {
"exec_timeout_minutes": 5
},
"mcp": {
"enabled": false,
"servers": {
"filesystem": {
"enabled": false,
"command": "npx",
"args": [
"-y",
"@modelcontextprotocol/server-filesystem",
"/tmp"
],
"protocol": "mcp",
"env": {},
"working_dir": "",
"init_timeout_seconds": 60,
"call_timeout_seconds": 30,
"max_response_bytes": 65536,
"include_tools": [],
"exclude_tools": []
}
}
}
},
+62 -4
View File
@@ -23,6 +23,7 @@ import (
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/constants"
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/mcp"
"github.com/sipeed/picoclaw/pkg/providers"
"github.com/sipeed/picoclaw/pkg/session"
"github.com/sipeed/picoclaw/pkg/state"
@@ -44,8 +45,12 @@ type AgentLoop struct {
running atomic.Bool
summarizing sync.Map // Tracks which sessions are currently being summarized
channelManager *channels.Manager
mcpManager *mcp.Manager
mcpCloseOnce sync.Once
}
const defaultWebFetchMaxChars = 50000
// processOptions configures how a message is processed
type processOptions struct {
SessionKey string // Session identifier for history/context
@@ -60,7 +65,14 @@ type processOptions struct {
// createToolRegistry creates a tool registry with common tools.
// This is shared between main agent and subagents.
func createToolRegistry(workspace string, restrict bool, cfg *config.Config, msgBus *bus.MessageBus) *tools.ToolRegistry {
func createToolRegistry(
workspace string,
restrict bool,
cfg *config.Config,
msgBus *bus.MessageBus,
mcpManager *mcp.Manager,
discoveredMCPTools []mcp.RegisteredTool,
) *tools.ToolRegistry {
registry := tools.NewToolRegistry()
// File system tools
@@ -79,10 +91,15 @@ func createToolRegistry(workspace string, restrict bool, cfg *config.Config, msg
BraveEnabled: cfg.Tools.Web.Brave.Enabled,
DuckDuckGoMaxResults: cfg.Tools.Web.DuckDuckGo.MaxResults,
DuckDuckGoEnabled: cfg.Tools.Web.DuckDuckGo.Enabled,
PerplexityAPIKey: cfg.Tools.Web.Perplexity.APIKey,
PerplexityMaxResults: cfg.Tools.Web.Perplexity.MaxResults,
PerplexityEnabled: cfg.Tools.Web.Perplexity.Enabled,
}); searchTool != nil {
registry.Register(searchTool)
}
registry.Register(tools.NewWebFetchTool(50000))
registry.Register(tools.NewWebFetchTool(defaultWebFetchMaxChars))
tools.RegisterKnownMCPTools(registry, mcpManager, discoveredMCPTools)
// Hardware tools (I2C, SPI) - Linux only, returns error on other platforms
registry.Register(tools.NewI2CTool())
@@ -110,12 +127,35 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
restrict := cfg.Agents.Defaults.RestrictToWorkspace
var (
mcpManager *mcp.Manager
discoveredMCPTools []mcp.RegisteredTool
)
if cfg.Tools.MCP.Enabled {
bootstrap, err := bootstrapMCP(cfg.Tools.MCP)
if err != nil {
logger.WarnCF("agent", "MCP tool bootstrap failed",
map[string]interface{}{
"error": err.Error(),
})
} else if bootstrap != nil {
mcpManager = bootstrap.Manager
discoveredMCPTools = bootstrap.Tools
if len(discoveredMCPTools) > 0 {
logger.InfoCF("agent", "MCP tools registered",
map[string]interface{}{
"count": len(discoveredMCPTools),
})
}
}
}
// Create tool registry for main agent
toolsRegistry := createToolRegistry(workspace, restrict, cfg, msgBus)
toolsRegistry := createToolRegistry(workspace, restrict, cfg, msgBus, mcpManager, discoveredMCPTools)
// Create subagent manager with its own tool registry
subagentManager := tools.NewSubagentManager(provider, cfg.Agents.Defaults.Model, workspace, msgBus)
subagentTools := createToolRegistry(workspace, restrict, cfg, msgBus)
subagentTools := createToolRegistry(workspace, restrict, cfg, msgBus, mcpManager, discoveredMCPTools)
// Subagent doesn't need spawn/subagent tools to avoid recursion
subagentManager.SetTools(subagentTools)
@@ -148,11 +188,13 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
contextBuilder: contextBuilder,
tools: toolsRegistry,
summarizing: sync.Map{},
mcpManager: mcpManager,
}
}
func (al *AgentLoop) Run(ctx context.Context) error {
al.running.Store(true)
defer al.closeMCP()
for al.running.Load() {
select {
@@ -195,6 +237,22 @@ func (al *AgentLoop) Run(ctx context.Context) error {
func (al *AgentLoop) Stop() {
al.running.Store(false)
al.closeMCP()
}
func (al *AgentLoop) closeMCP() {
if al.mcpManager == nil {
return
}
al.mcpCloseOnce.Do(func() {
if err := al.mcpManager.Close(); err != nil {
logger.WarnCF("agent", "Failed to close MCP manager",
map[string]interface{}{
"error": err.Error(),
})
}
})
}
func (al *AgentLoop) RegisterTool(tool tools.Tool) {
+110
View File
@@ -0,0 +1,110 @@
package agent
import (
"context"
"strings"
"time"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/mcp"
)
const (
mcpBootstrapMinTimeout = 10 * time.Second
mcpBootstrapMaxTimeout = 5 * time.Minute
mcpBootstrapGraceTimeout = 5 * time.Second
)
type mcpBootstrapResult struct {
Manager *mcp.Manager
Tools []mcp.RegisteredTool
}
func bootstrapMCP(cfg config.MCPToolsConfig) (*mcpBootstrapResult, error) {
serverConfigs := buildMCPServerConfigs(cfg)
if len(serverConfigs) == 0 {
return nil, nil
}
manager := mcp.NewManager(serverConfigs)
discoveryTimeout := calculateMCPDiscoveryTimeout(serverConfigs)
discoveryCtx, cancel := context.WithTimeout(context.Background(), discoveryTimeout)
defer cancel()
discoveredTools, err := manager.DiscoverTools(discoveryCtx)
if err != nil {
_ = manager.Close()
return nil, err
}
return &mcpBootstrapResult{
Manager: manager,
Tools: discoveredTools,
}, nil
}
func calculateMCPDiscoveryTimeout(serverConfigs map[string]mcp.ServerConfig) time.Duration {
maxInitTimeout := mcpBootstrapMinTimeout
for _, serverConfig := range serverConfigs {
initTimeout := serverConfig.InitTimeout()
if initTimeout > maxInitTimeout {
maxInitTimeout = initTimeout
}
}
timeout := maxInitTimeout + mcpBootstrapGraceTimeout
if timeout < mcpBootstrapMinTimeout {
return mcpBootstrapMinTimeout
}
if timeout > mcpBootstrapMaxTimeout {
return mcpBootstrapMaxTimeout
}
return timeout
}
func buildMCPServerConfigs(cfg config.MCPToolsConfig) map[string]mcp.ServerConfig {
servers := make(map[string]mcp.ServerConfig, len(cfg.Servers))
for serverName, serverCfg := range cfg.Servers {
if !serverCfg.Enabled {
continue
}
envCopy := make(map[string]string, len(serverCfg.Env))
for key, value := range serverCfg.Env {
envCopy[key] = value
}
servers[serverName] = mcp.ServerConfig{
Name: serverName,
Command: serverCfg.Command,
Args: append([]string{}, serverCfg.Args...),
Env: envCopy,
WorkingDir: serverCfg.WorkingDir,
Protocol: inferMCPProtocol(serverCfg.Protocol, serverCfg.Command),
InitTimeoutSeconds: serverCfg.InitTimeoutSeconds,
CallTimeoutSeconds: serverCfg.CallTimeoutSeconds,
MaxResponseBytes: serverCfg.MaxResponseBytes,
IncludeTools: append([]string{}, serverCfg.IncludeTools...),
ExcludeTools: append([]string{}, serverCfg.ExcludeTools...),
}
}
return servers
}
func inferMCPProtocol(configuredProtocol, command string) string {
if protocol := strings.TrimSpace(configuredProtocol); protocol != "" {
return protocol
}
// Context7 currently emits JSON-RPC messages as JSONL on stdio,
// so defaulting avoids long startup waits when protocol is omitted.
if strings.Contains(strings.ToLower(command), "context7-mcp") {
return mcp.ProtocolJSONLines
}
return ""
}
-2
View File
@@ -18,7 +18,6 @@ type MaixCamChannel struct {
listener net.Listener
clients map[net.Conn]bool
clientsMux sync.RWMutex
running bool
}
type MaixCamMessage struct {
@@ -35,7 +34,6 @@ func NewMaixCamChannel(cfg config.MaixCamConfig, bus *bus.MessageBus) (*MaixCamC
BaseChannel: base,
config: cfg,
clients: make(map[net.Conn]bool),
running: false,
}, nil
}
+103 -2
View File
@@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"github.com/caarlos0/env/v11"
@@ -51,7 +52,10 @@ type Config struct {
Tools ToolsConfig `json:"tools"`
Heartbeat HeartbeatConfig `json:"heartbeat"`
Devices DevicesConfig `json:"devices"`
mu sync.RWMutex
// MCPServers is a compatibility alias for configs using top-level "mcpServers".
// Canonical config remains tools.mcp.servers.
MCPServers map[string]LegacyMCPServerConfig `json:"mcpServers,omitempty"`
mu sync.RWMutex
}
type AgentsConfig struct {
@@ -206,13 +210,54 @@ type DuckDuckGoConfig struct {
MaxResults int `json:"max_results" env:"PICOCLAW_TOOLS_WEB_DUCKDUCKGO_MAX_RESULTS"`
}
type PerplexityConfig struct {
Enabled bool `json:"enabled" env:"PICOCLAW_TOOLS_WEB_PERPLEXITY_ENABLED"`
APIKey string `json:"api_key" env:"PICOCLAW_TOOLS_WEB_PERPLEXITY_API_KEY"`
MaxResults int `json:"max_results" env:"PICOCLAW_TOOLS_WEB_PERPLEXITY_MAX_RESULTS"`
}
type WebToolsConfig struct {
Brave BraveConfig `json:"brave"`
DuckDuckGo DuckDuckGoConfig `json:"duckduckgo"`
Perplexity PerplexityConfig `json:"perplexity"`
}
type CronToolsConfig struct {
ExecTimeoutMinutes int `json:"exec_timeout_minutes" env:"PICOCLAW_TOOLS_CRON_EXEC_TIMEOUT_MINUTES"` // 0 means no timeout
}
type MCPServerConfig struct {
Enabled bool `json:"enabled"`
Command string `json:"command"`
Args []string `json:"args"`
Env map[string]string `json:"env"`
WorkingDir string `json:"working_dir"`
Protocol string `json:"protocol"`
InitTimeoutSeconds int `json:"init_timeout_seconds"`
CallTimeoutSeconds int `json:"call_timeout_seconds"`
MaxResponseBytes int `json:"max_response_bytes"`
IncludeTools []string `json:"include_tools"`
ExcludeTools []string `json:"exclude_tools"`
}
type MCPToolsConfig struct {
Enabled bool `json:"enabled" env:"PICOCLAW_TOOLS_MCP_ENABLED"`
Servers map[string]MCPServerConfig `json:"servers"`
}
// LegacyMCPServerConfig supports compatibility with "mcpServers" style config.
type LegacyMCPServerConfig struct {
Type string `json:"type"`
Command string `json:"command"`
Args []string `json:"args"`
Env map[string]string `json:"env"`
Protocol string `json:"protocol"`
}
type ToolsConfig struct {
Web WebToolsConfig `json:"web"`
Web WebToolsConfig `json:"web"`
Cron CronToolsConfig `json:"cron"`
MCP MCPToolsConfig `json:"mcp"`
}
func DefaultConfig() *Config {
@@ -321,6 +366,18 @@ func DefaultConfig() *Config {
Enabled: true,
MaxResults: 5,
},
Perplexity: PerplexityConfig{
Enabled: false,
APIKey: "",
MaxResults: 5,
},
},
Cron: CronToolsConfig{
ExecTimeoutMinutes: 5, // default 5 minutes for LLM operations
},
MCP: MCPToolsConfig{
Enabled: false,
Servers: map[string]MCPServerConfig{},
},
},
Heartbeat: HeartbeatConfig{
@@ -353,9 +410,53 @@ func LoadConfig(path string) (*Config, error) {
return nil, err
}
cfg.applyLegacyMCPServers()
return cfg, nil
}
func (c *Config) applyLegacyMCPServers() {
// If canonical MCP config already exists, keep it as source of truth.
if len(c.Tools.MCP.Servers) > 0 {
return
}
if len(c.MCPServers) == 0 {
return
}
if c.Tools.MCP.Servers == nil {
c.Tools.MCP.Servers = map[string]MCPServerConfig{}
}
for name, legacy := range c.MCPServers {
if strings.TrimSpace(legacy.Command) == "" {
continue
}
enabled := true
if legacy.Type != "" && legacy.Type != "stdio" {
enabled = false
}
envCopy := make(map[string]string, len(legacy.Env))
for key, value := range legacy.Env {
envCopy[key] = value
}
c.Tools.MCP.Servers[name] = MCPServerConfig{
Enabled: enabled,
Command: legacy.Command,
Args: append([]string{}, legacy.Args...),
Env: envCopy,
Protocol: legacy.Protocol,
}
}
if len(c.Tools.MCP.Servers) > 0 {
c.Tools.MCP.Enabled = true
}
}
func SaveConfig(path string, cfg *Config) error {
cfg.mu.RLock()
defer cfg.mu.RUnlock()
+603
View File
@@ -0,0 +1,603 @@
package mcp
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"os/exec"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/sipeed/picoclaw/pkg/logger"
)
// Client is the transport-agnostic MCP client contract.
type Client interface {
Start(ctx context.Context) error
ListTools(ctx context.Context) ([]RemoteTool, error)
CallTool(ctx context.Context, toolName string, arguments map[string]any) (CallResult, error)
Close() error
}
// StdioClient speaks MCP over stdio (JSON-RPC framed with Content-Length headers).
type StdioClient struct {
config ServerConfig
mode string
mu sync.Mutex
writeMu sync.Mutex
started bool
closed bool
cmd *exec.Cmd
stdin io.WriteCloser
stdout io.ReadCloser
stderr io.ReadCloser
waitCh chan struct{}
pending map[string]chan rpcResponse
nextID uint64
}
type rpcRequest struct {
JSONRPC string `json:"jsonrpc"`
ID string `json:"id,omitempty"`
Method string `json:"method"`
Params any `json:"params,omitempty"`
}
type rpcResponseEnvelope struct {
JSONRPC string `json:"jsonrpc"`
ID json.RawMessage `json:"id,omitempty"`
Result json.RawMessage `json:"result,omitempty"`
Error *rpcError `json:"error,omitempty"`
Method string `json:"method,omitempty"`
}
type rpcError struct {
Code int `json:"code"`
Message string `json:"message"`
}
type rpcResponse struct {
result json.RawMessage
rpcErr *rpcError
err error
}
type initializeParams struct {
ProtocolVersion string `json:"protocolVersion"`
Capabilities map[string]any `json:"capabilities"`
ClientInfo map[string]interface{} `json:"clientInfo"`
}
func NewStdioClient(config ServerConfig) *StdioClient {
return &StdioClient{
config: config,
mode: normalizeProtocol(config.Protocol),
}
}
func (c *StdioClient) Start(ctx context.Context) error {
c.mu.Lock()
if c.started {
c.mu.Unlock()
return nil
}
if strings.TrimSpace(c.config.Command) == "" {
c.mu.Unlock()
return fmt.Errorf("mcp server %q command is empty", c.config.Name)
}
cmd := exec.Command(c.config.Command, c.config.Args...)
if c.config.WorkingDir != "" {
cmd.Dir = c.config.WorkingDir
}
cmd.Env = buildProcessEnv(c.config.Env)
stdin, err := cmd.StdinPipe()
if err != nil {
c.mu.Unlock()
return fmt.Errorf("create stdin pipe: %w", err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
c.mu.Unlock()
return fmt.Errorf("create stdout pipe: %w", err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
c.mu.Unlock()
return fmt.Errorf("create stderr pipe: %w", err)
}
if err := cmd.Start(); err != nil {
c.mu.Unlock()
return fmt.Errorf("start process: %w", err)
}
c.started = true
c.closed = false
c.cmd = cmd
c.stdin = stdin
c.stdout = stdout
c.stderr = stderr
c.waitCh = make(chan struct{})
c.pending = make(map[string]chan rpcResponse)
c.mu.Unlock()
go c.readLoop()
go c.waitLoop()
go c.drainStderr()
initCtx, cancel := withTimeoutIfMissing(ctx, c.config.InitTimeout())
defer cancel()
_, err = c.request(initCtx, "initialize", initializeParams{
ProtocolVersion: "2024-11-05",
Capabilities: map[string]any{
"tools": map[string]any{},
},
ClientInfo: map[string]any{
"name": "picoclaw",
"version": "dev",
},
})
if err != nil {
_ = c.Close()
return fmt.Errorf("initialize failed: %w", err)
}
if err := c.notify("notifications/initialized", map[string]any{}); err != nil {
_ = c.Close()
return fmt.Errorf("initialized notification failed: %w", err)
}
return nil
}
func (c *StdioClient) ListTools(ctx context.Context) ([]RemoteTool, error) {
if err := c.Start(ctx); err != nil {
return nil, err
}
type listToolsResponse struct {
Tools []struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
InputSchema map[string]any `json:"inputSchema"`
} `json:"tools"`
NextCursor string `json:"nextCursor,omitempty"`
}
allTools := make([]RemoteTool, 0, 8)
cursor := ""
for page := 0; page < maxToolListPages; page++ {
params := map[string]any{}
if cursor != "" {
params["cursor"] = cursor
}
callCtx, cancel := withTimeoutIfMissing(ctx, c.config.CallTimeout())
raw, err := c.request(callCtx, "tools/list", params)
cancel()
if err != nil {
return nil, err
}
var response listToolsResponse
if err := json.Unmarshal(raw, &response); err != nil {
return nil, fmt.Errorf("decode tools/list response: %w", err)
}
for _, tool := range response.Tools {
allTools = append(allTools, RemoteTool{
Name: tool.Name,
Description: tool.Description,
InputSchema: tool.InputSchema,
})
}
if response.NextCursor == "" {
return allTools, nil
}
cursor = response.NextCursor
}
return nil, fmt.Errorf("tools/list exceeded %d pages", maxToolListPages)
}
func (c *StdioClient) CallTool(ctx context.Context, toolName string, arguments map[string]any) (CallResult, error) {
if err := c.Start(ctx); err != nil {
return CallResult{}, err
}
callCtx, cancel := withTimeoutIfMissing(ctx, c.config.CallTimeout())
defer cancel()
raw, err := c.request(callCtx, "tools/call", map[string]any{
"name": toolName,
"arguments": arguments,
})
if err != nil {
return CallResult{}, err
}
return formatCallPayload(raw, c.config.ResponseLimit())
}
func (c *StdioClient) Close() error {
c.mu.Lock()
if !c.started || c.closed {
c.mu.Unlock()
return nil
}
c.closed = true
cmd := c.cmd
stdin := c.stdin
waitCh := c.waitCh
c.mu.Unlock()
c.failPending(errors.New("mcp client closed"))
if stdin != nil {
_ = stdin.Close()
}
if cmd != nil && cmd.Process != nil {
_ = cmd.Process.Kill()
}
if waitCh != nil {
select {
case <-waitCh:
case <-time.After(2 * time.Second):
}
}
return nil
}
func (c *StdioClient) request(ctx context.Context, method string, params any) (json.RawMessage, error) {
id := strconv.FormatUint(atomic.AddUint64(&c.nextID, 1), 10)
responseCh := make(chan rpcResponse, 1)
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return nil, fmt.Errorf("mcp server %q is closed", c.config.Name)
}
c.pending[id] = responseCh
c.mu.Unlock()
req := rpcRequest{
JSONRPC: "2.0",
ID: id,
Method: method,
Params: params,
}
if err := c.writeMessage(req); err != nil {
c.removePending(id)
return nil, err
}
select {
case <-ctx.Done():
c.removePending(id)
return nil, ctx.Err()
case response := <-responseCh:
if response.err != nil {
return nil, response.err
}
if response.rpcErr != nil {
return nil, fmt.Errorf("mcp error %d: %s", response.rpcErr.Code, response.rpcErr.Message)
}
return response.result, nil
}
}
func (c *StdioClient) notify(method string, params any) error {
req := rpcRequest{
JSONRPC: "2.0",
Method: method,
Params: params,
}
return c.writeMessage(req)
}
func (c *StdioClient) writeMessage(payload any) error {
c.mu.Lock()
if c.closed || c.stdin == nil {
c.mu.Unlock()
return fmt.Errorf("mcp server %q is not writable", c.config.Name)
}
stdin := c.stdin
c.mu.Unlock()
data, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshal json-rpc payload: %w", err)
}
if c.mode == ProtocolJSONLines {
c.writeMu.Lock()
defer c.writeMu.Unlock()
if _, err := stdin.Write(append(data, '\n')); err != nil {
return fmt.Errorf("write jsonl body: %w", err)
}
return nil
}
frameHeader := fmt.Sprintf("Content-Length: %d\r\n\r\n", len(data))
c.writeMu.Lock()
defer c.writeMu.Unlock()
if _, err := io.WriteString(stdin, frameHeader); err != nil {
return fmt.Errorf("write frame header: %w", err)
}
if _, err := stdin.Write(data); err != nil {
return fmt.Errorf("write frame body: %w", err)
}
return nil
}
func (c *StdioClient) readLoop() {
if c.mode == ProtocolJSONLines {
c.readJSONLLoop()
return
}
c.readMCPFrameLoop()
}
func (c *StdioClient) readMCPFrameLoop() {
reader := bufio.NewReader(c.stdout)
for {
payload, err := readFramePayload(reader)
if err != nil {
c.failPending(err)
return
}
var envelope rpcResponseEnvelope
if err := json.Unmarshal(payload, &envelope); err != nil {
continue
}
c.dispatchResponse(envelope)
}
}
func (c *StdioClient) readJSONLLoop() {
scanner := bufio.NewScanner(c.stdout)
scanner.Buffer(make([]byte, 0, defaultScannerBufferBytes), maxFrameBytes)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
var envelope rpcResponseEnvelope
if err := json.Unmarshal([]byte(line), &envelope); err != nil {
continue
}
c.dispatchResponse(envelope)
}
if err := scanner.Err(); err != nil {
c.failPending(err)
return
}
c.failPending(io.EOF)
}
func (c *StdioClient) dispatchResponse(envelope rpcResponseEnvelope) {
if len(envelope.ID) == 0 {
return
}
id, ok := parseRPCID(envelope.ID)
if !ok {
return
}
c.mu.Lock()
responseCh := c.pending[id]
if responseCh != nil {
delete(c.pending, id)
}
c.mu.Unlock()
if responseCh == nil {
return
}
response := rpcResponse{
result: envelope.Result,
rpcErr: envelope.Error,
}
select {
case responseCh <- response:
default:
}
}
func (c *StdioClient) waitLoop() {
c.mu.Lock()
cmd := c.cmd
waitCh := c.waitCh
serverName := c.config.Name
c.mu.Unlock()
if cmd == nil {
if waitCh != nil {
close(waitCh)
}
return
}
err := cmd.Wait()
if waitCh != nil {
close(waitCh)
}
if err != nil {
logger.WarnCF("mcp", "MCP process exited with error",
map[string]any{
"server": serverName,
"error": err.Error(),
})
}
}
func (c *StdioClient) drainStderr() {
c.mu.Lock()
stderr := c.stderr
serverName := c.config.Name
c.mu.Unlock()
if stderr == nil {
return
}
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
logger.DebugCF("mcp", "MCP server stderr",
map[string]any{
"server": serverName,
"line": line,
})
}
}
func (c *StdioClient) failPending(err error) {
c.mu.Lock()
pending := c.pending
c.pending = make(map[string]chan rpcResponse)
c.mu.Unlock()
if len(pending) == 0 {
return
}
for _, ch := range pending {
select {
case ch <- rpcResponse{err: err}:
default:
}
}
}
func (c *StdioClient) removePending(id string) {
c.mu.Lock()
delete(c.pending, id)
c.mu.Unlock()
}
func readFramePayload(reader *bufio.Reader) ([]byte, error) {
contentLength := -1
for {
line, err := reader.ReadString('\n')
if err != nil {
return nil, err
}
trimmed := strings.TrimRight(line, "\r\n")
if trimmed == "" {
break
}
parts := strings.SplitN(trimmed, ":", 2)
if len(parts) != 2 {
continue
}
headerName := strings.TrimSpace(strings.ToLower(parts[0]))
if headerName != "content-length" {
continue
}
value := strings.TrimSpace(parts[1])
length, err := strconv.Atoi(value)
if err != nil {
return nil, fmt.Errorf("invalid content-length %q: %w", value, err)
}
contentLength = length
}
if contentLength <= 0 {
return nil, fmt.Errorf("missing content-length")
}
if contentLength > maxFrameBytes {
return nil, fmt.Errorf("frame too large (%d bytes)", contentLength)
}
payload := make([]byte, contentLength)
if _, err := io.ReadFull(reader, payload); err != nil {
return nil, err
}
return payload, nil
}
func parseRPCID(raw json.RawMessage) (string, bool) {
var stringID string
if err := json.Unmarshal(raw, &stringID); err == nil {
return stringID, true
}
var numberID float64
if err := json.Unmarshal(raw, &numberID); err == nil {
return strconv.FormatInt(int64(numberID), 10), true
}
return "", false
}
func withTimeoutIfMissing(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
if _, hasDeadline := parent.Deadline(); hasDeadline {
return context.WithCancel(parent)
}
return context.WithTimeout(parent, timeout)
}
func buildProcessEnv(custom map[string]string) []string {
base := os.Environ()
if len(custom) == 0 {
return base
}
keys := make([]string, 0, len(custom))
for key := range custom {
keys = append(keys, key)
}
sort.Strings(keys)
env := make([]string, 0, len(base)+len(keys))
env = append(env, base...)
for _, key := range keys {
env = append(env, key+"="+custom[key])
}
return env
}
func normalizeProtocol(protocol string) string {
switch strings.ToLower(strings.TrimSpace(protocol)) {
case "", ProtocolMCPFrames:
return ProtocolMCPFrames
case ProtocolJSONLines:
return ProtocolJSONLines
default:
return ProtocolMCPFrames
}
}
+61
View File
@@ -0,0 +1,61 @@
package mcp
import (
"encoding/json"
"strings"
)
type callResponse struct {
Content []contentBlock `json:"content"`
StructuredContent any `json:"structuredContent,omitempty"`
IsError bool `json:"isError,omitempty"`
}
type contentBlock struct {
Type string `json:"type"`
Text string `json:"text,omitempty"`
}
func formatCallPayload(raw json.RawMessage, responseLimit int) (CallResult, error) {
var payload callResponse
if err := json.Unmarshal(raw, &payload); err != nil {
// Fallback for servers that return non-standard payloads.
return CallResult{
Content: truncateString(strings.TrimSpace(string(raw)), responseLimit),
IsError: false,
}, nil
}
parts := make([]string, 0, len(payload.Content)+1)
for _, block := range payload.Content {
if block.Type == "text" && strings.TrimSpace(block.Text) != "" {
parts = append(parts, block.Text)
}
}
if payload.StructuredContent != nil {
if encoded, err := json.Marshal(payload.StructuredContent); err == nil {
parts = append(parts, string(encoded))
}
}
content := strings.TrimSpace(strings.Join(parts, "\n"))
if content == "" {
content = "{}"
}
return CallResult{
Content: truncateString(content, responseLimit),
IsError: payload.IsError,
}, nil
}
func truncateString(value string, maxBytes int) string {
if maxBytes <= 0 || len(value) <= maxBytes {
return value
}
if maxBytes <= 12 {
return value[:maxBytes]
}
return value[:maxBytes-12] + "\n...[truncated]"
}
+190
View File
@@ -0,0 +1,190 @@
package mcp
import (
"context"
"fmt"
"slices"
"strings"
"sync"
)
type clientFactory func(config ServerConfig) Client
type managedServer struct {
config ServerConfig
client Client
}
// Manager owns MCP servers and maps discovered MCP tools to PicoClaw tools.
type Manager struct {
mu sync.RWMutex
servers map[string]*managedServer
tools map[string]RegisteredTool
discovered bool
newClient clientFactory
}
func NewManager(configs map[string]ServerConfig) *Manager {
servers := make(map[string]*managedServer, len(configs))
for name, cfg := range configs {
copied := cfg
copied.Name = name
servers[name] = &managedServer{config: copied}
}
return &Manager{
servers: servers,
tools: make(map[string]RegisteredTool),
discovered: false,
newClient: func(config ServerConfig) Client {
return NewStdioClient(config)
},
}
}
// DiscoverTools starts configured MCP servers and returns discovered tool metadata.
func (m *Manager) DiscoverTools(ctx context.Context) ([]RegisteredTool, error) {
m.mu.Lock()
if m.discovered {
tools := toolsFromMap(m.tools)
m.mu.Unlock()
return tools, nil
}
discoveryErrors := make([]string, 0)
for serverName, server := range m.servers {
client := m.newClient(server.config)
if err := client.Start(ctx); err != nil {
discoveryErrors = append(discoveryErrors, fmt.Sprintf("%s: %v", serverName, err))
continue
}
remoteTools, err := client.ListTools(ctx)
if err != nil {
_ = client.Close()
discoveryErrors = append(discoveryErrors, fmt.Sprintf("%s: %v", serverName, err))
continue
}
server.client = client
for _, remoteTool := range remoteTools {
if !isToolAllowed(remoteTool.Name, server.config.IncludeTools, server.config.ExcludeTools) {
continue
}
qualifiedName := m.makeUniqueToolName(serverName, remoteTool.Name)
parameters := normalizeSchema(remoteTool.InputSchema)
m.tools[qualifiedName] = RegisteredTool{
QualifiedName: qualifiedName,
ServerName: serverName,
ToolName: remoteTool.Name,
Description: remoteTool.Description,
Parameters: parameters,
}
}
}
m.discovered = true
tools := toolsFromMap(m.tools)
m.mu.Unlock()
if len(tools) == 0 && len(discoveryErrors) > 0 {
return nil, fmt.Errorf("mcp tool discovery failed: %s", strings.Join(discoveryErrors, "; "))
}
return tools, nil
}
func (m *Manager) CallTool(ctx context.Context, qualifiedName string, args map[string]any) (CallResult, error) {
m.mu.RLock()
tool, ok := m.tools[qualifiedName]
if !ok {
m.mu.RUnlock()
return CallResult{}, fmt.Errorf("mcp tool %q not found", qualifiedName)
}
server := m.servers[tool.ServerName]
if server == nil || server.client == nil {
m.mu.RUnlock()
return CallResult{}, fmt.Errorf("mcp server %q is not active", tool.ServerName)
}
client := server.client
toolName := tool.ToolName
m.mu.RUnlock()
if args == nil {
args = map[string]any{}
}
return client.CallTool(ctx, toolName, args)
}
func (m *Manager) Close() error {
m.mu.Lock()
servers := make([]*managedServer, 0, len(m.servers))
for _, server := range m.servers {
servers = append(servers, server)
}
m.mu.Unlock()
var firstErr error
for _, server := range servers {
if server.client == nil {
continue
}
if err := server.client.Close(); err != nil && firstErr == nil {
firstErr = err
}
}
return firstErr
}
func (m *Manager) makeUniqueToolName(serverName, toolName string) string {
base := QualifiedToolName(serverName, toolName)
if _, exists := m.tools[base]; !exists {
return base
}
for index := 2; ; index++ {
candidate := fmt.Sprintf("%s_%d", base, index)
if len(candidate) > qualifiedNameMaxLen {
overflow := len(candidate) - qualifiedNameMaxLen
if overflow < len(base) {
candidate = base[:len(base)-overflow] + fmt.Sprintf("_%d", index)
} else {
candidate = candidate[:qualifiedNameMaxLen]
}
}
if _, exists := m.tools[candidate]; !exists {
return candidate
}
}
}
func normalizeSchema(schema map[string]any) map[string]any {
if len(schema) == 0 {
return map[string]any{
"type": "object",
"properties": map[string]any{},
}
}
return schema
}
func isToolAllowed(name string, include, exclude []string) bool {
if len(include) > 0 && !slices.Contains(include, name) {
return false
}
if slices.Contains(exclude, name) {
return false
}
return true
}
func toolsFromMap(tools map[string]RegisteredTool) []RegisteredTool {
out := make([]RegisteredTool, 0, len(tools))
for _, tool := range tools {
out = append(out, tool)
}
return out
}
+53
View File
@@ -0,0 +1,53 @@
package mcp
import "strings"
const qualifiedNameMaxLen = 64
// QualifiedToolName creates a stable, provider-safe function name.
func QualifiedToolName(serverName, toolName string) string {
prefix := "mcp_" + sanitizeName(serverName) + "__"
tool := sanitizeName(toolName)
maxToolLen := qualifiedNameMaxLen - len(prefix)
if maxToolLen <= 0 {
return prefix[:qualifiedNameMaxLen]
}
if len(tool) > maxToolLen {
tool = tool[:maxToolLen]
}
return prefix + tool
}
func sanitizeName(value string) string {
trimmed := strings.TrimSpace(strings.ToLower(value))
if trimmed == "" {
return "unknown"
}
var b strings.Builder
b.Grow(len(trimmed))
lastUnderscore := false
for i := 0; i < len(trimmed); i++ {
ch := trimmed[i]
isAlphaNum := (ch >= 'a' && ch <= 'z') || (ch >= '0' && ch <= '9')
if isAlphaNum {
b.WriteByte(ch)
lastUnderscore = false
continue
}
if !lastUnderscore {
b.WriteByte('_')
lastUnderscore = true
}
}
s := strings.Trim(b.String(), "_")
if s == "" {
s = "unknown"
}
if s[0] >= '0' && s[0] <= '9' {
return "t_" + s
}
return s
}
+77
View File
@@ -0,0 +1,77 @@
package mcp
import "time"
const (
defaultInitTimeoutSeconds = 60
defaultCallTimeoutSeconds = 30
defaultMaxResponseBytes = 64 * 1024
defaultScannerBufferBytes = 64 * 1024
maxFrameBytes = 2 * 1024 * 1024
maxToolListPages = 50
)
const (
ProtocolMCPFrames = "mcp"
ProtocolJSONLines = "jsonl"
)
// ServerConfig defines one MCP server connection.
type ServerConfig struct {
Name string
Command string
Args []string
Env map[string]string
WorkingDir string
Protocol string
InitTimeoutSeconds int
CallTimeoutSeconds int
MaxResponseBytes int
IncludeTools []string
ExcludeTools []string
}
func (c ServerConfig) InitTimeout() time.Duration {
seconds := c.InitTimeoutSeconds
if seconds <= 0 {
seconds = defaultInitTimeoutSeconds
}
return time.Duration(seconds) * time.Second
}
func (c ServerConfig) CallTimeout() time.Duration {
seconds := c.CallTimeoutSeconds
if seconds <= 0 {
seconds = defaultCallTimeoutSeconds
}
return time.Duration(seconds) * time.Second
}
func (c ServerConfig) ResponseLimit() int {
if c.MaxResponseBytes <= 0 {
return defaultMaxResponseBytes
}
return c.MaxResponseBytes
}
// RemoteTool is an MCP tool discovered from a server.
type RemoteTool struct {
Name string
Description string
InputSchema map[string]any
}
// RegisteredTool is a discovered tool with a PicoClaw-facing qualified name.
type RegisteredTool struct {
QualifiedName string
ServerName string
ToolName string
Description string
Parameters map[string]any
}
// CallResult is a normalized MCP tool call result.
type CallResult struct {
Content string
IsError bool
}
+33 -7
View File
@@ -217,12 +217,18 @@ func buildCodexParams(messages []Message, tools []ToolDefinition, model string,
})
}
for _, tc := range msg.ToolCalls {
argsJSON, _ := json.Marshal(tc.Arguments)
name, args, ok := resolveCodexToolCall(tc)
if !ok {
logger.WarnCF("provider.codex", "Skipping invalid tool call in history", map[string]interface{}{
"call_id": tc.ID,
})
continue
}
inputItems = append(inputItems, responses.ResponseInputItemUnionParam{
OfFunctionCall: &responses.ResponseFunctionToolCallParam{
CallID: tc.ID,
Name: tc.Name,
Arguments: string(argsJSON),
Name: name,
Arguments: args,
},
})
}
@@ -260,10 +266,6 @@ func buildCodexParams(messages []Message, tools []ToolDefinition, model string,
params.Instructions = openai.Opt(defaultCodexInstructions)
}
if maxTokens, ok := options["max_tokens"].(int); ok {
params.MaxOutputTokens = openai.Opt(int64(maxTokens))
}
if len(tools) > 0 {
params.Tools = translateToolsForCodex(tools)
}
@@ -271,6 +273,30 @@ func buildCodexParams(messages []Message, tools []ToolDefinition, model string,
return params
}
func resolveCodexToolCall(tc ToolCall) (name string, arguments string, ok bool) {
name = tc.Name
if name == "" && tc.Function != nil {
name = tc.Function.Name
}
if name == "" {
return "", "", false
}
if len(tc.Arguments) > 0 {
argsJSON, err := json.Marshal(tc.Arguments)
if err != nil {
return "", "", false
}
return name, string(argsJSON), true
}
if tc.Function != nil && tc.Function.Arguments != "" {
return name, tc.Function.Arguments, true
}
return name, "{}", true
}
func translateToolsForCodex(tools []ToolDefinition) []responses.ToolUnionParam {
result := make([]responses.ToolUnionParam, 0, len(tools))
for _, t := range tools {
+50
View File
@@ -29,6 +29,9 @@ func TestBuildCodexParams_BasicMessage(t *testing.T) {
if params.Instructions.Or("") != defaultCodexInstructions {
t.Errorf("Instructions = %q, want %q", params.Instructions.Or(""), defaultCodexInstructions)
}
if params.MaxOutputTokens.Valid() {
t.Fatalf("MaxOutputTokens should not be set for Codex backend")
}
}
func TestBuildCodexParams_SystemAsInstructions(t *testing.T) {
@@ -65,6 +68,45 @@ func TestBuildCodexParams_ToolCallConversation(t *testing.T) {
}
}
func TestBuildCodexParams_ToolCallFunctionFallback(t *testing.T) {
messages := []Message{
{Role: "user", Content: "Read a file"},
{
Role: "assistant",
ToolCalls: []ToolCall{
{
ID: "call_1",
Type: "function",
Function: &FunctionCall{
Name: "read_file",
Arguments: `{"path":"README.md"}`,
},
},
},
},
{Role: "tool", Content: "ok", ToolCallID: "call_1"},
}
params := buildCodexParams(messages, nil, "gpt-4o", map[string]interface{}{})
if params.Input.OfInputItemList == nil {
t.Fatal("Input.OfInputItemList should not be nil")
}
if len(params.Input.OfInputItemList) != 3 {
t.Fatalf("len(Input items) = %d, want 3", len(params.Input.OfInputItemList))
}
fc := params.Input.OfInputItemList[1].OfFunctionCall
if fc == nil {
t.Fatal("assistant tool call should be converted to function_call input item")
}
if fc.Name != "read_file" {
t.Errorf("Function call name = %q, want %q", fc.Name, "read_file")
}
if fc.Arguments != `{"path":"README.md"}` {
t.Errorf("Function call arguments = %q, want %q", fc.Arguments, `{"path":"README.md"}`)
}
}
func TestBuildCodexParams_WithTools(t *testing.T) {
tools := []ToolDefinition{
{
@@ -214,6 +256,10 @@ func TestCodexProvider_ChatRoundTrip(t *testing.T) {
http.Error(w, "stream must be true", http.StatusBadRequest)
return
}
if _, ok := reqBody["max_output_tokens"]; ok {
http.Error(w, "max_output_tokens is not supported", http.StatusBadRequest)
return
}
resp := map[string]interface{}{
"id": "resp_test",
@@ -293,6 +339,10 @@ func TestCodexProvider_ChatRoundTrip_TokenSourceFallbackAccountID(t *testing.T)
http.Error(w, "temperature is not supported", http.StatusBadRequest)
return
}
if _, ok := reqBody["max_output_tokens"]; ok {
http.Error(w, "max_output_tokens is not supported", http.StatusBadRequest)
return
}
if reqBody["stream"] != true {
http.Error(w, "stream must be true", http.StatusBadRequest)
return
+5 -2
View File
@@ -28,12 +28,15 @@ type CronTool struct {
}
// NewCronTool creates a new CronTool
func NewCronTool(cronService *cron.CronService, executor JobExecutor, msgBus *bus.MessageBus, workspace string, restrict bool) *CronTool {
// execTimeout: 0 means no timeout, >0 sets the timeout duration
func NewCronTool(cronService *cron.CronService, executor JobExecutor, msgBus *bus.MessageBus, workspace string, restrict bool, execTimeout time.Duration) *CronTool {
execTool := NewExecTool(workspace, restrict)
execTool.SetTimeout(execTimeout) // 0 means no timeout
return &CronTool{
cronService: cronService,
executor: executor,
msgBus: msgBus,
execTool: NewExecTool(workspace, restrict),
execTool: execTool,
}
}
+85
View File
@@ -0,0 +1,85 @@
package tools
import (
"context"
"errors"
"fmt"
"github.com/sipeed/picoclaw/pkg/mcp"
)
type MCPTool struct {
manager *mcp.Manager
name string
description string
parameters map[string]any
}
func NewMCPTool(manager *mcp.Manager, tool mcp.RegisteredTool) *MCPTool {
description := tool.Description
if description == "" {
description = fmt.Sprintf("MCP tool %s from server %s", tool.ToolName, tool.ServerName)
}
return &MCPTool{
manager: manager,
name: tool.QualifiedName,
description: description,
parameters: tool.Parameters,
}
}
func (t *MCPTool) Name() string {
return t.name
}
func (t *MCPTool) Description() string {
return t.description
}
func (t *MCPTool) Parameters() map[string]interface{} {
return t.parameters
}
func (t *MCPTool) Execute(ctx context.Context, args map[string]interface{}) *ToolResult {
if t.manager == nil {
return ErrorResult("MCP manager is not configured")
}
result, err := t.manager.CallTool(ctx, t.name, args)
if err != nil {
return ErrorResult(fmt.Sprintf("MCP tool %s failed: %v", t.name, err)).WithError(err)
}
if result.IsError {
err := errors.New(result.Content)
return ErrorResult(result.Content).WithError(err)
}
return SilentResult(result.Content)
}
// RegisterMCPTools discovers tools from MCP servers and registers them into the registry.
func RegisterMCPTools(ctx context.Context, registry *ToolRegistry, manager *mcp.Manager) (int, error) {
if registry == nil || manager == nil {
return 0, nil
}
discoveredTools, err := manager.DiscoverTools(ctx)
if err != nil {
return 0, err
}
return RegisterKnownMCPTools(registry, manager, discoveredTools), nil
}
// RegisterKnownMCPTools registers already-discovered MCP tools.
// This avoids repeated discovery work when multiple registries share one manager.
func RegisterKnownMCPTools(registry *ToolRegistry, manager *mcp.Manager, discoveredTools []mcp.RegisteredTool) int {
if registry == nil || manager == nil || len(discoveredTools) == 0 {
return 0
}
for _, tool := range discoveredTools {
registry.Register(NewMCPTool(manager, tool))
}
return len(discoveredTools)
}
+8 -1
View File
@@ -89,7 +89,14 @@ func (t *ExecTool) Execute(ctx context.Context, args map[string]interface{}) *To
return ErrorResult(guardError)
}
cmdCtx, cancel := context.WithTimeout(ctx, t.timeout)
// timeout == 0 means no timeout
var cmdCtx context.Context
var cancel context.CancelFunc
if t.timeout > 0 {
cmdCtx, cancel = context.WithTimeout(ctx, t.timeout)
} else {
cmdCtx, cancel = context.WithCancel(ctx)
}
defer cancel()
var cmd *exec.Cmd
+75 -2
View File
@@ -176,6 +176,71 @@ func stripTags(content string) string {
return re.ReplaceAllString(content, "")
}
type PerplexitySearchProvider struct {
apiKey string
}
func (p *PerplexitySearchProvider) Search(ctx context.Context, query string, count int) (string, error) {
searchURL := "https://api.perplexity.ai/chat/completions"
payload := map[string]interface{}{
"model": "sonar",
"messages": []map[string]string{
{"role": "system", "content": "You are a search assistant. Provide concise search results with titles, URLs, and brief descriptions in the following format:\n1. Title\n URL\n Description\n\nDo not add extra commentary."},
{"role": "user", "content": fmt.Sprintf("Search for: %s. Provide up to %d relevant results.", query, count)},
},
"max_tokens": 1000,
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
return "", fmt.Errorf("failed to marshal request: %w", err)
}
req, err := http.NewRequestWithContext(ctx, "POST", searchURL, strings.NewReader(string(payloadBytes)))
if err != nil {
return "", fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+p.apiKey)
req.Header.Set("User-Agent", userAgent)
client := &http.Client{Timeout: 30 * time.Second}
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("request failed: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("failed to read response: %w", err)
}
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("Perplexity API error: %s", string(body))
}
var searchResp struct {
Choices []struct {
Message struct {
Content string `json:"content"`
} `json:"message"`
} `json:"choices"`
}
if err := json.Unmarshal(body, &searchResp); err != nil {
return "", fmt.Errorf("failed to parse response: %w", err)
}
if len(searchResp.Choices) == 0 {
return fmt.Sprintf("No results for: %s", query), nil
}
return fmt.Sprintf("Results for: %s (via Perplexity)\n%s", query, searchResp.Choices[0].Message.Content), nil
}
type WebSearchTool struct {
provider SearchProvider
maxResults int
@@ -187,14 +252,22 @@ type WebSearchToolOptions struct {
BraveEnabled bool
DuckDuckGoMaxResults int
DuckDuckGoEnabled bool
PerplexityAPIKey string
PerplexityMaxResults int
PerplexityEnabled bool
}
func NewWebSearchTool(opts WebSearchToolOptions) *WebSearchTool {
var provider SearchProvider
maxResults := 5
// Priority: Brave > DuckDuckGo
if opts.BraveEnabled && opts.BraveAPIKey != "" {
// Priority: Perplexity > Brave > DuckDuckGo
if opts.PerplexityEnabled && opts.PerplexityAPIKey != "" {
provider = &PerplexitySearchProvider{apiKey: opts.PerplexityAPIKey}
if opts.PerplexityMaxResults > 0 {
maxResults = opts.PerplexityMaxResults
}
} else if opts.BraveEnabled && opts.BraveAPIKey != "" {
provider = &BraveSearchProvider{apiKey: opts.BraveAPIKey}
if opts.BraveMaxResults > 0 {
maxResults = opts.BraveMaxResults