Files
picoclaw/pkg/cron/service.go
T
沈青川 e414b82ac3 fix(cron): publish agent response to outbound bus for cron-triggered jobs (#2100)
* fix(cron): publish agent response to outbound bus for cron-triggered jobs

When a cron job triggers agent execution via ProcessDirectWithChannel,
the agent response was silently discarded — the code assumed AgentLoop
would auto-publish it, but SendResponse is false on this path.

Delegate to PublishResponseIfNeeded (exported from AgentLoop) so the
response reaches the originating channel (e.g. Telegram) only when the
message tool did not already deliver content in the same round.

Also adds a "directive" message type to CronPayload, allowing cron jobs
to instruct the agent to execute a task rather than echo static text.

* fix(cron): add type validation and directive test coverage

Address reviewer blocking feedback:

1. Server-side whitelist for `type` parameter — the `enum` in
   Parameters() is only an LLM schema hint; any string was persisted.
   Now `addJob` rejects values other than "message" and "directive".

2. Comprehensive test coverage for the directive code path:
   - directive adds prompt prefix to ProcessDirectWithChannel
   - deliver=true + directive routes through agent (not direct publish)
   - directive prompt content, sessionKey, channel, chatID are correct
   - invalid type is rejected; valid types ("", "message", "directive") pass
   - deliver=true message type goes directly to bus (regression)
   - agent error path does not trigger publish (regression)

Also merge the two UpdateJob calls in addJob into one to avoid
redundant disk I/O (non-blocking suggestion from review).

* fix(cron): remove omitempty from CronPayload.Type for consistent JSON

Empty string and "message" are semantically equivalent defaults;
always serializing the field avoids asymmetric JSON output.

* test(cron): remove redundant test, strengthen error path coverage

- Remove ExecuteJobDirectivePassesCorrectContent: its assertions on
  sessionKey/channel/chatID duplicate ExecuteJobPublishesAgentResponse;
  its prompt check duplicates DirectiveAddsPromptPrefix.
- Strengthen DirectiveAddsPromptPrefix with exact prompt match and
  publish response assertion.
- Fix ReturnsErrorWithoutPublish: set non-empty stub response so the
  test verifies the error branch early-return, not the response==""
  guard.

* fix(ci): satisfy golines and gosmopolitan in cron code
2026-03-29 13:47:28 +08:00

574 lines
12 KiB
Go

package cron
import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"log"
"os"
"sync"
"time"
"github.com/adhocore/gronx"
"github.com/sipeed/picoclaw/pkg/fileutil"
)
type CronSchedule struct {
Kind string `json:"kind"`
AtMS *int64 `json:"atMs,omitempty"`
EveryMS *int64 `json:"everyMs,omitempty"`
Expr string `json:"expr,omitempty"`
TZ string `json:"tz,omitempty"`
}
type CronPayload struct {
Kind string `json:"kind"`
Type string `json:"type"`
Message string `json:"message"`
Command string `json:"command,omitempty"`
Deliver bool `json:"deliver"`
Channel string `json:"channel,omitempty"`
To string `json:"to,omitempty"`
}
type CronJobState struct {
NextRunAtMS *int64 `json:"nextRunAtMs,omitempty"`
LastRunAtMS *int64 `json:"lastRunAtMs,omitempty"`
LastStatus string `json:"lastStatus,omitempty"`
LastError string `json:"lastError,omitempty"`
}
type CronJob struct {
ID string `json:"id"`
Name string `json:"name"`
Enabled bool `json:"enabled"`
Schedule CronSchedule `json:"schedule"`
Payload CronPayload `json:"payload"`
State CronJobState `json:"state"`
CreatedAtMS int64 `json:"createdAtMs"`
UpdatedAtMS int64 `json:"updatedAtMs"`
DeleteAfterRun bool `json:"deleteAfterRun"`
}
type CronStore struct {
Version int `json:"version"`
Jobs []CronJob `json:"jobs"`
}
type JobHandler func(job *CronJob) (string, error)
type CronService struct {
storePath string
store *CronStore
onJob JobHandler
mu sync.RWMutex
running bool
stopChan chan struct{}
wakeChan chan struct{}
gronx *gronx.Gronx
}
func NewCronService(storePath string, onJob JobHandler) *CronService {
cs := &CronService{
storePath: storePath,
onJob: onJob,
gronx: gronx.New(),
wakeChan: make(chan struct{}),
}
// Initialize and load store on creation
cs.loadStore()
return cs
}
func (cs *CronService) Start() error {
cs.mu.Lock()
defer cs.mu.Unlock()
if cs.running {
return nil
}
if err := cs.loadStore(); err != nil {
return fmt.Errorf("failed to load store: %w", err)
}
cs.recomputeNextRuns()
if err := cs.saveStoreUnsafe(); err != nil {
return fmt.Errorf("failed to save store: %w", err)
}
cs.stopChan = make(chan struct{})
if cs.wakeChan == nil {
cs.wakeChan = make(chan struct{})
}
cs.running = true
go cs.runLoop(cs.stopChan)
return nil
}
func (cs *CronService) Stop() {
cs.mu.Lock()
defer cs.mu.Unlock()
if !cs.running {
return
}
cs.running = false
if cs.stopChan != nil {
close(cs.stopChan)
cs.stopChan = nil
}
}
func (cs *CronService) runLoop(stopChan chan struct{}) {
timer := time.NewTimer(time.Hour)
if !timer.Stop() {
<-timer.C
}
defer timer.Stop()
for {
// every loop, recalculate the next wake time
cs.mu.RLock()
nextWake := cs.getNextWakeMS()
cs.mu.RUnlock()
var delay time.Duration
now := time.Now().UnixMilli()
if nextWake == nil {
// no jobs, sleep for a long time (or until a new job is added)
delay = time.Hour
} else {
diff := *nextWake - now
if diff <= 0 {
delay = 0
} else {
delay = time.Duration(diff) * time.Millisecond
}
}
timer.Reset(delay)
select {
case <-stopChan:
return
case <-cs.wakeChan: // wake on new job or update
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
continue
case <-timer.C:
cs.checkJobs()
}
}
}
func (cs *CronService) checkJobs() {
cs.mu.Lock()
if !cs.running {
cs.mu.Unlock()
return
}
now := time.Now().UnixMilli()
var dueJobIDs []string
// Collect jobs that are due (we need to copy them to execute outside lock)
for i := range cs.store.Jobs {
job := &cs.store.Jobs[i]
if job.Enabled && job.State.NextRunAtMS != nil && *job.State.NextRunAtMS <= now {
dueJobIDs = append(dueJobIDs, job.ID)
}
}
// Reset next run for due jobs before unlocking to avoid duplicate execution.
dueMap := make(map[string]bool, len(dueJobIDs))
for _, jobID := range dueJobIDs {
dueMap[jobID] = true
}
for i := range cs.store.Jobs {
if dueMap[cs.store.Jobs[i].ID] {
cs.store.Jobs[i].State.NextRunAtMS = nil
}
}
if err := cs.saveStoreUnsafe(); err != nil {
log.Printf("[cron] failed to save store: %v", err)
}
cs.mu.Unlock()
// Execute jobs outside lock.
for _, jobID := range dueJobIDs {
cs.executeJobByID(jobID)
}
}
func (cs *CronService) executeJobByID(jobID string) {
startTime := time.Now().UnixMilli()
cs.mu.RLock()
var callbackJob *CronJob
for i := range cs.store.Jobs {
job := &cs.store.Jobs[i]
if job.ID == jobID {
jobCopy := *job
callbackJob = &jobCopy
break
}
}
cs.mu.RUnlock()
if callbackJob == nil {
log.Printf("[cron] job %s not found, skipping", jobID)
return
}
// Log job execution start
log.Printf("[cron] ▶ executing job '%s' (id: %s, schedule: %s, channel: %s)",
callbackJob.Name, jobID, callbackJob.Schedule.Kind, callbackJob.Payload.Channel)
var err error
if cs.onJob != nil {
_, err = cs.onJob(callbackJob)
}
execDuration := time.Now().UnixMilli() - startTime
// Now acquire lock to update state
cs.mu.Lock()
defer cs.mu.Unlock()
var job *CronJob
for i := range cs.store.Jobs {
if cs.store.Jobs[i].ID == jobID {
job = &cs.store.Jobs[i]
break
}
}
if job == nil {
log.Printf("[cron] job %s disappeared before state update", jobID)
return
}
job.State.LastRunAtMS = &startTime
job.UpdatedAtMS = time.Now().UnixMilli()
if err != nil {
job.State.LastStatus = "error"
job.State.LastError = err.Error()
log.Printf("[cron] ✗ job '%s' failed after %dms: %v", job.Name, execDuration, err)
} else {
job.State.LastStatus = "ok"
job.State.LastError = ""
}
// Compute next run time
var nextRunStr string
if job.Schedule.Kind == "at" {
if job.DeleteAfterRun {
cs.removeJobUnsafe(job.ID)
nextRunStr = "(deleted)"
} else {
job.Enabled = false
job.State.NextRunAtMS = nil
nextRunStr = "(disabled)"
}
} else {
nextRun := cs.computeNextRun(&job.Schedule, time.Now().UnixMilli())
job.State.NextRunAtMS = nextRun
if nextRun != nil {
nextRunStr = time.UnixMilli(*nextRun).Format("2006-01-02 15:04:05")
} else {
nextRunStr = "(none)"
}
}
if err == nil {
log.Printf("[cron] ✓ job '%s' completed in %dms, next run: %s", job.Name, execDuration, nextRunStr)
}
if err := cs.saveStoreUnsafe(); err != nil {
log.Printf("[cron] failed to save store: %v", err)
}
}
func (cs *CronService) computeNextRun(schedule *CronSchedule, nowMS int64) *int64 {
switch schedule.Kind {
case "at":
if schedule.AtMS != nil && *schedule.AtMS > nowMS {
return schedule.AtMS
}
return nil
case "every":
if schedule.EveryMS == nil || *schedule.EveryMS <= 0 {
return nil
}
next := nowMS + *schedule.EveryMS
return &next
case "cron":
if schedule.Expr == "" {
return nil
}
// Use gronx to calculate next run time
now := time.UnixMilli(nowMS)
nextTime, err := gronx.NextTickAfter(schedule.Expr, now, false)
if err != nil {
log.Printf("[cron] failed to compute next run for expr '%s': %v", schedule.Expr, err)
return nil
}
nextMS := nextTime.UnixMilli()
return &nextMS
default:
log.Printf("[cron] unknown schedule kind '%s'", schedule.Kind)
return nil
}
}
// wake up the loop to re-evaluate next wake time immediately (e.g. after add/update/remove jobs)
func (cs *CronService) notify() {
select {
case cs.wakeChan <- struct{}{}:
default:
// if the channel is full, it means the loop will wake up soon anyway, so we can skip sending
}
}
func (cs *CronService) recomputeNextRuns() {
now := time.Now().UnixMilli()
for i := range cs.store.Jobs {
job := &cs.store.Jobs[i]
if job.Enabled {
job.State.NextRunAtMS = cs.computeNextRun(&job.Schedule, now)
}
}
}
func (cs *CronService) getNextWakeMS() *int64 {
var nextWake *int64
for _, job := range cs.store.Jobs {
if job.Enabled && job.State.NextRunAtMS != nil {
if nextWake == nil || *job.State.NextRunAtMS < *nextWake {
nextWake = job.State.NextRunAtMS
}
}
}
return nextWake
}
func (cs *CronService) Load() error {
cs.mu.Lock()
defer cs.mu.Unlock()
return cs.loadStore()
}
func (cs *CronService) SetOnJob(handler JobHandler) {
cs.mu.Lock()
defer cs.mu.Unlock()
cs.onJob = handler
}
func (cs *CronService) loadStore() error {
cs.store = &CronStore{
Version: 1,
Jobs: []CronJob{},
}
data, err := os.ReadFile(cs.storePath)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
return json.Unmarshal(data, cs.store)
}
func (cs *CronService) saveStoreUnsafe() error {
data, err := json.MarshalIndent(cs.store, "", " ")
if err != nil {
return err
}
// Use unified atomic write utility with explicit sync for flash storage reliability.
return fileutil.WriteFileAtomic(cs.storePath, data, 0o600)
}
func (cs *CronService) AddJob(
name string,
schedule CronSchedule,
message string,
deliver bool,
channel, to string,
) (*CronJob, error) {
cs.mu.Lock()
defer cs.mu.Unlock()
now := time.Now().UnixMilli()
// One-time tasks (at) should be deleted after execution
deleteAfterRun := (schedule.Kind == "at")
job := CronJob{
ID: generateID(),
Name: name,
Enabled: true,
Schedule: schedule,
Payload: CronPayload{
Kind: "agent_turn",
Message: message,
Deliver: deliver,
Channel: channel,
To: to,
},
State: CronJobState{
NextRunAtMS: cs.computeNextRun(&schedule, now),
},
CreatedAtMS: now,
UpdatedAtMS: now,
DeleteAfterRun: deleteAfterRun,
}
cs.store.Jobs = append(cs.store.Jobs, job)
if err := cs.saveStoreUnsafe(); err != nil {
return nil, err
}
cs.notify()
return &job, nil
}
func (cs *CronService) UpdateJob(job *CronJob) error {
cs.mu.Lock()
defer cs.mu.Unlock()
for i := range cs.store.Jobs {
if cs.store.Jobs[i].ID == job.ID {
cs.store.Jobs[i] = *job
cs.store.Jobs[i].UpdatedAtMS = time.Now().UnixMilli()
cs.notify()
return cs.saveStoreUnsafe()
}
}
return fmt.Errorf("job not found")
}
func (cs *CronService) RemoveJob(jobID string) bool {
cs.mu.Lock()
defer cs.mu.Unlock()
return cs.removeJobUnsafe(jobID)
}
func (cs *CronService) removeJobUnsafe(jobID string) bool {
before := len(cs.store.Jobs)
var jobs []CronJob
for _, job := range cs.store.Jobs {
if job.ID != jobID {
jobs = append(jobs, job)
}
}
cs.store.Jobs = jobs
removed := len(cs.store.Jobs) < before
if removed {
if err := cs.saveStoreUnsafe(); err != nil {
log.Printf("[cron] failed to save store after remove: %v", err)
}
}
cs.notify()
return removed
}
func (cs *CronService) EnableJob(jobID string, enabled bool) *CronJob {
cs.mu.Lock()
defer cs.mu.Unlock()
for i := range cs.store.Jobs {
job := &cs.store.Jobs[i]
if job.ID == jobID {
job.Enabled = enabled
job.UpdatedAtMS = time.Now().UnixMilli()
if enabled {
job.State.NextRunAtMS = cs.computeNextRun(&job.Schedule, time.Now().UnixMilli())
} else {
job.State.NextRunAtMS = nil
}
if err := cs.saveStoreUnsafe(); err != nil {
log.Printf("[cron] failed to save store after enable: %v", err)
}
cs.notify()
return job
}
}
return nil
}
func (cs *CronService) ListJobs(includeDisabled bool) []CronJob {
cs.mu.RLock()
defer cs.mu.RUnlock()
if includeDisabled {
return cs.store.Jobs
}
var enabled []CronJob
for _, job := range cs.store.Jobs {
if job.Enabled {
enabled = append(enabled, job)
}
}
return enabled
}
func (cs *CronService) Status() map[string]any {
cs.mu.RLock()
defer cs.mu.RUnlock()
var enabledCount int
for _, job := range cs.store.Jobs {
if job.Enabled {
enabledCount++
}
}
return map[string]any{
"enabled": cs.running,
"jobs": len(cs.store.Jobs),
"nextWakeAtMS": cs.getNextWakeMS(),
}
}
func generateID() string {
// Use crypto/rand for better uniqueness under concurrent access
b := make([]byte, 8)
if _, err := rand.Read(b); err != nil {
// Fallback to time-based if crypto/rand fails
return fmt.Sprintf("%d", time.Now().UnixNano())
}
return hex.EncodeToString(b)
}