mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
refactor(memory): use fileutil.WriteFileAtomic and log corrupt lines
- Replace manual temp+rename in writeMeta and rewriteJSONL with the project's standard fileutil.WriteFileAtomic. This adds fsync before rename, which is important for flash storage on embedded devices where power loss can leave zero-length files after an unsynced rename. - Log a warning when readMessages skips a corrupt line, so operators can see that data was lost after a crash instead of silently dropping it. - Document the lossy sanitizeKey mapping (telegram:123 → telegram_123) as an intentional tradeoff.
This commit is contained in:
+27
-52
@@ -2,16 +2,19 @@ package memory
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/sipeed/picoclaw/pkg/fileutil"
|
||||
"github.com/sipeed/picoclaw/pkg/providers"
|
||||
)
|
||||
|
||||
@@ -83,6 +86,12 @@ func (s *JSONLStore) metaPath(key string) string {
|
||||
|
||||
// sanitizeKey converts a session key to a safe filename component.
|
||||
// Mirrors pkg/session.sanitizeFilename so that migration paths match.
|
||||
//
|
||||
// Note: this is a lossy mapping — "telegram:123" and "telegram_123"
|
||||
// both produce the same filename. This is an intentional tradeoff:
|
||||
// keys with colons (e.g. from channels) are by far the common case,
|
||||
// and a bidirectional encoding (like URL-encoding) would complicate
|
||||
// file listings and debugging.
|
||||
func sanitizeKey(key string) string {
|
||||
return strings.ReplaceAll(key, ":", "_")
|
||||
}
|
||||
@@ -105,27 +114,14 @@ func (s *JSONLStore) readMeta(key string) (sessionMeta, error) {
|
||||
return meta, nil
|
||||
}
|
||||
|
||||
// writeMeta atomically writes the metadata file (temp + rename).
|
||||
// writeMeta atomically writes the metadata file using the project's
|
||||
// standard WriteFileAtomic (temp + fsync + rename).
|
||||
func (s *JSONLStore) writeMeta(key string, meta sessionMeta) error {
|
||||
data, err := json.MarshalIndent(meta, "", " ")
|
||||
if err != nil {
|
||||
return fmt.Errorf("memory: encode meta: %w", err)
|
||||
}
|
||||
|
||||
target := s.metaPath(key)
|
||||
tmp := target + ".tmp"
|
||||
|
||||
err = os.WriteFile(tmp, data, 0o644)
|
||||
if err != nil {
|
||||
return fmt.Errorf("memory: write meta tmp: %w", err)
|
||||
}
|
||||
|
||||
err = os.Rename(tmp, target)
|
||||
if err != nil {
|
||||
_ = os.Remove(tmp)
|
||||
return fmt.Errorf("memory: rename meta: %w", err)
|
||||
}
|
||||
return nil
|
||||
return fileutil.WriteFileAtomic(s.metaPath(key), data, 0o644)
|
||||
}
|
||||
|
||||
// readMessages reads valid JSON lines from a .jsonl file, skipping
|
||||
@@ -158,9 +154,13 @@ func readMessages(path string, skip int) ([]providers.Message, error) {
|
||||
continue
|
||||
}
|
||||
var msg providers.Message
|
||||
if json.Unmarshal(line, &msg) != nil {
|
||||
if err := json.Unmarshal(line, &msg); err != nil {
|
||||
// Corrupt line — likely a partial write from a crash.
|
||||
// Skip it; this is the standard JSONL recovery pattern.
|
||||
// Log so operators know data was skipped, but don't
|
||||
// fail the entire read; this is the standard JSONL
|
||||
// recovery pattern.
|
||||
log.Printf("memory: skipping corrupt line %d in %s: %v",
|
||||
lineNum, filepath.Base(path), err)
|
||||
continue
|
||||
}
|
||||
msgs = append(msgs, msg)
|
||||
@@ -430,46 +430,21 @@ func (s *JSONLStore) Compact(
|
||||
return s.rewriteJSONL(sessionKey, active)
|
||||
}
|
||||
|
||||
// rewriteJSONL atomically replaces the JSONL file with the given messages.
|
||||
// rewriteJSONL atomically replaces the JSONL file with the given messages
|
||||
// using the project's standard WriteFileAtomic (temp + fsync + rename).
|
||||
func (s *JSONLStore) rewriteJSONL(
|
||||
sessionKey string, msgs []providers.Message,
|
||||
) error {
|
||||
target := s.jsonlPath(sessionKey)
|
||||
tmp := target + ".tmp"
|
||||
|
||||
f, err := os.Create(tmp)
|
||||
if err != nil {
|
||||
return fmt.Errorf("memory: create jsonl tmp: %w", err)
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
for i, msg := range msgs {
|
||||
line, marshalErr := json.Marshal(msg)
|
||||
if marshalErr != nil {
|
||||
f.Close()
|
||||
_ = os.Remove(tmp)
|
||||
return fmt.Errorf("memory: marshal message %d: %w", i, marshalErr)
|
||||
}
|
||||
line = append(line, '\n')
|
||||
_, writeErr := f.Write(line)
|
||||
if writeErr != nil {
|
||||
f.Close()
|
||||
_ = os.Remove(tmp)
|
||||
return fmt.Errorf("memory: write message %d: %w", i, writeErr)
|
||||
line, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("memory: marshal message %d: %w", i, err)
|
||||
}
|
||||
buf.Write(line)
|
||||
buf.WriteByte('\n')
|
||||
}
|
||||
|
||||
err = f.Close()
|
||||
if err != nil {
|
||||
_ = os.Remove(tmp)
|
||||
return fmt.Errorf("memory: close jsonl tmp: %w", err)
|
||||
}
|
||||
|
||||
err = os.Rename(tmp, target)
|
||||
if err != nil {
|
||||
_ = os.Remove(tmp)
|
||||
return fmt.Errorf("memory: rename jsonl: %w", err)
|
||||
}
|
||||
return nil
|
||||
return fileutil.WriteFileAtomic(s.jsonlPath(sessionKey), buf.Bytes(), 0o644)
|
||||
}
|
||||
|
||||
func (s *JSONLStore) Close() error {
|
||||
|
||||
Reference in New Issue
Block a user