diff --git a/cmd/picoclaw/internal/cron/add.go b/cmd/picoclaw/internal/cron/add.go index 947557d5a..f9d73089d 100644 --- a/cmd/picoclaw/internal/cron/add.go +++ b/cmd/picoclaw/internal/cron/add.go @@ -14,7 +14,6 @@ func newAddCommand(storePath func() string) *cobra.Command { message string every int64 cronExp string - deliver bool channel string to string ) @@ -37,7 +36,7 @@ func newAddCommand(storePath func() string) *cobra.Command { } cs := cron.NewCronService(storePath(), nil) - job, err := cs.AddJob(name, schedule, message, deliver, channel, to) + job, err := cs.AddJob(name, schedule, message, channel, to) if err != nil { return fmt.Errorf("error adding job: %w", err) } @@ -52,7 +51,6 @@ func newAddCommand(storePath func() string) *cobra.Command { cmd.Flags().StringVarP(&message, "message", "m", "", "Message for agent") cmd.Flags().Int64VarP(&every, "every", "e", 0, "Run every N seconds") cmd.Flags().StringVarP(&cronExp, "cron", "c", "", "Cron expression (e.g. '0 9 * * *')") - cmd.Flags().BoolVarP(&deliver, "deliver", "d", false, "Deliver response to channel") cmd.Flags().StringVar(&to, "to", "", "Recipient for delivery") cmd.Flags().StringVar(&channel, "channel", "", "Channel for delivery") diff --git a/cmd/picoclaw/internal/cron/add_test.go b/cmd/picoclaw/internal/cron/add_test.go index 09701fab5..53875dc51 100644 --- a/cmd/picoclaw/internal/cron/add_test.go +++ b/cmd/picoclaw/internal/cron/add_test.go @@ -21,7 +21,6 @@ func TestNewAddSubcommand(t *testing.T) { assert.NotNil(t, cmd.Flags().Lookup("every")) assert.NotNil(t, cmd.Flags().Lookup("cron")) - assert.NotNil(t, cmd.Flags().Lookup("deliver")) assert.NotNil(t, cmd.Flags().Lookup("to")) assert.NotNil(t, cmd.Flags().Lookup("channel")) diff --git a/pkg/cron/service.go b/pkg/cron/service.go index c1a224013..6a8728943 100644 --- a/pkg/cron/service.go +++ b/pkg/cron/service.go @@ -25,10 +25,8 @@ type CronSchedule struct { type CronPayload struct { Kind string `json:"kind"` - Type string `json:"type"` Message string `json:"message"` Command string `json:"command,omitempty"` - Deliver bool `json:"deliver"` Channel string `json:"channel,omitempty"` To string `json:"to,omitempty"` } @@ -410,7 +408,6 @@ func (cs *CronService) AddJob( name string, schedule CronSchedule, message string, - deliver bool, channel, to string, ) (*CronJob, error) { cs.mu.Lock() @@ -429,7 +426,6 @@ func (cs *CronService) AddJob( Payload: CronPayload{ Kind: "agent_turn", Message: message, - Deliver: deliver, Channel: channel, To: to, }, diff --git a/pkg/cron/service_test.go b/pkg/cron/service_test.go index c55e62174..6dff3b387 100644 --- a/pkg/cron/service_test.go +++ b/pkg/cron/service_test.go @@ -20,7 +20,7 @@ func TestSaveStore_FilePermissions(t *testing.T) { cs := NewCronService(storePath, nil) - _, err := cs.AddJob("test", CronSchedule{Kind: "every", EveryMS: int64Ptr(60000)}, "hello", false, "cli", "direct") + _, err := cs.AddJob("test", CronSchedule{Kind: "every", EveryMS: int64Ptr(60000)}, "hello", "cli", "direct") if err != nil { t.Fatalf("AddJob failed: %v", err) } @@ -52,7 +52,7 @@ func TestCronService_CRUD(t *testing.T) { // Test AddJob at := time.Now().Add(time.Hour).UnixMilli() - job, err := cs.AddJob("Task1", CronSchedule{Kind: "at", AtMS: &at}, "msg", true, "ch", "to") + job, err := cs.AddJob("Task1", CronSchedule{Kind: "at", AtMS: &at}, "msg", "ch", "to") if err != nil || job.ID == "" { t.Fatalf("AddJob failed: %v", err) } @@ -134,7 +134,7 @@ func TestCronService_ExecutionFlow(t *testing.T) { // Add a job then runs 100ms from now target := time.Now().Add(100 * time.Millisecond).UnixMilli() - job, _ := cs.AddJob("FastJob", CronSchedule{Kind: "at", AtMS: &target}, "", false, "", "") + job, _ := cs.AddJob("FastJob", CronSchedule{Kind: "at", AtMS: &target}, "", "", "") // Check for job execution with a timeout success := false @@ -167,7 +167,7 @@ func TestCronService_PersistenceIntegrity(t *testing.T) { // write a job and persist cs1 := NewCronService(tmpFile, nil) at := int64(2000000000000) - cs1.AddJob("PersistMe", CronSchedule{Kind: "at", AtMS: &at}, "payload", true, "ch1", "") + cs1.AddJob("PersistMe", CronSchedule{Kind: "at", AtMS: &at}, "payload", "ch1", "") // check file exists if _, err := os.Stat(tmpFile); os.IsNotExist(err) { @@ -213,7 +213,7 @@ func TestCronService_ConcurrentAccess(t *testing.T) { defer wg.Done() for j := range iterations { at := time.Now().Add(time.Hour).UnixMilli() - cs.AddJob(fmt.Sprintf("Job-%d-%d", id, j), CronSchedule{Kind: "at", AtMS: &at}, "", false, "", "") + cs.AddJob(fmt.Sprintf("Job-%d-%d", id, j), CronSchedule{Kind: "at", AtMS: &at}, "", "", "") time.Sleep(100 * time.Microsecond) } }(i) diff --git a/pkg/tools/cron.go b/pkg/tools/cron.go index 60d9d5e5a..c6ac3a129 100644 --- a/pkg/tools/cron.go +++ b/pkg/tools/cron.go @@ -92,7 +92,7 @@ func (t *CronTool) Parameters() map[string]any { }, "command": map[string]any{ "type": "string", - "description": "Optional: Shell command to execute directly (e.g., 'df -h'). If set, the agent will run this command and report output instead of just showing the message. 'deliver' will be forced to false for commands.", + "description": "Optional: Shell command to execute directly (e.g., 'df -h'). If set, the agent will run this command and report output instead of just showing the message.", }, "command_confirm": map[string]any{ "type": "boolean", @@ -114,15 +114,6 @@ func (t *CronTool) Parameters() map[string]any { "type": "string", "description": "Job ID (for remove/enable/disable)", }, - "type": map[string]any{ - "type": "string", - "enum": []string{"message", "directive"}, - "description": "Message generation strategy. 'message' (default): content is sent directly as-is. 'directive': content is treated as instructions for an AI agent to execute before delivery.", - }, - "deliver": map[string]any{ - "type": "boolean", - "description": "If true, send message directly to channel. If false, let agent process message (for complex tasks). Default: false", - }, }, "required": []string{"action"}, } @@ -199,18 +190,6 @@ func (t *CronTool) addJob(ctx context.Context, args map[string]any) *ToolResult return ErrorResult("one of at_seconds, every_seconds, or cron_expr is required") } - // Read deliver parameter, default to false so scheduled tasks execute through the agent - deliver := false - if d, ok := args["deliver"].(bool); ok { - deliver = d - } - - // Validate type parameter (server-side whitelist, not just LLM schema hint) - msgType, _ := args["type"].(string) - if msgType != "" && msgType != "message" && msgType != "directive" { - return ErrorResult(fmt.Sprintf("invalid type %q, must be 'message' or 'directive'", msgType)) - } - // GHSA-pv8c-p6jf-3fpp: command scheduling requires internal channel. When // allow_command is disabled, explicit confirmation is required as an override. // Non-command reminders remain open to all channels. @@ -226,7 +205,6 @@ func (t *CronTool) addJob(ctx context.Context, args map[string]any) *ToolResult if !t.allowCommand && !commandConfirm { return ErrorResult("command_confirm=true is required when allow_command is disabled") } - deliver = false } // Truncate message for job name (max 30 chars) @@ -236,7 +214,6 @@ func (t *CronTool) addJob(ctx context.Context, args map[string]any) *ToolResult messagePreview, schedule, message, - deliver, channel, chatID, ) @@ -250,10 +227,6 @@ func (t *CronTool) addJob(ctx context.Context, args map[string]any) *ToolResult job.Payload.Command = command needsUpdate = true } - if msgType != "" { - job.Payload.Type = msgType - needsUpdate = true - } if needsUpdate { t.cronService.UpdateJob(job) } @@ -369,40 +342,12 @@ func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string { return "ok" } - // Determine message generation strategy - // Type="directive": treat message as instructions for AI agent to execute - // Type="" or "message" (default): static message content - isDirective := job.Payload.Type == "directive" - - // If deliver=true and not directive, send message directly without agent processing - if job.Payload.Deliver && !isDirective { - pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second) - defer pubCancel() - t.msgBus.PublishOutbound(pubCtx, bus.OutboundMessage{ - Channel: channel, - ChatID: chatID, - Content: job.Payload.Message, - }) - return "ok" - } - - // For deliver=false OR directive mode, process through agent sessionKey := fmt.Sprintf("cron-%s", job.ID) - // Prepare the prompt based on type - prompt := job.Payload.Message - if isDirective { - // For directive type, prefix to clarify this is an instruction - prompt = fmt.Sprintf( - "Please execute the following directive and provide the result:\n\n%s", - job.Payload.Message, - ) - } - - // Call agent with the prepared prompt + // Call agent with the job message response, err := t.executor.ProcessDirectWithChannel( ctx, - prompt, + job.Payload.Message, sessionKey, channel, chatID, diff --git a/pkg/tools/cron_test.go b/pkg/tools/cron_test.go index 186c6a75e..c699908cd 100644 --- a/pkg/tools/cron_test.go +++ b/pkg/tools/cron_test.go @@ -229,28 +229,6 @@ func TestCronTool_NonCommandJobAllowedFromRemoteChannel(t *testing.T) { } } -func TestCronTool_NonCommandJobDefaultsDeliverToFalse(t *testing.T) { - tool := newTestCronTool(t) - ctx := WithToolContext(context.Background(), "telegram", "chat-1") - result := tool.Execute(ctx, map[string]any{ - "action": "add", - "message": "send me a poem", - "at_seconds": float64(600), - }) - - if result.IsError { - t.Fatalf("expected non-command reminder to succeed, got: %s", result.ForLLM) - } - - jobs := tool.cronService.ListJobs(false) - if len(jobs) != 1 { - t.Fatalf("expected 1 job, got %d", len(jobs)) - } - if jobs[0].Payload.Deliver { - t.Fatal("expected deliver=false by default for non-command jobs") - } -} - func TestCronTool_ExecuteJobPublishesErrorWhenExecDisabled(t *testing.T) { cfg := config.DefaultConfig() cfg.Tools.Exec.Enabled = false @@ -346,93 +324,6 @@ func TestCronTool_ExecuteJobSkipsWhenMessageToolAlreadySent(t *testing.T) { } } -func TestCronTool_ExecuteJobDirectiveAddsPromptPrefix(t *testing.T) { - executor := &stubJobExecutor{response: "directive result"} - tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig()) - - originalMsg := "check the weather and summarize" - job := &cron.CronJob{ID: "job-dir-1"} - job.Payload.Channel = "telegram" - job.Payload.To = "chat-1" - job.Payload.Message = originalMsg - job.Payload.Type = "directive" - - if got := tool.ExecuteJob(context.Background(), job); got != "ok" { - t.Fatalf("ExecuteJob() = %q, want ok", got) - } - - wantPrompt := "Please execute the following directive and provide the result:\n\n" + originalMsg - if executor.lastPrompt != wantPrompt { - t.Fatalf("prompt = %q, want exact %q", executor.lastPrompt, wantPrompt) - } - if executor.publishedResp != "directive result" { - t.Fatalf("published response = %q, want %q", executor.publishedResp, "directive result") - } -} - -func TestCronTool_ExecuteJobDirectiveWithDeliverRoutesToAgent(t *testing.T) { - executor := &stubJobExecutor{response: "agent processed"} - tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig()) - - job := &cron.CronJob{ID: "job-dir-deliver"} - job.Payload.Channel = "telegram" - job.Payload.To = "chat-1" - job.Payload.Message = "generate daily report" - job.Payload.Type = "directive" - job.Payload.Deliver = true - - if got := tool.ExecuteJob(context.Background(), job); got != "ok" { - t.Fatalf("ExecuteJob() = %q, want ok", got) - } - - if executor.lastPrompt == "" { - t.Fatal("expected agent to be called for directive+deliver, but ProcessDirectWithChannel was not invoked") - } - if executor.publishedResp != "agent processed" { - t.Fatalf("published response = %q, want %q", executor.publishedResp, "agent processed") - } - - // Verify no direct publish happened on the bus (agent path, not direct path) - ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) - defer cancel() - select { - case msg := <-tool.msgBus.OutboundChan(): - t.Fatalf("unexpected direct bus message: %+v", msg) - case <-ctx.Done(): - // expected: no direct bus message - } -} - -func TestCronTool_ExecuteJobDeliverMessageDirectlyToBus(t *testing.T) { - executor := &stubJobExecutor{response: "should not be called"} - tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig()) - - job := &cron.CronJob{ID: "job-deliver"} - job.Payload.Channel = "telegram" - job.Payload.To = "chat-1" - job.Payload.Message = "hello world" - job.Payload.Deliver = true - - if got := tool.ExecuteJob(context.Background(), job); got != "ok" { - t.Fatalf("ExecuteJob() = %q, want ok", got) - } - - if executor.lastPrompt != "" { - t.Fatal("expected agent NOT to be invoked for deliver=true message type") - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - select { - case msg := <-tool.msgBus.OutboundChan(): - if msg.Content != "hello world" { - t.Fatalf("bus content = %q, want %q", msg.Content, "hello world") - } - case <-ctx.Done(): - t.Fatal("timeout waiting for direct bus message") - } -} - func TestCronTool_ExecuteJobReturnsErrorWithoutPublish(t *testing.T) { executor := &stubJobExecutor{ response: "this response must not be published", @@ -454,43 +345,3 @@ func TestCronTool_ExecuteJobReturnsErrorWithoutPublish(t *testing.T) { t.Fatalf("unexpected publish on error path: %q", executor.publishedResp) } } - -func TestCronTool_AddJobRejectsInvalidType(t *testing.T) { - tool := newTestCronTool(t) - ctx := WithToolContext(context.Background(), "cli", "direct") - result := tool.Execute(ctx, map[string]any{ - "action": "add", - "message": "test", - "at_seconds": float64(60), - "type": "invalid_type", - }) - - if !result.IsError { - t.Fatal("expected error for invalid type parameter") - } - if !strings.Contains(result.ForLLM, "invalid type") { - t.Errorf("expected 'invalid type' error, got: %s", result.ForLLM) - } -} - -func TestCronTool_AddJobAcceptsValidTypes(t *testing.T) { - for _, msgType := range []string{"", "message", "directive"} { - t.Run("type="+msgType, func(t *testing.T) { - tool := newTestCronTool(t) - ctx := WithToolContext(context.Background(), "cli", "direct") - args := map[string]any{ - "action": "add", - "message": "test", - "at_seconds": float64(60), - } - if msgType != "" { - args["type"] = msgType - } - - result := tool.Execute(ctx, args) - if result.IsError { - t.Fatalf("expected valid type %q to succeed, got: %s", msgType, result.ForLLM) - } - }) - } -}