mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-05-25 16:00:35 +00:00
75270c4777
* fix(media): track cleanup ownership per path Add explicit cleanup policy handling to MediaStore and count refs by path before deleting the underlying file. This prevents cleanup from removing shared files until the final ref is gone. Refs #1886 * fix(tools): keep send_file refs forget-only Mark send_file media registrations as forget-only so cleanup drops the ref without deleting the original workspace file. Refs #1886 * fix(channels): declare managed media cleanup policy Explicitly mark downloaded and managed channel media as delete-on-cleanup so media ownership is visible at each registration site. Refs #1886
357 lines
9.7 KiB
Go
357 lines
9.7 KiB
Go
package media
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
|
|
"github.com/sipeed/picoclaw/pkg/logger"
|
|
)
|
|
|
|
// CleanupPolicy controls how the MediaStore treats the underlying file when
|
|
// a ref is released or expires.
|
|
type CleanupPolicy string
|
|
|
|
const (
|
|
// CleanupPolicyDeleteOnCleanup means the file is store-managed and may be
|
|
// deleted once the final ref for that path is gone.
|
|
CleanupPolicyDeleteOnCleanup CleanupPolicy = "delete_on_cleanup"
|
|
// CleanupPolicyForgetOnly means the store should only drop ref mappings and
|
|
// must never delete the underlying file.
|
|
CleanupPolicyForgetOnly CleanupPolicy = "forget_only"
|
|
)
|
|
|
|
// MediaMeta holds metadata about a stored media file.
|
|
type MediaMeta struct {
|
|
Filename string
|
|
ContentType string
|
|
Source string // "telegram", "discord", "tool:image-gen", etc.
|
|
CleanupPolicy CleanupPolicy // defaults to CleanupPolicyDeleteOnCleanup
|
|
}
|
|
|
|
// MediaStore manages the lifecycle of media files associated with processing scopes.
|
|
type MediaStore interface {
|
|
// Store registers an existing local file under the given scope.
|
|
// Returns a ref identifier (e.g. "media://<id>").
|
|
// Store does not move or copy the file; it only records the mapping.
|
|
// If meta.CleanupPolicy is empty, CleanupPolicyDeleteOnCleanup is assumed.
|
|
Store(localPath string, meta MediaMeta, scope string) (ref string, err error)
|
|
|
|
// Resolve returns the local file path for a given ref.
|
|
Resolve(ref string) (localPath string, err error)
|
|
|
|
// ResolveWithMeta returns the local file path and metadata for a given ref.
|
|
ResolveWithMeta(ref string) (localPath string, meta MediaMeta, err error)
|
|
|
|
// ReleaseAll deletes all files registered under the given scope
|
|
// and removes the mapping entries. File-not-exist errors are ignored.
|
|
ReleaseAll(scope string) error
|
|
}
|
|
|
|
// mediaEntry holds the path and metadata for a stored media file.
|
|
type mediaEntry struct {
|
|
path string
|
|
meta MediaMeta
|
|
storedAt time.Time
|
|
}
|
|
|
|
type pathRefState struct {
|
|
refCount int
|
|
deleteEligible bool
|
|
}
|
|
|
|
// MediaCleanerConfig configures the background TTL cleanup.
|
|
type MediaCleanerConfig struct {
|
|
Enabled bool
|
|
MaxAge time.Duration
|
|
Interval time.Duration
|
|
}
|
|
|
|
// FileMediaStore is a pure in-memory implementation of MediaStore.
|
|
// Files are expected to already exist on disk (e.g. in /tmp/picoclaw_media/).
|
|
type FileMediaStore struct {
|
|
mu sync.RWMutex
|
|
refs map[string]mediaEntry
|
|
scopeToRefs map[string]map[string]struct{}
|
|
refToScope map[string]string
|
|
refToPath map[string]string
|
|
pathStates map[string]pathRefState
|
|
|
|
cleanerCfg MediaCleanerConfig
|
|
stop chan struct{}
|
|
startOnce sync.Once
|
|
stopOnce sync.Once
|
|
nowFunc func() time.Time // for testing
|
|
}
|
|
|
|
// NewFileMediaStore creates a new FileMediaStore without background cleanup.
|
|
func NewFileMediaStore() *FileMediaStore {
|
|
return &FileMediaStore{
|
|
refs: make(map[string]mediaEntry),
|
|
scopeToRefs: make(map[string]map[string]struct{}),
|
|
refToScope: make(map[string]string),
|
|
refToPath: make(map[string]string),
|
|
pathStates: make(map[string]pathRefState),
|
|
nowFunc: time.Now,
|
|
}
|
|
}
|
|
|
|
// NewFileMediaStoreWithCleanup creates a FileMediaStore with TTL-based background cleanup.
|
|
func NewFileMediaStoreWithCleanup(cfg MediaCleanerConfig) *FileMediaStore {
|
|
return &FileMediaStore{
|
|
refs: make(map[string]mediaEntry),
|
|
scopeToRefs: make(map[string]map[string]struct{}),
|
|
refToScope: make(map[string]string),
|
|
refToPath: make(map[string]string),
|
|
pathStates: make(map[string]pathRefState),
|
|
cleanerCfg: cfg,
|
|
stop: make(chan struct{}),
|
|
nowFunc: time.Now,
|
|
}
|
|
}
|
|
|
|
// Store registers a local file under the given scope. The file must exist.
|
|
func (s *FileMediaStore) Store(localPath string, meta MediaMeta, scope string) (string, error) {
|
|
if _, err := os.Stat(localPath); err != nil {
|
|
return "", fmt.Errorf("media store: %s: %w", localPath, err)
|
|
}
|
|
|
|
ref := "media://" + uuid.New().String()
|
|
meta.CleanupPolicy = normalizeCleanupPolicy(meta.CleanupPolicy)
|
|
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
s.refs[ref] = mediaEntry{path: localPath, meta: meta, storedAt: s.nowFunc()}
|
|
if s.scopeToRefs[scope] == nil {
|
|
s.scopeToRefs[scope] = make(map[string]struct{})
|
|
}
|
|
s.scopeToRefs[scope][ref] = struct{}{}
|
|
s.refToScope[ref] = scope
|
|
s.refToPath[ref] = localPath
|
|
|
|
pathState := s.pathStates[localPath]
|
|
if pathState.refCount == 0 {
|
|
pathState.deleteEligible = meta.CleanupPolicy == CleanupPolicyDeleteOnCleanup
|
|
} else if meta.CleanupPolicy == CleanupPolicyForgetOnly {
|
|
// Be conservative: once a path is borrowed externally, never let this
|
|
// lifecycle auto-delete it even if store-managed refs also exist.
|
|
pathState.deleteEligible = false
|
|
}
|
|
pathState.refCount++
|
|
s.pathStates[localPath] = pathState
|
|
|
|
return ref, nil
|
|
}
|
|
|
|
// Resolve returns the local path for the given ref.
|
|
func (s *FileMediaStore) Resolve(ref string) (string, error) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
entry, ok := s.refs[ref]
|
|
if !ok {
|
|
return "", fmt.Errorf("media store: unknown ref: %s", ref)
|
|
}
|
|
return entry.path, nil
|
|
}
|
|
|
|
// ResolveWithMeta returns the local path and metadata for the given ref.
|
|
func (s *FileMediaStore) ResolveWithMeta(ref string) (string, MediaMeta, error) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
entry, ok := s.refs[ref]
|
|
if !ok {
|
|
return "", MediaMeta{}, fmt.Errorf("media store: unknown ref: %s", ref)
|
|
}
|
|
return entry.path, entry.meta, nil
|
|
}
|
|
|
|
// ReleaseAll removes all files under the given scope and cleans up mappings.
|
|
// Phase 1 (under lock): remove entries from maps.
|
|
// Phase 2 (no lock): delete store-managed files from disk once their final
|
|
// path ref is gone.
|
|
func (s *FileMediaStore) ReleaseAll(scope string) error {
|
|
// Phase 1: collect paths and remove from maps under lock
|
|
var paths []string
|
|
|
|
s.mu.Lock()
|
|
refs, ok := s.scopeToRefs[scope]
|
|
if !ok {
|
|
s.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
for ref := range refs {
|
|
fallbackPath := ""
|
|
if entry, exists := s.refs[ref]; exists {
|
|
fallbackPath = entry.path
|
|
}
|
|
if removablePath, shouldDelete := s.releaseRefLocked(ref, fallbackPath); shouldDelete {
|
|
paths = append(paths, removablePath)
|
|
}
|
|
}
|
|
delete(s.scopeToRefs, scope)
|
|
s.mu.Unlock()
|
|
|
|
// Phase 2: delete files without holding the lock
|
|
for _, p := range paths {
|
|
if err := os.Remove(p); err != nil && !os.IsNotExist(err) {
|
|
logger.WarnCF("media", "release: failed to remove file", map[string]any{
|
|
"path": p,
|
|
"error": err.Error(),
|
|
})
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// CleanExpired removes all entries older than MaxAge.
|
|
// Phase 1 (under lock): identify expired entries and remove from maps.
|
|
// Phase 2 (no lock): delete store-managed files from disk to minimize lock contention.
|
|
func (s *FileMediaStore) CleanExpired() int {
|
|
if s.cleanerCfg.MaxAge <= 0 {
|
|
return 0
|
|
}
|
|
|
|
// Phase 1: collect expired entries under lock
|
|
type expiredEntry struct {
|
|
ref string
|
|
deletePath string
|
|
}
|
|
|
|
s.mu.Lock()
|
|
cutoff := s.nowFunc().Add(-s.cleanerCfg.MaxAge)
|
|
var expired []expiredEntry
|
|
|
|
for ref, entry := range s.refs {
|
|
if entry.storedAt.Before(cutoff) {
|
|
if scope, ok := s.refToScope[ref]; ok {
|
|
if scopeRefs, ok := s.scopeToRefs[scope]; ok {
|
|
delete(scopeRefs, ref)
|
|
if len(scopeRefs) == 0 {
|
|
delete(s.scopeToRefs, scope)
|
|
}
|
|
}
|
|
}
|
|
|
|
expiredItem := expiredEntry{ref: ref}
|
|
if deletePath, shouldDelete := s.releaseRefLocked(ref, entry.path); shouldDelete {
|
|
expiredItem.deletePath = deletePath
|
|
}
|
|
expired = append(expired, expiredItem)
|
|
}
|
|
}
|
|
s.mu.Unlock()
|
|
|
|
// Phase 2: delete files without holding the lock
|
|
for _, e := range expired {
|
|
if e.deletePath == "" {
|
|
continue
|
|
}
|
|
if err := os.Remove(e.deletePath); err != nil && !os.IsNotExist(err) {
|
|
logger.WarnCF("media", "cleanup: failed to remove file", map[string]any{
|
|
"path": e.deletePath,
|
|
"error": err.Error(),
|
|
})
|
|
}
|
|
}
|
|
|
|
return len(expired)
|
|
}
|
|
|
|
func normalizeCleanupPolicy(policy CleanupPolicy) CleanupPolicy {
|
|
switch policy {
|
|
case "", CleanupPolicyDeleteOnCleanup:
|
|
return CleanupPolicyDeleteOnCleanup
|
|
case CleanupPolicyForgetOnly:
|
|
return CleanupPolicyForgetOnly
|
|
default:
|
|
return CleanupPolicyDeleteOnCleanup
|
|
}
|
|
}
|
|
|
|
func (s *FileMediaStore) releaseRefLocked(ref, fallbackPath string) (string, bool) {
|
|
path := fallbackPath
|
|
if storedPath, ok := s.refToPath[ref]; ok {
|
|
path = storedPath
|
|
delete(s.refToPath, ref)
|
|
}
|
|
|
|
delete(s.refs, ref)
|
|
delete(s.refToScope, ref)
|
|
|
|
if path == "" {
|
|
return "", false
|
|
}
|
|
|
|
pathState, ok := s.pathStates[path]
|
|
if !ok {
|
|
return "", false
|
|
}
|
|
if pathState.refCount <= 1 {
|
|
delete(s.pathStates, path)
|
|
return path, pathState.deleteEligible
|
|
}
|
|
|
|
pathState.refCount--
|
|
s.pathStates[path] = pathState
|
|
return "", false
|
|
}
|
|
|
|
// Start begins the background cleanup goroutine if cleanup is enabled.
|
|
// Safe to call multiple times; only the first call starts the goroutine.
|
|
func (s *FileMediaStore) Start() {
|
|
if !s.cleanerCfg.Enabled || s.stop == nil {
|
|
return
|
|
}
|
|
if s.cleanerCfg.Interval <= 0 || s.cleanerCfg.MaxAge <= 0 {
|
|
logger.WarnCF("media", "cleanup: skipped due to invalid config", map[string]any{
|
|
"interval": s.cleanerCfg.Interval.String(),
|
|
"max_age": s.cleanerCfg.MaxAge.String(),
|
|
})
|
|
return
|
|
}
|
|
|
|
s.startOnce.Do(func() {
|
|
logger.InfoCF("media", "cleanup enabled", map[string]any{
|
|
"interval": s.cleanerCfg.Interval.String(),
|
|
"max_age": s.cleanerCfg.MaxAge.String(),
|
|
})
|
|
|
|
go func() {
|
|
ticker := time.NewTicker(s.cleanerCfg.Interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
if n := s.CleanExpired(); n > 0 {
|
|
logger.InfoCF("media", "cleanup: removed expired entries", map[string]any{
|
|
"count": n,
|
|
})
|
|
}
|
|
case <-s.stop:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
})
|
|
}
|
|
|
|
// Stop terminates the background cleanup goroutine.
|
|
// Safe to call multiple times; only the first call closes the channel.
|
|
func (s *FileMediaStore) Stop() {
|
|
if s.stop == nil {
|
|
return
|
|
}
|
|
s.stopOnce.Do(func() {
|
|
close(s.stop)
|
|
})
|
|
}
|