diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index db476c212..ef2951365 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -461,7 +461,7 @@ func (al *AgentLoop) Run(ctx context.Context) error { if target == nil { cancelDrain() if finalResponse != "" { - al.publishResponseIfNeeded(ctx, msg.Channel, msg.ChatID, finalResponse) + al.PublishResponseIfNeeded(ctx, msg.Channel, msg.ChatID, finalResponse) } return } @@ -521,7 +521,7 @@ func (al *AgentLoop) Run(ctx context.Context) error { } if finalResponse != "" { - al.publishResponseIfNeeded(ctx, target.Channel, target.ChatID, finalResponse) + al.PublishResponseIfNeeded(ctx, target.Channel, target.ChatID, finalResponse) } }() } @@ -603,7 +603,7 @@ func (al *AgentLoop) Stop() { al.running.Store(false) } -func (al *AgentLoop) publishResponseIfNeeded(ctx context.Context, channel, chatID, response string) { +func (al *AgentLoop) PublishResponseIfNeeded(ctx context.Context, channel, chatID, response string) { if response == "" { return } diff --git a/pkg/cron/service.go b/pkg/cron/service.go index 77a413133..c1a224013 100644 --- a/pkg/cron/service.go +++ b/pkg/cron/service.go @@ -25,6 +25,7 @@ type CronSchedule struct { type CronPayload struct { Kind string `json:"kind"` + Type string `json:"type"` Message string `json:"message"` Command string `json:"command,omitempty"` Deliver bool `json:"deliver"` diff --git a/pkg/tools/cron.go b/pkg/tools/cron.go index 154ec75f0..60d9d5e5a 100644 --- a/pkg/tools/cron.go +++ b/pkg/tools/cron.go @@ -16,6 +16,9 @@ import ( // JobExecutor is the interface for executing cron jobs through the agent type JobExecutor interface { ProcessDirectWithChannel(ctx context.Context, content, sessionKey, channel, chatID string) (string, error) + // PublishResponseIfNeeded sends response to the outbound bus only when the + // agent did not already deliver content through the message tool in this round. + PublishResponseIfNeeded(ctx context.Context, channel, chatID, response string) } // CronTool provides scheduling capabilities for the agent @@ -111,6 +114,11 @@ func (t *CronTool) Parameters() map[string]any { "type": "string", "description": "Job ID (for remove/enable/disable)", }, + "type": map[string]any{ + "type": "string", + "enum": []string{"message", "directive"}, + "description": "Message generation strategy. 'message' (default): content is sent directly as-is. 'directive': content is treated as instructions for an AI agent to execute before delivery.", + }, "deliver": map[string]any{ "type": "boolean", "description": "If true, send message directly to channel. If false, let agent process message (for complex tasks). Default: false", @@ -197,6 +205,12 @@ func (t *CronTool) addJob(ctx context.Context, args map[string]any) *ToolResult deliver = d } + // Validate type parameter (server-side whitelist, not just LLM schema hint) + msgType, _ := args["type"].(string) + if msgType != "" && msgType != "message" && msgType != "directive" { + return ErrorResult(fmt.Sprintf("invalid type %q, must be 'message' or 'directive'", msgType)) + } + // GHSA-pv8c-p6jf-3fpp: command scheduling requires internal channel. When // allow_command is disabled, explicit confirmation is required as an override. // Non-command reminders remain open to all channels. @@ -230,9 +244,17 @@ func (t *CronTool) addJob(ctx context.Context, args map[string]any) *ToolResult return ErrorResult(fmt.Sprintf("Error adding job: %v", err)) } + // Apply optional payload fields and persist in a single UpdateJob call + needsUpdate := false if command != "" { job.Payload.Command = command - // Need to save the updated payload + needsUpdate = true + } + if msgType != "" { + job.Payload.Type = msgType + needsUpdate = true + } + if needsUpdate { t.cronService.UpdateJob(job) } @@ -347,8 +369,13 @@ func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string { return "ok" } - // If deliver=true, send message directly without agent processing - if job.Payload.Deliver { + // Determine message generation strategy + // Type="directive": treat message as instructions for AI agent to execute + // Type="" or "message" (default): static message content + isDirective := job.Payload.Type == "directive" + + // If deliver=true and not directive, send message directly without agent processing + if job.Payload.Deliver && !isDirective { pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second) defer pubCancel() t.msgBus.PublishOutbound(pubCtx, bus.OutboundMessage{ @@ -359,13 +386,23 @@ func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string { return "ok" } - // For deliver=false, process through agent (for complex tasks) + // For deliver=false OR directive mode, process through agent sessionKey := fmt.Sprintf("cron-%s", job.ID) - // Call agent with job's message + // Prepare the prompt based on type + prompt := job.Payload.Message + if isDirective { + // For directive type, prefix to clarify this is an instruction + prompt = fmt.Sprintf( + "Please execute the following directive and provide the result:\n\n%s", + job.Payload.Message, + ) + } + + // Call agent with the prepared prompt response, err := t.executor.ProcessDirectWithChannel( ctx, - job.Payload.Message, + prompt, sessionKey, channel, chatID, @@ -374,7 +411,8 @@ func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string { return fmt.Sprintf("Error: %v", err) } - // Response is automatically sent via MessageBus by AgentLoop - _ = response // Will be sent by AgentLoop + if response != "" { + t.executor.PublishResponseIfNeeded(ctx, channel, chatID, response) + } return "ok" } diff --git a/pkg/tools/cron_test.go b/pkg/tools/cron_test.go index cd7d39860..186c6a75e 100644 --- a/pkg/tools/cron_test.go +++ b/pkg/tools/cron_test.go @@ -2,6 +2,7 @@ package tools import ( "context" + "fmt" "path/filepath" "strings" "testing" @@ -12,18 +13,59 @@ import ( "github.com/sipeed/picoclaw/pkg/cron" ) -func newTestCronToolWithConfig(t *testing.T, cfg *config.Config) *CronTool { +type stubJobExecutor struct { + response string + err error + alreadySent bool // simulate message tool having already sent in this round + lastPrompt string + lastKey string + lastChan string + lastChatID string + publishedResp string + publishedChan string + publishedChatID string +} + +func (s *stubJobExecutor) ProcessDirectWithChannel( + _ context.Context, + content, sessionKey, channel, chatID string, +) (string, error) { + s.lastPrompt = content + s.lastKey = sessionKey + s.lastChan = channel + s.lastChatID = chatID + return s.response, s.err +} + +func (s *stubJobExecutor) PublishResponseIfNeeded( + _ context.Context, + channel, chatID, response string, +) { + if s.alreadySent { + return + } + s.publishedResp = response + s.publishedChan = channel + s.publishedChatID = chatID +} + +func newTestCronToolWithExecutorAndConfig(t *testing.T, executor JobExecutor, cfg *config.Config) *CronTool { t.Helper() storePath := filepath.Join(t.TempDir(), "cron.json") cronService := cron.NewCronService(storePath, nil) msgBus := bus.NewMessageBus() - tool, err := NewCronTool(cronService, nil, msgBus, t.TempDir(), true, 0, cfg) + tool, err := NewCronTool(cronService, executor, msgBus, t.TempDir(), true, 0, cfg) if err != nil { t.Fatalf("NewCronTool() error: %v", err) } return tool } +func newTestCronToolWithConfig(t *testing.T, cfg *config.Config) *CronTool { + t.Helper() + return newTestCronToolWithExecutorAndConfig(t, nil, cfg) +} + func newTestCronTool(t *testing.T) *CronTool { t.Helper() return newTestCronToolWithConfig(t, config.DefaultConfig()) @@ -237,3 +279,218 @@ func TestCronTool_ExecuteJobPublishesErrorWhenExecDisabled(t *testing.T) { t.Fatalf("expected exec disabled message, got: %s", msg.Content) } } + +func TestCronTool_ExecuteJobPublishesAgentResponse(t *testing.T) { + executor := &stubJobExecutor{response: "generated reply"} + tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig()) + + job := &cron.CronJob{ID: "job-1"} + job.Payload.Channel = "telegram" + job.Payload.To = "chat-1" + job.Payload.Message = "send me a poem" + + if got := tool.ExecuteJob(context.Background(), job); got != "ok" { + t.Fatalf("ExecuteJob() = %q, want ok", got) + } + + if executor.lastKey != "cron-job-1" { + t.Fatalf("sessionKey = %q, want cron-job-1", executor.lastKey) + } + if executor.lastChan != "telegram" || executor.lastChatID != "chat-1" { + t.Fatalf("executor target = %s/%s, want telegram/chat-1", executor.lastChan, executor.lastChatID) + } + if executor.lastPrompt != "send me a poem" { + t.Fatalf("prompt = %q, want original message", executor.lastPrompt) + } + if executor.publishedResp != "generated reply" { + t.Fatalf("published response = %q, want generated reply", executor.publishedResp) + } + if executor.publishedChan != "telegram" || executor.publishedChatID != "chat-1" { + t.Fatalf("published target = %s/%s, want telegram/chat-1", executor.publishedChan, executor.publishedChatID) + } +} + +func TestCronTool_ExecuteJobSkipsEmptyAgentResponse(t *testing.T) { + executor := &stubJobExecutor{} + tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig()) + + job := &cron.CronJob{ID: "job-empty"} + job.Payload.Channel = "telegram" + job.Payload.To = "chat-1" + job.Payload.Message = "say nothing" + + if got := tool.ExecuteJob(context.Background(), job); got != "ok" { + t.Fatalf("ExecuteJob() = %q, want ok", got) + } + + if executor.publishedResp != "" { + t.Fatalf("unexpected published response: %q", executor.publishedResp) + } +} + +func TestCronTool_ExecuteJobSkipsWhenMessageToolAlreadySent(t *testing.T) { + executor := &stubJobExecutor{response: "Sent.", alreadySent: true} + tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig()) + + job := &cron.CronJob{ID: "job-msg-sent"} + job.Payload.Channel = "telegram" + job.Payload.To = "chat-1" + job.Payload.Message = "send weather" + + if got := tool.ExecuteJob(context.Background(), job); got != "ok" { + t.Fatalf("ExecuteJob() = %q, want ok", got) + } + + if executor.publishedResp != "" { + t.Fatalf("expected no published response when message tool already sent, got: %q", executor.publishedResp) + } +} + +func TestCronTool_ExecuteJobDirectiveAddsPromptPrefix(t *testing.T) { + executor := &stubJobExecutor{response: "directive result"} + tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig()) + + originalMsg := "check the weather and summarize" + job := &cron.CronJob{ID: "job-dir-1"} + job.Payload.Channel = "telegram" + job.Payload.To = "chat-1" + job.Payload.Message = originalMsg + job.Payload.Type = "directive" + + if got := tool.ExecuteJob(context.Background(), job); got != "ok" { + t.Fatalf("ExecuteJob() = %q, want ok", got) + } + + wantPrompt := "Please execute the following directive and provide the result:\n\n" + originalMsg + if executor.lastPrompt != wantPrompt { + t.Fatalf("prompt = %q, want exact %q", executor.lastPrompt, wantPrompt) + } + if executor.publishedResp != "directive result" { + t.Fatalf("published response = %q, want %q", executor.publishedResp, "directive result") + } +} + +func TestCronTool_ExecuteJobDirectiveWithDeliverRoutesToAgent(t *testing.T) { + executor := &stubJobExecutor{response: "agent processed"} + tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig()) + + job := &cron.CronJob{ID: "job-dir-deliver"} + job.Payload.Channel = "telegram" + job.Payload.To = "chat-1" + job.Payload.Message = "generate daily report" + job.Payload.Type = "directive" + job.Payload.Deliver = true + + if got := tool.ExecuteJob(context.Background(), job); got != "ok" { + t.Fatalf("ExecuteJob() = %q, want ok", got) + } + + if executor.lastPrompt == "" { + t.Fatal("expected agent to be called for directive+deliver, but ProcessDirectWithChannel was not invoked") + } + if executor.publishedResp != "agent processed" { + t.Fatalf("published response = %q, want %q", executor.publishedResp, "agent processed") + } + + // Verify no direct publish happened on the bus (agent path, not direct path) + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + select { + case msg := <-tool.msgBus.OutboundChan(): + t.Fatalf("unexpected direct bus message: %+v", msg) + case <-ctx.Done(): + // expected: no direct bus message + } +} + +func TestCronTool_ExecuteJobDeliverMessageDirectlyToBus(t *testing.T) { + executor := &stubJobExecutor{response: "should not be called"} + tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig()) + + job := &cron.CronJob{ID: "job-deliver"} + job.Payload.Channel = "telegram" + job.Payload.To = "chat-1" + job.Payload.Message = "hello world" + job.Payload.Deliver = true + + if got := tool.ExecuteJob(context.Background(), job); got != "ok" { + t.Fatalf("ExecuteJob() = %q, want ok", got) + } + + if executor.lastPrompt != "" { + t.Fatal("expected agent NOT to be invoked for deliver=true message type") + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + select { + case msg := <-tool.msgBus.OutboundChan(): + if msg.Content != "hello world" { + t.Fatalf("bus content = %q, want %q", msg.Content, "hello world") + } + case <-ctx.Done(): + t.Fatal("timeout waiting for direct bus message") + } +} + +func TestCronTool_ExecuteJobReturnsErrorWithoutPublish(t *testing.T) { + executor := &stubJobExecutor{ + response: "this response must not be published", + err: fmt.Errorf("agent failure"), + } + tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig()) + + job := &cron.CronJob{ID: "job-err"} + job.Payload.Channel = "telegram" + job.Payload.To = "chat-1" + job.Payload.Message = "do something" + + got := tool.ExecuteJob(context.Background(), job) + if !strings.Contains(got, "agent failure") { + t.Fatalf("ExecuteJob() = %q, want error message", got) + } + + if executor.publishedResp != "" { + t.Fatalf("unexpected publish on error path: %q", executor.publishedResp) + } +} + +func TestCronTool_AddJobRejectsInvalidType(t *testing.T) { + tool := newTestCronTool(t) + ctx := WithToolContext(context.Background(), "cli", "direct") + result := tool.Execute(ctx, map[string]any{ + "action": "add", + "message": "test", + "at_seconds": float64(60), + "type": "invalid_type", + }) + + if !result.IsError { + t.Fatal("expected error for invalid type parameter") + } + if !strings.Contains(result.ForLLM, "invalid type") { + t.Errorf("expected 'invalid type' error, got: %s", result.ForLLM) + } +} + +func TestCronTool_AddJobAcceptsValidTypes(t *testing.T) { + for _, msgType := range []string{"", "message", "directive"} { + t.Run("type="+msgType, func(t *testing.T) { + tool := newTestCronTool(t) + ctx := WithToolContext(context.Background(), "cli", "direct") + args := map[string]any{ + "action": "add", + "message": "test", + "at_seconds": float64(60), + } + if msgType != "" { + args["type"] = msgType + } + + result := tool.Execute(ctx, args) + if result.IsError { + t.Fatalf("expected valid type %q to succeed, got: %s", msgType, result.ForLLM) + } + }) + } +}