From e414b82ac3b160ad5efdca83cae9337767cbe8d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B2=88=E9=9D=92=E5=B7=9D?= <46062972+ShenQingchuan@users.noreply.github.com> Date: Sun, 29 Mar 2026 13:47:28 +0800 Subject: [PATCH] fix(cron): publish agent response to outbound bus for cron-triggered jobs (#2100) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(cron): publish agent response to outbound bus for cron-triggered jobs When a cron job triggers agent execution via ProcessDirectWithChannel, the agent response was silently discarded — the code assumed AgentLoop would auto-publish it, but SendResponse is false on this path. Delegate to PublishResponseIfNeeded (exported from AgentLoop) so the response reaches the originating channel (e.g. Telegram) only when the message tool did not already deliver content in the same round. Also adds a "directive" message type to CronPayload, allowing cron jobs to instruct the agent to execute a task rather than echo static text. * fix(cron): add type validation and directive test coverage Address reviewer blocking feedback: 1. Server-side whitelist for `type` parameter — the `enum` in Parameters() is only an LLM schema hint; any string was persisted. Now `addJob` rejects values other than "message" and "directive". 2. Comprehensive test coverage for the directive code path: - directive adds prompt prefix to ProcessDirectWithChannel - deliver=true + directive routes through agent (not direct publish) - directive prompt content, sessionKey, channel, chatID are correct - invalid type is rejected; valid types ("", "message", "directive") pass - deliver=true message type goes directly to bus (regression) - agent error path does not trigger publish (regression) Also merge the two UpdateJob calls in addJob into one to avoid redundant disk I/O (non-blocking suggestion from review). * fix(cron): remove omitempty from CronPayload.Type for consistent JSON Empty string and "message" are semantically equivalent defaults; always serializing the field avoids asymmetric JSON output. * test(cron): remove redundant test, strengthen error path coverage - Remove ExecuteJobDirectivePassesCorrectContent: its assertions on sessionKey/channel/chatID duplicate ExecuteJobPublishesAgentResponse; its prompt check duplicates DirectiveAddsPromptPrefix. - Strengthen DirectiveAddsPromptPrefix with exact prompt match and publish response assertion. - Fix ReturnsErrorWithoutPublish: set non-empty stub response so the test verifies the error branch early-return, not the response=="" guard. * fix(ci): satisfy golines and gosmopolitan in cron code --- pkg/agent/loop.go | 6 +- pkg/cron/service.go | 1 + pkg/tools/cron.go | 54 +++++++-- pkg/tools/cron_test.go | 261 ++++++++++++++++++++++++++++++++++++++++- 4 files changed, 309 insertions(+), 13 deletions(-) diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index db476c212..ef2951365 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -461,7 +461,7 @@ func (al *AgentLoop) Run(ctx context.Context) error { if target == nil { cancelDrain() if finalResponse != "" { - al.publishResponseIfNeeded(ctx, msg.Channel, msg.ChatID, finalResponse) + al.PublishResponseIfNeeded(ctx, msg.Channel, msg.ChatID, finalResponse) } return } @@ -521,7 +521,7 @@ func (al *AgentLoop) Run(ctx context.Context) error { } if finalResponse != "" { - al.publishResponseIfNeeded(ctx, target.Channel, target.ChatID, finalResponse) + al.PublishResponseIfNeeded(ctx, target.Channel, target.ChatID, finalResponse) } }() } @@ -603,7 +603,7 @@ func (al *AgentLoop) Stop() { al.running.Store(false) } -func (al *AgentLoop) publishResponseIfNeeded(ctx context.Context, channel, chatID, response string) { +func (al *AgentLoop) PublishResponseIfNeeded(ctx context.Context, channel, chatID, response string) { if response == "" { return } diff --git a/pkg/cron/service.go b/pkg/cron/service.go index 77a413133..c1a224013 100644 --- a/pkg/cron/service.go +++ b/pkg/cron/service.go @@ -25,6 +25,7 @@ type CronSchedule struct { type CronPayload struct { Kind string `json:"kind"` + Type string `json:"type"` Message string `json:"message"` Command string `json:"command,omitempty"` Deliver bool `json:"deliver"` diff --git a/pkg/tools/cron.go b/pkg/tools/cron.go index 154ec75f0..60d9d5e5a 100644 --- a/pkg/tools/cron.go +++ b/pkg/tools/cron.go @@ -16,6 +16,9 @@ import ( // JobExecutor is the interface for executing cron jobs through the agent type JobExecutor interface { ProcessDirectWithChannel(ctx context.Context, content, sessionKey, channel, chatID string) (string, error) + // PublishResponseIfNeeded sends response to the outbound bus only when the + // agent did not already deliver content through the message tool in this round. + PublishResponseIfNeeded(ctx context.Context, channel, chatID, response string) } // CronTool provides scheduling capabilities for the agent @@ -111,6 +114,11 @@ func (t *CronTool) Parameters() map[string]any { "type": "string", "description": "Job ID (for remove/enable/disable)", }, + "type": map[string]any{ + "type": "string", + "enum": []string{"message", "directive"}, + "description": "Message generation strategy. 'message' (default): content is sent directly as-is. 'directive': content is treated as instructions for an AI agent to execute before delivery.", + }, "deliver": map[string]any{ "type": "boolean", "description": "If true, send message directly to channel. If false, let agent process message (for complex tasks). Default: false", @@ -197,6 +205,12 @@ func (t *CronTool) addJob(ctx context.Context, args map[string]any) *ToolResult deliver = d } + // Validate type parameter (server-side whitelist, not just LLM schema hint) + msgType, _ := args["type"].(string) + if msgType != "" && msgType != "message" && msgType != "directive" { + return ErrorResult(fmt.Sprintf("invalid type %q, must be 'message' or 'directive'", msgType)) + } + // GHSA-pv8c-p6jf-3fpp: command scheduling requires internal channel. When // allow_command is disabled, explicit confirmation is required as an override. // Non-command reminders remain open to all channels. @@ -230,9 +244,17 @@ func (t *CronTool) addJob(ctx context.Context, args map[string]any) *ToolResult return ErrorResult(fmt.Sprintf("Error adding job: %v", err)) } + // Apply optional payload fields and persist in a single UpdateJob call + needsUpdate := false if command != "" { job.Payload.Command = command - // Need to save the updated payload + needsUpdate = true + } + if msgType != "" { + job.Payload.Type = msgType + needsUpdate = true + } + if needsUpdate { t.cronService.UpdateJob(job) } @@ -347,8 +369,13 @@ func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string { return "ok" } - // If deliver=true, send message directly without agent processing - if job.Payload.Deliver { + // Determine message generation strategy + // Type="directive": treat message as instructions for AI agent to execute + // Type="" or "message" (default): static message content + isDirective := job.Payload.Type == "directive" + + // If deliver=true and not directive, send message directly without agent processing + if job.Payload.Deliver && !isDirective { pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second) defer pubCancel() t.msgBus.PublishOutbound(pubCtx, bus.OutboundMessage{ @@ -359,13 +386,23 @@ func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string { return "ok" } - // For deliver=false, process through agent (for complex tasks) + // For deliver=false OR directive mode, process through agent sessionKey := fmt.Sprintf("cron-%s", job.ID) - // Call agent with job's message + // Prepare the prompt based on type + prompt := job.Payload.Message + if isDirective { + // For directive type, prefix to clarify this is an instruction + prompt = fmt.Sprintf( + "Please execute the following directive and provide the result:\n\n%s", + job.Payload.Message, + ) + } + + // Call agent with the prepared prompt response, err := t.executor.ProcessDirectWithChannel( ctx, - job.Payload.Message, + prompt, sessionKey, channel, chatID, @@ -374,7 +411,8 @@ func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string { return fmt.Sprintf("Error: %v", err) } - // Response is automatically sent via MessageBus by AgentLoop - _ = response // Will be sent by AgentLoop + if response != "" { + t.executor.PublishResponseIfNeeded(ctx, channel, chatID, response) + } return "ok" } diff --git a/pkg/tools/cron_test.go b/pkg/tools/cron_test.go index cd7d39860..186c6a75e 100644 --- a/pkg/tools/cron_test.go +++ b/pkg/tools/cron_test.go @@ -2,6 +2,7 @@ package tools import ( "context" + "fmt" "path/filepath" "strings" "testing" @@ -12,18 +13,59 @@ import ( "github.com/sipeed/picoclaw/pkg/cron" ) -func newTestCronToolWithConfig(t *testing.T, cfg *config.Config) *CronTool { +type stubJobExecutor struct { + response string + err error + alreadySent bool // simulate message tool having already sent in this round + lastPrompt string + lastKey string + lastChan string + lastChatID string + publishedResp string + publishedChan string + publishedChatID string +} + +func (s *stubJobExecutor) ProcessDirectWithChannel( + _ context.Context, + content, sessionKey, channel, chatID string, +) (string, error) { + s.lastPrompt = content + s.lastKey = sessionKey + s.lastChan = channel + s.lastChatID = chatID + return s.response, s.err +} + +func (s *stubJobExecutor) PublishResponseIfNeeded( + _ context.Context, + channel, chatID, response string, +) { + if s.alreadySent { + return + } + s.publishedResp = response + s.publishedChan = channel + s.publishedChatID = chatID +} + +func newTestCronToolWithExecutorAndConfig(t *testing.T, executor JobExecutor, cfg *config.Config) *CronTool { t.Helper() storePath := filepath.Join(t.TempDir(), "cron.json") cronService := cron.NewCronService(storePath, nil) msgBus := bus.NewMessageBus() - tool, err := NewCronTool(cronService, nil, msgBus, t.TempDir(), true, 0, cfg) + tool, err := NewCronTool(cronService, executor, msgBus, t.TempDir(), true, 0, cfg) if err != nil { t.Fatalf("NewCronTool() error: %v", err) } return tool } +func newTestCronToolWithConfig(t *testing.T, cfg *config.Config) *CronTool { + t.Helper() + return newTestCronToolWithExecutorAndConfig(t, nil, cfg) +} + func newTestCronTool(t *testing.T) *CronTool { t.Helper() return newTestCronToolWithConfig(t, config.DefaultConfig()) @@ -237,3 +279,218 @@ func TestCronTool_ExecuteJobPublishesErrorWhenExecDisabled(t *testing.T) { t.Fatalf("expected exec disabled message, got: %s", msg.Content) } } + +func TestCronTool_ExecuteJobPublishesAgentResponse(t *testing.T) { + executor := &stubJobExecutor{response: "generated reply"} + tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig()) + + job := &cron.CronJob{ID: "job-1"} + job.Payload.Channel = "telegram" + job.Payload.To = "chat-1" + job.Payload.Message = "send me a poem" + + if got := tool.ExecuteJob(context.Background(), job); got != "ok" { + t.Fatalf("ExecuteJob() = %q, want ok", got) + } + + if executor.lastKey != "cron-job-1" { + t.Fatalf("sessionKey = %q, want cron-job-1", executor.lastKey) + } + if executor.lastChan != "telegram" || executor.lastChatID != "chat-1" { + t.Fatalf("executor target = %s/%s, want telegram/chat-1", executor.lastChan, executor.lastChatID) + } + if executor.lastPrompt != "send me a poem" { + t.Fatalf("prompt = %q, want original message", executor.lastPrompt) + } + if executor.publishedResp != "generated reply" { + t.Fatalf("published response = %q, want generated reply", executor.publishedResp) + } + if executor.publishedChan != "telegram" || executor.publishedChatID != "chat-1" { + t.Fatalf("published target = %s/%s, want telegram/chat-1", executor.publishedChan, executor.publishedChatID) + } +} + +func TestCronTool_ExecuteJobSkipsEmptyAgentResponse(t *testing.T) { + executor := &stubJobExecutor{} + tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig()) + + job := &cron.CronJob{ID: "job-empty"} + job.Payload.Channel = "telegram" + job.Payload.To = "chat-1" + job.Payload.Message = "say nothing" + + if got := tool.ExecuteJob(context.Background(), job); got != "ok" { + t.Fatalf("ExecuteJob() = %q, want ok", got) + } + + if executor.publishedResp != "" { + t.Fatalf("unexpected published response: %q", executor.publishedResp) + } +} + +func TestCronTool_ExecuteJobSkipsWhenMessageToolAlreadySent(t *testing.T) { + executor := &stubJobExecutor{response: "Sent.", alreadySent: true} + tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig()) + + job := &cron.CronJob{ID: "job-msg-sent"} + job.Payload.Channel = "telegram" + job.Payload.To = "chat-1" + job.Payload.Message = "send weather" + + if got := tool.ExecuteJob(context.Background(), job); got != "ok" { + t.Fatalf("ExecuteJob() = %q, want ok", got) + } + + if executor.publishedResp != "" { + t.Fatalf("expected no published response when message tool already sent, got: %q", executor.publishedResp) + } +} + +func TestCronTool_ExecuteJobDirectiveAddsPromptPrefix(t *testing.T) { + executor := &stubJobExecutor{response: "directive result"} + tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig()) + + originalMsg := "check the weather and summarize" + job := &cron.CronJob{ID: "job-dir-1"} + job.Payload.Channel = "telegram" + job.Payload.To = "chat-1" + job.Payload.Message = originalMsg + job.Payload.Type = "directive" + + if got := tool.ExecuteJob(context.Background(), job); got != "ok" { + t.Fatalf("ExecuteJob() = %q, want ok", got) + } + + wantPrompt := "Please execute the following directive and provide the result:\n\n" + originalMsg + if executor.lastPrompt != wantPrompt { + t.Fatalf("prompt = %q, want exact %q", executor.lastPrompt, wantPrompt) + } + if executor.publishedResp != "directive result" { + t.Fatalf("published response = %q, want %q", executor.publishedResp, "directive result") + } +} + +func TestCronTool_ExecuteJobDirectiveWithDeliverRoutesToAgent(t *testing.T) { + executor := &stubJobExecutor{response: "agent processed"} + tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig()) + + job := &cron.CronJob{ID: "job-dir-deliver"} + job.Payload.Channel = "telegram" + job.Payload.To = "chat-1" + job.Payload.Message = "generate daily report" + job.Payload.Type = "directive" + job.Payload.Deliver = true + + if got := tool.ExecuteJob(context.Background(), job); got != "ok" { + t.Fatalf("ExecuteJob() = %q, want ok", got) + } + + if executor.lastPrompt == "" { + t.Fatal("expected agent to be called for directive+deliver, but ProcessDirectWithChannel was not invoked") + } + if executor.publishedResp != "agent processed" { + t.Fatalf("published response = %q, want %q", executor.publishedResp, "agent processed") + } + + // Verify no direct publish happened on the bus (agent path, not direct path) + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + select { + case msg := <-tool.msgBus.OutboundChan(): + t.Fatalf("unexpected direct bus message: %+v", msg) + case <-ctx.Done(): + // expected: no direct bus message + } +} + +func TestCronTool_ExecuteJobDeliverMessageDirectlyToBus(t *testing.T) { + executor := &stubJobExecutor{response: "should not be called"} + tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig()) + + job := &cron.CronJob{ID: "job-deliver"} + job.Payload.Channel = "telegram" + job.Payload.To = "chat-1" + job.Payload.Message = "hello world" + job.Payload.Deliver = true + + if got := tool.ExecuteJob(context.Background(), job); got != "ok" { + t.Fatalf("ExecuteJob() = %q, want ok", got) + } + + if executor.lastPrompt != "" { + t.Fatal("expected agent NOT to be invoked for deliver=true message type") + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + select { + case msg := <-tool.msgBus.OutboundChan(): + if msg.Content != "hello world" { + t.Fatalf("bus content = %q, want %q", msg.Content, "hello world") + } + case <-ctx.Done(): + t.Fatal("timeout waiting for direct bus message") + } +} + +func TestCronTool_ExecuteJobReturnsErrorWithoutPublish(t *testing.T) { + executor := &stubJobExecutor{ + response: "this response must not be published", + err: fmt.Errorf("agent failure"), + } + tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig()) + + job := &cron.CronJob{ID: "job-err"} + job.Payload.Channel = "telegram" + job.Payload.To = "chat-1" + job.Payload.Message = "do something" + + got := tool.ExecuteJob(context.Background(), job) + if !strings.Contains(got, "agent failure") { + t.Fatalf("ExecuteJob() = %q, want error message", got) + } + + if executor.publishedResp != "" { + t.Fatalf("unexpected publish on error path: %q", executor.publishedResp) + } +} + +func TestCronTool_AddJobRejectsInvalidType(t *testing.T) { + tool := newTestCronTool(t) + ctx := WithToolContext(context.Background(), "cli", "direct") + result := tool.Execute(ctx, map[string]any{ + "action": "add", + "message": "test", + "at_seconds": float64(60), + "type": "invalid_type", + }) + + if !result.IsError { + t.Fatal("expected error for invalid type parameter") + } + if !strings.Contains(result.ForLLM, "invalid type") { + t.Errorf("expected 'invalid type' error, got: %s", result.ForLLM) + } +} + +func TestCronTool_AddJobAcceptsValidTypes(t *testing.T) { + for _, msgType := range []string{"", "message", "directive"} { + t.Run("type="+msgType, func(t *testing.T) { + tool := newTestCronTool(t) + ctx := WithToolContext(context.Background(), "cli", "direct") + args := map[string]any{ + "action": "add", + "message": "test", + "at_seconds": float64(60), + } + if msgType != "" { + args["type"] = msgType + } + + result := tool.Execute(ctx, args) + if result.IsError { + t.Fatalf("expected valid type %q to succeed, got: %s", msgType, result.ForLLM) + } + }) + } +}