Files
picoclaw/pkg/tools/cron.go
T
yinwm 881999aceb refactor(shell): interpret zero timeout as unlimited execution
Replace unconditional WithTimeout usage with conditional context creation
based on timeout configuration. Zero values now bypass timeout enforcement,
using WithCancel for graceful cancellation while preserving existing timeout
behavior for positive values. Simplifies CronTool initialization by removing
unnecessary conditional timeout assignment.
2026-02-17 21:10:20 +08:00

331 lines
9.7 KiB
Go

package tools
import (
"context"
"fmt"
"sync"
"time"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/cron"
"github.com/sipeed/picoclaw/pkg/utils"
)
// JobExecutor is the interface for executing cron jobs through the agent
type JobExecutor interface {
ProcessDirectWithChannel(ctx context.Context, content, sessionKey, channel, chatID string) (string, error)
}
// CronTool provides scheduling capabilities for the agent
type CronTool struct {
cronService *cron.CronService
executor JobExecutor
msgBus *bus.MessageBus
execTool *ExecTool
channel string
chatID string
mu sync.RWMutex
}
// NewCronTool creates a new CronTool
// execTimeout: 0 means no timeout, >0 sets the timeout duration
func NewCronTool(cronService *cron.CronService, executor JobExecutor, msgBus *bus.MessageBus, workspace string, restrict bool, execTimeout time.Duration) *CronTool {
execTool := NewExecTool(workspace, restrict)
execTool.SetTimeout(execTimeout) // 0 means no timeout
return &CronTool{
cronService: cronService,
executor: executor,
msgBus: msgBus,
execTool: execTool,
}
}
// Name returns the tool name
func (t *CronTool) Name() string {
return "cron"
}
// Description returns the tool description
func (t *CronTool) Description() string {
return "Schedule reminders, tasks, or system commands. IMPORTANT: When user asks to be reminded or scheduled, you MUST call this tool. Use 'at_seconds' for one-time reminders (e.g., 'remind me in 10 minutes' → at_seconds=600). Use 'every_seconds' ONLY for recurring tasks (e.g., 'every 2 hours' → every_seconds=7200). Use 'cron_expr' for complex recurring schedules. Use 'command' to execute shell commands directly."
}
// Parameters returns the tool parameters schema
func (t *CronTool) Parameters() map[string]interface{} {
return map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"action": map[string]interface{}{
"type": "string",
"enum": []string{"add", "list", "remove", "enable", "disable"},
"description": "Action to perform. Use 'add' when user wants to schedule a reminder or task.",
},
"message": map[string]interface{}{
"type": "string",
"description": "The reminder/task message to display when triggered. If 'command' is used, this describes what the command does.",
},
"command": map[string]interface{}{
"type": "string",
"description": "Optional: Shell command to execute directly (e.g., 'df -h'). If set, the agent will run this command and report output instead of just showing the message. 'deliver' will be forced to false for commands.",
},
"at_seconds": map[string]interface{}{
"type": "integer",
"description": "One-time reminder: seconds from now when to trigger (e.g., 600 for 10 minutes later). Use this for one-time reminders like 'remind me in 10 minutes'.",
},
"every_seconds": map[string]interface{}{
"type": "integer",
"description": "Recurring interval in seconds (e.g., 3600 for every hour). Use this ONLY for recurring tasks like 'every 2 hours' or 'daily reminder'.",
},
"cron_expr": map[string]interface{}{
"type": "string",
"description": "Cron expression for complex recurring schedules (e.g., '0 9 * * *' for daily at 9am). Use this for complex recurring schedules.",
},
"job_id": map[string]interface{}{
"type": "string",
"description": "Job ID (for remove/enable/disable)",
},
"deliver": map[string]interface{}{
"type": "boolean",
"description": "If true, send message directly to channel. If false, let agent process message (for complex tasks). Default: true",
},
},
"required": []string{"action"},
}
}
// SetContext sets the current session context for job creation
func (t *CronTool) SetContext(channel, chatID string) {
t.mu.Lock()
defer t.mu.Unlock()
t.channel = channel
t.chatID = chatID
}
// Execute runs the tool with the given arguments
func (t *CronTool) Execute(ctx context.Context, args map[string]interface{}) *ToolResult {
action, ok := args["action"].(string)
if !ok {
return ErrorResult("action is required")
}
switch action {
case "add":
return t.addJob(args)
case "list":
return t.listJobs()
case "remove":
return t.removeJob(args)
case "enable":
return t.enableJob(args, true)
case "disable":
return t.enableJob(args, false)
default:
return ErrorResult(fmt.Sprintf("unknown action: %s", action))
}
}
func (t *CronTool) addJob(args map[string]interface{}) *ToolResult {
t.mu.RLock()
channel := t.channel
chatID := t.chatID
t.mu.RUnlock()
if channel == "" || chatID == "" {
return ErrorResult("no session context (channel/chat_id not set). Use this tool in an active conversation.")
}
message, ok := args["message"].(string)
if !ok || message == "" {
return ErrorResult("message is required for add")
}
var schedule cron.CronSchedule
// Check for at_seconds (one-time), every_seconds (recurring), or cron_expr
atSeconds, hasAt := args["at_seconds"].(float64)
everySeconds, hasEvery := args["every_seconds"].(float64)
cronExpr, hasCron := args["cron_expr"].(string)
// Priority: at_seconds > every_seconds > cron_expr
if hasAt {
atMS := time.Now().UnixMilli() + int64(atSeconds)*1000
schedule = cron.CronSchedule{
Kind: "at",
AtMS: &atMS,
}
} else if hasEvery {
everyMS := int64(everySeconds) * 1000
schedule = cron.CronSchedule{
Kind: "every",
EveryMS: &everyMS,
}
} else if hasCron {
schedule = cron.CronSchedule{
Kind: "cron",
Expr: cronExpr,
}
} else {
return ErrorResult("one of at_seconds, every_seconds, or cron_expr is required")
}
// Read deliver parameter, default to true
deliver := true
if d, ok := args["deliver"].(bool); ok {
deliver = d
}
command, _ := args["command"].(string)
if command != "" {
// Commands must be processed by agent/exec tool, so deliver must be false (or handled specifically)
// Actually, let's keep deliver=false to let the system know it's not a simple chat message
// But for our new logic in ExecuteJob, we can handle it regardless of deliver flag if Payload.Command is set.
// However, logically, it's not "delivered" to chat directly as is.
deliver = false
}
// Truncate message for job name (max 30 chars)
messagePreview := utils.Truncate(message, 30)
job, err := t.cronService.AddJob(
messagePreview,
schedule,
message,
deliver,
channel,
chatID,
)
if err != nil {
return ErrorResult(fmt.Sprintf("Error adding job: %v", err))
}
if command != "" {
job.Payload.Command = command
// Need to save the updated payload
t.cronService.UpdateJob(job)
}
return SilentResult(fmt.Sprintf("Cron job added: %s (id: %s)", job.Name, job.ID))
}
func (t *CronTool) listJobs() *ToolResult {
jobs := t.cronService.ListJobs(false)
if len(jobs) == 0 {
return SilentResult("No scheduled jobs")
}
result := "Scheduled jobs:\n"
for _, j := range jobs {
var scheduleInfo string
if j.Schedule.Kind == "every" && j.Schedule.EveryMS != nil {
scheduleInfo = fmt.Sprintf("every %ds", *j.Schedule.EveryMS/1000)
} else if j.Schedule.Kind == "cron" {
scheduleInfo = j.Schedule.Expr
} else if j.Schedule.Kind == "at" {
scheduleInfo = "one-time"
} else {
scheduleInfo = "unknown"
}
result += fmt.Sprintf("- %s (id: %s, %s)\n", j.Name, j.ID, scheduleInfo)
}
return SilentResult(result)
}
func (t *CronTool) removeJob(args map[string]interface{}) *ToolResult {
jobID, ok := args["job_id"].(string)
if !ok || jobID == "" {
return ErrorResult("job_id is required for remove")
}
if t.cronService.RemoveJob(jobID) {
return SilentResult(fmt.Sprintf("Cron job removed: %s", jobID))
}
return ErrorResult(fmt.Sprintf("Job %s not found", jobID))
}
func (t *CronTool) enableJob(args map[string]interface{}, enable bool) *ToolResult {
jobID, ok := args["job_id"].(string)
if !ok || jobID == "" {
return ErrorResult("job_id is required for enable/disable")
}
job := t.cronService.EnableJob(jobID, enable)
if job == nil {
return ErrorResult(fmt.Sprintf("Job %s not found", jobID))
}
status := "enabled"
if !enable {
status = "disabled"
}
return SilentResult(fmt.Sprintf("Cron job '%s' %s", job.Name, status))
}
// ExecuteJob executes a cron job through the agent
func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string {
// Get channel/chatID from job payload
channel := job.Payload.Channel
chatID := job.Payload.To
// Default values if not set
if channel == "" {
channel = "cli"
}
if chatID == "" {
chatID = "direct"
}
// Execute command if present
if job.Payload.Command != "" {
args := map[string]interface{}{
"command": job.Payload.Command,
}
result := t.execTool.Execute(ctx, args)
var output string
if result.IsError {
output = fmt.Sprintf("Error executing scheduled command: %s", result.ForLLM)
} else {
output = fmt.Sprintf("Scheduled command '%s' executed:\n%s", job.Payload.Command, result.ForLLM)
}
t.msgBus.PublishOutbound(bus.OutboundMessage{
Channel: channel,
ChatID: chatID,
Content: output,
})
return "ok"
}
// If deliver=true, send message directly without agent processing
if job.Payload.Deliver {
t.msgBus.PublishOutbound(bus.OutboundMessage{
Channel: channel,
ChatID: chatID,
Content: job.Payload.Message,
})
return "ok"
}
// For deliver=false, process through agent (for complex tasks)
sessionKey := fmt.Sprintf("cron-%s", job.ID)
// Call agent with job's message
response, err := t.executor.ProcessDirectWithChannel(
ctx,
job.Payload.Message,
sessionKey,
channel,
chatID,
)
if err != nil {
return fmt.Sprintf("Error: %v", err)
}
// Response is automatically sent via MessageBus by AgentLoop
_ = response // Will be sent by AgentLoop
return "ok"
}