mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
5b608ae678
Address Copilot review feedback: - Use guardCommand() instead of Execute() to avoid running dangerous commands (format, mkfs, diskpart) on the host if regex regresses - Assert error message contains "blocked" for blocked commands - Replace "go fmt ./..." with "echo go fmt ./..." to avoid accidental file modification Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
476 lines
14 KiB
Go
476 lines
14 KiB
Go
package tools
|
|
|
|
import (
|
|
"context"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/sipeed/picoclaw/pkg/config"
|
|
)
|
|
|
|
// TestShellTool_Success verifies successful command execution
|
|
func TestShellTool_Success(t *testing.T) {
|
|
tool, err := NewExecTool("", false)
|
|
if err != nil {
|
|
t.Errorf("unable to configure exec tool: %s", err)
|
|
}
|
|
|
|
ctx := context.Background()
|
|
args := map[string]any{
|
|
"command": "echo 'hello world'",
|
|
}
|
|
|
|
result := tool.Execute(ctx, args)
|
|
|
|
// Success should not be an error
|
|
if result.IsError {
|
|
t.Errorf("Expected success, got IsError=true: %s", result.ForLLM)
|
|
}
|
|
|
|
// ForUser should contain command output
|
|
if !strings.Contains(result.ForUser, "hello world") {
|
|
t.Errorf("Expected ForUser to contain 'hello world', got: %s", result.ForUser)
|
|
}
|
|
|
|
// ForLLM should contain full output
|
|
if !strings.Contains(result.ForLLM, "hello world") {
|
|
t.Errorf("Expected ForLLM to contain 'hello world', got: %s", result.ForLLM)
|
|
}
|
|
}
|
|
|
|
// TestShellTool_Failure verifies failed command execution
|
|
func TestShellTool_Failure(t *testing.T) {
|
|
tool, err := NewExecTool("", false)
|
|
if err != nil {
|
|
t.Errorf("unable to configure exec tool: %s", err)
|
|
}
|
|
|
|
ctx := context.Background()
|
|
args := map[string]any{
|
|
"command": "ls /nonexistent_directory_12345",
|
|
}
|
|
|
|
result := tool.Execute(ctx, args)
|
|
|
|
// Failure should be marked as error
|
|
if !result.IsError {
|
|
t.Errorf("Expected error for failed command, got IsError=false")
|
|
}
|
|
|
|
// ForUser should contain error information
|
|
if result.ForUser == "" {
|
|
t.Errorf("Expected ForUser to contain error info, got empty string")
|
|
}
|
|
|
|
// ForLLM should contain exit code or error
|
|
if !strings.Contains(result.ForLLM, "Exit code") && result.ForUser == "" {
|
|
t.Errorf("Expected ForLLM to contain exit code or error, got: %s", result.ForLLM)
|
|
}
|
|
}
|
|
|
|
// TestShellTool_Timeout verifies command timeout handling
|
|
func TestShellTool_Timeout(t *testing.T) {
|
|
tool, err := NewExecTool("", false)
|
|
if err != nil {
|
|
t.Errorf("unable to configure exec tool: %s", err)
|
|
}
|
|
|
|
tool.SetTimeout(100 * time.Millisecond)
|
|
|
|
ctx := context.Background()
|
|
args := map[string]any{
|
|
"command": "sleep 10",
|
|
}
|
|
|
|
result := tool.Execute(ctx, args)
|
|
|
|
// Timeout should be marked as error
|
|
if !result.IsError {
|
|
t.Errorf("Expected error for timeout, got IsError=false")
|
|
}
|
|
|
|
// Should mention timeout
|
|
if !strings.Contains(result.ForLLM, "timed out") && !strings.Contains(result.ForUser, "timed out") {
|
|
t.Errorf("Expected timeout message, got ForLLM: %s, ForUser: %s", result.ForLLM, result.ForUser)
|
|
}
|
|
}
|
|
|
|
// TestShellTool_WorkingDir verifies custom working directory
|
|
func TestShellTool_WorkingDir(t *testing.T) {
|
|
// Create temp directory
|
|
tmpDir := t.TempDir()
|
|
testFile := filepath.Join(tmpDir, "test.txt")
|
|
os.WriteFile(testFile, []byte("test content"), 0o644)
|
|
|
|
tool, err := NewExecTool("", false)
|
|
if err != nil {
|
|
t.Errorf("unable to configure exec tool: %s", err)
|
|
}
|
|
|
|
ctx := context.Background()
|
|
args := map[string]any{
|
|
"command": "cat test.txt",
|
|
"working_dir": tmpDir,
|
|
}
|
|
|
|
result := tool.Execute(ctx, args)
|
|
|
|
if result.IsError {
|
|
t.Errorf("Expected success in custom working dir, got error: %s", result.ForLLM)
|
|
}
|
|
|
|
if !strings.Contains(result.ForUser, "test content") {
|
|
t.Errorf("Expected output from custom dir, got: %s", result.ForUser)
|
|
}
|
|
}
|
|
|
|
// TestShellTool_DangerousCommand verifies safety guard blocks dangerous commands
|
|
func TestShellTool_DangerousCommand(t *testing.T) {
|
|
tool, err := NewExecTool("", false)
|
|
if err != nil {
|
|
t.Errorf("unable to configure exec tool: %s", err)
|
|
}
|
|
|
|
ctx := context.Background()
|
|
args := map[string]any{
|
|
"command": "rm -rf /",
|
|
}
|
|
|
|
result := tool.Execute(ctx, args)
|
|
|
|
// Dangerous command should be blocked
|
|
if !result.IsError {
|
|
t.Errorf("Expected dangerous command to be blocked (IsError=true)")
|
|
}
|
|
|
|
if !strings.Contains(result.ForLLM, "blocked") && !strings.Contains(result.ForUser, "blocked") {
|
|
t.Errorf("Expected 'blocked' message, got ForLLM: %s, ForUser: %s", result.ForLLM, result.ForUser)
|
|
}
|
|
}
|
|
|
|
// TestShellTool_MissingCommand verifies error handling for missing command
|
|
func TestShellTool_MissingCommand(t *testing.T) {
|
|
tool, err := NewExecTool("", false)
|
|
if err != nil {
|
|
t.Errorf("unable to configure exec tool: %s", err)
|
|
}
|
|
|
|
ctx := context.Background()
|
|
args := map[string]any{}
|
|
|
|
result := tool.Execute(ctx, args)
|
|
|
|
// Should return error result
|
|
if !result.IsError {
|
|
t.Errorf("Expected error when command is missing")
|
|
}
|
|
}
|
|
|
|
// TestShellTool_StderrCapture verifies stderr is captured and included
|
|
func TestShellTool_StderrCapture(t *testing.T) {
|
|
tool, err := NewExecTool("", false)
|
|
if err != nil {
|
|
t.Errorf("unable to configure exec tool: %s", err)
|
|
}
|
|
|
|
ctx := context.Background()
|
|
args := map[string]any{
|
|
"command": "sh -c 'echo stdout; echo stderr >&2'",
|
|
}
|
|
|
|
result := tool.Execute(ctx, args)
|
|
|
|
// Both stdout and stderr should be in output
|
|
if !strings.Contains(result.ForLLM, "stdout") {
|
|
t.Errorf("Expected stdout in output, got: %s", result.ForLLM)
|
|
}
|
|
if !strings.Contains(result.ForLLM, "stderr") {
|
|
t.Errorf("Expected stderr in output, got: %s", result.ForLLM)
|
|
}
|
|
}
|
|
|
|
// TestShellTool_OutputTruncation verifies long output is truncated
|
|
func TestShellTool_OutputTruncation(t *testing.T) {
|
|
tool, err := NewExecTool("", false)
|
|
if err != nil {
|
|
t.Errorf("unable to configure exec tool: %s", err)
|
|
}
|
|
|
|
ctx := context.Background()
|
|
// Generate long output (>10000 chars)
|
|
args := map[string]any{
|
|
"command": "python3 -c \"print('x' * 20000)\" || echo " + strings.Repeat("x", 20000),
|
|
}
|
|
|
|
result := tool.Execute(ctx, args)
|
|
|
|
// Should have truncation message or be truncated
|
|
if len(result.ForLLM) > 15000 {
|
|
t.Errorf("Expected output to be truncated, got length: %d", len(result.ForLLM))
|
|
}
|
|
}
|
|
|
|
// TestShellTool_WorkingDir_OutsideWorkspace verifies that working_dir cannot escape the workspace directly
|
|
func TestShellTool_WorkingDir_OutsideWorkspace(t *testing.T) {
|
|
root := t.TempDir()
|
|
workspace := filepath.Join(root, "workspace")
|
|
outsideDir := filepath.Join(root, "outside")
|
|
if err := os.MkdirAll(workspace, 0o755); err != nil {
|
|
t.Fatalf("failed to create workspace: %v", err)
|
|
}
|
|
if err := os.MkdirAll(outsideDir, 0o755); err != nil {
|
|
t.Fatalf("failed to create outside dir: %v", err)
|
|
}
|
|
|
|
tool, err := NewExecTool(workspace, true)
|
|
if err != nil {
|
|
t.Errorf("unable to configure exec tool: %s", err)
|
|
}
|
|
|
|
result := tool.Execute(context.Background(), map[string]any{
|
|
"command": "pwd",
|
|
"working_dir": outsideDir,
|
|
})
|
|
|
|
if !result.IsError {
|
|
t.Fatalf("expected working_dir outside workspace to be blocked, got output: %s", result.ForLLM)
|
|
}
|
|
if !strings.Contains(result.ForLLM, "blocked") {
|
|
t.Errorf("expected 'blocked' in error, got: %s", result.ForLLM)
|
|
}
|
|
}
|
|
|
|
// TestShellTool_WorkingDir_SymlinkEscape verifies that a symlink inside the workspace
|
|
// pointing outside cannot be used as working_dir to escape the sandbox.
|
|
func TestShellTool_WorkingDir_SymlinkEscape(t *testing.T) {
|
|
root := t.TempDir()
|
|
workspace := filepath.Join(root, "workspace")
|
|
secretDir := filepath.Join(root, "secret")
|
|
if err := os.MkdirAll(workspace, 0o755); err != nil {
|
|
t.Fatalf("failed to create workspace: %v", err)
|
|
}
|
|
if err := os.MkdirAll(secretDir, 0o755); err != nil {
|
|
t.Fatalf("failed to create secret dir: %v", err)
|
|
}
|
|
os.WriteFile(filepath.Join(secretDir, "secret.txt"), []byte("top secret"), 0o644)
|
|
|
|
// symlink lives inside the workspace but resolves to secretDir outside it
|
|
link := filepath.Join(workspace, "escape")
|
|
if err := os.Symlink(secretDir, link); err != nil {
|
|
t.Skipf("symlinks not supported in this environment: %v", err)
|
|
}
|
|
|
|
tool, err := NewExecTool(workspace, true)
|
|
if err != nil {
|
|
t.Errorf("unable to configure exec tool: %s", err)
|
|
}
|
|
|
|
result := tool.Execute(context.Background(), map[string]any{
|
|
"command": "cat secret.txt",
|
|
"working_dir": link,
|
|
})
|
|
|
|
if !result.IsError {
|
|
t.Fatalf("expected symlink working_dir escape to be blocked, got output: %s", result.ForLLM)
|
|
}
|
|
if !strings.Contains(result.ForLLM, "blocked") {
|
|
t.Errorf("expected 'blocked' in error, got: %s", result.ForLLM)
|
|
}
|
|
}
|
|
|
|
// TestShellTool_RestrictToWorkspace verifies workspace restriction
|
|
func TestShellTool_RestrictToWorkspace(t *testing.T) {
|
|
tmpDir := t.TempDir()
|
|
tool, err := NewExecTool(tmpDir, false)
|
|
if err != nil {
|
|
t.Errorf("unable to configure exec tool: %s", err)
|
|
}
|
|
|
|
tool.SetRestrictToWorkspace(true)
|
|
|
|
ctx := context.Background()
|
|
args := map[string]any{
|
|
"command": "cat ../../etc/passwd",
|
|
}
|
|
|
|
result := tool.Execute(ctx, args)
|
|
|
|
// Path traversal should be blocked
|
|
if !result.IsError {
|
|
t.Errorf("Expected path traversal to be blocked with restrictToWorkspace=true")
|
|
}
|
|
|
|
if !strings.Contains(result.ForLLM, "blocked") && !strings.Contains(result.ForUser, "blocked") {
|
|
t.Errorf(
|
|
"Expected 'blocked' message for path traversal, got ForLLM: %s, ForUser: %s",
|
|
result.ForLLM,
|
|
result.ForUser,
|
|
)
|
|
}
|
|
}
|
|
|
|
// TestShellTool_DevNullAllowed verifies that /dev/null redirections are not blocked (issue #964).
|
|
func TestShellTool_DevNullAllowed(t *testing.T) {
|
|
tmpDir := t.TempDir()
|
|
tool, err := NewExecTool(tmpDir, true)
|
|
if err != nil {
|
|
t.Fatalf("unable to configure exec tool: %s", err)
|
|
}
|
|
|
|
commands := []string{
|
|
"echo hello 2>/dev/null",
|
|
"echo hello >/dev/null",
|
|
"echo hello > /dev/null",
|
|
"echo hello 2> /dev/null",
|
|
"echo hello >/dev/null 2>&1",
|
|
"find " + tmpDir + " -name '*.go' 2>/dev/null",
|
|
}
|
|
|
|
for _, cmd := range commands {
|
|
result := tool.Execute(context.Background(), map[string]any{"command": cmd})
|
|
if result.IsError && strings.Contains(result.ForLLM, "blocked") {
|
|
t.Errorf("command should not be blocked: %s\n error: %s", cmd, result.ForLLM)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestShellTool_BlockDevices verifies that writes to block devices are blocked (issue #965).
|
|
func TestShellTool_BlockDevices(t *testing.T) {
|
|
tool, err := NewExecTool("", false)
|
|
if err != nil {
|
|
t.Fatalf("unable to configure exec tool: %s", err)
|
|
}
|
|
|
|
blocked := []string{
|
|
"echo x > /dev/sda",
|
|
"echo x > /dev/hda",
|
|
"echo x > /dev/vda",
|
|
"echo x > /dev/xvda",
|
|
"echo x > /dev/nvme0n1",
|
|
"echo x > /dev/mmcblk0",
|
|
"echo x > /dev/loop0",
|
|
"echo x > /dev/dm-0",
|
|
"echo x > /dev/md0",
|
|
"echo x > /dev/sr0",
|
|
"echo x > /dev/nbd0",
|
|
}
|
|
|
|
for _, cmd := range blocked {
|
|
result := tool.Execute(context.Background(), map[string]any{"command": cmd})
|
|
if !result.IsError {
|
|
t.Errorf("expected block device write to be blocked: %s", cmd)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestShellTool_DenyPattern_DiskWiping verifies the deny pattern for disk wiping
|
|
// commands (format, mkfs, diskpart) blocks them when preceded by shell separators
|
|
// but does NOT block legitimate uses like --format flags.
|
|
func TestShellTool_DenyPattern_DiskWiping(t *testing.T) {
|
|
tool, err := NewExecTool("", false)
|
|
if err != nil {
|
|
t.Fatalf("unable to configure exec tool: %s", err)
|
|
}
|
|
|
|
// These should be BLOCKED (disk wiping commands)
|
|
blockedCmds := []struct {
|
|
name string
|
|
cmd string
|
|
}{
|
|
{"format with space", "format c:"},
|
|
{"mkfs standalone", "mkfs /dev/sda"},
|
|
{"semicolon format", "echo hello; format c:"},
|
|
{"pipe format", "echo hello | format c:"},
|
|
{"and format", "echo hello && format c:"},
|
|
{"diskpart standalone", "diskpart /s script.txt"},
|
|
}
|
|
|
|
for _, tt := range blockedCmds {
|
|
t.Run("blocked_"+tt.name, func(t *testing.T) {
|
|
msg := tool.guardCommand(tt.cmd, "")
|
|
if !strings.Contains(msg, "blocked") {
|
|
t.Errorf("Expected %q to be blocked by safety guard, got: %q", tt.cmd, msg)
|
|
}
|
|
})
|
|
}
|
|
|
|
// These should be ALLOWED (not disk wiping)
|
|
allowed := []struct {
|
|
name string
|
|
cmd string
|
|
}{
|
|
{"--format flag", "echo test --format json"},
|
|
{"go fmt", "echo go fmt ./..."},
|
|
}
|
|
|
|
for _, tt := range allowed {
|
|
t.Run("allowed_"+tt.name, func(t *testing.T) {
|
|
msg := tool.guardCommand(tt.cmd, "")
|
|
if msg != "" {
|
|
t.Errorf("Expected %q to be allowed, but it was blocked: %s", tt.cmd, msg)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestShellTool_SafePathsInWorkspaceRestriction verifies that safe kernel pseudo-devices
|
|
// are allowed even when workspace restriction is active.
|
|
func TestShellTool_SafePathsInWorkspaceRestriction(t *testing.T) {
|
|
tmpDir := t.TempDir()
|
|
tool, err := NewExecTool(tmpDir, true)
|
|
if err != nil {
|
|
t.Fatalf("unable to configure exec tool: %s", err)
|
|
}
|
|
|
|
// These reference paths outside workspace but should be allowed via safePaths.
|
|
commands := []string{
|
|
"cat /dev/urandom | head -c 16 | od",
|
|
"echo test > /dev/null",
|
|
"dd if=/dev/zero bs=1 count=1",
|
|
}
|
|
|
|
for _, cmd := range commands {
|
|
result := tool.Execute(context.Background(), map[string]any{"command": cmd})
|
|
if result.IsError && strings.Contains(result.ForLLM, "path outside working dir") {
|
|
t.Errorf("safe path should not be blocked by workspace check: %s\n error: %s", cmd, result.ForLLM)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestShellTool_CustomAllowPatterns verifies that custom allow patterns exempt
|
|
// commands from deny pattern checks.
|
|
func TestShellTool_CustomAllowPatterns(t *testing.T) {
|
|
cfg := &config.Config{
|
|
Tools: config.ToolsConfig{
|
|
Exec: config.ExecConfig{
|
|
EnableDenyPatterns: true,
|
|
CustomAllowPatterns: []string{`\bgit\s+push\s+origin\b`},
|
|
},
|
|
},
|
|
}
|
|
|
|
tool, err := NewExecToolWithConfig("", false, cfg)
|
|
if err != nil {
|
|
t.Fatalf("unable to configure exec tool: %s", err)
|
|
}
|
|
|
|
// "git push origin main" should be allowed by custom allow pattern.
|
|
result := tool.Execute(context.Background(), map[string]any{
|
|
"command": "git push origin main",
|
|
})
|
|
if result.IsError && strings.Contains(result.ForLLM, "blocked") {
|
|
t.Errorf("custom allow pattern should exempt 'git push origin main', got: %s", result.ForLLM)
|
|
}
|
|
|
|
// "git push upstream main" should still be blocked (does not match allow pattern).
|
|
result = tool.Execute(context.Background(), map[string]any{
|
|
"command": "git push upstream main",
|
|
})
|
|
if !result.IsError {
|
|
t.Errorf("'git push upstream main' should still be blocked by deny pattern")
|
|
}
|
|
}
|