Fix 1886 media cleanup policy (#1887)

* fix(media): track cleanup ownership per path

Add explicit cleanup policy handling to MediaStore and count refs by path before deleting the underlying file. This prevents cleanup from removing shared files until the final ref is gone.

Refs #1886

* fix(tools): keep send_file refs forget-only

Mark send_file media registrations as forget-only so cleanup drops the ref without deleting the original workspace file.

Refs #1886

* fix(channels): declare managed media cleanup policy

Explicitly mark downloaded and managed channel media as delete-on-cleanup so media ownership is visible at each registration site.

Refs #1886
This commit is contained in:
美電球
2026-03-23 12:13:59 +08:00
committed by GitHub
parent 3a61892313
commit 75270c4777
14 changed files with 321 additions and 39 deletions
+3 -2
View File
@@ -396,8 +396,9 @@ func (c *DiscordChannel) handleMessage(s *discordgo.Session, m *discordgo.Messag
storeMedia := func(localPath, filename string) string {
if store := c.GetMediaStore(); store != nil {
ref, err := store.Store(localPath, media.MediaMeta{
Filename: filename,
Source: "discord",
Filename: filename,
Source: "discord",
CleanupPolicy: media.CleanupPolicyDeleteOnCleanup,
}, scope)
if err == nil {
return ref
+3 -2
View File
@@ -725,8 +725,9 @@ func (c *FeishuChannel) downloadResource(
out.Close()
ref, err := store.Store(localPath, media.MediaMeta{
Filename: filename,
Source: "feishu",
Filename: filename,
Source: "feishu",
CleanupPolicy: media.CleanupPolicyDeleteOnCleanup,
}, scope)
if err != nil {
logger.ErrorCF("feishu", "Failed to store downloaded resource", map[string]any{
+3 -2
View File
@@ -301,8 +301,9 @@ func (c *LINEChannel) processEvent(event lineEvent) {
storeMedia := func(localPath, filename string) string {
if store := c.GetMediaStore(); store != nil {
ref, err := store.Store(localPath, media.MediaMeta{
Filename: filename,
Source: "line",
Filename: filename,
Source: "line",
CleanupPolicy: media.CleanupPolicyDeleteOnCleanup,
}, scope)
if err == nil {
return ref
+3
View File
@@ -692,6 +692,9 @@ func (c *MatrixChannel) extractInboundMedia(
func (c *MatrixChannel) storeMedia(localPath string, meta media.MediaMeta, scope string) string {
if store := c.GetMediaStore(); store != nil {
if meta.CleanupPolicy == "" {
meta.CleanupPolicy = media.CleanupPolicyDeleteOnCleanup
}
ref, err := store.Store(localPath, meta, scope)
if err == nil {
return ref
+3 -2
View File
@@ -749,8 +749,9 @@ func (c *OneBotChannel) parseMessageSegments(
storeFile := func(localPath, filename string) string {
if store != nil {
ref, err := store.Store(localPath, media.MediaMeta{
Filename: filename,
Source: "onebot",
Filename: filename,
Source: "onebot",
CleanupPolicy: media.CleanupPolicyDeleteOnCleanup,
}, scope)
if err == nil {
return ref
+4 -3
View File
@@ -719,9 +719,10 @@ func (c *QQChannel) extractInboundAttachments(
storeMedia := func(localPath string, attachment *dto.MessageAttachment) string {
if store := c.GetMediaStore(); store != nil {
ref, err := store.Store(localPath, media.MediaMeta{
Filename: qqAttachmentFilename(attachment),
ContentType: attachment.ContentType,
Source: "qq",
Filename: qqAttachmentFilename(attachment),
ContentType: attachment.ContentType,
Source: "qq",
CleanupPolicy: media.CleanupPolicyDeleteOnCleanup,
}, scope)
if err == nil {
return ref
+3 -2
View File
@@ -327,8 +327,9 @@ func (c *SlackChannel) handleMessageEvent(ev *slackevents.MessageEvent) {
storeMedia := func(localPath, filename string) string {
if store := c.GetMediaStore(); store != nil {
ref, err := store.Store(localPath, media.MediaMeta{
Filename: filename,
Source: "slack",
Filename: filename,
Source: "slack",
CleanupPolicy: media.CleanupPolicyDeleteOnCleanup,
}, scope)
if err == nil {
return ref
+3 -2
View File
@@ -561,8 +561,9 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Mes
storeMedia := func(localPath, filename string) string {
if store := c.GetMediaStore(); store != nil {
ref, err := store.Store(localPath, media.MediaMeta{
Filename: filename,
Source: "telegram",
Filename: filename,
Source: "telegram",
CleanupPolicy: media.CleanupPolicyDeleteOnCleanup,
}, scope)
if err == nil {
return ref
+3 -2
View File
@@ -1218,8 +1218,9 @@ func (c *WeComAIBotWSChannel) storeWSMedia(
scope := channels.BuildMediaScope("wecom_aibot", chatID, msgID)
ref, err := store.Store(tmpPath, media.MediaMeta{
Filename: msgID + ext,
Source: "wecom_aibot",
Filename: msgID + ext,
Source: "wecom_aibot",
CleanupPolicy: media.CleanupPolicyDeleteOnCleanup,
}, scope)
if err != nil {
os.Remove(tmpPath)
+4 -3
View File
@@ -291,9 +291,10 @@ func (c *WeixinChannel) storeInboundBytes(
return "", err
}
ref, err := store.Store(tmpPath, media.MediaMeta{
Filename: filename,
ContentType: contentType,
Source: "weixin",
Filename: filename,
ContentType: contentType,
Source: "weixin",
CleanupPolicy: media.CleanupPolicyDeleteOnCleanup,
}, basechannels.BuildMediaScope("weixin", chatID, messageID))
if err != nil {
os.Remove(tmpPath)
+101 -16
View File
@@ -11,11 +11,25 @@ import (
"github.com/sipeed/picoclaw/pkg/logger"
)
// CleanupPolicy controls how the MediaStore treats the underlying file when
// a ref is released or expires.
type CleanupPolicy string
const (
// CleanupPolicyDeleteOnCleanup means the file is store-managed and may be
// deleted once the final ref for that path is gone.
CleanupPolicyDeleteOnCleanup CleanupPolicy = "delete_on_cleanup"
// CleanupPolicyForgetOnly means the store should only drop ref mappings and
// must never delete the underlying file.
CleanupPolicyForgetOnly CleanupPolicy = "forget_only"
)
// MediaMeta holds metadata about a stored media file.
type MediaMeta struct {
Filename string
ContentType string
Source string // "telegram", "discord", "tool:image-gen", etc.
Filename string
ContentType string
Source string // "telegram", "discord", "tool:image-gen", etc.
CleanupPolicy CleanupPolicy // defaults to CleanupPolicyDeleteOnCleanup
}
// MediaStore manages the lifecycle of media files associated with processing scopes.
@@ -23,6 +37,7 @@ type MediaStore interface {
// Store registers an existing local file under the given scope.
// Returns a ref identifier (e.g. "media://<id>").
// Store does not move or copy the file; it only records the mapping.
// If meta.CleanupPolicy is empty, CleanupPolicyDeleteOnCleanup is assumed.
Store(localPath string, meta MediaMeta, scope string) (ref string, err error)
// Resolve returns the local file path for a given ref.
@@ -43,6 +58,11 @@ type mediaEntry struct {
storedAt time.Time
}
type pathRefState struct {
refCount int
deleteEligible bool
}
// MediaCleanerConfig configures the background TTL cleanup.
type MediaCleanerConfig struct {
Enabled bool
@@ -57,6 +77,8 @@ type FileMediaStore struct {
refs map[string]mediaEntry
scopeToRefs map[string]map[string]struct{}
refToScope map[string]string
refToPath map[string]string
pathStates map[string]pathRefState
cleanerCfg MediaCleanerConfig
stop chan struct{}
@@ -71,6 +93,8 @@ func NewFileMediaStore() *FileMediaStore {
refs: make(map[string]mediaEntry),
scopeToRefs: make(map[string]map[string]struct{}),
refToScope: make(map[string]string),
refToPath: make(map[string]string),
pathStates: make(map[string]pathRefState),
nowFunc: time.Now,
}
}
@@ -81,6 +105,8 @@ func NewFileMediaStoreWithCleanup(cfg MediaCleanerConfig) *FileMediaStore {
refs: make(map[string]mediaEntry),
scopeToRefs: make(map[string]map[string]struct{}),
refToScope: make(map[string]string),
refToPath: make(map[string]string),
pathStates: make(map[string]pathRefState),
cleanerCfg: cfg,
stop: make(chan struct{}),
nowFunc: time.Now,
@@ -94,6 +120,7 @@ func (s *FileMediaStore) Store(localPath string, meta MediaMeta, scope string) (
}
ref := "media://" + uuid.New().String()
meta.CleanupPolicy = normalizeCleanupPolicy(meta.CleanupPolicy)
s.mu.Lock()
defer s.mu.Unlock()
@@ -104,6 +131,18 @@ func (s *FileMediaStore) Store(localPath string, meta MediaMeta, scope string) (
}
s.scopeToRefs[scope][ref] = struct{}{}
s.refToScope[ref] = scope
s.refToPath[ref] = localPath
pathState := s.pathStates[localPath]
if pathState.refCount == 0 {
pathState.deleteEligible = meta.CleanupPolicy == CleanupPolicyDeleteOnCleanup
} else if meta.CleanupPolicy == CleanupPolicyForgetOnly {
// Be conservative: once a path is borrowed externally, never let this
// lifecycle auto-delete it even if store-managed refs also exist.
pathState.deleteEligible = false
}
pathState.refCount++
s.pathStates[localPath] = pathState
return ref, nil
}
@@ -134,7 +173,8 @@ func (s *FileMediaStore) ResolveWithMeta(ref string) (string, MediaMeta, error)
// ReleaseAll removes all files under the given scope and cleans up mappings.
// Phase 1 (under lock): remove entries from maps.
// Phase 2 (no lock): delete files from disk.
// Phase 2 (no lock): delete store-managed files from disk once their final
// path ref is gone.
func (s *FileMediaStore) ReleaseAll(scope string) error {
// Phase 1: collect paths and remove from maps under lock
var paths []string
@@ -147,11 +187,13 @@ func (s *FileMediaStore) ReleaseAll(scope string) error {
}
for ref := range refs {
fallbackPath := ""
if entry, exists := s.refs[ref]; exists {
paths = append(paths, entry.path)
fallbackPath = entry.path
}
if removablePath, shouldDelete := s.releaseRefLocked(ref, fallbackPath); shouldDelete {
paths = append(paths, removablePath)
}
delete(s.refs, ref)
delete(s.refToScope, ref)
}
delete(s.scopeToRefs, scope)
s.mu.Unlock()
@@ -171,7 +213,7 @@ func (s *FileMediaStore) ReleaseAll(scope string) error {
// CleanExpired removes all entries older than MaxAge.
// Phase 1 (under lock): identify expired entries and remove from maps.
// Phase 2 (no lock): delete files from disk to minimize lock contention.
// Phase 2 (no lock): delete store-managed files from disk to minimize lock contention.
func (s *FileMediaStore) CleanExpired() int {
if s.cleanerCfg.MaxAge <= 0 {
return 0
@@ -179,8 +221,8 @@ func (s *FileMediaStore) CleanExpired() int {
// Phase 1: collect expired entries under lock
type expiredEntry struct {
ref string
path string
ref string
deletePath string
}
s.mu.Lock()
@@ -189,8 +231,6 @@ func (s *FileMediaStore) CleanExpired() int {
for ref, entry := range s.refs {
if entry.storedAt.Before(cutoff) {
expired = append(expired, expiredEntry{ref: ref, path: entry.path})
if scope, ok := s.refToScope[ref]; ok {
if scopeRefs, ok := s.scopeToRefs[scope]; ok {
delete(scopeRefs, ref)
@@ -200,17 +240,23 @@ func (s *FileMediaStore) CleanExpired() int {
}
}
delete(s.refs, ref)
delete(s.refToScope, ref)
expiredItem := expiredEntry{ref: ref}
if deletePath, shouldDelete := s.releaseRefLocked(ref, entry.path); shouldDelete {
expiredItem.deletePath = deletePath
}
expired = append(expired, expiredItem)
}
}
s.mu.Unlock()
// Phase 2: delete files without holding the lock
for _, e := range expired {
if err := os.Remove(e.path); err != nil && !os.IsNotExist(err) {
if e.deletePath == "" {
continue
}
if err := os.Remove(e.deletePath); err != nil && !os.IsNotExist(err) {
logger.WarnCF("media", "cleanup: failed to remove file", map[string]any{
"path": e.path,
"path": e.deletePath,
"error": err.Error(),
})
}
@@ -219,6 +265,45 @@ func (s *FileMediaStore) CleanExpired() int {
return len(expired)
}
func normalizeCleanupPolicy(policy CleanupPolicy) CleanupPolicy {
switch policy {
case "", CleanupPolicyDeleteOnCleanup:
return CleanupPolicyDeleteOnCleanup
case CleanupPolicyForgetOnly:
return CleanupPolicyForgetOnly
default:
return CleanupPolicyDeleteOnCleanup
}
}
func (s *FileMediaStore) releaseRefLocked(ref, fallbackPath string) (string, bool) {
path := fallbackPath
if storedPath, ok := s.refToPath[ref]; ok {
path = storedPath
delete(s.refToPath, ref)
}
delete(s.refs, ref)
delete(s.refToScope, ref)
if path == "" {
return "", false
}
pathState, ok := s.pathStates[path]
if !ok {
return "", false
}
if pathState.refCount <= 1 {
delete(s.pathStates, path)
return path, pathState.deleteEligible
}
pathState.refCount--
s.pathStates[path] = pathState
return "", false
}
// Start begins the background cleanup goroutine if cleanup is enabled.
// Safe to call multiple times; only the first call starts the goroutine.
func (s *FileMediaStore) Start() {
+176
View File
@@ -77,6 +77,106 @@ func TestReleaseAll(t *testing.T) {
}
}
func TestReleaseAllForgetOnlyKeepsFile(t *testing.T) {
dir := t.TempDir()
store := NewFileMediaStore()
path := createTempFile(t, dir, "workspace.txt")
ref, err := store.Store(path, MediaMeta{
Source: "test",
CleanupPolicy: CleanupPolicyForgetOnly,
}, "scope1")
if err != nil {
t.Fatalf("Store failed: %v", err)
}
if err := store.ReleaseAll("scope1"); err != nil {
t.Fatalf("ReleaseAll failed: %v", err)
}
if _, err := store.Resolve(ref); err == nil {
t.Error("forget-only ref should be unresolvable after release")
}
if _, err := os.Stat(path); err != nil {
t.Errorf("forget-only file should remain on disk: %v", err)
}
}
func TestReleaseAllSharedPathDeletesOnFinalRefOnly(t *testing.T) {
dir := t.TempDir()
store := NewFileMediaStore()
path := createTempFile(t, dir, "shared.jpg")
refA, err := store.Store(path, MediaMeta{
Source: "test",
CleanupPolicy: CleanupPolicyDeleteOnCleanup,
}, "scopeA")
if err != nil {
t.Fatalf("Store(scopeA) failed: %v", err)
}
refB, err := store.Store(path, MediaMeta{
Source: "test",
CleanupPolicy: CleanupPolicyDeleteOnCleanup,
}, "scopeB")
if err != nil {
t.Fatalf("Store(scopeB) failed: %v", err)
}
if err := store.ReleaseAll("scopeA"); err != nil {
t.Fatalf("ReleaseAll(scopeA) failed: %v", err)
}
if _, err := store.Resolve(refA); err == nil {
t.Error("refA should be unresolvable after ReleaseAll(scopeA)")
}
if _, err := store.Resolve(refB); err != nil {
t.Fatalf("refB should still resolve: %v", err)
}
if _, err := os.Stat(path); err != nil {
t.Errorf("shared file should remain until final ref is released: %v", err)
}
if err := store.ReleaseAll("scopeB"); err != nil {
t.Fatalf("ReleaseAll(scopeB) failed: %v", err)
}
if _, err := os.Stat(path); !os.IsNotExist(err) {
t.Error("shared file should be deleted after final ref is released")
}
}
func TestReleaseAllMixedPoliciesKeepsFile(t *testing.T) {
dir := t.TempDir()
store := NewFileMediaStore()
path := createTempFile(t, dir, "shared.txt")
if _, err := store.Store(path, MediaMeta{
Source: "test",
CleanupPolicy: CleanupPolicyDeleteOnCleanup,
}, "owned"); err != nil {
t.Fatalf("Store(owned) failed: %v", err)
}
if _, err := store.Store(path, MediaMeta{
Source: "test",
CleanupPolicy: CleanupPolicyForgetOnly,
}, "borrowed"); err != nil {
t.Fatalf("Store(borrowed) failed: %v", err)
}
if err := store.ReleaseAll("owned"); err != nil {
t.Fatalf("ReleaseAll(owned) failed: %v", err)
}
if _, err := os.Stat(path); err != nil {
t.Fatalf("mixed-policy file should remain after owned ref release: %v", err)
}
if err := store.ReleaseAll("borrowed"); err != nil {
t.Fatalf("ReleaseAll(borrowed) failed: %v", err)
}
if _, err := os.Stat(path); err != nil {
t.Errorf("mixed-policy path should not be auto-deleted: %v", err)
}
}
func TestMultiScopeIsolation(t *testing.T) {
dir := t.TempDir()
store := NewFileMediaStore()
@@ -293,6 +393,35 @@ func TestCleanExpiredRemovesOldEntries(t *testing.T) {
}
}
func TestCleanExpiredForgetOnlyKeepsFile(t *testing.T) {
dir := t.TempDir()
now := time.Now()
store := newTestStoreWithCleanup(10 * time.Minute)
store.nowFunc = func() time.Time { return now.Add(-20 * time.Minute) }
path := createTempFile(t, dir, "workspace.txt")
ref, err := store.Store(path, MediaMeta{
Source: "test",
CleanupPolicy: CleanupPolicyForgetOnly,
}, "scope1")
if err != nil {
t.Fatalf("Store failed: %v", err)
}
store.nowFunc = func() time.Time { return now }
removed := store.CleanExpired()
if removed != 1 {
t.Errorf("expected 1 removed, got %d", removed)
}
if _, err := store.Resolve(ref); err == nil {
t.Error("expired forget-only ref should be unresolvable")
}
if _, err := os.Stat(path); err != nil {
t.Errorf("forget-only file should remain on disk: %v", err)
}
}
func TestCleanExpiredKeepsNonExpired(t *testing.T) {
dir := t.TempDir()
now := time.Now()
@@ -346,6 +475,53 @@ func TestCleanExpiredMixedAges(t *testing.T) {
}
}
func TestCleanExpiredSharedPathDeletesOnFinalRefOnly(t *testing.T) {
dir := t.TempDir()
now := time.Now()
store := newTestStoreWithCleanup(10 * time.Minute)
path := createTempFile(t, dir, "shared.jpg")
store.nowFunc = func() time.Time { return now.Add(-20 * time.Minute) }
oldRef, err := store.Store(path, MediaMeta{
Source: "test",
CleanupPolicy: CleanupPolicyDeleteOnCleanup,
}, "scope-old")
if err != nil {
t.Fatalf("Store(old) failed: %v", err)
}
store.nowFunc = func() time.Time { return now }
freshRef, err := store.Store(path, MediaMeta{
Source: "test",
CleanupPolicy: CleanupPolicyDeleteOnCleanup,
}, "scope-fresh")
if err != nil {
t.Fatalf("Store(fresh) failed: %v", err)
}
removed := store.CleanExpired()
if removed != 1 {
t.Errorf("expected 1 removed, got %d", removed)
}
if _, err := store.Resolve(oldRef); err == nil {
t.Error("old ref should be gone after cleanup")
}
if _, err := store.Resolve(freshRef); err != nil {
t.Fatalf("fresh ref should still resolve: %v", err)
}
if _, err := os.Stat(path); err != nil {
t.Errorf("shared file should remain while fresh ref exists: %v", err)
}
if err := store.ReleaseAll("scope-fresh"); err != nil {
t.Fatalf("ReleaseAll(scope-fresh) failed: %v", err)
}
if _, err := os.Stat(path); !os.IsNotExist(err) {
t.Error("shared file should be deleted after final ref is released")
}
}
func TestCleanExpiredCleansEmptyScopes(t *testing.T) {
dir := t.TempDir()
now := time.Now()
+4 -3
View File
@@ -133,9 +133,10 @@ func (t *SendFileTool) Execute(ctx context.Context, args map[string]any) *ToolRe
scope := fmt.Sprintf("tool:send_file:%s:%s", channel, chatID)
ref, err := t.mediaStore.Store(resolved, media.MediaMeta{
Filename: filename,
ContentType: mediaType,
Source: "tool:send_file",
Filename: filename,
ContentType: mediaType,
Source: "tool:send_file",
CleanupPolicy: media.CleanupPolicyForgetOnly,
}, scope)
if err != nil {
return ErrorResult(fmt.Sprintf("failed to register media: %v", err))
+8
View File
@@ -104,6 +104,14 @@ func TestSendFileTool_Success(t *testing.T) {
if result.Media[0][:8] != "media://" {
t.Errorf("expected media:// ref, got %q", result.Media[0])
}
_, meta, err := store.ResolveWithMeta(result.Media[0])
if err != nil {
t.Fatalf("ResolveWithMeta failed: %v", err)
}
if meta.CleanupPolicy != media.CleanupPolicyForgetOnly {
t.Errorf("CleanupPolicy = %q, want %q", meta.CleanupPolicy, media.CleanupPolicyForgetOnly)
}
}
func TestSendFileTool_CustomFilename(t *testing.T) {