From e810331dd8d8875d440ffe2cff6d8f530a2b13b2 Mon Sep 17 00:00:00 2001 From: xiaoen <2768753269@qq.com> Date: Thu, 26 Feb 2026 16:15:11 +0800 Subject: [PATCH] fix(memory): use SetHistory in migration for crash idempotency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit MigrateFromJSON previously called AddFullMessage in a loop, then renamed the .json file to .json.migrated. If the process crashed after appending some messages but before the rename, a retry would re-read the same .json and append all messages again — duplicating whatever was written before the crash. Switch to SetHistory which atomically replaces the session contents. A retry after crash overwrites the partial data instead of appending. --- pkg/memory/migration.go | 21 +++++++------- pkg/memory/migration_test.go | 56 ++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 10 deletions(-) diff --git a/pkg/memory/migration.go b/pkg/memory/migration.go index 5b2f69ab3..c9d5176ab 100644 --- a/pkg/memory/migration.go +++ b/pkg/memory/migration.go @@ -74,19 +74,20 @@ func MigrateFromJSON( key = strings.TrimSuffix(name, ".json") } - for _, msg := range sess.Messages { - addErr := store.AddFullMessage(ctx, key, msg) - if addErr != nil { - return migrated, fmt.Errorf( - "memory: migrate %s: add message: %w", - name, addErr, - ) - } + // Use SetHistory (atomic replace) instead of per-message + // AddFullMessage. This makes migration idempotent: if the + // process crashes after writing messages but before the + // rename below, a retry replaces the partial data cleanly + // instead of duplicating messages. + if setErr := store.SetHistory(ctx, key, sess.Messages); setErr != nil { + return migrated, fmt.Errorf( + "memory: migrate %s: set history: %w", + name, setErr, + ) } if sess.Summary != "" { - sumErr := store.SetSummary(ctx, key, sess.Summary) - if sumErr != nil { + if sumErr := store.SetSummary(ctx, key, sess.Summary); sumErr != nil { return migrated, fmt.Errorf( "memory: migrate %s: set summary: %w", name, sumErr, diff --git a/pkg/memory/migration_test.go b/pkg/memory/migration_test.go index bf16c32f8..3170758b7 100644 --- a/pkg/memory/migration_test.go +++ b/pkg/memory/migration_test.go @@ -314,6 +314,62 @@ func TestMigrateFromJSON_ColonInKey(t *testing.T) { } } +func TestMigrateFromJSON_RetryAfterCrash(t *testing.T) { + // Simulates a crash during migration: first run writes messages + // but doesn't rename the .json file. Second run must replace + // (not duplicate) the messages thanks to SetHistory semantics. + sessionsDir := t.TempDir() + store := newTestStore(t) + ctx := context.Background() + + writeJSONSession(t, sessionsDir, "retry.json", jsonSession{ + Key: "retry", + Messages: []providers.Message{ + {Role: "user", Content: "one"}, + {Role: "assistant", Content: "two"}, + }, + Created: time.Now(), + Updated: time.Now(), + }) + + // First migration succeeds — writes messages and renames file. + count, err := MigrateFromJSON(ctx, sessionsDir, store) + if err != nil { + t.Fatalf("first migration: %v", err) + } + if count != 1 { + t.Fatalf("expected 1, got %d", count) + } + + // Simulate "crash before rename": restore the .json file. + src := filepath.Join(sessionsDir, "retry.json.migrated") + dst := filepath.Join(sessionsDir, "retry.json") + if renameErr := os.Rename(src, dst); renameErr != nil { + t.Fatalf("restore .json: %v", renameErr) + } + + // Second migration should re-import without duplicating messages. + count, err = MigrateFromJSON(ctx, sessionsDir, store) + if err != nil { + t.Fatalf("second migration: %v", err) + } + if count != 1 { + t.Fatalf("expected 1, got %d", count) + } + + history, err := store.GetHistory(ctx, "retry") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + // Must be exactly 2 messages (not 4 from duplication). + if len(history) != 2 { + t.Fatalf("expected 2 messages (no duplicates), got %d", len(history)) + } + if history[0].Content != "one" || history[1].Content != "two" { + t.Errorf("unexpected messages: %+v", history) + } +} + func TestMigrateFromJSON_NonexistentDir(t *testing.T) { store := newTestStore(t) ctx := context.Background()