mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
1d8ef7dcfb
Add GetJob and improved UpdateJob to CronService with proper cloning, schedule diffing, and next-run recomputation. Expose get/update actions in the cron tool so agents can inspect and partially update jobs without losing payloads or needing remove+add cycles. Includes access control for remote channels and command safety gates.
368 lines
9.4 KiB
Go
368 lines
9.4 KiB
Go
package cron
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestSaveStore_FilePermissions(t *testing.T) {
|
|
if runtime.GOOS == "windows" {
|
|
t.Skip("file permission bits are not enforced on Windows")
|
|
}
|
|
|
|
tmpDir := t.TempDir()
|
|
storePath := filepath.Join(tmpDir, "cron", "jobs.json")
|
|
|
|
cs := NewCronService(storePath, nil)
|
|
|
|
_, err := cs.AddJob("test", CronSchedule{Kind: "every", EveryMS: int64Ptr(60000)}, "hello", "cli", "direct")
|
|
if err != nil {
|
|
t.Fatalf("AddJob failed: %v", err)
|
|
}
|
|
|
|
info, err := os.Stat(storePath)
|
|
if err != nil {
|
|
t.Fatalf("Stat failed: %v", err)
|
|
}
|
|
|
|
perm := info.Mode().Perm()
|
|
if perm != 0o600 {
|
|
t.Errorf("cron store has permission %04o, want 0600", perm)
|
|
}
|
|
}
|
|
|
|
func int64Ptr(v int64) *int64 {
|
|
return &v
|
|
}
|
|
|
|
func setupService(handler JobHandler) (*CronService, string) {
|
|
tmpFile := fmt.Sprintf("test_cron_%d.json", time.Now().UnixNano())
|
|
cs := NewCronService(tmpFile, handler)
|
|
return cs, tmpFile
|
|
}
|
|
|
|
func TestCronService_CRUD(t *testing.T) {
|
|
cs, path := setupService(nil)
|
|
defer os.Remove(path)
|
|
|
|
// Test AddJob
|
|
at := time.Now().Add(time.Hour).UnixMilli()
|
|
job, err := cs.AddJob("Task1", CronSchedule{Kind: "at", AtMS: &at}, "msg", "ch", "to")
|
|
if err != nil || job.ID == "" {
|
|
t.Fatalf("AddJob failed: %v", err)
|
|
}
|
|
|
|
// Test ListJobs
|
|
if len(cs.ListJobs(true)) != 1 {
|
|
t.Error("ListJobs should return 1 job")
|
|
}
|
|
|
|
// Test UpdateJob
|
|
job.Name = "UpdatedName"
|
|
err = cs.UpdateJob(job)
|
|
if err != nil || cs.store.Jobs[0].Name != "UpdatedName" {
|
|
t.Error("UpdateJob failed")
|
|
}
|
|
|
|
// Test EnableJob
|
|
cs.EnableJob(job.ID, false)
|
|
if cs.store.Jobs[0].Enabled != false || cs.store.Jobs[0].State.NextRunAtMS != nil {
|
|
t.Error("EnableJob(false) failed to clear state")
|
|
}
|
|
|
|
// Test RemoveJob
|
|
removed := cs.RemoveJob(job.ID)
|
|
if !removed || len(cs.store.Jobs) != 0 {
|
|
t.Error("RemoveJob failed")
|
|
}
|
|
}
|
|
|
|
func TestCronService_GetJobReturnsCopy(t *testing.T) {
|
|
cs, path := setupService(nil)
|
|
defer os.Remove(path)
|
|
|
|
everyMS := int64(60_000)
|
|
job, err := cs.AddJob("Task1", CronSchedule{Kind: "every", EveryMS: &everyMS}, "msg", "ch", "to")
|
|
if err != nil {
|
|
t.Fatalf("AddJob failed: %v", err)
|
|
}
|
|
if job.State.NextRunAtMS == nil {
|
|
t.Fatal("expected initial next run")
|
|
}
|
|
nextRun := *job.State.NextRunAtMS
|
|
|
|
got, ok := cs.GetJob(job.ID)
|
|
if !ok {
|
|
t.Fatal("GetJob should find job")
|
|
}
|
|
got.Name = "mutated"
|
|
got.Payload.Message = "changed"
|
|
if got.Schedule.EveryMS != nil {
|
|
*got.Schedule.EveryMS = 120_000
|
|
}
|
|
if got.State.NextRunAtMS != nil {
|
|
*got.State.NextRunAtMS = time.Now().Add(3 * time.Hour).UnixMilli()
|
|
}
|
|
|
|
again, ok := cs.GetJob(job.ID)
|
|
if !ok {
|
|
t.Fatal("GetJob should still find job")
|
|
}
|
|
if again.Name != "Task1" || again.Payload.Message != "msg" {
|
|
t.Fatalf("GetJob should return a copy, got %+v", again)
|
|
}
|
|
if again.Schedule.EveryMS == nil || *again.Schedule.EveryMS != everyMS {
|
|
t.Fatalf("GetJob should not alias schedule pointers, got %+v", again.Schedule)
|
|
}
|
|
if again.State.NextRunAtMS == nil || *again.State.NextRunAtMS != nextRun {
|
|
t.Fatalf("GetJob should not alias state pointers, got %+v", again.State)
|
|
}
|
|
}
|
|
|
|
func TestCronService_UpdateJobRecomputesNextRunOnScheduleOrEnabledChange(t *testing.T) {
|
|
cs, path := setupService(nil)
|
|
defer os.Remove(path)
|
|
|
|
at := time.Now().Add(time.Hour).UnixMilli()
|
|
job, err := cs.AddJob("Task1", CronSchedule{Kind: "at", AtMS: &at}, "msg", "ch", "to")
|
|
if err != nil {
|
|
t.Fatalf("AddJob failed: %v", err)
|
|
}
|
|
if job.State.NextRunAtMS == nil {
|
|
t.Fatal("expected initial next run")
|
|
}
|
|
initialNextRun := *job.State.NextRunAtMS
|
|
|
|
everyMS := int64(30_000)
|
|
job.Schedule = CronSchedule{Kind: "every", EveryMS: &everyMS}
|
|
if err := cs.UpdateJob(job); err != nil {
|
|
t.Fatalf("UpdateJob schedule failed: %v", err)
|
|
}
|
|
updated, ok := cs.GetJob(job.ID)
|
|
if !ok {
|
|
t.Fatal("updated job not found")
|
|
}
|
|
if updated.State.NextRunAtMS == nil {
|
|
t.Fatal("expected recomputed next run after schedule change")
|
|
}
|
|
if *updated.State.NextRunAtMS == initialNextRun {
|
|
t.Fatalf("next run should be recomputed, still %d", initialNextRun)
|
|
}
|
|
|
|
if disabled := cs.EnableJob(job.ID, false); disabled == nil {
|
|
t.Fatal("EnableJob(false) returned nil")
|
|
}
|
|
disabled, ok := cs.GetJob(job.ID)
|
|
if !ok {
|
|
t.Fatal("disabled job not found")
|
|
}
|
|
disabled.Enabled = true
|
|
if err := cs.UpdateJob(disabled); err != nil {
|
|
t.Fatalf("UpdateJob enabled failed: %v", err)
|
|
}
|
|
reenabled, ok := cs.GetJob(job.ID)
|
|
if !ok {
|
|
t.Fatal("reenabled job not found")
|
|
}
|
|
if !reenabled.Enabled || reenabled.State.NextRunAtMS == nil {
|
|
t.Fatalf("expected enabled job with next run, got %+v", reenabled)
|
|
}
|
|
}
|
|
|
|
func TestCronService_UpdateJobPreservesRunStateOnPayloadOnlyChange(t *testing.T) {
|
|
cs, path := setupService(nil)
|
|
defer os.Remove(path)
|
|
|
|
everyMS := int64(60_000)
|
|
job, err := cs.AddJob("Task1", CronSchedule{Kind: "every", EveryMS: &everyMS}, "msg", "ch", "to")
|
|
if err != nil {
|
|
t.Fatalf("AddJob failed: %v", err)
|
|
}
|
|
lastRun := time.Now().Add(-time.Minute).UnixMilli()
|
|
job.State.LastRunAtMS = &lastRun
|
|
job.State.LastStatus = "ok"
|
|
job.State.LastError = "previous"
|
|
if job.State.NextRunAtMS == nil {
|
|
t.Fatal("expected next run before update")
|
|
}
|
|
nextRun := *job.State.NextRunAtMS
|
|
|
|
job.Payload.Message = "updated msg"
|
|
if err := cs.UpdateJob(job); err != nil {
|
|
t.Fatalf("UpdateJob failed: %v", err)
|
|
}
|
|
|
|
updated, ok := cs.GetJob(job.ID)
|
|
if !ok {
|
|
t.Fatal("updated job not found")
|
|
}
|
|
if updated.State.LastRunAtMS == nil || *updated.State.LastRunAtMS != lastRun {
|
|
t.Fatalf("last run changed: %+v", updated.State)
|
|
}
|
|
if updated.State.LastStatus != "ok" || updated.State.LastError != "previous" {
|
|
t.Fatalf("last status changed: %+v", updated.State)
|
|
}
|
|
if updated.State.NextRunAtMS == nil || *updated.State.NextRunAtMS != nextRun {
|
|
t.Fatalf("next run should be preserved: before=%d after=%+v", nextRun, updated.State.NextRunAtMS)
|
|
}
|
|
}
|
|
|
|
// 2. Test Cron Expression Calculation Logic
|
|
func TestCronService_ComputeNextRun(t *testing.T) {
|
|
cs, path := setupService(nil)
|
|
defer os.Remove(path)
|
|
|
|
now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC).UnixMilli()
|
|
|
|
tests := []struct {
|
|
name string
|
|
schedule CronSchedule
|
|
wantNil bool
|
|
}{
|
|
{"Valid Cron", CronSchedule{Kind: "cron", Expr: "0 * * * *"}, false},
|
|
{"Invalid Cron", CronSchedule{Kind: "cron", Expr: "invalid"}, true},
|
|
{"Every MS", CronSchedule{Kind: "every", EveryMS: int64Ptr(5000)}, false},
|
|
{"At Future", CronSchedule{Kind: "at", AtMS: int64Ptr(now + 1000)}, false},
|
|
{"At Past", CronSchedule{Kind: "at", AtMS: int64Ptr(now - 1000)}, true},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
got := cs.computeNextRun(&tt.schedule, now)
|
|
if (got == nil) != tt.wantNil {
|
|
t.Errorf("%s: got %v, wantNil %v", tt.name, got, tt.wantNil)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// 3. Test Execution Flow
|
|
func TestCronService_ExecutionFlow(t *testing.T) {
|
|
var mu sync.Mutex
|
|
executedJobs := make(map[string]bool)
|
|
|
|
handler := func(job *CronJob) (string, error) {
|
|
mu.Lock()
|
|
executedJobs[job.ID] = true
|
|
mu.Unlock()
|
|
return "ok", nil
|
|
}
|
|
|
|
cs, path := setupService(handler)
|
|
defer os.Remove(path)
|
|
|
|
// Start the service
|
|
if err := cs.Start(); err != nil {
|
|
t.Fatalf("Start failed: %v", err)
|
|
}
|
|
defer cs.Stop()
|
|
|
|
// Add a job then runs 100ms from now
|
|
target := time.Now().Add(100 * time.Millisecond).UnixMilli()
|
|
job, _ := cs.AddJob("FastJob", CronSchedule{Kind: "at", AtMS: &target}, "", "", "")
|
|
|
|
// Check for job execution with a timeout
|
|
success := false
|
|
for range 20 {
|
|
mu.Lock()
|
|
if executedJobs[job.ID] {
|
|
success = true
|
|
mu.Unlock()
|
|
break
|
|
}
|
|
mu.Unlock()
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
|
|
if !success {
|
|
t.Error("Job was not executed in time")
|
|
}
|
|
|
|
// check that the job is removed after execution (DeleteAfterRun = true)
|
|
status := cs.Status()
|
|
if status["jobs"].(int) != 0 {
|
|
t.Errorf("Job should be deleted after run, got count: %v", status["jobs"])
|
|
}
|
|
}
|
|
|
|
func TestCronService_PersistenceIntegrity(t *testing.T) {
|
|
tmpFile := "persist_test.json"
|
|
defer os.Remove(tmpFile)
|
|
|
|
// write a job and persist
|
|
cs1 := NewCronService(tmpFile, nil)
|
|
at := int64(2000000000000)
|
|
cs1.AddJob("PersistMe", CronSchedule{Kind: "at", AtMS: &at}, "payload", "ch1", "")
|
|
|
|
// check file exists
|
|
if _, err := os.Stat(tmpFile); os.IsNotExist(err) {
|
|
t.Fatal("Store file was not created")
|
|
}
|
|
|
|
// reload and check data integrity
|
|
cs2 := NewCronService(tmpFile, nil)
|
|
if err := cs2.Load(); err != nil {
|
|
t.Fatalf("Failed to load store: %v", err)
|
|
}
|
|
|
|
jobs := cs2.ListJobs(true)
|
|
if len(jobs) != 1 || jobs[0].Name != "PersistMe" {
|
|
t.Errorf("Data corruption after reload. Got: %+v", jobs)
|
|
}
|
|
|
|
// test loading invalid JSON
|
|
os.WriteFile(tmpFile, []byte("{invalid json}"), 0o644)
|
|
cs3 := NewCronService(tmpFile, nil)
|
|
err := cs3.loadStore()
|
|
if err == nil {
|
|
t.Error("Should return error when loading invalid JSON")
|
|
}
|
|
}
|
|
|
|
func TestCronService_ConcurrentAccess(t *testing.T) {
|
|
cs, path := setupService(nil)
|
|
defer os.Remove(path)
|
|
|
|
cs.Start()
|
|
defer cs.Stop()
|
|
|
|
var wg sync.WaitGroup
|
|
workers := 10
|
|
iterations := 50
|
|
|
|
wg.Add(workers * 2)
|
|
|
|
// add jobs concurrently
|
|
for i := range workers {
|
|
go func(id int) {
|
|
defer wg.Done()
|
|
for j := range iterations {
|
|
at := time.Now().Add(time.Hour).UnixMilli()
|
|
cs.AddJob(fmt.Sprintf("Job-%d-%d", id, j), CronSchedule{Kind: "at", AtMS: &at}, "", "", "")
|
|
time.Sleep(100 * time.Microsecond)
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
// read and update jobs concurrently
|
|
for range workers {
|
|
go func() {
|
|
defer wg.Done()
|
|
for j := range iterations {
|
|
jobs := cs.ListJobs(true)
|
|
if len(jobs) > 0 {
|
|
cs.EnableJob(jobs[0].ID, j%2 == 0)
|
|
}
|
|
time.Sleep(100 * time.Microsecond)
|
|
}
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|