diff --git a/pkg/memory/jsonl.go b/pkg/memory/jsonl.go index 222d91f02..efd4347c0 100644 --- a/pkg/memory/jsonl.go +++ b/pkg/memory/jsonl.go @@ -2,16 +2,19 @@ package memory import ( "bufio" + "bytes" "context" "encoding/json" "fmt" "hash/fnv" + "log" "os" "path/filepath" "strings" "sync" "time" + "github.com/sipeed/picoclaw/pkg/fileutil" "github.com/sipeed/picoclaw/pkg/providers" ) @@ -83,6 +86,12 @@ func (s *JSONLStore) metaPath(key string) string { // sanitizeKey converts a session key to a safe filename component. // Mirrors pkg/session.sanitizeFilename so that migration paths match. +// +// Note: this is a lossy mapping — "telegram:123" and "telegram_123" +// both produce the same filename. This is an intentional tradeoff: +// keys with colons (e.g. from channels) are by far the common case, +// and a bidirectional encoding (like URL-encoding) would complicate +// file listings and debugging. func sanitizeKey(key string) string { return strings.ReplaceAll(key, ":", "_") } @@ -105,27 +114,14 @@ func (s *JSONLStore) readMeta(key string) (sessionMeta, error) { return meta, nil } -// writeMeta atomically writes the metadata file (temp + rename). +// writeMeta atomically writes the metadata file using the project's +// standard WriteFileAtomic (temp + fsync + rename). func (s *JSONLStore) writeMeta(key string, meta sessionMeta) error { data, err := json.MarshalIndent(meta, "", " ") if err != nil { return fmt.Errorf("memory: encode meta: %w", err) } - - target := s.metaPath(key) - tmp := target + ".tmp" - - err = os.WriteFile(tmp, data, 0o644) - if err != nil { - return fmt.Errorf("memory: write meta tmp: %w", err) - } - - err = os.Rename(tmp, target) - if err != nil { - _ = os.Remove(tmp) - return fmt.Errorf("memory: rename meta: %w", err) - } - return nil + return fileutil.WriteFileAtomic(s.metaPath(key), data, 0o644) } // readMessages reads valid JSON lines from a .jsonl file, skipping @@ -158,9 +154,13 @@ func readMessages(path string, skip int) ([]providers.Message, error) { continue } var msg providers.Message - if json.Unmarshal(line, &msg) != nil { + if err := json.Unmarshal(line, &msg); err != nil { // Corrupt line — likely a partial write from a crash. - // Skip it; this is the standard JSONL recovery pattern. + // Log so operators know data was skipped, but don't + // fail the entire read; this is the standard JSONL + // recovery pattern. + log.Printf("memory: skipping corrupt line %d in %s: %v", + lineNum, filepath.Base(path), err) continue } msgs = append(msgs, msg) @@ -430,46 +430,21 @@ func (s *JSONLStore) Compact( return s.rewriteJSONL(sessionKey, active) } -// rewriteJSONL atomically replaces the JSONL file with the given messages. +// rewriteJSONL atomically replaces the JSONL file with the given messages +// using the project's standard WriteFileAtomic (temp + fsync + rename). func (s *JSONLStore) rewriteJSONL( sessionKey string, msgs []providers.Message, ) error { - target := s.jsonlPath(sessionKey) - tmp := target + ".tmp" - - f, err := os.Create(tmp) - if err != nil { - return fmt.Errorf("memory: create jsonl tmp: %w", err) - } - + var buf bytes.Buffer for i, msg := range msgs { - line, marshalErr := json.Marshal(msg) - if marshalErr != nil { - f.Close() - _ = os.Remove(tmp) - return fmt.Errorf("memory: marshal message %d: %w", i, marshalErr) - } - line = append(line, '\n') - _, writeErr := f.Write(line) - if writeErr != nil { - f.Close() - _ = os.Remove(tmp) - return fmt.Errorf("memory: write message %d: %w", i, writeErr) + line, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("memory: marshal message %d: %w", i, err) } + buf.Write(line) + buf.WriteByte('\n') } - - err = f.Close() - if err != nil { - _ = os.Remove(tmp) - return fmt.Errorf("memory: close jsonl tmp: %w", err) - } - - err = os.Rename(tmp, target) - if err != nil { - _ = os.Remove(tmp) - return fmt.Errorf("memory: rename jsonl: %w", err) - } - return nil + return fileutil.WriteFileAtomic(s.jsonlPath(sessionKey), buf.Bytes(), 0o644) } func (s *JSONLStore) Close() error {