mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
feat(media): integrate TTL cleanup into FileMediaStore
Add background TTL-based cleanup (L2 safety net) directly into FileMediaStore so file deletion and in-memory ref removal happen atomically under the same mutex, preventing dangling references. - Add storedAt timestamp and refToScope reverse map to mediaEntry - Add CleanExpired() for atomic TTL-based expiration - Add Start()/Stop() for background goroutine lifecycle - Add MediaCleanupConfig (enabled, max_age, interval) to config - Wire up in cmd_gateway.go with config-driven defaults - Add 8 new tests including concurrent cleanup safety Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -114,8 +114,13 @@ func gatewayCmd(debug bool) error {
|
||||
return tools.SilentResult(response)
|
||||
})
|
||||
|
||||
// Create media store for file lifecycle management
|
||||
mediaStore := media.NewFileMediaStore()
|
||||
// Create media store for file lifecycle management with TTL cleanup
|
||||
mediaStore := media.NewFileMediaStoreWithCleanup(media.MediaCleanerConfig{
|
||||
Enabled: cfg.Tools.MediaCleanup.Enabled,
|
||||
MaxAge: time.Duration(cfg.Tools.MediaCleanup.MaxAge) * time.Minute,
|
||||
Interval: time.Duration(cfg.Tools.MediaCleanup.Interval) * time.Minute,
|
||||
})
|
||||
mediaStore.Start()
|
||||
|
||||
channelManager, err := channels.NewManager(cfg, msgBus, mediaStore)
|
||||
if err != nil {
|
||||
@@ -195,6 +200,7 @@ func gatewayCmd(debug bool) error {
|
||||
deviceService.Stop()
|
||||
heartbeatService.Stop()
|
||||
cronService.Stop()
|
||||
mediaStore.Stop()
|
||||
agentLoop.Stop()
|
||||
fmt.Println("✓ Gateway stopped")
|
||||
|
||||
|
||||
+11
-4
@@ -519,11 +519,18 @@ type ExecConfig struct {
|
||||
CustomDenyPatterns []string `json:"custom_deny_patterns" env:"PICOCLAW_TOOLS_EXEC_CUSTOM_DENY_PATTERNS"`
|
||||
}
|
||||
|
||||
type MediaCleanupConfig struct {
|
||||
Enabled bool `json:"enabled" env:"PICOCLAW_MEDIA_CLEANUP_ENABLED"`
|
||||
MaxAge int `json:"max_age_minutes" env:"PICOCLAW_MEDIA_CLEANUP_MAX_AGE"`
|
||||
Interval int `json:"interval_minutes" env:"PICOCLAW_MEDIA_CLEANUP_INTERVAL"`
|
||||
}
|
||||
|
||||
type ToolsConfig struct {
|
||||
Web WebToolsConfig `json:"web"`
|
||||
Cron CronToolsConfig `json:"cron"`
|
||||
Exec ExecConfig `json:"exec"`
|
||||
Skills SkillsToolsConfig `json:"skills"`
|
||||
Web WebToolsConfig `json:"web"`
|
||||
Cron CronToolsConfig `json:"cron"`
|
||||
Exec ExecConfig `json:"exec"`
|
||||
Skills SkillsToolsConfig `json:"skills"`
|
||||
MediaCleanup MediaCleanupConfig `json:"media_cleanup"`
|
||||
}
|
||||
|
||||
type SkillsToolsConfig struct {
|
||||
|
||||
@@ -291,6 +291,11 @@ func DefaultConfig() *Config {
|
||||
Port: 18790,
|
||||
},
|
||||
Tools: ToolsConfig{
|
||||
MediaCleanup: MediaCleanupConfig{
|
||||
Enabled: true,
|
||||
MaxAge: 30,
|
||||
Interval: 5,
|
||||
},
|
||||
Web: WebToolsConfig{
|
||||
Proxy: "",
|
||||
Brave: BraveConfig{
|
||||
|
||||
+105
-4
@@ -2,8 +2,10 @@ package media
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
@@ -35,8 +37,16 @@ type MediaStore interface {
|
||||
|
||||
// mediaEntry holds the path and metadata for a stored media file.
|
||||
type mediaEntry struct {
|
||||
path string
|
||||
meta MediaMeta
|
||||
path string
|
||||
meta MediaMeta
|
||||
storedAt time.Time
|
||||
}
|
||||
|
||||
// MediaCleanerConfig configures the background TTL cleanup.
|
||||
type MediaCleanerConfig struct {
|
||||
Enabled bool
|
||||
MaxAge time.Duration
|
||||
Interval time.Duration
|
||||
}
|
||||
|
||||
// FileMediaStore is a pure in-memory implementation of MediaStore.
|
||||
@@ -45,13 +55,33 @@ type FileMediaStore struct {
|
||||
mu sync.RWMutex
|
||||
refs map[string]mediaEntry
|
||||
scopeToRefs map[string]map[string]struct{}
|
||||
refToScope map[string]string
|
||||
|
||||
cleanerCfg MediaCleanerConfig
|
||||
stop chan struct{}
|
||||
once sync.Once
|
||||
nowFunc func() time.Time // for testing
|
||||
}
|
||||
|
||||
// NewFileMediaStore creates a new FileMediaStore.
|
||||
// NewFileMediaStore creates a new FileMediaStore without background cleanup.
|
||||
func NewFileMediaStore() *FileMediaStore {
|
||||
return &FileMediaStore{
|
||||
refs: make(map[string]mediaEntry),
|
||||
scopeToRefs: make(map[string]map[string]struct{}),
|
||||
refToScope: make(map[string]string),
|
||||
nowFunc: time.Now,
|
||||
}
|
||||
}
|
||||
|
||||
// NewFileMediaStoreWithCleanup creates a FileMediaStore with TTL-based background cleanup.
|
||||
func NewFileMediaStoreWithCleanup(cfg MediaCleanerConfig) *FileMediaStore {
|
||||
return &FileMediaStore{
|
||||
refs: make(map[string]mediaEntry),
|
||||
scopeToRefs: make(map[string]map[string]struct{}),
|
||||
refToScope: make(map[string]string),
|
||||
cleanerCfg: cfg,
|
||||
stop: make(chan struct{}),
|
||||
nowFunc: time.Now,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,11 +96,12 @@ func (s *FileMediaStore) Store(localPath string, meta MediaMeta, scope string) (
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.refs[ref] = mediaEntry{path: localPath, meta: meta}
|
||||
s.refs[ref] = mediaEntry{path: localPath, meta: meta, storedAt: s.nowFunc()}
|
||||
if s.scopeToRefs[scope] == nil {
|
||||
s.scopeToRefs[scope] = make(map[string]struct{})
|
||||
}
|
||||
s.scopeToRefs[scope][ref] = struct{}{}
|
||||
s.refToScope[ref] = scope
|
||||
|
||||
return ref, nil
|
||||
}
|
||||
@@ -115,9 +146,79 @@ func (s *FileMediaStore) ReleaseAll(scope string) error {
|
||||
// Log but continue — best effort cleanup
|
||||
}
|
||||
delete(s.refs, ref)
|
||||
delete(s.refToScope, ref)
|
||||
}
|
||||
}
|
||||
|
||||
delete(s.scopeToRefs, scope)
|
||||
return nil
|
||||
}
|
||||
|
||||
// CleanExpired removes all entries older than MaxAge.
|
||||
// Both the file on disk and the in-memory references are deleted atomically
|
||||
// under the same mutex, preventing dangling references.
|
||||
func (s *FileMediaStore) CleanExpired() int {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
cutoff := s.nowFunc().Add(-s.cleanerCfg.MaxAge)
|
||||
removed := 0
|
||||
|
||||
for ref, entry := range s.refs {
|
||||
if entry.storedAt.Before(cutoff) {
|
||||
if err := os.Remove(entry.path); err != nil && !os.IsNotExist(err) {
|
||||
// Log but continue — best effort cleanup
|
||||
}
|
||||
|
||||
scope := s.refToScope[ref]
|
||||
if scopeRefs, ok := s.scopeToRefs[scope]; ok {
|
||||
delete(scopeRefs, ref)
|
||||
if len(scopeRefs) == 0 {
|
||||
delete(s.scopeToRefs, scope)
|
||||
}
|
||||
}
|
||||
|
||||
delete(s.refs, ref)
|
||||
delete(s.refToScope, ref)
|
||||
removed++
|
||||
}
|
||||
}
|
||||
|
||||
return removed
|
||||
}
|
||||
|
||||
// Start begins the background cleanup goroutine if cleanup is enabled.
|
||||
func (s *FileMediaStore) Start() {
|
||||
if !s.cleanerCfg.Enabled || s.stop == nil {
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("[media] cleanup enabled: interval=%s, max_age=%s",
|
||||
s.cleanerCfg.Interval, s.cleanerCfg.MaxAge)
|
||||
|
||||
go func() {
|
||||
ticker := time.NewTicker(s.cleanerCfg.Interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if n := s.CleanExpired(); n > 0 {
|
||||
log.Printf("[media] cleanup: removed %d expired entries", n)
|
||||
}
|
||||
case <-s.stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Stop terminates the background cleanup goroutine.
|
||||
func (s *FileMediaStore) Stop() {
|
||||
if s.stop == nil {
|
||||
return
|
||||
}
|
||||
s.once.Do(func() {
|
||||
close(s.stop)
|
||||
})
|
||||
}
|
||||
|
||||
+234
-1
@@ -1,11 +1,13 @@
|
||||
package media
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func createTempFile(t *testing.T, dir, name string) string {
|
||||
@@ -140,7 +142,8 @@ func TestStoreNonexistentFile(t *testing.T) {
|
||||
t.Error("Store should fail for nonexistent file")
|
||||
}
|
||||
// Error message should include the underlying os error, not just "file does not exist"
|
||||
if !strings.Contains(err.Error(), "no such file or directory") {
|
||||
if !strings.Contains(err.Error(), "no such file or directory") &&
|
||||
!strings.Contains(err.Error(), "cannot find") {
|
||||
t.Errorf("Error should contain OS error detail, got: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -221,3 +224,233 @@ func TestConcurrentSafety(t *testing.T) {
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// --- TTL cleanup tests ---
|
||||
|
||||
func newTestStoreWithCleanup(maxAge time.Duration) *FileMediaStore {
|
||||
s := NewFileMediaStoreWithCleanup(MediaCleanerConfig{
|
||||
Enabled: true,
|
||||
MaxAge: maxAge,
|
||||
Interval: time.Hour, // won't tick in tests
|
||||
})
|
||||
return s
|
||||
}
|
||||
|
||||
func TestCleanExpiredRemovesOldEntries(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
now := time.Now()
|
||||
store := newTestStoreWithCleanup(10 * time.Minute)
|
||||
store.nowFunc = func() time.Time { return now.Add(-20 * time.Minute) }
|
||||
|
||||
path := createTempFile(t, dir, "old.jpg")
|
||||
ref, err := store.Store(path, MediaMeta{Source: "test"}, "scope1")
|
||||
if err != nil {
|
||||
t.Fatalf("Store failed: %v", err)
|
||||
}
|
||||
|
||||
// Advance clock to present
|
||||
store.nowFunc = func() time.Time { return now }
|
||||
removed := store.CleanExpired()
|
||||
|
||||
if removed != 1 {
|
||||
t.Errorf("expected 1 removed, got %d", removed)
|
||||
}
|
||||
if _, err := store.Resolve(ref); err == nil {
|
||||
t.Error("expired ref should be unresolvable")
|
||||
}
|
||||
if _, err := os.Stat(path); !os.IsNotExist(err) {
|
||||
t.Error("expired file should be deleted")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCleanExpiredKeepsNonExpired(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
now := time.Now()
|
||||
store := newTestStoreWithCleanup(10 * time.Minute)
|
||||
store.nowFunc = func() time.Time { return now }
|
||||
|
||||
path := createTempFile(t, dir, "fresh.jpg")
|
||||
ref, err := store.Store(path, MediaMeta{Source: "test"}, "scope1")
|
||||
if err != nil {
|
||||
t.Fatalf("Store failed: %v", err)
|
||||
}
|
||||
|
||||
removed := store.CleanExpired()
|
||||
if removed != 0 {
|
||||
t.Errorf("expected 0 removed, got %d", removed)
|
||||
}
|
||||
|
||||
if _, err := store.Resolve(ref); err != nil {
|
||||
t.Errorf("fresh ref should still resolve: %v", err)
|
||||
}
|
||||
if _, err := os.Stat(path); err != nil {
|
||||
t.Error("fresh file should still exist")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCleanExpiredMixedAges(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
now := time.Now()
|
||||
store := newTestStoreWithCleanup(10 * time.Minute)
|
||||
|
||||
// Store old entry
|
||||
store.nowFunc = func() time.Time { return now.Add(-20 * time.Minute) }
|
||||
oldPath := createTempFile(t, dir, "old.jpg")
|
||||
oldRef, _ := store.Store(oldPath, MediaMeta{Source: "test"}, "scope1")
|
||||
|
||||
// Store fresh entry
|
||||
store.nowFunc = func() time.Time { return now }
|
||||
freshPath := createTempFile(t, dir, "fresh.jpg")
|
||||
freshRef, _ := store.Store(freshPath, MediaMeta{Source: "test"}, "scope1")
|
||||
|
||||
removed := store.CleanExpired()
|
||||
if removed != 1 {
|
||||
t.Errorf("expected 1 removed, got %d", removed)
|
||||
}
|
||||
|
||||
if _, err := store.Resolve(oldRef); err == nil {
|
||||
t.Error("old ref should be gone")
|
||||
}
|
||||
if _, err := store.Resolve(freshRef); err != nil {
|
||||
t.Errorf("fresh ref should still resolve: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCleanExpiredCleansEmptyScopes(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
now := time.Now()
|
||||
store := newTestStoreWithCleanup(10 * time.Minute)
|
||||
|
||||
// Store old entry as the only one in scope
|
||||
store.nowFunc = func() time.Time { return now.Add(-20 * time.Minute) }
|
||||
path := createTempFile(t, dir, "only.jpg")
|
||||
store.Store(path, MediaMeta{Source: "test"}, "lonely_scope")
|
||||
|
||||
store.nowFunc = func() time.Time { return now }
|
||||
store.CleanExpired()
|
||||
|
||||
store.mu.RLock()
|
||||
defer store.mu.RUnlock()
|
||||
if _, ok := store.scopeToRefs["lonely_scope"]; ok {
|
||||
t.Error("empty scope should be cleaned up")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStartStopLifecycle(t *testing.T) {
|
||||
store := NewFileMediaStoreWithCleanup(MediaCleanerConfig{
|
||||
Enabled: true,
|
||||
MaxAge: time.Minute,
|
||||
Interval: 50 * time.Millisecond,
|
||||
})
|
||||
|
||||
// Start and stop should not panic
|
||||
store.Start()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
store.Stop()
|
||||
|
||||
// Double stop should not panic
|
||||
store.Stop()
|
||||
}
|
||||
|
||||
func TestStartDisabledIsNoop(t *testing.T) {
|
||||
store := NewFileMediaStoreWithCleanup(MediaCleanerConfig{
|
||||
Enabled: false,
|
||||
MaxAge: time.Minute,
|
||||
Interval: time.Minute,
|
||||
})
|
||||
// Should not start any goroutine or panic
|
||||
store.Start()
|
||||
store.Stop()
|
||||
}
|
||||
|
||||
func TestConcurrentCleanupSafety(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
store := newTestStoreWithCleanup(50 * time.Millisecond)
|
||||
store.nowFunc = time.Now
|
||||
|
||||
const workers = 10
|
||||
const ops = 20
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(workers * 4)
|
||||
|
||||
// Store workers
|
||||
for w := 0; w < workers; w++ {
|
||||
go func(wIdx int) {
|
||||
defer wg.Done()
|
||||
scope := fmt.Sprintf("scope-%d", wIdx)
|
||||
for i := 0; i < ops; i++ {
|
||||
p := createTempFile(t, dir, fmt.Sprintf("w%d-f%d.tmp", wIdx, i))
|
||||
store.Store(p, MediaMeta{Source: "test"}, scope)
|
||||
}
|
||||
}(w)
|
||||
}
|
||||
|
||||
// Resolve workers
|
||||
for w := 0; w < workers; w++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < ops; i++ {
|
||||
store.Resolve("media://nonexistent")
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// ReleaseAll workers
|
||||
for w := 0; w < workers; w++ {
|
||||
go func(wIdx int) {
|
||||
defer wg.Done()
|
||||
for i := 0; i < ops; i++ {
|
||||
store.ReleaseAll(fmt.Sprintf("scope-%d", wIdx))
|
||||
}
|
||||
}(w)
|
||||
}
|
||||
|
||||
// CleanExpired workers
|
||||
for w := 0; w < workers; w++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < ops; i++ {
|
||||
store.CleanExpired()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestRefToScopeConsistency(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
store := NewFileMediaStore()
|
||||
|
||||
// Store entries in two scopes
|
||||
ref1, _ := store.Store(createTempFile(t, dir, "a.jpg"), MediaMeta{Source: "test"}, "s1")
|
||||
ref2, _ := store.Store(createTempFile(t, dir, "b.jpg"), MediaMeta{Source: "test"}, "s1")
|
||||
ref3, _ := store.Store(createTempFile(t, dir, "c.jpg"), MediaMeta{Source: "test"}, "s2")
|
||||
|
||||
store.mu.RLock()
|
||||
checkRef := func(ref, expectedScope string) {
|
||||
t.Helper()
|
||||
if scope, ok := store.refToScope[ref]; !ok || scope != expectedScope {
|
||||
t.Errorf("refToScope[%s] = %q, want %q", ref, scope, expectedScope)
|
||||
}
|
||||
}
|
||||
checkRef(ref1, "s1")
|
||||
checkRef(ref2, "s1")
|
||||
checkRef(ref3, "s2")
|
||||
store.mu.RUnlock()
|
||||
|
||||
// Release s1 and verify refToScope is cleaned
|
||||
store.ReleaseAll("s1")
|
||||
|
||||
store.mu.RLock()
|
||||
defer store.mu.RUnlock()
|
||||
if _, ok := store.refToScope[ref1]; ok {
|
||||
t.Error("refToScope should not contain ref1 after ReleaseAll")
|
||||
}
|
||||
if _, ok := store.refToScope[ref2]; ok {
|
||||
t.Error("refToScope should not contain ref2 after ReleaseAll")
|
||||
}
|
||||
if _, ok := store.refToScope[ref3]; !ok {
|
||||
t.Error("refToScope should still contain ref3")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user