mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-05-25 16:00:35 +00:00
7af40d49eb
CronTool.ExecuteJob was calling ExecTool.Execute without setting
action='run' in the args map. ExecTool.Execute requires the action
field and returns ErrorResult('action is required') immediately when
it's missing. This caused all cron command jobs to silently fail.
Adds a test covering the command execution happy path.
365 lines
11 KiB
Go
365 lines
11 KiB
Go
package tools
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
|
|
"github.com/sipeed/picoclaw/pkg/bus"
|
|
"github.com/sipeed/picoclaw/pkg/config"
|
|
"github.com/sipeed/picoclaw/pkg/constants"
|
|
"github.com/sipeed/picoclaw/pkg/cron"
|
|
"github.com/sipeed/picoclaw/pkg/utils"
|
|
)
|
|
|
|
// JobExecutor is the interface for executing cron jobs through the agent
|
|
type JobExecutor interface {
|
|
ProcessDirectWithChannel(ctx context.Context, content, sessionKey, channel, chatID string) (string, error)
|
|
// PublishResponseIfNeeded sends response to the outbound bus only when the
|
|
// agent did not already deliver content through the message tool in this round.
|
|
PublishResponseIfNeeded(ctx context.Context, channel, chatID, sessionKey, response string)
|
|
}
|
|
|
|
// CronTool provides scheduling capabilities for the agent
|
|
type CronTool struct {
|
|
cronService *cron.CronService
|
|
executor JobExecutor
|
|
msgBus *bus.MessageBus
|
|
execTool *ExecTool
|
|
allowCommand bool
|
|
execEnabled bool
|
|
}
|
|
|
|
// NewCronTool creates a new CronTool
|
|
// execTimeout: 0 means no timeout, >0 sets the timeout duration
|
|
func NewCronTool(
|
|
cronService *cron.CronService, executor JobExecutor, msgBus *bus.MessageBus, workspace string, restrict bool,
|
|
execTimeout time.Duration, config *config.Config,
|
|
) (*CronTool, error) {
|
|
allowCommand := true
|
|
execEnabled := true
|
|
if config != nil {
|
|
allowCommand = config.Tools.Cron.AllowCommand
|
|
execEnabled = config.Tools.Exec.Enabled
|
|
}
|
|
|
|
var execTool *ExecTool
|
|
if execEnabled {
|
|
var err error
|
|
execTool, err = NewExecToolWithConfig(workspace, restrict, config)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to configure exec tool: %w", err)
|
|
}
|
|
}
|
|
|
|
if execTool != nil {
|
|
execTool.SetTimeout(execTimeout)
|
|
}
|
|
return &CronTool{
|
|
cronService: cronService,
|
|
executor: executor,
|
|
msgBus: msgBus,
|
|
execTool: execTool,
|
|
allowCommand: allowCommand,
|
|
execEnabled: execEnabled,
|
|
}, nil
|
|
}
|
|
|
|
// Name returns the tool name
|
|
func (t *CronTool) Name() string {
|
|
return "cron"
|
|
}
|
|
|
|
// Description returns the tool description
|
|
func (t *CronTool) Description() string {
|
|
return "Schedule reminders, tasks, or system commands. IMPORTANT: When user asks to be reminded or scheduled, you MUST call this tool. Use 'at_seconds' for one-time reminders (e.g., 'remind me in 10 minutes' → at_seconds=600). Use 'every_seconds' ONLY for recurring tasks (e.g., 'every 2 hours' → every_seconds=7200). Use 'cron_expr' for complex recurring schedules. Use 'command' to execute shell commands directly."
|
|
}
|
|
|
|
// Parameters returns the tool parameters schema
|
|
func (t *CronTool) Parameters() map[string]any {
|
|
return map[string]any{
|
|
"type": "object",
|
|
"properties": map[string]any{
|
|
"action": map[string]any{
|
|
"type": "string",
|
|
"enum": []string{"add", "list", "remove", "enable", "disable"},
|
|
"description": "Action to perform. Use 'add' when user wants to schedule a reminder or task.",
|
|
},
|
|
"message": map[string]any{
|
|
"type": "string",
|
|
"description": "The reminder/task message to display when triggered. If 'command' is used, this describes what the command does.",
|
|
},
|
|
"command": map[string]any{
|
|
"type": "string",
|
|
"description": "Optional: Shell command to execute directly (e.g., 'df -h'). If set, the agent will run this command and report output instead of just showing the message.",
|
|
},
|
|
"command_confirm": map[string]any{
|
|
"type": "boolean",
|
|
"description": "Optional explicit confirmation flag for scheduling a shell command. Command execution must also be enabled via tools.cron.allow_command.",
|
|
},
|
|
"at_seconds": map[string]any{
|
|
"type": "integer",
|
|
"description": "One-time reminder: seconds from now when to trigger (e.g., 600 for 10 minutes later). Use this for one-time reminders like 'remind me in 10 minutes'.",
|
|
},
|
|
"every_seconds": map[string]any{
|
|
"type": "integer",
|
|
"description": "Recurring interval in seconds (e.g., 3600 for every hour). Use this ONLY for recurring tasks like 'every 2 hours' or 'daily reminder'.",
|
|
},
|
|
"cron_expr": map[string]any{
|
|
"type": "string",
|
|
"description": "Cron expression for complex recurring schedules (e.g., '0 9 * * *' for daily at 9am). Use this for complex recurring schedules.",
|
|
},
|
|
"job_id": map[string]any{
|
|
"type": "string",
|
|
"description": "Job ID (for remove/enable/disable)",
|
|
},
|
|
},
|
|
"required": []string{"action"},
|
|
}
|
|
}
|
|
|
|
// Execute runs the tool with the given arguments
|
|
func (t *CronTool) Execute(ctx context.Context, args map[string]any) *ToolResult {
|
|
action, ok := args["action"].(string)
|
|
if !ok {
|
|
return ErrorResult("action is required")
|
|
}
|
|
|
|
switch action {
|
|
case "add":
|
|
return t.addJob(ctx, args)
|
|
case "list":
|
|
return t.listJobs()
|
|
case "remove":
|
|
return t.removeJob(args)
|
|
case "enable":
|
|
return t.enableJob(args, true)
|
|
case "disable":
|
|
return t.enableJob(args, false)
|
|
default:
|
|
return ErrorResult(fmt.Sprintf("unknown action: %s", action))
|
|
}
|
|
}
|
|
|
|
func (t *CronTool) addJob(ctx context.Context, args map[string]any) *ToolResult {
|
|
channel := ToolChannel(ctx)
|
|
chatID := ToolChatID(ctx)
|
|
|
|
if channel == "" || chatID == "" {
|
|
return ErrorResult("no session context (channel/chat_id not set). Use this tool in an active conversation.")
|
|
}
|
|
|
|
message, ok := args["message"].(string)
|
|
if !ok || message == "" {
|
|
return ErrorResult("message is required for add")
|
|
}
|
|
|
|
var schedule cron.CronSchedule
|
|
|
|
// Check for at_seconds (one-time), every_seconds (recurring), or cron_expr
|
|
atSeconds, hasAt := args["at_seconds"].(float64)
|
|
everySeconds, hasEvery := args["every_seconds"].(float64)
|
|
cronExpr, hasCron := args["cron_expr"].(string)
|
|
|
|
// Fix: type assertions return true for zero values, need additional validity checks
|
|
// This prevents LLMs that fill unused optional parameters with defaults (0) from triggering wrong type
|
|
hasAt = hasAt && atSeconds > 0
|
|
hasEvery = hasEvery && everySeconds > 0
|
|
hasCron = hasCron && cronExpr != ""
|
|
|
|
// Priority: at_seconds > every_seconds > cron_expr
|
|
if hasAt {
|
|
atMS := time.Now().UnixMilli() + int64(atSeconds)*1000
|
|
schedule = cron.CronSchedule{
|
|
Kind: "at",
|
|
AtMS: &atMS,
|
|
}
|
|
} else if hasEvery {
|
|
everyMS := int64(everySeconds) * 1000
|
|
schedule = cron.CronSchedule{
|
|
Kind: "every",
|
|
EveryMS: &everyMS,
|
|
}
|
|
} else if hasCron {
|
|
schedule = cron.CronSchedule{
|
|
Kind: "cron",
|
|
Expr: cronExpr,
|
|
}
|
|
} else {
|
|
return ErrorResult("one of at_seconds, every_seconds, or cron_expr is required")
|
|
}
|
|
|
|
// GHSA-pv8c-p6jf-3fpp: command scheduling requires internal channel. When
|
|
// allow_command is disabled, explicit confirmation is required as an override.
|
|
// Non-command reminders remain open to all channels.
|
|
command, _ := args["command"].(string)
|
|
commandConfirm, _ := args["command_confirm"].(bool)
|
|
if command != "" {
|
|
if !t.execEnabled {
|
|
return ErrorResult("command execution is disabled")
|
|
}
|
|
if !constants.IsInternalChannel(channel) {
|
|
return ErrorResult("scheduling command execution is restricted to internal channels")
|
|
}
|
|
if !t.allowCommand && !commandConfirm {
|
|
return ErrorResult("command_confirm=true is required when allow_command is disabled")
|
|
}
|
|
}
|
|
|
|
// Truncate message for job name (max 30 chars)
|
|
messagePreview := utils.Truncate(message, 30)
|
|
|
|
job, err := t.cronService.AddJob(
|
|
messagePreview,
|
|
schedule,
|
|
message,
|
|
channel,
|
|
chatID,
|
|
)
|
|
if err != nil {
|
|
return ErrorResult(fmt.Sprintf("Error adding job: %v", err))
|
|
}
|
|
|
|
// Apply optional payload fields and persist in a single UpdateJob call
|
|
needsUpdate := false
|
|
if command != "" {
|
|
job.Payload.Command = command
|
|
needsUpdate = true
|
|
}
|
|
if needsUpdate {
|
|
t.cronService.UpdateJob(job)
|
|
}
|
|
|
|
return SilentResult(fmt.Sprintf("Cron job added: %s (id: %s)", job.Name, job.ID))
|
|
}
|
|
|
|
func (t *CronTool) listJobs() *ToolResult {
|
|
jobs := t.cronService.ListJobs(false)
|
|
|
|
if len(jobs) == 0 {
|
|
return SilentResult("No scheduled jobs")
|
|
}
|
|
|
|
var result strings.Builder
|
|
result.WriteString("Scheduled jobs:\n")
|
|
for _, j := range jobs {
|
|
var scheduleInfo string
|
|
if j.Schedule.Kind == "every" && j.Schedule.EveryMS != nil {
|
|
scheduleInfo = fmt.Sprintf("every %ds", *j.Schedule.EveryMS/1000)
|
|
} else if j.Schedule.Kind == "cron" {
|
|
scheduleInfo = j.Schedule.Expr
|
|
} else if j.Schedule.Kind == "at" {
|
|
scheduleInfo = "one-time"
|
|
} else {
|
|
scheduleInfo = "unknown"
|
|
}
|
|
result.WriteString(fmt.Sprintf("- %s (id: %s, %s)\n", j.Name, j.ID, scheduleInfo))
|
|
}
|
|
|
|
return SilentResult(result.String())
|
|
}
|
|
|
|
func (t *CronTool) removeJob(args map[string]any) *ToolResult {
|
|
jobID, ok := args["job_id"].(string)
|
|
if !ok || jobID == "" {
|
|
return ErrorResult("job_id is required for remove")
|
|
}
|
|
|
|
if t.cronService.RemoveJob(jobID) {
|
|
return SilentResult(fmt.Sprintf("Cron job removed: %s", jobID))
|
|
}
|
|
return ErrorResult(fmt.Sprintf("Job %s not found", jobID))
|
|
}
|
|
|
|
func (t *CronTool) enableJob(args map[string]any, enable bool) *ToolResult {
|
|
jobID, ok := args["job_id"].(string)
|
|
if !ok || jobID == "" {
|
|
return ErrorResult("job_id is required for enable/disable")
|
|
}
|
|
|
|
job := t.cronService.EnableJob(jobID, enable)
|
|
if job == nil {
|
|
return ErrorResult(fmt.Sprintf("Job %s not found", jobID))
|
|
}
|
|
|
|
status := "enabled"
|
|
if !enable {
|
|
status = "disabled"
|
|
}
|
|
return SilentResult(fmt.Sprintf("Cron job '%s' %s", job.Name, status))
|
|
}
|
|
|
|
// ExecuteJob executes a cron job through the agent
|
|
func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string {
|
|
// Get channel/chatID from job payload
|
|
channel := job.Payload.Channel
|
|
chatID := job.Payload.To
|
|
|
|
// Default values if not set
|
|
if channel == "" {
|
|
channel = "cli"
|
|
}
|
|
if chatID == "" {
|
|
chatID = "direct"
|
|
}
|
|
|
|
// Execute command if present
|
|
if job.Payload.Command != "" {
|
|
if !t.execEnabled || t.execTool == nil {
|
|
output := "Error executing scheduled command: command execution is disabled"
|
|
pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer pubCancel()
|
|
t.msgBus.PublishOutbound(pubCtx, bus.OutboundMessage{
|
|
Context: bus.NewOutboundContext(channel, chatID, ""),
|
|
Content: output,
|
|
})
|
|
return "ok"
|
|
}
|
|
|
|
args := map[string]any{
|
|
"action": "run",
|
|
"command": job.Payload.Command,
|
|
"__channel": channel,
|
|
"__chat_id": chatID,
|
|
}
|
|
|
|
result := t.execTool.Execute(ctx, args)
|
|
var output string
|
|
if result.IsError {
|
|
output = fmt.Sprintf("Error executing scheduled command: %s", result.ForLLM)
|
|
} else {
|
|
output = fmt.Sprintf("Scheduled command '%s' executed:\n%s", job.Payload.Command, result.ForLLM)
|
|
}
|
|
|
|
pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer pubCancel()
|
|
t.msgBus.PublishOutbound(pubCtx, bus.OutboundMessage{
|
|
Context: bus.NewOutboundContext(channel, chatID, ""),
|
|
Content: output,
|
|
})
|
|
return "ok"
|
|
}
|
|
|
|
sessionKey := fmt.Sprintf("agent:cron-%s-%s", job.ID, uuid.New().String())
|
|
|
|
// Call agent with the job message
|
|
response, err := t.executor.ProcessDirectWithChannel(
|
|
ctx,
|
|
job.Payload.Message,
|
|
sessionKey,
|
|
channel,
|
|
chatID,
|
|
)
|
|
if err != nil {
|
|
return fmt.Sprintf("Error: %v", err)
|
|
}
|
|
|
|
if response != "" {
|
|
t.executor.PublishResponseIfNeeded(ctx, channel, chatID, sessionKey, response)
|
|
}
|
|
return "ok"
|
|
}
|