diff --git a/pkg/channels/discord/discord.go b/pkg/channels/discord/discord.go index 83a04907c..297bfe89f 100644 --- a/pkg/channels/discord/discord.go +++ b/pkg/channels/discord/discord.go @@ -396,8 +396,9 @@ func (c *DiscordChannel) handleMessage(s *discordgo.Session, m *discordgo.Messag storeMedia := func(localPath, filename string) string { if store := c.GetMediaStore(); store != nil { ref, err := store.Store(localPath, media.MediaMeta{ - Filename: filename, - Source: "discord", + Filename: filename, + Source: "discord", + CleanupPolicy: media.CleanupPolicyDeleteOnCleanup, }, scope) if err == nil { return ref diff --git a/pkg/channels/feishu/feishu_64.go b/pkg/channels/feishu/feishu_64.go index 37a74718a..abc9291f6 100644 --- a/pkg/channels/feishu/feishu_64.go +++ b/pkg/channels/feishu/feishu_64.go @@ -725,8 +725,9 @@ func (c *FeishuChannel) downloadResource( out.Close() ref, err := store.Store(localPath, media.MediaMeta{ - Filename: filename, - Source: "feishu", + Filename: filename, + Source: "feishu", + CleanupPolicy: media.CleanupPolicyDeleteOnCleanup, }, scope) if err != nil { logger.ErrorCF("feishu", "Failed to store downloaded resource", map[string]any{ diff --git a/pkg/channels/line/line.go b/pkg/channels/line/line.go index 56ba02183..b2cdb6267 100644 --- a/pkg/channels/line/line.go +++ b/pkg/channels/line/line.go @@ -301,8 +301,9 @@ func (c *LINEChannel) processEvent(event lineEvent) { storeMedia := func(localPath, filename string) string { if store := c.GetMediaStore(); store != nil { ref, err := store.Store(localPath, media.MediaMeta{ - Filename: filename, - Source: "line", + Filename: filename, + Source: "line", + CleanupPolicy: media.CleanupPolicyDeleteOnCleanup, }, scope) if err == nil { return ref diff --git a/pkg/channels/matrix/matrix.go b/pkg/channels/matrix/matrix.go index 4cbe95c5c..fa16dd414 100644 --- a/pkg/channels/matrix/matrix.go +++ b/pkg/channels/matrix/matrix.go @@ -692,6 +692,9 @@ func (c *MatrixChannel) extractInboundMedia( func (c *MatrixChannel) storeMedia(localPath string, meta media.MediaMeta, scope string) string { if store := c.GetMediaStore(); store != nil { + if meta.CleanupPolicy == "" { + meta.CleanupPolicy = media.CleanupPolicyDeleteOnCleanup + } ref, err := store.Store(localPath, meta, scope) if err == nil { return ref diff --git a/pkg/channels/onebot/onebot.go b/pkg/channels/onebot/onebot.go index 62a9eb34a..b4bd1970c 100644 --- a/pkg/channels/onebot/onebot.go +++ b/pkg/channels/onebot/onebot.go @@ -749,8 +749,9 @@ func (c *OneBotChannel) parseMessageSegments( storeFile := func(localPath, filename string) string { if store != nil { ref, err := store.Store(localPath, media.MediaMeta{ - Filename: filename, - Source: "onebot", + Filename: filename, + Source: "onebot", + CleanupPolicy: media.CleanupPolicyDeleteOnCleanup, }, scope) if err == nil { return ref diff --git a/pkg/channels/qq/qq.go b/pkg/channels/qq/qq.go index 2cd6e1747..9daf24f93 100644 --- a/pkg/channels/qq/qq.go +++ b/pkg/channels/qq/qq.go @@ -719,9 +719,10 @@ func (c *QQChannel) extractInboundAttachments( storeMedia := func(localPath string, attachment *dto.MessageAttachment) string { if store := c.GetMediaStore(); store != nil { ref, err := store.Store(localPath, media.MediaMeta{ - Filename: qqAttachmentFilename(attachment), - ContentType: attachment.ContentType, - Source: "qq", + Filename: qqAttachmentFilename(attachment), + ContentType: attachment.ContentType, + Source: "qq", + CleanupPolicy: media.CleanupPolicyDeleteOnCleanup, }, scope) if err == nil { return ref diff --git a/pkg/channels/slack/slack.go b/pkg/channels/slack/slack.go index 3ee849621..f12c74cd7 100644 --- a/pkg/channels/slack/slack.go +++ b/pkg/channels/slack/slack.go @@ -327,8 +327,9 @@ func (c *SlackChannel) handleMessageEvent(ev *slackevents.MessageEvent) { storeMedia := func(localPath, filename string) string { if store := c.GetMediaStore(); store != nil { ref, err := store.Store(localPath, media.MediaMeta{ - Filename: filename, - Source: "slack", + Filename: filename, + Source: "slack", + CleanupPolicy: media.CleanupPolicyDeleteOnCleanup, }, scope) if err == nil { return ref diff --git a/pkg/channels/telegram/telegram.go b/pkg/channels/telegram/telegram.go index 3eb89c636..18b034213 100644 --- a/pkg/channels/telegram/telegram.go +++ b/pkg/channels/telegram/telegram.go @@ -561,8 +561,9 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Mes storeMedia := func(localPath, filename string) string { if store := c.GetMediaStore(); store != nil { ref, err := store.Store(localPath, media.MediaMeta{ - Filename: filename, - Source: "telegram", + Filename: filename, + Source: "telegram", + CleanupPolicy: media.CleanupPolicyDeleteOnCleanup, }, scope) if err == nil { return ref diff --git a/pkg/channels/wecom/aibot_ws.go b/pkg/channels/wecom/aibot_ws.go index 830e763b9..feecd1f4b 100644 --- a/pkg/channels/wecom/aibot_ws.go +++ b/pkg/channels/wecom/aibot_ws.go @@ -1218,8 +1218,9 @@ func (c *WeComAIBotWSChannel) storeWSMedia( scope := channels.BuildMediaScope("wecom_aibot", chatID, msgID) ref, err := store.Store(tmpPath, media.MediaMeta{ - Filename: msgID + ext, - Source: "wecom_aibot", + Filename: msgID + ext, + Source: "wecom_aibot", + CleanupPolicy: media.CleanupPolicyDeleteOnCleanup, }, scope) if err != nil { os.Remove(tmpPath) diff --git a/pkg/channels/weixin/media.go b/pkg/channels/weixin/media.go index 0332f48f6..72af27438 100644 --- a/pkg/channels/weixin/media.go +++ b/pkg/channels/weixin/media.go @@ -291,9 +291,10 @@ func (c *WeixinChannel) storeInboundBytes( return "", err } ref, err := store.Store(tmpPath, media.MediaMeta{ - Filename: filename, - ContentType: contentType, - Source: "weixin", + Filename: filename, + ContentType: contentType, + Source: "weixin", + CleanupPolicy: media.CleanupPolicyDeleteOnCleanup, }, basechannels.BuildMediaScope("weixin", chatID, messageID)) if err != nil { os.Remove(tmpPath) diff --git a/pkg/media/store.go b/pkg/media/store.go index 30220986c..78cff8bb6 100644 --- a/pkg/media/store.go +++ b/pkg/media/store.go @@ -11,11 +11,25 @@ import ( "github.com/sipeed/picoclaw/pkg/logger" ) +// CleanupPolicy controls how the MediaStore treats the underlying file when +// a ref is released or expires. +type CleanupPolicy string + +const ( + // CleanupPolicyDeleteOnCleanup means the file is store-managed and may be + // deleted once the final ref for that path is gone. + CleanupPolicyDeleteOnCleanup CleanupPolicy = "delete_on_cleanup" + // CleanupPolicyForgetOnly means the store should only drop ref mappings and + // must never delete the underlying file. + CleanupPolicyForgetOnly CleanupPolicy = "forget_only" +) + // MediaMeta holds metadata about a stored media file. type MediaMeta struct { - Filename string - ContentType string - Source string // "telegram", "discord", "tool:image-gen", etc. + Filename string + ContentType string + Source string // "telegram", "discord", "tool:image-gen", etc. + CleanupPolicy CleanupPolicy // defaults to CleanupPolicyDeleteOnCleanup } // MediaStore manages the lifecycle of media files associated with processing scopes. @@ -23,6 +37,7 @@ type MediaStore interface { // Store registers an existing local file under the given scope. // Returns a ref identifier (e.g. "media://"). // Store does not move or copy the file; it only records the mapping. + // If meta.CleanupPolicy is empty, CleanupPolicyDeleteOnCleanup is assumed. Store(localPath string, meta MediaMeta, scope string) (ref string, err error) // Resolve returns the local file path for a given ref. @@ -43,6 +58,11 @@ type mediaEntry struct { storedAt time.Time } +type pathRefState struct { + refCount int + deleteEligible bool +} + // MediaCleanerConfig configures the background TTL cleanup. type MediaCleanerConfig struct { Enabled bool @@ -57,6 +77,8 @@ type FileMediaStore struct { refs map[string]mediaEntry scopeToRefs map[string]map[string]struct{} refToScope map[string]string + refToPath map[string]string + pathStates map[string]pathRefState cleanerCfg MediaCleanerConfig stop chan struct{} @@ -71,6 +93,8 @@ func NewFileMediaStore() *FileMediaStore { refs: make(map[string]mediaEntry), scopeToRefs: make(map[string]map[string]struct{}), refToScope: make(map[string]string), + refToPath: make(map[string]string), + pathStates: make(map[string]pathRefState), nowFunc: time.Now, } } @@ -81,6 +105,8 @@ func NewFileMediaStoreWithCleanup(cfg MediaCleanerConfig) *FileMediaStore { refs: make(map[string]mediaEntry), scopeToRefs: make(map[string]map[string]struct{}), refToScope: make(map[string]string), + refToPath: make(map[string]string), + pathStates: make(map[string]pathRefState), cleanerCfg: cfg, stop: make(chan struct{}), nowFunc: time.Now, @@ -94,6 +120,7 @@ func (s *FileMediaStore) Store(localPath string, meta MediaMeta, scope string) ( } ref := "media://" + uuid.New().String() + meta.CleanupPolicy = normalizeCleanupPolicy(meta.CleanupPolicy) s.mu.Lock() defer s.mu.Unlock() @@ -104,6 +131,18 @@ func (s *FileMediaStore) Store(localPath string, meta MediaMeta, scope string) ( } s.scopeToRefs[scope][ref] = struct{}{} s.refToScope[ref] = scope + s.refToPath[ref] = localPath + + pathState := s.pathStates[localPath] + if pathState.refCount == 0 { + pathState.deleteEligible = meta.CleanupPolicy == CleanupPolicyDeleteOnCleanup + } else if meta.CleanupPolicy == CleanupPolicyForgetOnly { + // Be conservative: once a path is borrowed externally, never let this + // lifecycle auto-delete it even if store-managed refs also exist. + pathState.deleteEligible = false + } + pathState.refCount++ + s.pathStates[localPath] = pathState return ref, nil } @@ -134,7 +173,8 @@ func (s *FileMediaStore) ResolveWithMeta(ref string) (string, MediaMeta, error) // ReleaseAll removes all files under the given scope and cleans up mappings. // Phase 1 (under lock): remove entries from maps. -// Phase 2 (no lock): delete files from disk. +// Phase 2 (no lock): delete store-managed files from disk once their final +// path ref is gone. func (s *FileMediaStore) ReleaseAll(scope string) error { // Phase 1: collect paths and remove from maps under lock var paths []string @@ -147,11 +187,13 @@ func (s *FileMediaStore) ReleaseAll(scope string) error { } for ref := range refs { + fallbackPath := "" if entry, exists := s.refs[ref]; exists { - paths = append(paths, entry.path) + fallbackPath = entry.path + } + if removablePath, shouldDelete := s.releaseRefLocked(ref, fallbackPath); shouldDelete { + paths = append(paths, removablePath) } - delete(s.refs, ref) - delete(s.refToScope, ref) } delete(s.scopeToRefs, scope) s.mu.Unlock() @@ -171,7 +213,7 @@ func (s *FileMediaStore) ReleaseAll(scope string) error { // CleanExpired removes all entries older than MaxAge. // Phase 1 (under lock): identify expired entries and remove from maps. -// Phase 2 (no lock): delete files from disk to minimize lock contention. +// Phase 2 (no lock): delete store-managed files from disk to minimize lock contention. func (s *FileMediaStore) CleanExpired() int { if s.cleanerCfg.MaxAge <= 0 { return 0 @@ -179,8 +221,8 @@ func (s *FileMediaStore) CleanExpired() int { // Phase 1: collect expired entries under lock type expiredEntry struct { - ref string - path string + ref string + deletePath string } s.mu.Lock() @@ -189,8 +231,6 @@ func (s *FileMediaStore) CleanExpired() int { for ref, entry := range s.refs { if entry.storedAt.Before(cutoff) { - expired = append(expired, expiredEntry{ref: ref, path: entry.path}) - if scope, ok := s.refToScope[ref]; ok { if scopeRefs, ok := s.scopeToRefs[scope]; ok { delete(scopeRefs, ref) @@ -200,17 +240,23 @@ func (s *FileMediaStore) CleanExpired() int { } } - delete(s.refs, ref) - delete(s.refToScope, ref) + expiredItem := expiredEntry{ref: ref} + if deletePath, shouldDelete := s.releaseRefLocked(ref, entry.path); shouldDelete { + expiredItem.deletePath = deletePath + } + expired = append(expired, expiredItem) } } s.mu.Unlock() // Phase 2: delete files without holding the lock for _, e := range expired { - if err := os.Remove(e.path); err != nil && !os.IsNotExist(err) { + if e.deletePath == "" { + continue + } + if err := os.Remove(e.deletePath); err != nil && !os.IsNotExist(err) { logger.WarnCF("media", "cleanup: failed to remove file", map[string]any{ - "path": e.path, + "path": e.deletePath, "error": err.Error(), }) } @@ -219,6 +265,45 @@ func (s *FileMediaStore) CleanExpired() int { return len(expired) } +func normalizeCleanupPolicy(policy CleanupPolicy) CleanupPolicy { + switch policy { + case "", CleanupPolicyDeleteOnCleanup: + return CleanupPolicyDeleteOnCleanup + case CleanupPolicyForgetOnly: + return CleanupPolicyForgetOnly + default: + return CleanupPolicyDeleteOnCleanup + } +} + +func (s *FileMediaStore) releaseRefLocked(ref, fallbackPath string) (string, bool) { + path := fallbackPath + if storedPath, ok := s.refToPath[ref]; ok { + path = storedPath + delete(s.refToPath, ref) + } + + delete(s.refs, ref) + delete(s.refToScope, ref) + + if path == "" { + return "", false + } + + pathState, ok := s.pathStates[path] + if !ok { + return "", false + } + if pathState.refCount <= 1 { + delete(s.pathStates, path) + return path, pathState.deleteEligible + } + + pathState.refCount-- + s.pathStates[path] = pathState + return "", false +} + // Start begins the background cleanup goroutine if cleanup is enabled. // Safe to call multiple times; only the first call starts the goroutine. func (s *FileMediaStore) Start() { diff --git a/pkg/media/store_test.go b/pkg/media/store_test.go index 1dcfdf350..dabcc3142 100644 --- a/pkg/media/store_test.go +++ b/pkg/media/store_test.go @@ -77,6 +77,106 @@ func TestReleaseAll(t *testing.T) { } } +func TestReleaseAllForgetOnlyKeepsFile(t *testing.T) { + dir := t.TempDir() + store := NewFileMediaStore() + + path := createTempFile(t, dir, "workspace.txt") + ref, err := store.Store(path, MediaMeta{ + Source: "test", + CleanupPolicy: CleanupPolicyForgetOnly, + }, "scope1") + if err != nil { + t.Fatalf("Store failed: %v", err) + } + + if err := store.ReleaseAll("scope1"); err != nil { + t.Fatalf("ReleaseAll failed: %v", err) + } + + if _, err := store.Resolve(ref); err == nil { + t.Error("forget-only ref should be unresolvable after release") + } + if _, err := os.Stat(path); err != nil { + t.Errorf("forget-only file should remain on disk: %v", err) + } +} + +func TestReleaseAllSharedPathDeletesOnFinalRefOnly(t *testing.T) { + dir := t.TempDir() + store := NewFileMediaStore() + + path := createTempFile(t, dir, "shared.jpg") + refA, err := store.Store(path, MediaMeta{ + Source: "test", + CleanupPolicy: CleanupPolicyDeleteOnCleanup, + }, "scopeA") + if err != nil { + t.Fatalf("Store(scopeA) failed: %v", err) + } + refB, err := store.Store(path, MediaMeta{ + Source: "test", + CleanupPolicy: CleanupPolicyDeleteOnCleanup, + }, "scopeB") + if err != nil { + t.Fatalf("Store(scopeB) failed: %v", err) + } + + if err := store.ReleaseAll("scopeA"); err != nil { + t.Fatalf("ReleaseAll(scopeA) failed: %v", err) + } + + if _, err := store.Resolve(refA); err == nil { + t.Error("refA should be unresolvable after ReleaseAll(scopeA)") + } + if _, err := store.Resolve(refB); err != nil { + t.Fatalf("refB should still resolve: %v", err) + } + if _, err := os.Stat(path); err != nil { + t.Errorf("shared file should remain until final ref is released: %v", err) + } + + if err := store.ReleaseAll("scopeB"); err != nil { + t.Fatalf("ReleaseAll(scopeB) failed: %v", err) + } + if _, err := os.Stat(path); !os.IsNotExist(err) { + t.Error("shared file should be deleted after final ref is released") + } +} + +func TestReleaseAllMixedPoliciesKeepsFile(t *testing.T) { + dir := t.TempDir() + store := NewFileMediaStore() + + path := createTempFile(t, dir, "shared.txt") + if _, err := store.Store(path, MediaMeta{ + Source: "test", + CleanupPolicy: CleanupPolicyDeleteOnCleanup, + }, "owned"); err != nil { + t.Fatalf("Store(owned) failed: %v", err) + } + if _, err := store.Store(path, MediaMeta{ + Source: "test", + CleanupPolicy: CleanupPolicyForgetOnly, + }, "borrowed"); err != nil { + t.Fatalf("Store(borrowed) failed: %v", err) + } + + if err := store.ReleaseAll("owned"); err != nil { + t.Fatalf("ReleaseAll(owned) failed: %v", err) + } + if _, err := os.Stat(path); err != nil { + t.Fatalf("mixed-policy file should remain after owned ref release: %v", err) + } + + if err := store.ReleaseAll("borrowed"); err != nil { + t.Fatalf("ReleaseAll(borrowed) failed: %v", err) + } + if _, err := os.Stat(path); err != nil { + t.Errorf("mixed-policy path should not be auto-deleted: %v", err) + } +} + func TestMultiScopeIsolation(t *testing.T) { dir := t.TempDir() store := NewFileMediaStore() @@ -293,6 +393,35 @@ func TestCleanExpiredRemovesOldEntries(t *testing.T) { } } +func TestCleanExpiredForgetOnlyKeepsFile(t *testing.T) { + dir := t.TempDir() + now := time.Now() + store := newTestStoreWithCleanup(10 * time.Minute) + store.nowFunc = func() time.Time { return now.Add(-20 * time.Minute) } + + path := createTempFile(t, dir, "workspace.txt") + ref, err := store.Store(path, MediaMeta{ + Source: "test", + CleanupPolicy: CleanupPolicyForgetOnly, + }, "scope1") + if err != nil { + t.Fatalf("Store failed: %v", err) + } + + store.nowFunc = func() time.Time { return now } + removed := store.CleanExpired() + + if removed != 1 { + t.Errorf("expected 1 removed, got %d", removed) + } + if _, err := store.Resolve(ref); err == nil { + t.Error("expired forget-only ref should be unresolvable") + } + if _, err := os.Stat(path); err != nil { + t.Errorf("forget-only file should remain on disk: %v", err) + } +} + func TestCleanExpiredKeepsNonExpired(t *testing.T) { dir := t.TempDir() now := time.Now() @@ -346,6 +475,53 @@ func TestCleanExpiredMixedAges(t *testing.T) { } } +func TestCleanExpiredSharedPathDeletesOnFinalRefOnly(t *testing.T) { + dir := t.TempDir() + now := time.Now() + store := newTestStoreWithCleanup(10 * time.Minute) + + path := createTempFile(t, dir, "shared.jpg") + + store.nowFunc = func() time.Time { return now.Add(-20 * time.Minute) } + oldRef, err := store.Store(path, MediaMeta{ + Source: "test", + CleanupPolicy: CleanupPolicyDeleteOnCleanup, + }, "scope-old") + if err != nil { + t.Fatalf("Store(old) failed: %v", err) + } + + store.nowFunc = func() time.Time { return now } + freshRef, err := store.Store(path, MediaMeta{ + Source: "test", + CleanupPolicy: CleanupPolicyDeleteOnCleanup, + }, "scope-fresh") + if err != nil { + t.Fatalf("Store(fresh) failed: %v", err) + } + + removed := store.CleanExpired() + if removed != 1 { + t.Errorf("expected 1 removed, got %d", removed) + } + if _, err := store.Resolve(oldRef); err == nil { + t.Error("old ref should be gone after cleanup") + } + if _, err := store.Resolve(freshRef); err != nil { + t.Fatalf("fresh ref should still resolve: %v", err) + } + if _, err := os.Stat(path); err != nil { + t.Errorf("shared file should remain while fresh ref exists: %v", err) + } + + if err := store.ReleaseAll("scope-fresh"); err != nil { + t.Fatalf("ReleaseAll(scope-fresh) failed: %v", err) + } + if _, err := os.Stat(path); !os.IsNotExist(err) { + t.Error("shared file should be deleted after final ref is released") + } +} + func TestCleanExpiredCleansEmptyScopes(t *testing.T) { dir := t.TempDir() now := time.Now() diff --git a/pkg/tools/send_file.go b/pkg/tools/send_file.go index a67bd4210..57b99a845 100644 --- a/pkg/tools/send_file.go +++ b/pkg/tools/send_file.go @@ -133,9 +133,10 @@ func (t *SendFileTool) Execute(ctx context.Context, args map[string]any) *ToolRe scope := fmt.Sprintf("tool:send_file:%s:%s", channel, chatID) ref, err := t.mediaStore.Store(resolved, media.MediaMeta{ - Filename: filename, - ContentType: mediaType, - Source: "tool:send_file", + Filename: filename, + ContentType: mediaType, + Source: "tool:send_file", + CleanupPolicy: media.CleanupPolicyForgetOnly, }, scope) if err != nil { return ErrorResult(fmt.Sprintf("failed to register media: %v", err)) diff --git a/pkg/tools/send_file_test.go b/pkg/tools/send_file_test.go index 6daaab31c..0a99e8028 100644 --- a/pkg/tools/send_file_test.go +++ b/pkg/tools/send_file_test.go @@ -104,6 +104,14 @@ func TestSendFileTool_Success(t *testing.T) { if result.Media[0][:8] != "media://" { t.Errorf("expected media:// ref, got %q", result.Media[0]) } + + _, meta, err := store.ResolveWithMeta(result.Media[0]) + if err != nil { + t.Fatalf("ResolveWithMeta failed: %v", err) + } + if meta.CleanupPolicy != media.CleanupPolicyForgetOnly { + t.Errorf("CleanupPolicy = %q, want %q", meta.CleanupPolicy, media.CleanupPolicyForgetOnly) + } } func TestSendFileTool_CustomFilename(t *testing.T) {