From 4ada4063d7862cf4a1fc4637afce2da7ff722d2c Mon Sep 17 00:00:00 2001 From: ex-takashima Date: Tue, 24 Feb 2026 21:24:32 +0900 Subject: [PATCH] feat(media): integrate TTL cleanup into FileMediaStore Add background TTL-based cleanup (L2 safety net) directly into FileMediaStore so file deletion and in-memory ref removal happen atomically under the same mutex, preventing dangling references. - Add storedAt timestamp and refToScope reverse map to mediaEntry - Add CleanExpired() for atomic TTL-based expiration - Add Start()/Stop() for background goroutine lifecycle - Add MediaCleanupConfig (enabled, max_age, interval) to config - Wire up in cmd_gateway.go with config-driven defaults - Add 8 new tests including concurrent cleanup safety Co-Authored-By: Claude Opus 4.6 --- cmd/picoclaw/internal/gateway/helpers.go | 10 +- pkg/config/config.go | 15 +- pkg/config/defaults.go | 5 + pkg/media/store.go | 109 ++++++++++- pkg/media/store_test.go | 235 ++++++++++++++++++++++- 5 files changed, 363 insertions(+), 11 deletions(-) diff --git a/cmd/picoclaw/internal/gateway/helpers.go b/cmd/picoclaw/internal/gateway/helpers.go index dd93087f4..8255fb6e8 100644 --- a/cmd/picoclaw/internal/gateway/helpers.go +++ b/cmd/picoclaw/internal/gateway/helpers.go @@ -114,8 +114,13 @@ func gatewayCmd(debug bool) error { return tools.SilentResult(response) }) - // Create media store for file lifecycle management - mediaStore := media.NewFileMediaStore() + // Create media store for file lifecycle management with TTL cleanup + mediaStore := media.NewFileMediaStoreWithCleanup(media.MediaCleanerConfig{ + Enabled: cfg.Tools.MediaCleanup.Enabled, + MaxAge: time.Duration(cfg.Tools.MediaCleanup.MaxAge) * time.Minute, + Interval: time.Duration(cfg.Tools.MediaCleanup.Interval) * time.Minute, + }) + mediaStore.Start() channelManager, err := channels.NewManager(cfg, msgBus, mediaStore) if err != nil { @@ -195,6 +200,7 @@ func gatewayCmd(debug bool) error { deviceService.Stop() heartbeatService.Stop() cronService.Stop() + mediaStore.Stop() agentLoop.Stop() fmt.Println("✓ Gateway stopped") diff --git a/pkg/config/config.go b/pkg/config/config.go index 56453cd33..64cdf6eac 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -519,11 +519,18 @@ type ExecConfig struct { CustomDenyPatterns []string `json:"custom_deny_patterns" env:"PICOCLAW_TOOLS_EXEC_CUSTOM_DENY_PATTERNS"` } +type MediaCleanupConfig struct { + Enabled bool `json:"enabled" env:"PICOCLAW_MEDIA_CLEANUP_ENABLED"` + MaxAge int `json:"max_age_minutes" env:"PICOCLAW_MEDIA_CLEANUP_MAX_AGE"` + Interval int `json:"interval_minutes" env:"PICOCLAW_MEDIA_CLEANUP_INTERVAL"` +} + type ToolsConfig struct { - Web WebToolsConfig `json:"web"` - Cron CronToolsConfig `json:"cron"` - Exec ExecConfig `json:"exec"` - Skills SkillsToolsConfig `json:"skills"` + Web WebToolsConfig `json:"web"` + Cron CronToolsConfig `json:"cron"` + Exec ExecConfig `json:"exec"` + Skills SkillsToolsConfig `json:"skills"` + MediaCleanup MediaCleanupConfig `json:"media_cleanup"` } type SkillsToolsConfig struct { diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index 8445510e2..d19ce1d38 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -291,6 +291,11 @@ func DefaultConfig() *Config { Port: 18790, }, Tools: ToolsConfig{ + MediaCleanup: MediaCleanupConfig{ + Enabled: true, + MaxAge: 30, + Interval: 5, + }, Web: WebToolsConfig{ Proxy: "", Brave: BraveConfig{ diff --git a/pkg/media/store.go b/pkg/media/store.go index 2df4420e9..6851f9512 100644 --- a/pkg/media/store.go +++ b/pkg/media/store.go @@ -2,8 +2,10 @@ package media import ( "fmt" + "log" "os" "sync" + "time" "github.com/google/uuid" ) @@ -35,8 +37,16 @@ type MediaStore interface { // mediaEntry holds the path and metadata for a stored media file. type mediaEntry struct { - path string - meta MediaMeta + path string + meta MediaMeta + storedAt time.Time +} + +// MediaCleanerConfig configures the background TTL cleanup. +type MediaCleanerConfig struct { + Enabled bool + MaxAge time.Duration + Interval time.Duration } // FileMediaStore is a pure in-memory implementation of MediaStore. @@ -45,13 +55,33 @@ type FileMediaStore struct { mu sync.RWMutex refs map[string]mediaEntry scopeToRefs map[string]map[string]struct{} + refToScope map[string]string + + cleanerCfg MediaCleanerConfig + stop chan struct{} + once sync.Once + nowFunc func() time.Time // for testing } -// NewFileMediaStore creates a new FileMediaStore. +// NewFileMediaStore creates a new FileMediaStore without background cleanup. func NewFileMediaStore() *FileMediaStore { return &FileMediaStore{ refs: make(map[string]mediaEntry), scopeToRefs: make(map[string]map[string]struct{}), + refToScope: make(map[string]string), + nowFunc: time.Now, + } +} + +// NewFileMediaStoreWithCleanup creates a FileMediaStore with TTL-based background cleanup. +func NewFileMediaStoreWithCleanup(cfg MediaCleanerConfig) *FileMediaStore { + return &FileMediaStore{ + refs: make(map[string]mediaEntry), + scopeToRefs: make(map[string]map[string]struct{}), + refToScope: make(map[string]string), + cleanerCfg: cfg, + stop: make(chan struct{}), + nowFunc: time.Now, } } @@ -66,11 +96,12 @@ func (s *FileMediaStore) Store(localPath string, meta MediaMeta, scope string) ( s.mu.Lock() defer s.mu.Unlock() - s.refs[ref] = mediaEntry{path: localPath, meta: meta} + s.refs[ref] = mediaEntry{path: localPath, meta: meta, storedAt: s.nowFunc()} if s.scopeToRefs[scope] == nil { s.scopeToRefs[scope] = make(map[string]struct{}) } s.scopeToRefs[scope][ref] = struct{}{} + s.refToScope[ref] = scope return ref, nil } @@ -115,9 +146,79 @@ func (s *FileMediaStore) ReleaseAll(scope string) error { // Log but continue — best effort cleanup } delete(s.refs, ref) + delete(s.refToScope, ref) } } delete(s.scopeToRefs, scope) return nil } + +// CleanExpired removes all entries older than MaxAge. +// Both the file on disk and the in-memory references are deleted atomically +// under the same mutex, preventing dangling references. +func (s *FileMediaStore) CleanExpired() int { + s.mu.Lock() + defer s.mu.Unlock() + + cutoff := s.nowFunc().Add(-s.cleanerCfg.MaxAge) + removed := 0 + + for ref, entry := range s.refs { + if entry.storedAt.Before(cutoff) { + if err := os.Remove(entry.path); err != nil && !os.IsNotExist(err) { + // Log but continue — best effort cleanup + } + + scope := s.refToScope[ref] + if scopeRefs, ok := s.scopeToRefs[scope]; ok { + delete(scopeRefs, ref) + if len(scopeRefs) == 0 { + delete(s.scopeToRefs, scope) + } + } + + delete(s.refs, ref) + delete(s.refToScope, ref) + removed++ + } + } + + return removed +} + +// Start begins the background cleanup goroutine if cleanup is enabled. +func (s *FileMediaStore) Start() { + if !s.cleanerCfg.Enabled || s.stop == nil { + return + } + + log.Printf("[media] cleanup enabled: interval=%s, max_age=%s", + s.cleanerCfg.Interval, s.cleanerCfg.MaxAge) + + go func() { + ticker := time.NewTicker(s.cleanerCfg.Interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if n := s.CleanExpired(); n > 0 { + log.Printf("[media] cleanup: removed %d expired entries", n) + } + case <-s.stop: + return + } + } + }() +} + +// Stop terminates the background cleanup goroutine. +func (s *FileMediaStore) Stop() { + if s.stop == nil { + return + } + s.once.Do(func() { + close(s.stop) + }) +} diff --git a/pkg/media/store_test.go b/pkg/media/store_test.go index 95bd1eb7a..e934bf9c0 100644 --- a/pkg/media/store_test.go +++ b/pkg/media/store_test.go @@ -1,11 +1,13 @@ package media import ( + "fmt" "os" "path/filepath" "strings" "sync" "testing" + "time" ) func createTempFile(t *testing.T, dir, name string) string { @@ -140,7 +142,8 @@ func TestStoreNonexistentFile(t *testing.T) { t.Error("Store should fail for nonexistent file") } // Error message should include the underlying os error, not just "file does not exist" - if !strings.Contains(err.Error(), "no such file or directory") { + if !strings.Contains(err.Error(), "no such file or directory") && + !strings.Contains(err.Error(), "cannot find") { t.Errorf("Error should contain OS error detail, got: %v", err) } } @@ -221,3 +224,233 @@ func TestConcurrentSafety(t *testing.T) { wg.Wait() } + +// --- TTL cleanup tests --- + +func newTestStoreWithCleanup(maxAge time.Duration) *FileMediaStore { + s := NewFileMediaStoreWithCleanup(MediaCleanerConfig{ + Enabled: true, + MaxAge: maxAge, + Interval: time.Hour, // won't tick in tests + }) + return s +} + +func TestCleanExpiredRemovesOldEntries(t *testing.T) { + dir := t.TempDir() + now := time.Now() + store := newTestStoreWithCleanup(10 * time.Minute) + store.nowFunc = func() time.Time { return now.Add(-20 * time.Minute) } + + path := createTempFile(t, dir, "old.jpg") + ref, err := store.Store(path, MediaMeta{Source: "test"}, "scope1") + if err != nil { + t.Fatalf("Store failed: %v", err) + } + + // Advance clock to present + store.nowFunc = func() time.Time { return now } + removed := store.CleanExpired() + + if removed != 1 { + t.Errorf("expected 1 removed, got %d", removed) + } + if _, err := store.Resolve(ref); err == nil { + t.Error("expired ref should be unresolvable") + } + if _, err := os.Stat(path); !os.IsNotExist(err) { + t.Error("expired file should be deleted") + } +} + +func TestCleanExpiredKeepsNonExpired(t *testing.T) { + dir := t.TempDir() + now := time.Now() + store := newTestStoreWithCleanup(10 * time.Minute) + store.nowFunc = func() time.Time { return now } + + path := createTempFile(t, dir, "fresh.jpg") + ref, err := store.Store(path, MediaMeta{Source: "test"}, "scope1") + if err != nil { + t.Fatalf("Store failed: %v", err) + } + + removed := store.CleanExpired() + if removed != 0 { + t.Errorf("expected 0 removed, got %d", removed) + } + + if _, err := store.Resolve(ref); err != nil { + t.Errorf("fresh ref should still resolve: %v", err) + } + if _, err := os.Stat(path); err != nil { + t.Error("fresh file should still exist") + } +} + +func TestCleanExpiredMixedAges(t *testing.T) { + dir := t.TempDir() + now := time.Now() + store := newTestStoreWithCleanup(10 * time.Minute) + + // Store old entry + store.nowFunc = func() time.Time { return now.Add(-20 * time.Minute) } + oldPath := createTempFile(t, dir, "old.jpg") + oldRef, _ := store.Store(oldPath, MediaMeta{Source: "test"}, "scope1") + + // Store fresh entry + store.nowFunc = func() time.Time { return now } + freshPath := createTempFile(t, dir, "fresh.jpg") + freshRef, _ := store.Store(freshPath, MediaMeta{Source: "test"}, "scope1") + + removed := store.CleanExpired() + if removed != 1 { + t.Errorf("expected 1 removed, got %d", removed) + } + + if _, err := store.Resolve(oldRef); err == nil { + t.Error("old ref should be gone") + } + if _, err := store.Resolve(freshRef); err != nil { + t.Errorf("fresh ref should still resolve: %v", err) + } +} + +func TestCleanExpiredCleansEmptyScopes(t *testing.T) { + dir := t.TempDir() + now := time.Now() + store := newTestStoreWithCleanup(10 * time.Minute) + + // Store old entry as the only one in scope + store.nowFunc = func() time.Time { return now.Add(-20 * time.Minute) } + path := createTempFile(t, dir, "only.jpg") + store.Store(path, MediaMeta{Source: "test"}, "lonely_scope") + + store.nowFunc = func() time.Time { return now } + store.CleanExpired() + + store.mu.RLock() + defer store.mu.RUnlock() + if _, ok := store.scopeToRefs["lonely_scope"]; ok { + t.Error("empty scope should be cleaned up") + } +} + +func TestStartStopLifecycle(t *testing.T) { + store := NewFileMediaStoreWithCleanup(MediaCleanerConfig{ + Enabled: true, + MaxAge: time.Minute, + Interval: 50 * time.Millisecond, + }) + + // Start and stop should not panic + store.Start() + time.Sleep(100 * time.Millisecond) + store.Stop() + + // Double stop should not panic + store.Stop() +} + +func TestStartDisabledIsNoop(t *testing.T) { + store := NewFileMediaStoreWithCleanup(MediaCleanerConfig{ + Enabled: false, + MaxAge: time.Minute, + Interval: time.Minute, + }) + // Should not start any goroutine or panic + store.Start() + store.Stop() +} + +func TestConcurrentCleanupSafety(t *testing.T) { + dir := t.TempDir() + store := newTestStoreWithCleanup(50 * time.Millisecond) + store.nowFunc = time.Now + + const workers = 10 + const ops = 20 + var wg sync.WaitGroup + wg.Add(workers * 4) + + // Store workers + for w := 0; w < workers; w++ { + go func(wIdx int) { + defer wg.Done() + scope := fmt.Sprintf("scope-%d", wIdx) + for i := 0; i < ops; i++ { + p := createTempFile(t, dir, fmt.Sprintf("w%d-f%d.tmp", wIdx, i)) + store.Store(p, MediaMeta{Source: "test"}, scope) + } + }(w) + } + + // Resolve workers + for w := 0; w < workers; w++ { + go func() { + defer wg.Done() + for i := 0; i < ops; i++ { + store.Resolve("media://nonexistent") + } + }() + } + + // ReleaseAll workers + for w := 0; w < workers; w++ { + go func(wIdx int) { + defer wg.Done() + for i := 0; i < ops; i++ { + store.ReleaseAll(fmt.Sprintf("scope-%d", wIdx)) + } + }(w) + } + + // CleanExpired workers + for w := 0; w < workers; w++ { + go func() { + defer wg.Done() + for i := 0; i < ops; i++ { + store.CleanExpired() + } + }() + } + + wg.Wait() +} + +func TestRefToScopeConsistency(t *testing.T) { + dir := t.TempDir() + store := NewFileMediaStore() + + // Store entries in two scopes + ref1, _ := store.Store(createTempFile(t, dir, "a.jpg"), MediaMeta{Source: "test"}, "s1") + ref2, _ := store.Store(createTempFile(t, dir, "b.jpg"), MediaMeta{Source: "test"}, "s1") + ref3, _ := store.Store(createTempFile(t, dir, "c.jpg"), MediaMeta{Source: "test"}, "s2") + + store.mu.RLock() + checkRef := func(ref, expectedScope string) { + t.Helper() + if scope, ok := store.refToScope[ref]; !ok || scope != expectedScope { + t.Errorf("refToScope[%s] = %q, want %q", ref, scope, expectedScope) + } + } + checkRef(ref1, "s1") + checkRef(ref2, "s1") + checkRef(ref3, "s2") + store.mu.RUnlock() + + // Release s1 and verify refToScope is cleaned + store.ReleaseAll("s1") + + store.mu.RLock() + defer store.mu.RUnlock() + if _, ok := store.refToScope[ref1]; ok { + t.Error("refToScope should not contain ref1 after ReleaseAll") + } + if _, ok := store.refToScope[ref2]; ok { + t.Error("refToScope should not contain ref2 after ReleaseAll") + } + if _, ok := store.refToScope[ref3]; !ok { + t.Error("refToScope should still contain ref3") + } +}