package memory import ( "bufio" "context" "encoding/json" "fmt" "hash/fnv" "os" "path/filepath" "strings" "sync" "time" "github.com/sipeed/picoclaw/pkg/providers" ) const ( // numLockShards is the fixed number of mutexes used to serialize // per-session access. Using a sharded array instead of a map keeps // memory bounded regardless of how many sessions are created over // the lifetime of the process — important for a long-running daemon. numLockShards = 64 // maxLineSize is the maximum size of a single JSON line in a .jsonl // file. Tool results (read_file, web search, etc.) can be large, so // we set a generous limit. The scanner starts at 64 KB and grows // only as needed up to this cap. maxLineSize = 10 * 1024 * 1024 // 10 MB ) // sessionMeta holds per-session metadata stored in a .meta.json file. type sessionMeta struct { Key string `json:"key"` Summary string `json:"summary"` Skip int `json:"skip"` Count int `json:"count"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } // JSONLStore implements Store using append-only JSONL files. // // Each session is stored as two files: // // {sanitized_key}.jsonl — one JSON-encoded message per line, append-only // {sanitized_key}.meta.json — session metadata (summary, logical truncation offset) // // Messages are never physically deleted from the JSONL file. Instead, // TruncateHistory records a "skip" offset in the metadata file and // GetHistory ignores lines before that offset. This keeps all writes // append-only, which is both fast and crash-safe. type JSONLStore struct { dir string locks [numLockShards]sync.Mutex } // NewJSONLStore creates a new JSONL-backed store rooted at dir. func NewJSONLStore(dir string) (*JSONLStore, error) { err := os.MkdirAll(dir, 0o755) if err != nil { return nil, fmt.Errorf("memory: create directory: %w", err) } return &JSONLStore{dir: dir}, nil } // sessionLock returns a mutex for the given session key. // Keys are mapped to a fixed pool of shards via FNV hash, so // memory usage is O(1) regardless of total session count. func (s *JSONLStore) sessionLock(key string) *sync.Mutex { h := fnv.New32a() h.Write([]byte(key)) return &s.locks[h.Sum32()%numLockShards] } func (s *JSONLStore) jsonlPath(key string) string { return filepath.Join(s.dir, sanitizeKey(key)+".jsonl") } func (s *JSONLStore) metaPath(key string) string { return filepath.Join(s.dir, sanitizeKey(key)+".meta.json") } // sanitizeKey converts a session key to a safe filename component. // Mirrors pkg/session.sanitizeFilename so that migration paths match. func sanitizeKey(key string) string { return strings.ReplaceAll(key, ":", "_") } // readMeta loads the metadata file for a session. // Returns a zero-value sessionMeta if the file does not exist. func (s *JSONLStore) readMeta(key string) (sessionMeta, error) { data, err := os.ReadFile(s.metaPath(key)) if os.IsNotExist(err) { return sessionMeta{Key: key}, nil } if err != nil { return sessionMeta{}, fmt.Errorf("memory: read meta: %w", err) } var meta sessionMeta err = json.Unmarshal(data, &meta) if err != nil { return sessionMeta{}, fmt.Errorf("memory: decode meta: %w", err) } return meta, nil } // writeMeta atomically writes the metadata file (temp + rename). func (s *JSONLStore) writeMeta(key string, meta sessionMeta) error { data, err := json.MarshalIndent(meta, "", " ") if err != nil { return fmt.Errorf("memory: encode meta: %w", err) } target := s.metaPath(key) tmp := target + ".tmp" err = os.WriteFile(tmp, data, 0o644) if err != nil { return fmt.Errorf("memory: write meta tmp: %w", err) } err = os.Rename(tmp, target) if err != nil { _ = os.Remove(tmp) return fmt.Errorf("memory: rename meta: %w", err) } return nil } // readMessages reads valid JSON lines from a .jsonl file, skipping // the first `skip` lines without unmarshaling them. This avoids the // cost of json.Unmarshal on logically truncated messages. // Malformed trailing lines (e.g. from a crash) are silently skipped. func readMessages(path string, skip int) ([]providers.Message, error) { f, err := os.Open(path) if os.IsNotExist(err) { return []providers.Message{}, nil } if err != nil { return nil, fmt.Errorf("memory: open jsonl: %w", err) } defer f.Close() var msgs []providers.Message scanner := bufio.NewScanner(f) // Allow large lines for tool results (read_file, web search, etc.). scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize) lineNum := 0 for scanner.Scan() { line := scanner.Bytes() if len(line) == 0 { continue } lineNum++ if lineNum <= skip { continue } var msg providers.Message if json.Unmarshal(line, &msg) != nil { // Corrupt line — likely a partial write from a crash. // Skip it; this is the standard JSONL recovery pattern. continue } msgs = append(msgs, msg) } if scanner.Err() != nil { return nil, fmt.Errorf("memory: scan jsonl: %w", scanner.Err()) } if msgs == nil { msgs = []providers.Message{} } return msgs, nil } // countLines counts the total number of non-empty lines in a .jsonl file. // Used by TruncateHistory to reconcile a stale meta.Count without // the overhead of unmarshaling every message. func countLines(path string) (int, error) { f, err := os.Open(path) if os.IsNotExist(err) { return 0, nil } if err != nil { return 0, fmt.Errorf("memory: open jsonl: %w", err) } defer f.Close() n := 0 scanner := bufio.NewScanner(f) scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize) for scanner.Scan() { if len(scanner.Bytes()) > 0 { n++ } } return n, scanner.Err() } func (s *JSONLStore) AddMessage( _ context.Context, sessionKey, role, content string, ) error { return s.addMsg(sessionKey, providers.Message{ Role: role, Content: content, }) } func (s *JSONLStore) AddFullMessage( _ context.Context, sessionKey string, msg providers.Message, ) error { return s.addMsg(sessionKey, msg) } // addMsg is the shared implementation for AddMessage and AddFullMessage. func (s *JSONLStore) addMsg(sessionKey string, msg providers.Message) error { l := s.sessionLock(sessionKey) l.Lock() defer l.Unlock() // Append the message as a single JSON line. line, err := json.Marshal(msg) if err != nil { return fmt.Errorf("memory: marshal message: %w", err) } line = append(line, '\n') f, err := os.OpenFile( s.jsonlPath(sessionKey), os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644, ) if err != nil { return fmt.Errorf("memory: open jsonl for append: %w", err) } _, writeErr := f.Write(line) closeErr := f.Close() if writeErr != nil { return fmt.Errorf("memory: append message: %w", writeErr) } if closeErr != nil { return fmt.Errorf("memory: close jsonl: %w", closeErr) } // Update metadata. meta, err := s.readMeta(sessionKey) if err != nil { return err } now := time.Now() if meta.Count == 0 && meta.CreatedAt.IsZero() { meta.CreatedAt = now } meta.Count++ meta.UpdatedAt = now return s.writeMeta(sessionKey, meta) } func (s *JSONLStore) GetHistory( _ context.Context, sessionKey string, ) ([]providers.Message, error) { l := s.sessionLock(sessionKey) l.Lock() defer l.Unlock() meta, err := s.readMeta(sessionKey) if err != nil { return nil, err } // Pass meta.Skip so readMessages skips those lines without // unmarshaling them — avoids wasted CPU on truncated messages. msgs, err := readMessages(s.jsonlPath(sessionKey), meta.Skip) if err != nil { return nil, err } return msgs, nil } func (s *JSONLStore) GetSummary( _ context.Context, sessionKey string, ) (string, error) { l := s.sessionLock(sessionKey) l.Lock() defer l.Unlock() meta, err := s.readMeta(sessionKey) if err != nil { return "", err } return meta.Summary, nil } func (s *JSONLStore) SetSummary( _ context.Context, sessionKey, summary string, ) error { l := s.sessionLock(sessionKey) l.Lock() defer l.Unlock() meta, err := s.readMeta(sessionKey) if err != nil { return err } now := time.Now() if meta.CreatedAt.IsZero() { meta.CreatedAt = now } meta.Summary = summary meta.UpdatedAt = now return s.writeMeta(sessionKey, meta) } func (s *JSONLStore) TruncateHistory( _ context.Context, sessionKey string, keepLast int, ) error { l := s.sessionLock(sessionKey) l.Lock() defer l.Unlock() meta, err := s.readMeta(sessionKey) if err != nil { return err } // Always reconcile meta.Count with the actual line count on disk. // A crash between the JSONL append and the meta update in addMsg // leaves meta.Count stale (e.g. file has 101 lines but meta says // 100). Counting lines is cheap — no unmarshal, just a scan — and // TruncateHistory is not a hot path, so always re-count. n, countErr := countLines(s.jsonlPath(sessionKey)) if countErr != nil { return countErr } meta.Count = n if keepLast <= 0 { meta.Skip = meta.Count } else { effective := meta.Count - meta.Skip if keepLast < effective { meta.Skip = meta.Count - keepLast } } meta.UpdatedAt = time.Now() return s.writeMeta(sessionKey, meta) } func (s *JSONLStore) SetHistory( _ context.Context, sessionKey string, history []providers.Message, ) error { l := s.sessionLock(sessionKey) l.Lock() defer l.Unlock() meta, err := s.readMeta(sessionKey) if err != nil { return err } now := time.Now() if meta.CreatedAt.IsZero() { meta.CreatedAt = now } meta.Skip = 0 meta.Count = len(history) meta.UpdatedAt = now // Write meta BEFORE rewriting the JSONL file. If we crash between // the two writes, meta has Skip=0 and the old file is still intact, // so GetHistory reads from line 1 — returning "too many" messages // rather than losing data. The next SetHistory call corrects this. err = s.writeMeta(sessionKey, meta) if err != nil { return err } return s.rewriteJSONL(sessionKey, history) } // Compact physically rewrites the JSONL file, dropping all logically // skipped lines. This reclaims disk space that accumulates after // repeated TruncateHistory calls. // // It is safe to call at any time; if there is nothing to compact // (skip == 0) the method returns immediately. func (s *JSONLStore) Compact( _ context.Context, sessionKey string, ) error { l := s.sessionLock(sessionKey) l.Lock() defer l.Unlock() meta, err := s.readMeta(sessionKey) if err != nil { return err } if meta.Skip == 0 { return nil } // Read only the active messages, skipping truncated lines // without unmarshaling them. active, err := readMessages(s.jsonlPath(sessionKey), meta.Skip) if err != nil { return err } // Write meta BEFORE rewriting the JSONL file. If the process // crashes between the two writes, meta has Skip=0 and the old // (uncompacted) file is still intact, so GetHistory reads from // line 1 — returning previously-truncated messages rather than // losing data. The next Compact or TruncateHistory corrects this. meta.Skip = 0 meta.Count = len(active) meta.UpdatedAt = time.Now() err = s.writeMeta(sessionKey, meta) if err != nil { return err } return s.rewriteJSONL(sessionKey, active) } // rewriteJSONL atomically replaces the JSONL file with the given messages. func (s *JSONLStore) rewriteJSONL( sessionKey string, msgs []providers.Message, ) error { target := s.jsonlPath(sessionKey) tmp := target + ".tmp" f, err := os.Create(tmp) if err != nil { return fmt.Errorf("memory: create jsonl tmp: %w", err) } for i, msg := range msgs { line, marshalErr := json.Marshal(msg) if marshalErr != nil { f.Close() _ = os.Remove(tmp) return fmt.Errorf("memory: marshal message %d: %w", i, marshalErr) } line = append(line, '\n') _, writeErr := f.Write(line) if writeErr != nil { f.Close() _ = os.Remove(tmp) return fmt.Errorf("memory: write message %d: %w", i, writeErr) } } err = f.Close() if err != nil { _ = os.Remove(tmp) return fmt.Errorf("memory: close jsonl tmp: %w", err) } err = os.Rename(tmp, target) if err != nil { _ = os.Remove(tmp) return fmt.Errorf("memory: rename jsonl: %w", err) } return nil } func (s *JSONLStore) Close() error { return nil }