mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
82856bc57a
Add a new configuration option `exec_timeout_minutes` under the `tools.cron` section to control the maximum execution time for cron jobs. The default timeout is set to 5 minutes, which is appropriate for LLM operations. The configuration can be set in the config file or via the `PICOCLAW_TOOLS_CRON_EXEC_TIMEOUT_MINUTES` environment variable. A value of 0 disables the timeout entirely. This change improves system reliability by preventing cron jobs from running indefinitely in case of unexpected failures or hanging processes.
332 lines
9.6 KiB
Go
332 lines
9.6 KiB
Go
package tools
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/sipeed/picoclaw/pkg/bus"
|
|
"github.com/sipeed/picoclaw/pkg/cron"
|
|
"github.com/sipeed/picoclaw/pkg/utils"
|
|
)
|
|
|
|
// JobExecutor is the interface for executing cron jobs through the agent
|
|
type JobExecutor interface {
|
|
ProcessDirectWithChannel(ctx context.Context, content, sessionKey, channel, chatID string) (string, error)
|
|
}
|
|
|
|
// CronTool provides scheduling capabilities for the agent
|
|
type CronTool struct {
|
|
cronService *cron.CronService
|
|
executor JobExecutor
|
|
msgBus *bus.MessageBus
|
|
execTool *ExecTool
|
|
channel string
|
|
chatID string
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// NewCronTool creates a new CronTool
|
|
func NewCronTool(cronService *cron.CronService, executor JobExecutor, msgBus *bus.MessageBus, workspace string, execTimeout time.Duration) *CronTool {
|
|
execTool := NewExecTool(workspace, false)
|
|
if execTimeout > 0 {
|
|
execTool.SetTimeout(execTimeout)
|
|
}
|
|
return &CronTool{
|
|
cronService: cronService,
|
|
executor: executor,
|
|
msgBus: msgBus,
|
|
execTool: execTool,
|
|
}
|
|
}
|
|
|
|
// Name returns the tool name
|
|
func (t *CronTool) Name() string {
|
|
return "cron"
|
|
}
|
|
|
|
// Description returns the tool description
|
|
func (t *CronTool) Description() string {
|
|
return "Schedule reminders, tasks, or system commands. IMPORTANT: When user asks to be reminded or scheduled, you MUST call this tool. Use 'at_seconds' for one-time reminders (e.g., 'remind me in 10 minutes' → at_seconds=600). Use 'every_seconds' ONLY for recurring tasks (e.g., 'every 2 hours' → every_seconds=7200). Use 'cron_expr' for complex recurring schedules. Use 'command' to execute shell commands directly."
|
|
}
|
|
|
|
// Parameters returns the tool parameters schema
|
|
func (t *CronTool) Parameters() map[string]interface{} {
|
|
return map[string]interface{}{
|
|
"type": "object",
|
|
"properties": map[string]interface{}{
|
|
"action": map[string]interface{}{
|
|
"type": "string",
|
|
"enum": []string{"add", "list", "remove", "enable", "disable"},
|
|
"description": "Action to perform. Use 'add' when user wants to schedule a reminder or task.",
|
|
},
|
|
"message": map[string]interface{}{
|
|
"type": "string",
|
|
"description": "The reminder/task message to display when triggered. If 'command' is used, this describes what the command does.",
|
|
},
|
|
"command": map[string]interface{}{
|
|
"type": "string",
|
|
"description": "Optional: Shell command to execute directly (e.g., 'df -h'). If set, the agent will run this command and report output instead of just showing the message. 'deliver' will be forced to false for commands.",
|
|
},
|
|
"at_seconds": map[string]interface{}{
|
|
"type": "integer",
|
|
"description": "One-time reminder: seconds from now when to trigger (e.g., 600 for 10 minutes later). Use this for one-time reminders like 'remind me in 10 minutes'.",
|
|
},
|
|
"every_seconds": map[string]interface{}{
|
|
"type": "integer",
|
|
"description": "Recurring interval in seconds (e.g., 3600 for every hour). Use this ONLY for recurring tasks like 'every 2 hours' or 'daily reminder'.",
|
|
},
|
|
"cron_expr": map[string]interface{}{
|
|
"type": "string",
|
|
"description": "Cron expression for complex recurring schedules (e.g., '0 9 * * *' for daily at 9am). Use this for complex recurring schedules.",
|
|
},
|
|
"job_id": map[string]interface{}{
|
|
"type": "string",
|
|
"description": "Job ID (for remove/enable/disable)",
|
|
},
|
|
"deliver": map[string]interface{}{
|
|
"type": "boolean",
|
|
"description": "If true, send message directly to channel. If false, let agent process message (for complex tasks). Default: true",
|
|
},
|
|
},
|
|
"required": []string{"action"},
|
|
}
|
|
}
|
|
|
|
// SetContext sets the current session context for job creation
|
|
func (t *CronTool) SetContext(channel, chatID string) {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
t.channel = channel
|
|
t.chatID = chatID
|
|
}
|
|
|
|
// Execute runs the tool with the given arguments
|
|
func (t *CronTool) Execute(ctx context.Context, args map[string]interface{}) *ToolResult {
|
|
action, ok := args["action"].(string)
|
|
if !ok {
|
|
return ErrorResult("action is required")
|
|
}
|
|
|
|
switch action {
|
|
case "add":
|
|
return t.addJob(args)
|
|
case "list":
|
|
return t.listJobs()
|
|
case "remove":
|
|
return t.removeJob(args)
|
|
case "enable":
|
|
return t.enableJob(args, true)
|
|
case "disable":
|
|
return t.enableJob(args, false)
|
|
default:
|
|
return ErrorResult(fmt.Sprintf("unknown action: %s", action))
|
|
}
|
|
}
|
|
|
|
func (t *CronTool) addJob(args map[string]interface{}) *ToolResult {
|
|
t.mu.RLock()
|
|
channel := t.channel
|
|
chatID := t.chatID
|
|
t.mu.RUnlock()
|
|
|
|
if channel == "" || chatID == "" {
|
|
return ErrorResult("no session context (channel/chat_id not set). Use this tool in an active conversation.")
|
|
}
|
|
|
|
message, ok := args["message"].(string)
|
|
if !ok || message == "" {
|
|
return ErrorResult("message is required for add")
|
|
}
|
|
|
|
var schedule cron.CronSchedule
|
|
|
|
// Check for at_seconds (one-time), every_seconds (recurring), or cron_expr
|
|
atSeconds, hasAt := args["at_seconds"].(float64)
|
|
everySeconds, hasEvery := args["every_seconds"].(float64)
|
|
cronExpr, hasCron := args["cron_expr"].(string)
|
|
|
|
// Priority: at_seconds > every_seconds > cron_expr
|
|
if hasAt {
|
|
atMS := time.Now().UnixMilli() + int64(atSeconds)*1000
|
|
schedule = cron.CronSchedule{
|
|
Kind: "at",
|
|
AtMS: &atMS,
|
|
}
|
|
} else if hasEvery {
|
|
everyMS := int64(everySeconds) * 1000
|
|
schedule = cron.CronSchedule{
|
|
Kind: "every",
|
|
EveryMS: &everyMS,
|
|
}
|
|
} else if hasCron {
|
|
schedule = cron.CronSchedule{
|
|
Kind: "cron",
|
|
Expr: cronExpr,
|
|
}
|
|
} else {
|
|
return ErrorResult("one of at_seconds, every_seconds, or cron_expr is required")
|
|
}
|
|
|
|
// Read deliver parameter, default to true
|
|
deliver := true
|
|
if d, ok := args["deliver"].(bool); ok {
|
|
deliver = d
|
|
}
|
|
|
|
command, _ := args["command"].(string)
|
|
if command != "" {
|
|
// Commands must be processed by agent/exec tool, so deliver must be false (or handled specifically)
|
|
// Actually, let's keep deliver=false to let the system know it's not a simple chat message
|
|
// But for our new logic in ExecuteJob, we can handle it regardless of deliver flag if Payload.Command is set.
|
|
// However, logically, it's not "delivered" to chat directly as is.
|
|
deliver = false
|
|
}
|
|
|
|
// Truncate message for job name (max 30 chars)
|
|
messagePreview := utils.Truncate(message, 30)
|
|
|
|
job, err := t.cronService.AddJob(
|
|
messagePreview,
|
|
schedule,
|
|
message,
|
|
deliver,
|
|
channel,
|
|
chatID,
|
|
)
|
|
if err != nil {
|
|
return ErrorResult(fmt.Sprintf("Error adding job: %v", err))
|
|
}
|
|
|
|
if command != "" {
|
|
job.Payload.Command = command
|
|
// Need to save the updated payload
|
|
t.cronService.UpdateJob(job)
|
|
}
|
|
|
|
return SilentResult(fmt.Sprintf("Cron job added: %s (id: %s)", job.Name, job.ID))
|
|
}
|
|
|
|
func (t *CronTool) listJobs() *ToolResult {
|
|
jobs := t.cronService.ListJobs(false)
|
|
|
|
if len(jobs) == 0 {
|
|
return SilentResult("No scheduled jobs")
|
|
}
|
|
|
|
result := "Scheduled jobs:\n"
|
|
for _, j := range jobs {
|
|
var scheduleInfo string
|
|
if j.Schedule.Kind == "every" && j.Schedule.EveryMS != nil {
|
|
scheduleInfo = fmt.Sprintf("every %ds", *j.Schedule.EveryMS/1000)
|
|
} else if j.Schedule.Kind == "cron" {
|
|
scheduleInfo = j.Schedule.Expr
|
|
} else if j.Schedule.Kind == "at" {
|
|
scheduleInfo = "one-time"
|
|
} else {
|
|
scheduleInfo = "unknown"
|
|
}
|
|
result += fmt.Sprintf("- %s (id: %s, %s)\n", j.Name, j.ID, scheduleInfo)
|
|
}
|
|
|
|
return SilentResult(result)
|
|
}
|
|
|
|
func (t *CronTool) removeJob(args map[string]interface{}) *ToolResult {
|
|
jobID, ok := args["job_id"].(string)
|
|
if !ok || jobID == "" {
|
|
return ErrorResult("job_id is required for remove")
|
|
}
|
|
|
|
if t.cronService.RemoveJob(jobID) {
|
|
return SilentResult(fmt.Sprintf("Cron job removed: %s", jobID))
|
|
}
|
|
return ErrorResult(fmt.Sprintf("Job %s not found", jobID))
|
|
}
|
|
|
|
func (t *CronTool) enableJob(args map[string]interface{}, enable bool) *ToolResult {
|
|
jobID, ok := args["job_id"].(string)
|
|
if !ok || jobID == "" {
|
|
return ErrorResult("job_id is required for enable/disable")
|
|
}
|
|
|
|
job := t.cronService.EnableJob(jobID, enable)
|
|
if job == nil {
|
|
return ErrorResult(fmt.Sprintf("Job %s not found", jobID))
|
|
}
|
|
|
|
status := "enabled"
|
|
if !enable {
|
|
status = "disabled"
|
|
}
|
|
return SilentResult(fmt.Sprintf("Cron job '%s' %s", job.Name, status))
|
|
}
|
|
|
|
// ExecuteJob executes a cron job through the agent
|
|
func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string {
|
|
// Get channel/chatID from job payload
|
|
channel := job.Payload.Channel
|
|
chatID := job.Payload.To
|
|
|
|
// Default values if not set
|
|
if channel == "" {
|
|
channel = "cli"
|
|
}
|
|
if chatID == "" {
|
|
chatID = "direct"
|
|
}
|
|
|
|
// Execute command if present
|
|
if job.Payload.Command != "" {
|
|
args := map[string]interface{}{
|
|
"command": job.Payload.Command,
|
|
}
|
|
|
|
result := t.execTool.Execute(ctx, args)
|
|
var output string
|
|
if result.IsError {
|
|
output = fmt.Sprintf("Error executing scheduled command: %s", result.ForLLM)
|
|
} else {
|
|
output = fmt.Sprintf("Scheduled command '%s' executed:\n%s", job.Payload.Command, result.ForLLM)
|
|
}
|
|
|
|
t.msgBus.PublishOutbound(bus.OutboundMessage{
|
|
Channel: channel,
|
|
ChatID: chatID,
|
|
Content: output,
|
|
})
|
|
return "ok"
|
|
}
|
|
|
|
// If deliver=true, send message directly without agent processing
|
|
if job.Payload.Deliver {
|
|
t.msgBus.PublishOutbound(bus.OutboundMessage{
|
|
Channel: channel,
|
|
ChatID: chatID,
|
|
Content: job.Payload.Message,
|
|
})
|
|
return "ok"
|
|
}
|
|
|
|
// For deliver=false, process through agent (for complex tasks)
|
|
sessionKey := fmt.Sprintf("cron-%s", job.ID)
|
|
|
|
// Call agent with job's message
|
|
response, err := t.executor.ProcessDirectWithChannel(
|
|
ctx,
|
|
job.Payload.Message,
|
|
sessionKey,
|
|
channel,
|
|
chatID,
|
|
)
|
|
|
|
if err != nil {
|
|
return fmt.Sprintf("Error: %v", err)
|
|
}
|
|
|
|
// Response is automatically sent via MessageBus by AgentLoop
|
|
_ = response // Will be sent by AgentLoop
|
|
return "ok"
|
|
}
|