Files
picoclaw/pkg/memory/jsonl.go
T
xiaoen 1f0b85280a fix(memory): always reconcile line count in TruncateHistory
A crash between the JSONL append and the meta update in addMsg can
leave meta.Count stale (e.g. file has 101 lines but meta says 100).
The previous code only reconciled when Count==0, so a nonzero stale
count was silently trusted, causing keepLast/skip to be calculated
against the wrong total.

Now TruncateHistory always counts the actual lines on disk. This is
cheap (scan without unmarshal) and TruncateHistory is not a hot path.
2026-02-26 16:12:34 +08:00

469 lines
11 KiB
Go

package memory
import (
"bufio"
"context"
"encoding/json"
"fmt"
"hash/fnv"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/sipeed/picoclaw/pkg/providers"
)
const (
// numLockShards is the fixed number of mutexes used to serialize
// per-session access. Using a sharded array instead of a map keeps
// memory bounded regardless of how many sessions are created over
// the lifetime of the process — important for a long-running daemon.
numLockShards = 64
// maxLineSize is the maximum size of a single JSON line in a .jsonl
// file. Tool results (read_file, web search, etc.) can be large, so
// we set a generous limit. The scanner starts at 64 KB and grows
// only as needed up to this cap.
maxLineSize = 10 * 1024 * 1024 // 10 MB
)
// sessionMeta holds per-session metadata stored in a .meta.json file.
type sessionMeta struct {
Key string `json:"key"`
Summary string `json:"summary"`
Skip int `json:"skip"`
Count int `json:"count"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// JSONLStore implements Store using append-only JSONL files.
//
// Each session is stored as two files:
//
// {sanitized_key}.jsonl — one JSON-encoded message per line, append-only
// {sanitized_key}.meta.json — session metadata (summary, logical truncation offset)
//
// Messages are never physically deleted from the JSONL file. Instead,
// TruncateHistory records a "skip" offset in the metadata file and
// GetHistory ignores lines before that offset. This keeps all writes
// append-only, which is both fast and crash-safe.
type JSONLStore struct {
dir string
locks [numLockShards]sync.Mutex
}
// NewJSONLStore creates a new JSONL-backed store rooted at dir.
func NewJSONLStore(dir string) (*JSONLStore, error) {
err := os.MkdirAll(dir, 0o755)
if err != nil {
return nil, fmt.Errorf("memory: create directory: %w", err)
}
return &JSONLStore{dir: dir}, nil
}
// sessionLock returns a mutex for the given session key.
// Keys are mapped to a fixed pool of shards via FNV hash, so
// memory usage is O(1) regardless of total session count.
func (s *JSONLStore) sessionLock(key string) *sync.Mutex {
h := fnv.New32a()
h.Write([]byte(key))
return &s.locks[h.Sum32()%numLockShards]
}
func (s *JSONLStore) jsonlPath(key string) string {
return filepath.Join(s.dir, sanitizeKey(key)+".jsonl")
}
func (s *JSONLStore) metaPath(key string) string {
return filepath.Join(s.dir, sanitizeKey(key)+".meta.json")
}
// sanitizeKey converts a session key to a safe filename component.
// Mirrors pkg/session.sanitizeFilename so that migration paths match.
func sanitizeKey(key string) string {
return strings.ReplaceAll(key, ":", "_")
}
// readMeta loads the metadata file for a session.
// Returns a zero-value sessionMeta if the file does not exist.
func (s *JSONLStore) readMeta(key string) (sessionMeta, error) {
data, err := os.ReadFile(s.metaPath(key))
if os.IsNotExist(err) {
return sessionMeta{Key: key}, nil
}
if err != nil {
return sessionMeta{}, fmt.Errorf("memory: read meta: %w", err)
}
var meta sessionMeta
err = json.Unmarshal(data, &meta)
if err != nil {
return sessionMeta{}, fmt.Errorf("memory: decode meta: %w", err)
}
return meta, nil
}
// writeMeta atomically writes the metadata file (temp + rename).
func (s *JSONLStore) writeMeta(key string, meta sessionMeta) error {
data, err := json.MarshalIndent(meta, "", " ")
if err != nil {
return fmt.Errorf("memory: encode meta: %w", err)
}
target := s.metaPath(key)
tmp := target + ".tmp"
err = os.WriteFile(tmp, data, 0o644)
if err != nil {
return fmt.Errorf("memory: write meta tmp: %w", err)
}
err = os.Rename(tmp, target)
if err != nil {
_ = os.Remove(tmp)
return fmt.Errorf("memory: rename meta: %w", err)
}
return nil
}
// readMessages reads valid JSON lines from a .jsonl file, skipping
// the first `skip` lines without unmarshaling them. This avoids the
// cost of json.Unmarshal on logically truncated messages.
// Malformed trailing lines (e.g. from a crash) are silently skipped.
func readMessages(path string, skip int) ([]providers.Message, error) {
f, err := os.Open(path)
if os.IsNotExist(err) {
return []providers.Message{}, nil
}
if err != nil {
return nil, fmt.Errorf("memory: open jsonl: %w", err)
}
defer f.Close()
var msgs []providers.Message
scanner := bufio.NewScanner(f)
// Allow large lines for tool results (read_file, web search, etc.).
scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize)
lineNum := 0
for scanner.Scan() {
line := scanner.Bytes()
if len(line) == 0 {
continue
}
lineNum++
if lineNum <= skip {
continue
}
var msg providers.Message
if json.Unmarshal(line, &msg) != nil {
// Corrupt line — likely a partial write from a crash.
// Skip it; this is the standard JSONL recovery pattern.
continue
}
msgs = append(msgs, msg)
}
if scanner.Err() != nil {
return nil, fmt.Errorf("memory: scan jsonl: %w", scanner.Err())
}
if msgs == nil {
msgs = []providers.Message{}
}
return msgs, nil
}
// countLines counts the total number of non-empty lines in a .jsonl file.
// Used by TruncateHistory to reconcile a stale meta.Count without
// the overhead of unmarshaling every message.
func countLines(path string) (int, error) {
f, err := os.Open(path)
if os.IsNotExist(err) {
return 0, nil
}
if err != nil {
return 0, fmt.Errorf("memory: open jsonl: %w", err)
}
defer f.Close()
n := 0
scanner := bufio.NewScanner(f)
scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize)
for scanner.Scan() {
if len(scanner.Bytes()) > 0 {
n++
}
}
return n, scanner.Err()
}
func (s *JSONLStore) AddMessage(
_ context.Context, sessionKey, role, content string,
) error {
return s.addMsg(sessionKey, providers.Message{
Role: role,
Content: content,
})
}
func (s *JSONLStore) AddFullMessage(
_ context.Context, sessionKey string, msg providers.Message,
) error {
return s.addMsg(sessionKey, msg)
}
// addMsg is the shared implementation for AddMessage and AddFullMessage.
func (s *JSONLStore) addMsg(sessionKey string, msg providers.Message) error {
l := s.sessionLock(sessionKey)
l.Lock()
defer l.Unlock()
// Append the message as a single JSON line.
line, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("memory: marshal message: %w", err)
}
line = append(line, '\n')
f, err := os.OpenFile(
s.jsonlPath(sessionKey),
os.O_CREATE|os.O_WRONLY|os.O_APPEND,
0o644,
)
if err != nil {
return fmt.Errorf("memory: open jsonl for append: %w", err)
}
_, writeErr := f.Write(line)
closeErr := f.Close()
if writeErr != nil {
return fmt.Errorf("memory: append message: %w", writeErr)
}
if closeErr != nil {
return fmt.Errorf("memory: close jsonl: %w", closeErr)
}
// Update metadata.
meta, err := s.readMeta(sessionKey)
if err != nil {
return err
}
now := time.Now()
if meta.Count == 0 && meta.CreatedAt.IsZero() {
meta.CreatedAt = now
}
meta.Count++
meta.UpdatedAt = now
return s.writeMeta(sessionKey, meta)
}
func (s *JSONLStore) GetHistory(
_ context.Context, sessionKey string,
) ([]providers.Message, error) {
l := s.sessionLock(sessionKey)
l.Lock()
defer l.Unlock()
meta, err := s.readMeta(sessionKey)
if err != nil {
return nil, err
}
// Pass meta.Skip so readMessages skips those lines without
// unmarshaling them — avoids wasted CPU on truncated messages.
msgs, err := readMessages(s.jsonlPath(sessionKey), meta.Skip)
if err != nil {
return nil, err
}
return msgs, nil
}
func (s *JSONLStore) GetSummary(
_ context.Context, sessionKey string,
) (string, error) {
l := s.sessionLock(sessionKey)
l.Lock()
defer l.Unlock()
meta, err := s.readMeta(sessionKey)
if err != nil {
return "", err
}
return meta.Summary, nil
}
func (s *JSONLStore) SetSummary(
_ context.Context, sessionKey, summary string,
) error {
l := s.sessionLock(sessionKey)
l.Lock()
defer l.Unlock()
meta, err := s.readMeta(sessionKey)
if err != nil {
return err
}
now := time.Now()
if meta.CreatedAt.IsZero() {
meta.CreatedAt = now
}
meta.Summary = summary
meta.UpdatedAt = now
return s.writeMeta(sessionKey, meta)
}
func (s *JSONLStore) TruncateHistory(
_ context.Context, sessionKey string, keepLast int,
) error {
l := s.sessionLock(sessionKey)
l.Lock()
defer l.Unlock()
meta, err := s.readMeta(sessionKey)
if err != nil {
return err
}
// Always reconcile meta.Count with the actual line count on disk.
// A crash between the JSONL append and the meta update in addMsg
// leaves meta.Count stale (e.g. file has 101 lines but meta says
// 100). Counting lines is cheap — no unmarshal, just a scan — and
// TruncateHistory is not a hot path, so always re-count.
n, countErr := countLines(s.jsonlPath(sessionKey))
if countErr != nil {
return countErr
}
meta.Count = n
if keepLast <= 0 {
meta.Skip = meta.Count
} else {
effective := meta.Count - meta.Skip
if keepLast < effective {
meta.Skip = meta.Count - keepLast
}
}
meta.UpdatedAt = time.Now()
return s.writeMeta(sessionKey, meta)
}
func (s *JSONLStore) SetHistory(
_ context.Context,
sessionKey string,
history []providers.Message,
) error {
l := s.sessionLock(sessionKey)
l.Lock()
defer l.Unlock()
err := s.rewriteJSONL(sessionKey, history)
if err != nil {
return err
}
meta, err := s.readMeta(sessionKey)
if err != nil {
return err
}
now := time.Now()
if meta.CreatedAt.IsZero() {
meta.CreatedAt = now
}
meta.Skip = 0
meta.Count = len(history)
meta.UpdatedAt = now
return s.writeMeta(sessionKey, meta)
}
// Compact physically rewrites the JSONL file, dropping all logically
// skipped lines. This reclaims disk space that accumulates after
// repeated TruncateHistory calls.
//
// It is safe to call at any time; if there is nothing to compact
// (skip == 0) the method returns immediately.
func (s *JSONLStore) Compact(
_ context.Context, sessionKey string,
) error {
l := s.sessionLock(sessionKey)
l.Lock()
defer l.Unlock()
meta, err := s.readMeta(sessionKey)
if err != nil {
return err
}
if meta.Skip == 0 {
return nil
}
// Read only the active messages, skipping truncated lines
// without unmarshaling them.
active, err := readMessages(s.jsonlPath(sessionKey), meta.Skip)
if err != nil {
return err
}
err = s.rewriteJSONL(sessionKey, active)
if err != nil {
return err
}
meta.Skip = 0
meta.Count = len(active)
meta.UpdatedAt = time.Now()
return s.writeMeta(sessionKey, meta)
}
// rewriteJSONL atomically replaces the JSONL file with the given messages.
func (s *JSONLStore) rewriteJSONL(
sessionKey string, msgs []providers.Message,
) error {
target := s.jsonlPath(sessionKey)
tmp := target + ".tmp"
f, err := os.Create(tmp)
if err != nil {
return fmt.Errorf("memory: create jsonl tmp: %w", err)
}
for i, msg := range msgs {
line, marshalErr := json.Marshal(msg)
if marshalErr != nil {
f.Close()
_ = os.Remove(tmp)
return fmt.Errorf("memory: marshal message %d: %w", i, marshalErr)
}
line = append(line, '\n')
_, writeErr := f.Write(line)
if writeErr != nil {
f.Close()
_ = os.Remove(tmp)
return fmt.Errorf("memory: write message %d: %w", i, writeErr)
}
}
err = f.Close()
if err != nil {
_ = os.Remove(tmp)
return fmt.Errorf("memory: close jsonl tmp: %w", err)
}
err = os.Rename(tmp, target)
if err != nil {
_ = os.Remove(tmp)
return fmt.Errorf("memory: rename jsonl: %w", err)
}
return nil
}
func (s *JSONLStore) Close() error {
return nil
}