refactor(cron): remove deliver and type params, unify agent execution path (#2147)

The agent path now publishes to outbound bus directly (since #2100),
making the deliver=true direct-to-bus shortcut and the directive type
prompt wrapping redundant. All cron jobs now uniformly route through
the agent. This is an intentional behavior change: old jobs with
deliver=true will execute through the agent instead of bypassing it.

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
daming大铭
2026-03-29 22:52:34 +08:00
committed by GitHub
parent 1ef0553929
commit 1fc5345857
6 changed files with 9 additions and 220 deletions
-4
View File
@@ -25,10 +25,8 @@ type CronSchedule struct {
type CronPayload struct {
Kind string `json:"kind"`
Type string `json:"type"`
Message string `json:"message"`
Command string `json:"command,omitempty"`
Deliver bool `json:"deliver"`
Channel string `json:"channel,omitempty"`
To string `json:"to,omitempty"`
}
@@ -410,7 +408,6 @@ func (cs *CronService) AddJob(
name string,
schedule CronSchedule,
message string,
deliver bool,
channel, to string,
) (*CronJob, error) {
cs.mu.Lock()
@@ -429,7 +426,6 @@ func (cs *CronService) AddJob(
Payload: CronPayload{
Kind: "agent_turn",
Message: message,
Deliver: deliver,
Channel: channel,
To: to,
},
+5 -5
View File
@@ -20,7 +20,7 @@ func TestSaveStore_FilePermissions(t *testing.T) {
cs := NewCronService(storePath, nil)
_, err := cs.AddJob("test", CronSchedule{Kind: "every", EveryMS: int64Ptr(60000)}, "hello", false, "cli", "direct")
_, err := cs.AddJob("test", CronSchedule{Kind: "every", EveryMS: int64Ptr(60000)}, "hello", "cli", "direct")
if err != nil {
t.Fatalf("AddJob failed: %v", err)
}
@@ -52,7 +52,7 @@ func TestCronService_CRUD(t *testing.T) {
// Test AddJob
at := time.Now().Add(time.Hour).UnixMilli()
job, err := cs.AddJob("Task1", CronSchedule{Kind: "at", AtMS: &at}, "msg", true, "ch", "to")
job, err := cs.AddJob("Task1", CronSchedule{Kind: "at", AtMS: &at}, "msg", "ch", "to")
if err != nil || job.ID == "" {
t.Fatalf("AddJob failed: %v", err)
}
@@ -134,7 +134,7 @@ func TestCronService_ExecutionFlow(t *testing.T) {
// Add a job then runs 100ms from now
target := time.Now().Add(100 * time.Millisecond).UnixMilli()
job, _ := cs.AddJob("FastJob", CronSchedule{Kind: "at", AtMS: &target}, "", false, "", "")
job, _ := cs.AddJob("FastJob", CronSchedule{Kind: "at", AtMS: &target}, "", "", "")
// Check for job execution with a timeout
success := false
@@ -167,7 +167,7 @@ func TestCronService_PersistenceIntegrity(t *testing.T) {
// write a job and persist
cs1 := NewCronService(tmpFile, nil)
at := int64(2000000000000)
cs1.AddJob("PersistMe", CronSchedule{Kind: "at", AtMS: &at}, "payload", true, "ch1", "")
cs1.AddJob("PersistMe", CronSchedule{Kind: "at", AtMS: &at}, "payload", "ch1", "")
// check file exists
if _, err := os.Stat(tmpFile); os.IsNotExist(err) {
@@ -213,7 +213,7 @@ func TestCronService_ConcurrentAccess(t *testing.T) {
defer wg.Done()
for j := range iterations {
at := time.Now().Add(time.Hour).UnixMilli()
cs.AddJob(fmt.Sprintf("Job-%d-%d", id, j), CronSchedule{Kind: "at", AtMS: &at}, "", false, "", "")
cs.AddJob(fmt.Sprintf("Job-%d-%d", id, j), CronSchedule{Kind: "at", AtMS: &at}, "", "", "")
time.Sleep(100 * time.Microsecond)
}
}(i)
+3 -58
View File
@@ -92,7 +92,7 @@ func (t *CronTool) Parameters() map[string]any {
},
"command": map[string]any{
"type": "string",
"description": "Optional: Shell command to execute directly (e.g., 'df -h'). If set, the agent will run this command and report output instead of just showing the message. 'deliver' will be forced to false for commands.",
"description": "Optional: Shell command to execute directly (e.g., 'df -h'). If set, the agent will run this command and report output instead of just showing the message.",
},
"command_confirm": map[string]any{
"type": "boolean",
@@ -114,15 +114,6 @@ func (t *CronTool) Parameters() map[string]any {
"type": "string",
"description": "Job ID (for remove/enable/disable)",
},
"type": map[string]any{
"type": "string",
"enum": []string{"message", "directive"},
"description": "Message generation strategy. 'message' (default): content is sent directly as-is. 'directive': content is treated as instructions for an AI agent to execute before delivery.",
},
"deliver": map[string]any{
"type": "boolean",
"description": "If true, send message directly to channel. If false, let agent process message (for complex tasks). Default: false",
},
},
"required": []string{"action"},
}
@@ -199,18 +190,6 @@ func (t *CronTool) addJob(ctx context.Context, args map[string]any) *ToolResult
return ErrorResult("one of at_seconds, every_seconds, or cron_expr is required")
}
// Read deliver parameter, default to false so scheduled tasks execute through the agent
deliver := false
if d, ok := args["deliver"].(bool); ok {
deliver = d
}
// Validate type parameter (server-side whitelist, not just LLM schema hint)
msgType, _ := args["type"].(string)
if msgType != "" && msgType != "message" && msgType != "directive" {
return ErrorResult(fmt.Sprintf("invalid type %q, must be 'message' or 'directive'", msgType))
}
// GHSA-pv8c-p6jf-3fpp: command scheduling requires internal channel. When
// allow_command is disabled, explicit confirmation is required as an override.
// Non-command reminders remain open to all channels.
@@ -226,7 +205,6 @@ func (t *CronTool) addJob(ctx context.Context, args map[string]any) *ToolResult
if !t.allowCommand && !commandConfirm {
return ErrorResult("command_confirm=true is required when allow_command is disabled")
}
deliver = false
}
// Truncate message for job name (max 30 chars)
@@ -236,7 +214,6 @@ func (t *CronTool) addJob(ctx context.Context, args map[string]any) *ToolResult
messagePreview,
schedule,
message,
deliver,
channel,
chatID,
)
@@ -250,10 +227,6 @@ func (t *CronTool) addJob(ctx context.Context, args map[string]any) *ToolResult
job.Payload.Command = command
needsUpdate = true
}
if msgType != "" {
job.Payload.Type = msgType
needsUpdate = true
}
if needsUpdate {
t.cronService.UpdateJob(job)
}
@@ -369,40 +342,12 @@ func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string {
return "ok"
}
// Determine message generation strategy
// Type="directive": treat message as instructions for AI agent to execute
// Type="" or "message" (default): static message content
isDirective := job.Payload.Type == "directive"
// If deliver=true and not directive, send message directly without agent processing
if job.Payload.Deliver && !isDirective {
pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer pubCancel()
t.msgBus.PublishOutbound(pubCtx, bus.OutboundMessage{
Channel: channel,
ChatID: chatID,
Content: job.Payload.Message,
})
return "ok"
}
// For deliver=false OR directive mode, process through agent
sessionKey := fmt.Sprintf("cron-%s", job.ID)
// Prepare the prompt based on type
prompt := job.Payload.Message
if isDirective {
// For directive type, prefix to clarify this is an instruction
prompt = fmt.Sprintf(
"Please execute the following directive and provide the result:\n\n%s",
job.Payload.Message,
)
}
// Call agent with the prepared prompt
// Call agent with the job message
response, err := t.executor.ProcessDirectWithChannel(
ctx,
prompt,
job.Payload.Message,
sessionKey,
channel,
chatID,
-149
View File
@@ -229,28 +229,6 @@ func TestCronTool_NonCommandJobAllowedFromRemoteChannel(t *testing.T) {
}
}
func TestCronTool_NonCommandJobDefaultsDeliverToFalse(t *testing.T) {
tool := newTestCronTool(t)
ctx := WithToolContext(context.Background(), "telegram", "chat-1")
result := tool.Execute(ctx, map[string]any{
"action": "add",
"message": "send me a poem",
"at_seconds": float64(600),
})
if result.IsError {
t.Fatalf("expected non-command reminder to succeed, got: %s", result.ForLLM)
}
jobs := tool.cronService.ListJobs(false)
if len(jobs) != 1 {
t.Fatalf("expected 1 job, got %d", len(jobs))
}
if jobs[0].Payload.Deliver {
t.Fatal("expected deliver=false by default for non-command jobs")
}
}
func TestCronTool_ExecuteJobPublishesErrorWhenExecDisabled(t *testing.T) {
cfg := config.DefaultConfig()
cfg.Tools.Exec.Enabled = false
@@ -346,93 +324,6 @@ func TestCronTool_ExecuteJobSkipsWhenMessageToolAlreadySent(t *testing.T) {
}
}
func TestCronTool_ExecuteJobDirectiveAddsPromptPrefix(t *testing.T) {
executor := &stubJobExecutor{response: "directive result"}
tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig())
originalMsg := "check the weather and summarize"
job := &cron.CronJob{ID: "job-dir-1"}
job.Payload.Channel = "telegram"
job.Payload.To = "chat-1"
job.Payload.Message = originalMsg
job.Payload.Type = "directive"
if got := tool.ExecuteJob(context.Background(), job); got != "ok" {
t.Fatalf("ExecuteJob() = %q, want ok", got)
}
wantPrompt := "Please execute the following directive and provide the result:\n\n" + originalMsg
if executor.lastPrompt != wantPrompt {
t.Fatalf("prompt = %q, want exact %q", executor.lastPrompt, wantPrompt)
}
if executor.publishedResp != "directive result" {
t.Fatalf("published response = %q, want %q", executor.publishedResp, "directive result")
}
}
func TestCronTool_ExecuteJobDirectiveWithDeliverRoutesToAgent(t *testing.T) {
executor := &stubJobExecutor{response: "agent processed"}
tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig())
job := &cron.CronJob{ID: "job-dir-deliver"}
job.Payload.Channel = "telegram"
job.Payload.To = "chat-1"
job.Payload.Message = "generate daily report"
job.Payload.Type = "directive"
job.Payload.Deliver = true
if got := tool.ExecuteJob(context.Background(), job); got != "ok" {
t.Fatalf("ExecuteJob() = %q, want ok", got)
}
if executor.lastPrompt == "" {
t.Fatal("expected agent to be called for directive+deliver, but ProcessDirectWithChannel was not invoked")
}
if executor.publishedResp != "agent processed" {
t.Fatalf("published response = %q, want %q", executor.publishedResp, "agent processed")
}
// Verify no direct publish happened on the bus (agent path, not direct path)
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
select {
case msg := <-tool.msgBus.OutboundChan():
t.Fatalf("unexpected direct bus message: %+v", msg)
case <-ctx.Done():
// expected: no direct bus message
}
}
func TestCronTool_ExecuteJobDeliverMessageDirectlyToBus(t *testing.T) {
executor := &stubJobExecutor{response: "should not be called"}
tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig())
job := &cron.CronJob{ID: "job-deliver"}
job.Payload.Channel = "telegram"
job.Payload.To = "chat-1"
job.Payload.Message = "hello world"
job.Payload.Deliver = true
if got := tool.ExecuteJob(context.Background(), job); got != "ok" {
t.Fatalf("ExecuteJob() = %q, want ok", got)
}
if executor.lastPrompt != "" {
t.Fatal("expected agent NOT to be invoked for deliver=true message type")
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
select {
case msg := <-tool.msgBus.OutboundChan():
if msg.Content != "hello world" {
t.Fatalf("bus content = %q, want %q", msg.Content, "hello world")
}
case <-ctx.Done():
t.Fatal("timeout waiting for direct bus message")
}
}
func TestCronTool_ExecuteJobReturnsErrorWithoutPublish(t *testing.T) {
executor := &stubJobExecutor{
response: "this response must not be published",
@@ -454,43 +345,3 @@ func TestCronTool_ExecuteJobReturnsErrorWithoutPublish(t *testing.T) {
t.Fatalf("unexpected publish on error path: %q", executor.publishedResp)
}
}
func TestCronTool_AddJobRejectsInvalidType(t *testing.T) {
tool := newTestCronTool(t)
ctx := WithToolContext(context.Background(), "cli", "direct")
result := tool.Execute(ctx, map[string]any{
"action": "add",
"message": "test",
"at_seconds": float64(60),
"type": "invalid_type",
})
if !result.IsError {
t.Fatal("expected error for invalid type parameter")
}
if !strings.Contains(result.ForLLM, "invalid type") {
t.Errorf("expected 'invalid type' error, got: %s", result.ForLLM)
}
}
func TestCronTool_AddJobAcceptsValidTypes(t *testing.T) {
for _, msgType := range []string{"", "message", "directive"} {
t.Run("type="+msgType, func(t *testing.T) {
tool := newTestCronTool(t)
ctx := WithToolContext(context.Background(), "cli", "direct")
args := map[string]any{
"action": "add",
"message": "test",
"at_seconds": float64(60),
}
if msgType != "" {
args["type"] = msgType
}
result := tool.Execute(ctx, args)
if result.IsError {
t.Fatalf("expected valid type %q to succeed, got: %s", msgType, result.ForLLM)
}
})
}
}