diff --git a/pkg/memory/jsonl.go b/pkg/memory/jsonl.go index 266f453d9..be71396ca 100644 --- a/pkg/memory/jsonl.go +++ b/pkg/memory/jsonl.go @@ -328,7 +328,74 @@ func (s *JSONLStore) SetHistory( l.Lock() defer l.Unlock() - // Rewrite the JSONL file atomically (temp + rename). + err := s.rewriteJSONL(sessionKey, history) + if err != nil { + return err + } + + meta, err := s.readMeta(sessionKey) + if err != nil { + return err + } + now := time.Now() + if meta.CreatedAt.IsZero() { + meta.CreatedAt = now + } + meta.Skip = 0 + meta.Count = len(history) + meta.UpdatedAt = now + + return s.writeMeta(sessionKey, meta) +} + +// Compact physically rewrites the JSONL file, dropping all logically +// skipped lines. This reclaims disk space that accumulates after +// repeated TruncateHistory calls. +// +// It is safe to call at any time; if there is nothing to compact +// (skip == 0) the method returns immediately. +func (s *JSONLStore) Compact( + _ context.Context, sessionKey string, +) error { + l := s.sessionLock(sessionKey) + l.Lock() + defer l.Unlock() + + meta, err := s.readMeta(sessionKey) + if err != nil { + return err + } + if meta.Skip == 0 { + return nil + } + + all, err := readMessages(s.jsonlPath(sessionKey)) + if err != nil { + return err + } + + // Keep only the active (non-skipped) messages. + var active []providers.Message + if meta.Skip < len(all) { + active = all[meta.Skip:] + } + + err = s.rewriteJSONL(sessionKey, active) + if err != nil { + return err + } + + meta.Skip = 0 + meta.Count = len(active) + meta.UpdatedAt = time.Now() + + return s.writeMeta(sessionKey, meta) +} + +// rewriteJSONL atomically replaces the JSONL file with the given messages. +func (s *JSONLStore) rewriteJSONL( + sessionKey string, msgs []providers.Message, +) error { target := s.jsonlPath(sessionKey) tmp := target + ".tmp" @@ -337,7 +404,7 @@ func (s *JSONLStore) SetHistory( return fmt.Errorf("memory: create jsonl tmp: %w", err) } - for i, msg := range history { + for i, msg := range msgs { line, marshalErr := json.Marshal(msg) if marshalErr != nil { f.Close() @@ -364,21 +431,7 @@ func (s *JSONLStore) SetHistory( _ = os.Remove(tmp) return fmt.Errorf("memory: rename jsonl: %w", err) } - - // Reset metadata: skip=0, count=len(history). - meta, err := s.readMeta(sessionKey) - if err != nil { - return err - } - now := time.Now() - if meta.CreatedAt.IsZero() { - meta.CreatedAt = now - } - meta.Skip = 0 - meta.Count = len(history) - meta.UpdatedAt = now - - return s.writeMeta(sessionKey, meta) + return nil } func (s *JSONLStore) Close() error { diff --git a/pkg/memory/jsonl_test.go b/pkg/memory/jsonl_test.go index 57675504d..e3b53bfde 100644 --- a/pkg/memory/jsonl_test.go +++ b/pkg/memory/jsonl_test.go @@ -423,6 +423,127 @@ func TestColonInKey(t *testing.T) { } } +func TestCompact_RemovesSkippedMessages(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + // Write 10 messages, then truncate to keep last 3. + for i := 0; i < 10; i++ { + err := store.AddMessage(ctx, "compact", "user", string(rune('a'+i))) + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + } + err := store.TruncateHistory(ctx, "compact", 3) + if err != nil { + t.Fatalf("TruncateHistory: %v", err) + } + + // Before compact: file still has 10 lines. + allOnDisk, err := readMessages(store.jsonlPath("compact")) + if err != nil { + t.Fatalf("readMessages: %v", err) + } + if len(allOnDisk) != 10 { + t.Fatalf("before compact: expected 10 on disk, got %d", len(allOnDisk)) + } + + // Compact. + err = store.Compact(ctx, "compact") + if err != nil { + t.Fatalf("Compact: %v", err) + } + + // After compact: file should have only 3 lines. + allOnDisk, err = readMessages(store.jsonlPath("compact")) + if err != nil { + t.Fatalf("readMessages: %v", err) + } + if len(allOnDisk) != 3 { + t.Fatalf("after compact: expected 3 on disk, got %d", len(allOnDisk)) + } + + // GetHistory should still return the same 3 messages. + history, err := store.GetHistory(ctx, "compact") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 3 { + t.Fatalf("expected 3, got %d", len(history)) + } + if history[0].Content != "h" || history[2].Content != "j" { + t.Errorf("wrong content: %+v", history) + } +} + +func TestCompact_NoOpWhenNoSkip(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + for i := 0; i < 5; i++ { + err := store.AddMessage(ctx, "noop", "user", "msg") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + } + + // Compact without prior truncation — should be a no-op. + err := store.Compact(ctx, "noop") + if err != nil { + t.Fatalf("Compact: %v", err) + } + + history, err := store.GetHistory(ctx, "noop") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 5 { + t.Errorf("expected 5, got %d", len(history)) + } +} + +func TestCompact_ThenAppend(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + for i := 0; i < 8; i++ { + err := store.AddMessage(ctx, "cap", "user", string(rune('a'+i))) + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + } + + err := store.TruncateHistory(ctx, "cap", 2) + if err != nil { + t.Fatalf("TruncateHistory: %v", err) + } + err = store.Compact(ctx, "cap") + if err != nil { + t.Fatalf("Compact: %v", err) + } + + // Append after compaction should work correctly. + err = store.AddMessage(ctx, "cap", "user", "new") + if err != nil { + t.Fatalf("AddMessage after compact: %v", err) + } + + history, err := store.GetHistory(ctx, "cap") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 3 { + t.Fatalf("expected 3, got %d", len(history)) + } + // g, h (kept from truncation), new (appended after compaction). + if history[0].Content != "g" { + t.Errorf("first = %q, want 'g'", history[0].Content) + } + if history[2].Content != "new" { + t.Errorf("last = %q, want 'new'", history[2].Content) + } +} + func TestCrashRecovery_PartialLine(t *testing.T) { store := newTestStore(t) ctx := context.Background() diff --git a/pkg/memory/store.go b/pkg/memory/store.go index 6887ec26e..b6e11707d 100644 --- a/pkg/memory/store.go +++ b/pkg/memory/store.go @@ -33,6 +33,10 @@ type Store interface { // SetHistory replaces all messages in a session with the provided history. SetHistory(ctx context.Context, sessionKey string, history []providers.Message) error + // Compact reclaims storage by physically removing logically truncated + // data. Backends that do not accumulate dead data may return nil. + Compact(ctx context.Context, sessionKey string) error + // Close releases any resources held by the store. Close() error }