mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
fix(media): address review comments on TTL cleanup
- CleanExpired: split into two phases — collect expired entries under lock, then delete files after releasing the lock to minimize contention - CleanExpired: guard against zero MaxAge (no-op if unconfigured) - CleanExpired: log file removal errors instead of silently ignoring - Start: protect with startOnce to prevent multiple goroutines - Stop: rename once -> stopOnce for clarity - cmd_gateway: call mediaStore.Stop() on error path after Start() - Add TestCleanExpiredZeroMaxAge and double-Start test Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -124,6 +124,7 @@ func gatewayCmd(debug bool) error {
|
||||
|
||||
channelManager, err := channels.NewManager(cfg, msgBus, mediaStore)
|
||||
if err != nil {
|
||||
mediaStore.Stop()
|
||||
return fmt.Errorf("error creating channel manager: %w", err)
|
||||
}
|
||||
|
||||
|
||||
+44
-26
@@ -59,7 +59,8 @@ type FileMediaStore struct {
|
||||
|
||||
cleanerCfg MediaCleanerConfig
|
||||
stop chan struct{}
|
||||
once sync.Once
|
||||
startOnce sync.Once
|
||||
stopOnce sync.Once
|
||||
nowFunc func() time.Time // for testing
|
||||
}
|
||||
|
||||
@@ -155,20 +156,26 @@ func (s *FileMediaStore) ReleaseAll(scope string) error {
|
||||
}
|
||||
|
||||
// CleanExpired removes all entries older than MaxAge.
|
||||
// Both the file on disk and the in-memory references are deleted atomically
|
||||
// under the same mutex, preventing dangling references.
|
||||
// Phase 1 (under lock): identify expired entries and remove from maps.
|
||||
// Phase 2 (no lock): delete files from disk to minimize lock contention.
|
||||
func (s *FileMediaStore) CleanExpired() int {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.cleanerCfg.MaxAge <= 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
// Phase 1: collect expired entries under lock
|
||||
type expiredEntry struct {
|
||||
ref string
|
||||
path string
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
cutoff := s.nowFunc().Add(-s.cleanerCfg.MaxAge)
|
||||
removed := 0
|
||||
var expired []expiredEntry
|
||||
|
||||
for ref, entry := range s.refs {
|
||||
if entry.storedAt.Before(cutoff) {
|
||||
if err := os.Remove(entry.path); err != nil && !os.IsNotExist(err) {
|
||||
// Log but continue — best effort cleanup
|
||||
}
|
||||
expired = append(expired, expiredEntry{ref: ref, path: entry.path})
|
||||
|
||||
scope := s.refToScope[ref]
|
||||
if scopeRefs, ok := s.scopeToRefs[scope]; ok {
|
||||
@@ -180,45 +187,56 @@ func (s *FileMediaStore) CleanExpired() int {
|
||||
|
||||
delete(s.refs, ref)
|
||||
delete(s.refToScope, ref)
|
||||
removed++
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
// Phase 2: delete files without holding the lock
|
||||
for _, e := range expired {
|
||||
if err := os.Remove(e.path); err != nil && !os.IsNotExist(err) {
|
||||
log.Printf("[media] cleanup: failed to remove %s: %v", e.path, err)
|
||||
}
|
||||
}
|
||||
|
||||
return removed
|
||||
return len(expired)
|
||||
}
|
||||
|
||||
// Start begins the background cleanup goroutine if cleanup is enabled.
|
||||
// Safe to call multiple times; only the first call starts the goroutine.
|
||||
func (s *FileMediaStore) Start() {
|
||||
if !s.cleanerCfg.Enabled || s.stop == nil {
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("[media] cleanup enabled: interval=%s, max_age=%s",
|
||||
s.cleanerCfg.Interval, s.cleanerCfg.MaxAge)
|
||||
s.startOnce.Do(func() {
|
||||
log.Printf("[media] cleanup enabled: interval=%s, max_age=%s",
|
||||
s.cleanerCfg.Interval, s.cleanerCfg.MaxAge)
|
||||
|
||||
go func() {
|
||||
ticker := time.NewTicker(s.cleanerCfg.Interval)
|
||||
defer ticker.Stop()
|
||||
go func() {
|
||||
ticker := time.NewTicker(s.cleanerCfg.Interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if n := s.CleanExpired(); n > 0 {
|
||||
log.Printf("[media] cleanup: removed %d expired entries", n)
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if n := s.CleanExpired(); n > 0 {
|
||||
log.Printf("[media] cleanup: removed %d expired entries", n)
|
||||
}
|
||||
case <-s.stop:
|
||||
return
|
||||
}
|
||||
case <-s.stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}()
|
||||
})
|
||||
}
|
||||
|
||||
// Stop terminates the background cleanup goroutine.
|
||||
// Safe to call multiple times; only the first call closes the channel.
|
||||
func (s *FileMediaStore) Stop() {
|
||||
if s.stop == nil {
|
||||
return
|
||||
}
|
||||
s.once.Do(func() {
|
||||
s.stopOnce.Do(func() {
|
||||
close(s.stop)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -345,6 +345,8 @@ func TestStartStopLifecycle(t *testing.T) {
|
||||
|
||||
// Start and stop should not panic
|
||||
store.Start()
|
||||
// Double start should not spawn a second goroutine
|
||||
store.Start()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
store.Stop()
|
||||
|
||||
@@ -352,6 +354,27 @@ func TestStartStopLifecycle(t *testing.T) {
|
||||
store.Stop()
|
||||
}
|
||||
|
||||
func TestCleanExpiredZeroMaxAge(t *testing.T) {
|
||||
store := NewFileMediaStoreWithCleanup(MediaCleanerConfig{
|
||||
Enabled: true,
|
||||
MaxAge: 0,
|
||||
Interval: time.Hour,
|
||||
})
|
||||
|
||||
dir := t.TempDir()
|
||||
path := createTempFile(t, dir, "file.jpg")
|
||||
ref, _ := store.Store(path, MediaMeta{Source: "test"}, "scope1")
|
||||
|
||||
// Zero MaxAge should be a no-op
|
||||
removed := store.CleanExpired()
|
||||
if removed != 0 {
|
||||
t.Errorf("expected 0 removed with zero MaxAge, got %d", removed)
|
||||
}
|
||||
if _, err := store.Resolve(ref); err != nil {
|
||||
t.Errorf("ref should still resolve: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStartDisabledIsNoop(t *testing.T) {
|
||||
store := NewFileMediaStoreWithCleanup(MediaCleanerConfig{
|
||||
Enabled: false,
|
||||
|
||||
Reference in New Issue
Block a user