mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
Merge pull request #732 from is-Xiaoen/feat/jsonl-memory-store
feat(memory): JSONL-backed session persistence
This commit is contained in:
@@ -0,0 +1,460 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/sipeed/picoclaw/pkg/fileutil"
|
||||
"github.com/sipeed/picoclaw/pkg/providers"
|
||||
)
|
||||
|
||||
const (
|
||||
// numLockShards is the fixed number of mutexes used to serialize
|
||||
// per-session access. Using a sharded array instead of a map keeps
|
||||
// memory bounded regardless of how many sessions are created over
|
||||
// the lifetime of the process — important for a long-running daemon.
|
||||
numLockShards = 64
|
||||
|
||||
// maxLineSize is the maximum size of a single JSON line in a .jsonl
|
||||
// file. Tool results (read_file, web search, etc.) can be large, so
|
||||
// we set a generous limit. The scanner starts at 64 KB and grows
|
||||
// only as needed up to this cap.
|
||||
maxLineSize = 10 * 1024 * 1024 // 10 MB
|
||||
)
|
||||
|
||||
// sessionMeta holds per-session metadata stored in a .meta.json file.
|
||||
type sessionMeta struct {
|
||||
Key string `json:"key"`
|
||||
Summary string `json:"summary"`
|
||||
Skip int `json:"skip"`
|
||||
Count int `json:"count"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
}
|
||||
|
||||
// JSONLStore implements Store using append-only JSONL files.
|
||||
//
|
||||
// Each session is stored as two files:
|
||||
//
|
||||
// {sanitized_key}.jsonl — one JSON-encoded message per line, append-only
|
||||
// {sanitized_key}.meta.json — session metadata (summary, logical truncation offset)
|
||||
//
|
||||
// Messages are never physically deleted from the JSONL file. Instead,
|
||||
// TruncateHistory records a "skip" offset in the metadata file and
|
||||
// GetHistory ignores lines before that offset. This keeps all writes
|
||||
// append-only, which is both fast and crash-safe.
|
||||
type JSONLStore struct {
|
||||
dir string
|
||||
locks [numLockShards]sync.Mutex
|
||||
}
|
||||
|
||||
// NewJSONLStore creates a new JSONL-backed store rooted at dir.
|
||||
func NewJSONLStore(dir string) (*JSONLStore, error) {
|
||||
err := os.MkdirAll(dir, 0o755)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("memory: create directory: %w", err)
|
||||
}
|
||||
return &JSONLStore{dir: dir}, nil
|
||||
}
|
||||
|
||||
// sessionLock returns a mutex for the given session key.
|
||||
// Keys are mapped to a fixed pool of shards via FNV hash, so
|
||||
// memory usage is O(1) regardless of total session count.
|
||||
func (s *JSONLStore) sessionLock(key string) *sync.Mutex {
|
||||
h := fnv.New32a()
|
||||
h.Write([]byte(key))
|
||||
return &s.locks[h.Sum32()%numLockShards]
|
||||
}
|
||||
|
||||
func (s *JSONLStore) jsonlPath(key string) string {
|
||||
return filepath.Join(s.dir, sanitizeKey(key)+".jsonl")
|
||||
}
|
||||
|
||||
func (s *JSONLStore) metaPath(key string) string {
|
||||
return filepath.Join(s.dir, sanitizeKey(key)+".meta.json")
|
||||
}
|
||||
|
||||
// sanitizeKey converts a session key to a safe filename component.
|
||||
// Mirrors pkg/session.sanitizeFilename so that migration paths match.
|
||||
//
|
||||
// Note: this is a lossy mapping — "telegram:123" and "telegram_123"
|
||||
// both produce the same filename. This is an intentional tradeoff:
|
||||
// keys with colons (e.g. from channels) are by far the common case,
|
||||
// and a bidirectional encoding (like URL-encoding) would complicate
|
||||
// file listings and debugging.
|
||||
func sanitizeKey(key string) string {
|
||||
return strings.ReplaceAll(key, ":", "_")
|
||||
}
|
||||
|
||||
// readMeta loads the metadata file for a session.
|
||||
// Returns a zero-value sessionMeta if the file does not exist.
|
||||
func (s *JSONLStore) readMeta(key string) (sessionMeta, error) {
|
||||
data, err := os.ReadFile(s.metaPath(key))
|
||||
if os.IsNotExist(err) {
|
||||
return sessionMeta{Key: key}, nil
|
||||
}
|
||||
if err != nil {
|
||||
return sessionMeta{}, fmt.Errorf("memory: read meta: %w", err)
|
||||
}
|
||||
var meta sessionMeta
|
||||
err = json.Unmarshal(data, &meta)
|
||||
if err != nil {
|
||||
return sessionMeta{}, fmt.Errorf("memory: decode meta: %w", err)
|
||||
}
|
||||
return meta, nil
|
||||
}
|
||||
|
||||
// writeMeta atomically writes the metadata file using the project's
|
||||
// standard WriteFileAtomic (temp + fsync + rename).
|
||||
func (s *JSONLStore) writeMeta(key string, meta sessionMeta) error {
|
||||
data, err := json.MarshalIndent(meta, "", " ")
|
||||
if err != nil {
|
||||
return fmt.Errorf("memory: encode meta: %w", err)
|
||||
}
|
||||
return fileutil.WriteFileAtomic(s.metaPath(key), data, 0o644)
|
||||
}
|
||||
|
||||
// readMessages reads valid JSON lines from a .jsonl file, skipping
|
||||
// the first `skip` lines without unmarshaling them. This avoids the
|
||||
// cost of json.Unmarshal on logically truncated messages.
|
||||
// Malformed trailing lines (e.g. from a crash) are silently skipped.
|
||||
func readMessages(path string, skip int) ([]providers.Message, error) {
|
||||
f, err := os.Open(path)
|
||||
if os.IsNotExist(err) {
|
||||
return []providers.Message{}, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("memory: open jsonl: %w", err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
var msgs []providers.Message
|
||||
scanner := bufio.NewScanner(f)
|
||||
// Allow large lines for tool results (read_file, web search, etc.).
|
||||
scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize)
|
||||
|
||||
lineNum := 0
|
||||
for scanner.Scan() {
|
||||
line := scanner.Bytes()
|
||||
if len(line) == 0 {
|
||||
continue
|
||||
}
|
||||
lineNum++
|
||||
if lineNum <= skip {
|
||||
continue
|
||||
}
|
||||
var msg providers.Message
|
||||
if err := json.Unmarshal(line, &msg); err != nil {
|
||||
// Corrupt line — likely a partial write from a crash.
|
||||
// Log so operators know data was skipped, but don't
|
||||
// fail the entire read; this is the standard JSONL
|
||||
// recovery pattern.
|
||||
log.Printf("memory: skipping corrupt line %d in %s: %v",
|
||||
lineNum, filepath.Base(path), err)
|
||||
continue
|
||||
}
|
||||
msgs = append(msgs, msg)
|
||||
}
|
||||
if scanner.Err() != nil {
|
||||
return nil, fmt.Errorf("memory: scan jsonl: %w", scanner.Err())
|
||||
}
|
||||
|
||||
if msgs == nil {
|
||||
msgs = []providers.Message{}
|
||||
}
|
||||
return msgs, nil
|
||||
}
|
||||
|
||||
// countLines counts the total number of non-empty lines in a .jsonl file.
|
||||
// Used by TruncateHistory to reconcile a stale meta.Count without
|
||||
// the overhead of unmarshaling every message.
|
||||
func countLines(path string) (int, error) {
|
||||
f, err := os.Open(path)
|
||||
if os.IsNotExist(err) {
|
||||
return 0, nil
|
||||
}
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("memory: open jsonl: %w", err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
n := 0
|
||||
scanner := bufio.NewScanner(f)
|
||||
scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize)
|
||||
for scanner.Scan() {
|
||||
if len(scanner.Bytes()) > 0 {
|
||||
n++
|
||||
}
|
||||
}
|
||||
return n, scanner.Err()
|
||||
}
|
||||
|
||||
func (s *JSONLStore) AddMessage(
|
||||
_ context.Context, sessionKey, role, content string,
|
||||
) error {
|
||||
return s.addMsg(sessionKey, providers.Message{
|
||||
Role: role,
|
||||
Content: content,
|
||||
})
|
||||
}
|
||||
|
||||
func (s *JSONLStore) AddFullMessage(
|
||||
_ context.Context, sessionKey string, msg providers.Message,
|
||||
) error {
|
||||
return s.addMsg(sessionKey, msg)
|
||||
}
|
||||
|
||||
// addMsg is the shared implementation for AddMessage and AddFullMessage.
|
||||
func (s *JSONLStore) addMsg(sessionKey string, msg providers.Message) error {
|
||||
l := s.sessionLock(sessionKey)
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
// Append the message as a single JSON line.
|
||||
line, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("memory: marshal message: %w", err)
|
||||
}
|
||||
line = append(line, '\n')
|
||||
|
||||
f, err := os.OpenFile(
|
||||
s.jsonlPath(sessionKey),
|
||||
os.O_CREATE|os.O_WRONLY|os.O_APPEND,
|
||||
0o644,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("memory: open jsonl for append: %w", err)
|
||||
}
|
||||
_, writeErr := f.Write(line)
|
||||
if writeErr != nil {
|
||||
f.Close()
|
||||
return fmt.Errorf("memory: append message: %w", writeErr)
|
||||
}
|
||||
// Flush to physical storage before closing. This matches the
|
||||
// durability guarantee of writeMeta and rewriteJSONL (which use
|
||||
// WriteFileAtomic with fsync). Without Sync, a power loss could
|
||||
// leave the append in the kernel page cache only — lost on reboot.
|
||||
if syncErr := f.Sync(); syncErr != nil {
|
||||
f.Close()
|
||||
return fmt.Errorf("memory: sync jsonl: %w", syncErr)
|
||||
}
|
||||
if closeErr := f.Close(); closeErr != nil {
|
||||
return fmt.Errorf("memory: close jsonl: %w", closeErr)
|
||||
}
|
||||
|
||||
// Update metadata.
|
||||
meta, err := s.readMeta(sessionKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
now := time.Now()
|
||||
if meta.Count == 0 && meta.CreatedAt.IsZero() {
|
||||
meta.CreatedAt = now
|
||||
}
|
||||
meta.Count++
|
||||
meta.UpdatedAt = now
|
||||
|
||||
return s.writeMeta(sessionKey, meta)
|
||||
}
|
||||
|
||||
func (s *JSONLStore) GetHistory(
|
||||
_ context.Context, sessionKey string,
|
||||
) ([]providers.Message, error) {
|
||||
l := s.sessionLock(sessionKey)
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
meta, err := s.readMeta(sessionKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Pass meta.Skip so readMessages skips those lines without
|
||||
// unmarshaling them — avoids wasted CPU on truncated messages.
|
||||
msgs, err := readMessages(s.jsonlPath(sessionKey), meta.Skip)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return msgs, nil
|
||||
}
|
||||
|
||||
func (s *JSONLStore) GetSummary(
|
||||
_ context.Context, sessionKey string,
|
||||
) (string, error) {
|
||||
l := s.sessionLock(sessionKey)
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
meta, err := s.readMeta(sessionKey)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return meta.Summary, nil
|
||||
}
|
||||
|
||||
func (s *JSONLStore) SetSummary(
|
||||
_ context.Context, sessionKey, summary string,
|
||||
) error {
|
||||
l := s.sessionLock(sessionKey)
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
meta, err := s.readMeta(sessionKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
now := time.Now()
|
||||
if meta.CreatedAt.IsZero() {
|
||||
meta.CreatedAt = now
|
||||
}
|
||||
meta.Summary = summary
|
||||
meta.UpdatedAt = now
|
||||
|
||||
return s.writeMeta(sessionKey, meta)
|
||||
}
|
||||
|
||||
func (s *JSONLStore) TruncateHistory(
|
||||
_ context.Context, sessionKey string, keepLast int,
|
||||
) error {
|
||||
l := s.sessionLock(sessionKey)
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
meta, err := s.readMeta(sessionKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Always reconcile meta.Count with the actual line count on disk.
|
||||
// A crash between the JSONL append and the meta update in addMsg
|
||||
// leaves meta.Count stale (e.g. file has 101 lines but meta says
|
||||
// 100). Counting lines is cheap — no unmarshal, just a scan — and
|
||||
// TruncateHistory is not a hot path, so always re-count.
|
||||
n, countErr := countLines(s.jsonlPath(sessionKey))
|
||||
if countErr != nil {
|
||||
return countErr
|
||||
}
|
||||
meta.Count = n
|
||||
|
||||
if keepLast <= 0 {
|
||||
meta.Skip = meta.Count
|
||||
} else {
|
||||
effective := meta.Count - meta.Skip
|
||||
if keepLast < effective {
|
||||
meta.Skip = meta.Count - keepLast
|
||||
}
|
||||
}
|
||||
meta.UpdatedAt = time.Now()
|
||||
|
||||
return s.writeMeta(sessionKey, meta)
|
||||
}
|
||||
|
||||
func (s *JSONLStore) SetHistory(
|
||||
_ context.Context,
|
||||
sessionKey string,
|
||||
history []providers.Message,
|
||||
) error {
|
||||
l := s.sessionLock(sessionKey)
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
meta, err := s.readMeta(sessionKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
now := time.Now()
|
||||
if meta.CreatedAt.IsZero() {
|
||||
meta.CreatedAt = now
|
||||
}
|
||||
meta.Skip = 0
|
||||
meta.Count = len(history)
|
||||
meta.UpdatedAt = now
|
||||
|
||||
// Write meta BEFORE rewriting the JSONL file. If we crash between
|
||||
// the two writes, meta has Skip=0 and the old file is still intact,
|
||||
// so GetHistory reads from line 1 — returning "too many" messages
|
||||
// rather than losing data. The next SetHistory call corrects this.
|
||||
err = s.writeMeta(sessionKey, meta)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.rewriteJSONL(sessionKey, history)
|
||||
}
|
||||
|
||||
// Compact physically rewrites the JSONL file, dropping all logically
|
||||
// skipped lines. This reclaims disk space that accumulates after
|
||||
// repeated TruncateHistory calls.
|
||||
//
|
||||
// It is safe to call at any time; if there is nothing to compact
|
||||
// (skip == 0) the method returns immediately.
|
||||
func (s *JSONLStore) Compact(
|
||||
_ context.Context, sessionKey string,
|
||||
) error {
|
||||
l := s.sessionLock(sessionKey)
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
meta, err := s.readMeta(sessionKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if meta.Skip == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Read only the active messages, skipping truncated lines
|
||||
// without unmarshaling them.
|
||||
active, err := readMessages(s.jsonlPath(sessionKey), meta.Skip)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Write meta BEFORE rewriting the JSONL file. If the process
|
||||
// crashes between the two writes, meta has Skip=0 and the old
|
||||
// (uncompacted) file is still intact, so GetHistory reads from
|
||||
// line 1 — returning previously-truncated messages rather than
|
||||
// losing data. The next Compact or TruncateHistory corrects this.
|
||||
meta.Skip = 0
|
||||
meta.Count = len(active)
|
||||
meta.UpdatedAt = time.Now()
|
||||
|
||||
err = s.writeMeta(sessionKey, meta)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.rewriteJSONL(sessionKey, active)
|
||||
}
|
||||
|
||||
// rewriteJSONL atomically replaces the JSONL file with the given messages
|
||||
// using the project's standard WriteFileAtomic (temp + fsync + rename).
|
||||
func (s *JSONLStore) rewriteJSONL(
|
||||
sessionKey string, msgs []providers.Message,
|
||||
) error {
|
||||
var buf bytes.Buffer
|
||||
for i, msg := range msgs {
|
||||
line, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("memory: marshal message %d: %w", i, err)
|
||||
}
|
||||
buf.Write(line)
|
||||
buf.WriteByte('\n')
|
||||
}
|
||||
return fileutil.WriteFileAtomic(s.jsonlPath(sessionKey), buf.Bytes(), 0o644)
|
||||
}
|
||||
|
||||
func (s *JSONLStore) Close() error {
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,835 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/sipeed/picoclaw/pkg/providers"
|
||||
)
|
||||
|
||||
func newTestStore(t *testing.T) *JSONLStore {
|
||||
t.Helper()
|
||||
store, err := NewJSONLStore(t.TempDir())
|
||||
if err != nil {
|
||||
t.Fatalf("NewJSONLStore: %v", err)
|
||||
}
|
||||
return store
|
||||
}
|
||||
|
||||
func TestNewJSONLStore_CreatesDirectory(t *testing.T) {
|
||||
dir := filepath.Join(t.TempDir(), "nested", "sessions")
|
||||
store, err := NewJSONLStore(dir)
|
||||
if err != nil {
|
||||
t.Fatalf("NewJSONLStore: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
info, err := os.Stat(dir)
|
||||
if err != nil {
|
||||
t.Fatalf("Stat: %v", err)
|
||||
}
|
||||
if !info.IsDir() {
|
||||
t.Errorf("expected directory, got file")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddMessage_BasicRoundtrip(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
err := store.AddMessage(ctx, "s1", "user", "hello")
|
||||
if err != nil {
|
||||
t.Fatalf("AddMessage: %v", err)
|
||||
}
|
||||
err = store.AddMessage(ctx, "s1", "assistant", "hi there")
|
||||
if err != nil {
|
||||
t.Fatalf("AddMessage: %v", err)
|
||||
}
|
||||
|
||||
history, err := store.GetHistory(ctx, "s1")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
if len(history) != 2 {
|
||||
t.Fatalf("expected 2 messages, got %d", len(history))
|
||||
}
|
||||
if history[0].Role != "user" || history[0].Content != "hello" {
|
||||
t.Errorf("msg[0] = %+v", history[0])
|
||||
}
|
||||
if history[1].Role != "assistant" || history[1].Content != "hi there" {
|
||||
t.Errorf("msg[1] = %+v", history[1])
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddMessage_AutoCreatesSession(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
// Adding a message to a non-existent session should work.
|
||||
err := store.AddMessage(ctx, "new-session", "user", "first message")
|
||||
if err != nil {
|
||||
t.Fatalf("AddMessage: %v", err)
|
||||
}
|
||||
|
||||
history, err := store.GetHistory(ctx, "new-session")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
if len(history) != 1 {
|
||||
t.Fatalf("expected 1 message, got %d", len(history))
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddFullMessage_WithToolCalls(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
msg := providers.Message{
|
||||
Role: "assistant",
|
||||
Content: "Let me search that.",
|
||||
ToolCalls: []providers.ToolCall{
|
||||
{
|
||||
ID: "call_abc",
|
||||
Type: "function",
|
||||
Function: &providers.FunctionCall{
|
||||
Name: "web_search",
|
||||
Arguments: `{"q":"golang jsonl"}`,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err := store.AddFullMessage(ctx, "tc", msg)
|
||||
if err != nil {
|
||||
t.Fatalf("AddFullMessage: %v", err)
|
||||
}
|
||||
|
||||
history, err := store.GetHistory(ctx, "tc")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
if len(history) != 1 {
|
||||
t.Fatalf("expected 1, got %d", len(history))
|
||||
}
|
||||
if len(history[0].ToolCalls) != 1 {
|
||||
t.Fatalf("expected 1 tool call, got %d", len(history[0].ToolCalls))
|
||||
}
|
||||
tc := history[0].ToolCalls[0]
|
||||
if tc.ID != "call_abc" {
|
||||
t.Errorf("tool call ID = %q", tc.ID)
|
||||
}
|
||||
if tc.Function == nil || tc.Function.Name != "web_search" {
|
||||
t.Errorf("tool call function = %+v", tc.Function)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddFullMessage_ToolCallID(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
msg := providers.Message{
|
||||
Role: "tool",
|
||||
Content: "search results here",
|
||||
ToolCallID: "call_abc",
|
||||
}
|
||||
|
||||
err := store.AddFullMessage(ctx, "tr", msg)
|
||||
if err != nil {
|
||||
t.Fatalf("AddFullMessage: %v", err)
|
||||
}
|
||||
|
||||
history, err := store.GetHistory(ctx, "tr")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
if len(history) != 1 {
|
||||
t.Fatalf("expected 1, got %d", len(history))
|
||||
}
|
||||
if history[0].ToolCallID != "call_abc" {
|
||||
t.Errorf("ToolCallID = %q", history[0].ToolCallID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetHistory_EmptySession(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
history, err := store.GetHistory(ctx, "nonexistent")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
if history == nil {
|
||||
t.Fatal("expected non-nil empty slice")
|
||||
}
|
||||
if len(history) != 0 {
|
||||
t.Errorf("expected 0 messages, got %d", len(history))
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetHistory_Ordering(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
err := store.AddMessage(
|
||||
ctx, "order",
|
||||
"user",
|
||||
string(rune('a'+i)),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("AddMessage(%d): %v", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
history, err := store.GetHistory(ctx, "order")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
if len(history) != 5 {
|
||||
t.Fatalf("expected 5, got %d", len(history))
|
||||
}
|
||||
for i := 0; i < 5; i++ {
|
||||
expected := string(rune('a' + i))
|
||||
if history[i].Content != expected {
|
||||
t.Errorf("msg[%d].Content = %q, want %q", i, history[i].Content, expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetSummary_GetSummary(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
// No summary yet.
|
||||
summary, err := store.GetSummary(ctx, "s1")
|
||||
if err != nil {
|
||||
t.Fatalf("GetSummary: %v", err)
|
||||
}
|
||||
if summary != "" {
|
||||
t.Errorf("expected empty, got %q", summary)
|
||||
}
|
||||
|
||||
// Set a summary.
|
||||
err = store.SetSummary(ctx, "s1", "talked about Go")
|
||||
if err != nil {
|
||||
t.Fatalf("SetSummary: %v", err)
|
||||
}
|
||||
|
||||
summary, err = store.GetSummary(ctx, "s1")
|
||||
if err != nil {
|
||||
t.Fatalf("GetSummary: %v", err)
|
||||
}
|
||||
if summary != "talked about Go" {
|
||||
t.Errorf("summary = %q", summary)
|
||||
}
|
||||
|
||||
// Update summary.
|
||||
err = store.SetSummary(ctx, "s1", "updated summary")
|
||||
if err != nil {
|
||||
t.Fatalf("SetSummary: %v", err)
|
||||
}
|
||||
|
||||
summary, err = store.GetSummary(ctx, "s1")
|
||||
if err != nil {
|
||||
t.Fatalf("GetSummary: %v", err)
|
||||
}
|
||||
if summary != "updated summary" {
|
||||
t.Errorf("summary = %q", summary)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTruncateHistory_KeepLast(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
err := store.AddMessage(
|
||||
ctx, "trunc",
|
||||
"user",
|
||||
string(rune('a'+i)),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("AddMessage: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
err := store.TruncateHistory(ctx, "trunc", 4)
|
||||
if err != nil {
|
||||
t.Fatalf("TruncateHistory: %v", err)
|
||||
}
|
||||
|
||||
history, err := store.GetHistory(ctx, "trunc")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
if len(history) != 4 {
|
||||
t.Fatalf("expected 4, got %d", len(history))
|
||||
}
|
||||
// Should be the last 4: g, h, i, j
|
||||
if history[0].Content != "g" {
|
||||
t.Errorf("first kept = %q, want 'g'", history[0].Content)
|
||||
}
|
||||
if history[3].Content != "j" {
|
||||
t.Errorf("last kept = %q, want 'j'", history[3].Content)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTruncateHistory_KeepZero(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
err := store.AddMessage(ctx, "empty", "user", "msg")
|
||||
if err != nil {
|
||||
t.Fatalf("AddMessage: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
err := store.TruncateHistory(ctx, "empty", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("TruncateHistory: %v", err)
|
||||
}
|
||||
|
||||
history, err := store.GetHistory(ctx, "empty")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
if len(history) != 0 {
|
||||
t.Errorf("expected 0, got %d", len(history))
|
||||
}
|
||||
}
|
||||
|
||||
func TestTruncateHistory_KeepMoreThanExists(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
err := store.AddMessage(ctx, "few", "user", "msg")
|
||||
if err != nil {
|
||||
t.Fatalf("AddMessage: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Keep 100, but only 3 exist — should keep all.
|
||||
err := store.TruncateHistory(ctx, "few", 100)
|
||||
if err != nil {
|
||||
t.Fatalf("TruncateHistory: %v", err)
|
||||
}
|
||||
|
||||
history, err := store.GetHistory(ctx, "few")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
if len(history) != 3 {
|
||||
t.Errorf("expected 3, got %d", len(history))
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetHistory_ReplacesAll(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
// Add some initial messages.
|
||||
for i := 0; i < 5; i++ {
|
||||
err := store.AddMessage(ctx, "replace", "user", "old")
|
||||
if err != nil {
|
||||
t.Fatalf("AddMessage: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Replace with new history.
|
||||
newHistory := []providers.Message{
|
||||
{Role: "user", Content: "new1"},
|
||||
{Role: "assistant", Content: "new2"},
|
||||
}
|
||||
err := store.SetHistory(ctx, "replace", newHistory)
|
||||
if err != nil {
|
||||
t.Fatalf("SetHistory: %v", err)
|
||||
}
|
||||
|
||||
history, err := store.GetHistory(ctx, "replace")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
if len(history) != 2 {
|
||||
t.Fatalf("expected 2, got %d", len(history))
|
||||
}
|
||||
if history[0].Content != "new1" || history[1].Content != "new2" {
|
||||
t.Errorf("history = %+v", history)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetHistory_ResetsSkip(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
// Add messages and truncate.
|
||||
for i := 0; i < 10; i++ {
|
||||
err := store.AddMessage(ctx, "skip-reset", "user", "old")
|
||||
if err != nil {
|
||||
t.Fatalf("AddMessage: %v", err)
|
||||
}
|
||||
}
|
||||
err := store.TruncateHistory(ctx, "skip-reset", 3)
|
||||
if err != nil {
|
||||
t.Fatalf("TruncateHistory: %v", err)
|
||||
}
|
||||
|
||||
// SetHistory should reset skip to 0.
|
||||
newHistory := []providers.Message{
|
||||
{Role: "user", Content: "fresh"},
|
||||
}
|
||||
err = store.SetHistory(ctx, "skip-reset", newHistory)
|
||||
if err != nil {
|
||||
t.Fatalf("SetHistory: %v", err)
|
||||
}
|
||||
|
||||
history, err := store.GetHistory(ctx, "skip-reset")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
if len(history) != 1 {
|
||||
t.Fatalf("expected 1, got %d", len(history))
|
||||
}
|
||||
if history[0].Content != "fresh" {
|
||||
t.Errorf("content = %q", history[0].Content)
|
||||
}
|
||||
}
|
||||
|
||||
func TestColonInKey(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
err := store.AddMessage(ctx, "telegram:123", "user", "hi")
|
||||
if err != nil {
|
||||
t.Fatalf("AddMessage: %v", err)
|
||||
}
|
||||
|
||||
history, err := store.GetHistory(ctx, "telegram:123")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
if len(history) != 1 {
|
||||
t.Fatalf("expected 1, got %d", len(history))
|
||||
}
|
||||
|
||||
// Verify the file is named with underscore.
|
||||
jsonlFile := filepath.Join(store.dir, "telegram_123.jsonl")
|
||||
if _, statErr := os.Stat(jsonlFile); statErr != nil {
|
||||
t.Errorf("expected file %s to exist: %v", jsonlFile, statErr)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCompact_RemovesSkippedMessages(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
// Write 10 messages, then truncate to keep last 3.
|
||||
for i := 0; i < 10; i++ {
|
||||
err := store.AddMessage(ctx, "compact", "user", string(rune('a'+i)))
|
||||
if err != nil {
|
||||
t.Fatalf("AddMessage: %v", err)
|
||||
}
|
||||
}
|
||||
err := store.TruncateHistory(ctx, "compact", 3)
|
||||
if err != nil {
|
||||
t.Fatalf("TruncateHistory: %v", err)
|
||||
}
|
||||
|
||||
// Before compact: file still has 10 lines.
|
||||
allOnDisk, err := readMessages(store.jsonlPath("compact"), 0)
|
||||
if err != nil {
|
||||
t.Fatalf("readMessages: %v", err)
|
||||
}
|
||||
if len(allOnDisk) != 10 {
|
||||
t.Fatalf("before compact: expected 10 on disk, got %d", len(allOnDisk))
|
||||
}
|
||||
|
||||
// Compact.
|
||||
err = store.Compact(ctx, "compact")
|
||||
if err != nil {
|
||||
t.Fatalf("Compact: %v", err)
|
||||
}
|
||||
|
||||
// After compact: file should have only 3 lines.
|
||||
allOnDisk, err = readMessages(store.jsonlPath("compact"), 0)
|
||||
if err != nil {
|
||||
t.Fatalf("readMessages: %v", err)
|
||||
}
|
||||
if len(allOnDisk) != 3 {
|
||||
t.Fatalf("after compact: expected 3 on disk, got %d", len(allOnDisk))
|
||||
}
|
||||
|
||||
// GetHistory should still return the same 3 messages.
|
||||
history, err := store.GetHistory(ctx, "compact")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
if len(history) != 3 {
|
||||
t.Fatalf("expected 3, got %d", len(history))
|
||||
}
|
||||
if history[0].Content != "h" || history[2].Content != "j" {
|
||||
t.Errorf("wrong content: %+v", history)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCompact_NoOpWhenNoSkip(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
err := store.AddMessage(ctx, "noop", "user", "msg")
|
||||
if err != nil {
|
||||
t.Fatalf("AddMessage: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Compact without prior truncation — should be a no-op.
|
||||
err := store.Compact(ctx, "noop")
|
||||
if err != nil {
|
||||
t.Fatalf("Compact: %v", err)
|
||||
}
|
||||
|
||||
history, err := store.GetHistory(ctx, "noop")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
if len(history) != 5 {
|
||||
t.Errorf("expected 5, got %d", len(history))
|
||||
}
|
||||
}
|
||||
|
||||
func TestCompact_ThenAppend(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
for i := 0; i < 8; i++ {
|
||||
err := store.AddMessage(ctx, "cap", "user", string(rune('a'+i)))
|
||||
if err != nil {
|
||||
t.Fatalf("AddMessage: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
err := store.TruncateHistory(ctx, "cap", 2)
|
||||
if err != nil {
|
||||
t.Fatalf("TruncateHistory: %v", err)
|
||||
}
|
||||
err = store.Compact(ctx, "cap")
|
||||
if err != nil {
|
||||
t.Fatalf("Compact: %v", err)
|
||||
}
|
||||
|
||||
// Append after compaction should work correctly.
|
||||
err = store.AddMessage(ctx, "cap", "user", "new")
|
||||
if err != nil {
|
||||
t.Fatalf("AddMessage after compact: %v", err)
|
||||
}
|
||||
|
||||
history, err := store.GetHistory(ctx, "cap")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
if len(history) != 3 {
|
||||
t.Fatalf("expected 3, got %d", len(history))
|
||||
}
|
||||
// g, h (kept from truncation), new (appended after compaction).
|
||||
if history[0].Content != "g" {
|
||||
t.Errorf("first = %q, want 'g'", history[0].Content)
|
||||
}
|
||||
if history[2].Content != "new" {
|
||||
t.Errorf("last = %q, want 'new'", history[2].Content)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTruncateHistory_StaleMetaCount(t *testing.T) {
|
||||
// Simulates a crash between JSONL append and meta update in addMsg:
|
||||
// file has N+1 lines but meta.Count is still N. TruncateHistory must
|
||||
// reconcile with the real line count so that keepLast is accurate.
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
// Write 10 messages normally (meta.Count = 10).
|
||||
for i := 0; i < 10; i++ {
|
||||
err := store.AddMessage(ctx, "stale", "user", string(rune('a'+i)))
|
||||
if err != nil {
|
||||
t.Fatalf("AddMessage: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Simulate crash: append a line to JSONL but do NOT update meta.
|
||||
// This leaves meta.Count = 10 while the file has 11 lines.
|
||||
jsonlPath := store.jsonlPath("stale")
|
||||
f, err := os.OpenFile(jsonlPath, os.O_WRONLY|os.O_APPEND, 0o644)
|
||||
if err != nil {
|
||||
t.Fatalf("open for append: %v", err)
|
||||
}
|
||||
_, err = f.WriteString(`{"role":"user","content":"orphan"}` + "\n")
|
||||
if err != nil {
|
||||
t.Fatalf("write orphan: %v", err)
|
||||
}
|
||||
f.Close()
|
||||
|
||||
// TruncateHistory(keepLast=4) should keep the last 4 of 11 lines,
|
||||
// not the last 4 of 10.
|
||||
err = store.TruncateHistory(ctx, "stale", 4)
|
||||
if err != nil {
|
||||
t.Fatalf("TruncateHistory: %v", err)
|
||||
}
|
||||
|
||||
history, err := store.GetHistory(ctx, "stale")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
if len(history) != 4 {
|
||||
t.Fatalf("expected 4, got %d", len(history))
|
||||
}
|
||||
// Last 4 of [a,b,c,d,e,f,g,h,i,j,orphan] = [h,i,j,orphan]
|
||||
if history[0].Content != "h" {
|
||||
t.Errorf("first kept = %q, want 'h'", history[0].Content)
|
||||
}
|
||||
if history[3].Content != "orphan" {
|
||||
t.Errorf("last kept = %q, want 'orphan'", history[3].Content)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCrashRecovery_PartialLine(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
// Write a valid message first.
|
||||
err := store.AddMessage(ctx, "crash", "user", "valid")
|
||||
if err != nil {
|
||||
t.Fatalf("AddMessage: %v", err)
|
||||
}
|
||||
|
||||
// Simulate a crash by appending a partial JSON line directly.
|
||||
jsonlPath := store.jsonlPath("crash")
|
||||
f, err := os.OpenFile(jsonlPath, os.O_WRONLY|os.O_APPEND, 0o644)
|
||||
if err != nil {
|
||||
t.Fatalf("open for append: %v", err)
|
||||
}
|
||||
_, err = f.WriteString(`{"role":"user","content":"incomple`)
|
||||
if err != nil {
|
||||
t.Fatalf("write partial: %v", err)
|
||||
}
|
||||
f.Close()
|
||||
|
||||
// GetHistory should return only the valid message.
|
||||
history, err := store.GetHistory(ctx, "crash")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
if len(history) != 1 {
|
||||
t.Fatalf("expected 1 valid message, got %d", len(history))
|
||||
}
|
||||
if history[0].Content != "valid" {
|
||||
t.Errorf("content = %q", history[0].Content)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPersistence_AcrossInstances(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
ctx := context.Background()
|
||||
|
||||
// Write with first instance.
|
||||
store1, err := NewJSONLStore(dir)
|
||||
if err != nil {
|
||||
t.Fatalf("NewJSONLStore: %v", err)
|
||||
}
|
||||
err = store1.AddMessage(ctx, "persist", "user", "remember me")
|
||||
if err != nil {
|
||||
t.Fatalf("AddMessage: %v", err)
|
||||
}
|
||||
err = store1.SetSummary(ctx, "persist", "a test session")
|
||||
if err != nil {
|
||||
t.Fatalf("SetSummary: %v", err)
|
||||
}
|
||||
store1.Close()
|
||||
|
||||
// Read with second instance.
|
||||
store2, err := NewJSONLStore(dir)
|
||||
if err != nil {
|
||||
t.Fatalf("NewJSONLStore: %v", err)
|
||||
}
|
||||
defer store2.Close()
|
||||
|
||||
history, err := store2.GetHistory(ctx, "persist")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
if len(history) != 1 || history[0].Content != "remember me" {
|
||||
t.Errorf("history = %+v", history)
|
||||
}
|
||||
|
||||
summary, err := store2.GetSummary(ctx, "persist")
|
||||
if err != nil {
|
||||
t.Fatalf("GetSummary: %v", err)
|
||||
}
|
||||
if summary != "a test session" {
|
||||
t.Errorf("summary = %q", summary)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConcurrent_AddAndRead(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
const goroutines = 10
|
||||
const msgsPerGoroutine = 20
|
||||
|
||||
// Concurrent writes.
|
||||
for g := 0; g < goroutines; g++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < msgsPerGoroutine; i++ {
|
||||
_ = store.AddMessage(ctx, "concurrent", "user", "msg")
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
history, err := store.GetHistory(ctx, "concurrent")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
expected := goroutines * msgsPerGoroutine
|
||||
if len(history) != expected {
|
||||
t.Errorf("expected %d messages, got %d", expected, len(history))
|
||||
}
|
||||
}
|
||||
|
||||
func TestConcurrent_SummarizeRace(t *testing.T) {
|
||||
// Simulates the #704 race: one goroutine adds messages while
|
||||
// another truncates + sets summary — like summarizeSession().
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
// Seed with some messages.
|
||||
for i := 0; i < 20; i++ {
|
||||
err := store.AddMessage(ctx, "race", "user", "seed")
|
||||
if err != nil {
|
||||
t.Fatalf("AddMessage: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Writer goroutine (main agent loop).
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < 50; i++ {
|
||||
_ = store.AddMessage(ctx, "race", "user", "new")
|
||||
}
|
||||
}()
|
||||
|
||||
// Summarizer goroutine (background task).
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < 10; i++ {
|
||||
_ = store.SetSummary(ctx, "race", "summary")
|
||||
_ = store.TruncateHistory(ctx, "race", 5)
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Verify the store is still in a consistent state.
|
||||
_, err := store.GetHistory(ctx, "race")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory after race: %v", err)
|
||||
}
|
||||
_, err = store.GetSummary(ctx, "race")
|
||||
if err != nil {
|
||||
t.Fatalf("GetSummary after race: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMultipleSessions_Isolation(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
err := store.AddMessage(ctx, "s1", "user", "msg for s1")
|
||||
if err != nil {
|
||||
t.Fatalf("AddMessage: %v", err)
|
||||
}
|
||||
err = store.AddMessage(ctx, "s2", "user", "msg for s2")
|
||||
if err != nil {
|
||||
t.Fatalf("AddMessage: %v", err)
|
||||
}
|
||||
|
||||
h1, err := store.GetHistory(ctx, "s1")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory s1: %v", err)
|
||||
}
|
||||
h2, err := store.GetHistory(ctx, "s2")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory s2: %v", err)
|
||||
}
|
||||
|
||||
if len(h1) != 1 || h1[0].Content != "msg for s1" {
|
||||
t.Errorf("s1 history = %+v", h1)
|
||||
}
|
||||
if len(h2) != 1 || h2[0].Content != "msg for s2" {
|
||||
t.Errorf("s2 history = %+v", h2)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkAddMessage(b *testing.B) {
|
||||
dir := b.TempDir()
|
||||
store, err := NewJSONLStore(dir)
|
||||
if err != nil {
|
||||
b.Fatalf("NewJSONLStore: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
ctx := context.Background()
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_ = store.AddMessage(ctx, "bench", "user", "benchmark message content")
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkGetHistory_100(b *testing.B) {
|
||||
dir := b.TempDir()
|
||||
store, err := NewJSONLStore(dir)
|
||||
if err != nil {
|
||||
b.Fatalf("NewJSONLStore: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
ctx := context.Background()
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
_ = store.AddMessage(ctx, "bench", "user", "message content")
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, _ = store.GetHistory(ctx, "bench")
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkGetHistory_1000(b *testing.B) {
|
||||
dir := b.TempDir()
|
||||
store, err := NewJSONLStore(dir)
|
||||
if err != nil {
|
||||
b.Fatalf("NewJSONLStore: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
ctx := context.Background()
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
_ = store.AddMessage(ctx, "bench", "user", "message content")
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, _ = store.GetHistory(ctx, "bench")
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,108 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/sipeed/picoclaw/pkg/providers"
|
||||
)
|
||||
|
||||
// jsonSession mirrors pkg/session.Session for migration purposes.
|
||||
type jsonSession struct {
|
||||
Key string `json:"key"`
|
||||
Messages []providers.Message `json:"messages"`
|
||||
Summary string `json:"summary,omitempty"`
|
||||
Created time.Time `json:"created"`
|
||||
Updated time.Time `json:"updated"`
|
||||
}
|
||||
|
||||
// MigrateFromJSON reads legacy sessions/*.json files from sessionsDir,
|
||||
// writes them into the Store, and renames each migrated file to
|
||||
// .json.migrated as a backup. Returns the number of sessions migrated.
|
||||
//
|
||||
// Files that fail to parse are logged and skipped. Already-migrated
|
||||
// files (.json.migrated) are ignored, making the function idempotent.
|
||||
func MigrateFromJSON(
|
||||
ctx context.Context, sessionsDir string, store Store,
|
||||
) (int, error) {
|
||||
entries, err := os.ReadDir(sessionsDir)
|
||||
if os.IsNotExist(err) {
|
||||
return 0, nil
|
||||
}
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("memory: read sessions dir: %w", err)
|
||||
}
|
||||
|
||||
migrated := 0
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() {
|
||||
continue
|
||||
}
|
||||
name := entry.Name()
|
||||
if !strings.HasSuffix(name, ".json") {
|
||||
continue
|
||||
}
|
||||
// Skip already-migrated files.
|
||||
if strings.HasSuffix(name, ".migrated") {
|
||||
continue
|
||||
}
|
||||
|
||||
srcPath := filepath.Join(sessionsDir, name)
|
||||
|
||||
data, readErr := os.ReadFile(srcPath)
|
||||
if readErr != nil {
|
||||
log.Printf("memory: migrate: skip %s: %v", name, readErr)
|
||||
continue
|
||||
}
|
||||
|
||||
var sess jsonSession
|
||||
if parseErr := json.Unmarshal(data, &sess); parseErr != nil {
|
||||
log.Printf("memory: migrate: skip %s: %v", name, parseErr)
|
||||
continue
|
||||
}
|
||||
|
||||
// Use the key from the JSON content, not the filename.
|
||||
// Filenames are sanitized (":" → "_") but keys are not.
|
||||
key := sess.Key
|
||||
if key == "" {
|
||||
key = strings.TrimSuffix(name, ".json")
|
||||
}
|
||||
|
||||
// Use SetHistory (atomic replace) instead of per-message
|
||||
// AddFullMessage. This makes migration idempotent: if the
|
||||
// process crashes after writing messages but before the
|
||||
// rename below, a retry replaces the partial data cleanly
|
||||
// instead of duplicating messages.
|
||||
if setErr := store.SetHistory(ctx, key, sess.Messages); setErr != nil {
|
||||
return migrated, fmt.Errorf(
|
||||
"memory: migrate %s: set history: %w",
|
||||
name, setErr,
|
||||
)
|
||||
}
|
||||
|
||||
if sess.Summary != "" {
|
||||
if sumErr := store.SetSummary(ctx, key, sess.Summary); sumErr != nil {
|
||||
return migrated, fmt.Errorf(
|
||||
"memory: migrate %s: set summary: %w",
|
||||
name, sumErr,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Rename to .migrated as backup (not delete).
|
||||
renameErr := os.Rename(srcPath, srcPath+".migrated")
|
||||
if renameErr != nil {
|
||||
log.Printf("memory: migrate: rename %s: %v", name, renameErr)
|
||||
}
|
||||
|
||||
migrated++
|
||||
}
|
||||
|
||||
return migrated, nil
|
||||
}
|
||||
@@ -0,0 +1,384 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/sipeed/picoclaw/pkg/providers"
|
||||
)
|
||||
|
||||
func writeJSONSession(
|
||||
t *testing.T, dir string, filename string, sess jsonSession,
|
||||
) {
|
||||
t.Helper()
|
||||
data, err := json.MarshalIndent(sess, "", " ")
|
||||
if err != nil {
|
||||
t.Fatalf("marshal session: %v", err)
|
||||
}
|
||||
err = os.WriteFile(filepath.Join(dir, filename), data, 0o644)
|
||||
if err != nil {
|
||||
t.Fatalf("write session file: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMigrateFromJSON_Basic(t *testing.T) {
|
||||
sessionsDir := t.TempDir()
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
writeJSONSession(t, sessionsDir, "test.json", jsonSession{
|
||||
Key: "test",
|
||||
Messages: []providers.Message{
|
||||
{Role: "user", Content: "hello"},
|
||||
{Role: "assistant", Content: "hi"},
|
||||
},
|
||||
Summary: "A greeting.",
|
||||
Created: time.Now(),
|
||||
Updated: time.Now(),
|
||||
})
|
||||
|
||||
count, err := MigrateFromJSON(ctx, sessionsDir, store)
|
||||
if err != nil {
|
||||
t.Fatalf("MigrateFromJSON: %v", err)
|
||||
}
|
||||
if count != 1 {
|
||||
t.Errorf("expected 1 migrated, got %d", count)
|
||||
}
|
||||
|
||||
history, err := store.GetHistory(ctx, "test")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
if len(history) != 2 {
|
||||
t.Fatalf("expected 2 messages, got %d", len(history))
|
||||
}
|
||||
if history[0].Content != "hello" || history[1].Content != "hi" {
|
||||
t.Errorf("unexpected messages: %+v", history)
|
||||
}
|
||||
|
||||
summary, err := store.GetSummary(ctx, "test")
|
||||
if err != nil {
|
||||
t.Fatalf("GetSummary: %v", err)
|
||||
}
|
||||
if summary != "A greeting." {
|
||||
t.Errorf("summary = %q", summary)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMigrateFromJSON_WithToolCalls(t *testing.T) {
|
||||
sessionsDir := t.TempDir()
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
writeJSONSession(t, sessionsDir, "tools.json", jsonSession{
|
||||
Key: "tools",
|
||||
Messages: []providers.Message{
|
||||
{
|
||||
Role: "assistant",
|
||||
Content: "Searching...",
|
||||
ToolCalls: []providers.ToolCall{
|
||||
{
|
||||
ID: "call_1",
|
||||
Type: "function",
|
||||
Function: &providers.FunctionCall{
|
||||
Name: "web_search",
|
||||
Arguments: `{"q":"test"}`,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Role: "tool",
|
||||
Content: "result",
|
||||
ToolCallID: "call_1",
|
||||
},
|
||||
},
|
||||
Created: time.Now(),
|
||||
Updated: time.Now(),
|
||||
})
|
||||
|
||||
count, err := MigrateFromJSON(ctx, sessionsDir, store)
|
||||
if err != nil {
|
||||
t.Fatalf("MigrateFromJSON: %v", err)
|
||||
}
|
||||
if count != 1 {
|
||||
t.Errorf("expected 1, got %d", count)
|
||||
}
|
||||
|
||||
history, err := store.GetHistory(ctx, "tools")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
if len(history) != 2 {
|
||||
t.Fatalf("expected 2 messages, got %d", len(history))
|
||||
}
|
||||
if len(history[0].ToolCalls) != 1 {
|
||||
t.Fatalf("expected 1 tool call, got %d", len(history[0].ToolCalls))
|
||||
}
|
||||
if history[0].ToolCalls[0].Function.Name != "web_search" {
|
||||
t.Errorf("function = %q", history[0].ToolCalls[0].Function.Name)
|
||||
}
|
||||
if history[1].ToolCallID != "call_1" {
|
||||
t.Errorf("ToolCallID = %q", history[1].ToolCallID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMigrateFromJSON_MultipleFiles(t *testing.T) {
|
||||
sessionsDir := t.TempDir()
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
key := string(rune('a' + i))
|
||||
writeJSONSession(t, sessionsDir, key+".json", jsonSession{
|
||||
Key: key,
|
||||
Messages: []providers.Message{{Role: "user", Content: "msg " + key}},
|
||||
Created: time.Now(),
|
||||
Updated: time.Now(),
|
||||
})
|
||||
}
|
||||
|
||||
count, err := MigrateFromJSON(ctx, sessionsDir, store)
|
||||
if err != nil {
|
||||
t.Fatalf("MigrateFromJSON: %v", err)
|
||||
}
|
||||
if count != 3 {
|
||||
t.Errorf("expected 3, got %d", count)
|
||||
}
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
key := string(rune('a' + i))
|
||||
history, histErr := store.GetHistory(ctx, key)
|
||||
if histErr != nil {
|
||||
t.Fatalf("GetHistory(%q): %v", key, histErr)
|
||||
}
|
||||
if len(history) != 1 {
|
||||
t.Errorf("session %q: expected 1 msg, got %d", key, len(history))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMigrateFromJSON_InvalidJSON(t *testing.T) {
|
||||
sessionsDir := t.TempDir()
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
// One valid, one invalid.
|
||||
writeJSONSession(t, sessionsDir, "good.json", jsonSession{
|
||||
Key: "good",
|
||||
Messages: []providers.Message{{Role: "user", Content: "ok"}},
|
||||
Created: time.Now(),
|
||||
Updated: time.Now(),
|
||||
})
|
||||
err := os.WriteFile(
|
||||
filepath.Join(sessionsDir, "bad.json"),
|
||||
[]byte("{invalid json"),
|
||||
0o644,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("write bad file: %v", err)
|
||||
}
|
||||
|
||||
count, err := MigrateFromJSON(ctx, sessionsDir, store)
|
||||
if err != nil {
|
||||
t.Fatalf("MigrateFromJSON: %v", err)
|
||||
}
|
||||
if count != 1 {
|
||||
t.Errorf("expected 1 (bad file skipped), got %d", count)
|
||||
}
|
||||
|
||||
history, err := store.GetHistory(ctx, "good")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
if len(history) != 1 {
|
||||
t.Errorf("expected 1 message, got %d", len(history))
|
||||
}
|
||||
}
|
||||
|
||||
func TestMigrateFromJSON_RenamesFiles(t *testing.T) {
|
||||
sessionsDir := t.TempDir()
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
writeJSONSession(t, sessionsDir, "rename.json", jsonSession{
|
||||
Key: "rename",
|
||||
Messages: []providers.Message{{Role: "user", Content: "hi"}},
|
||||
Created: time.Now(),
|
||||
Updated: time.Now(),
|
||||
})
|
||||
|
||||
_, err := MigrateFromJSON(ctx, sessionsDir, store)
|
||||
if err != nil {
|
||||
t.Fatalf("MigrateFromJSON: %v", err)
|
||||
}
|
||||
|
||||
// Original .json should not exist.
|
||||
_, statErr := os.Stat(filepath.Join(sessionsDir, "rename.json"))
|
||||
if !os.IsNotExist(statErr) {
|
||||
t.Error("rename.json should have been renamed")
|
||||
}
|
||||
// .json.migrated should exist.
|
||||
_, statErr = os.Stat(
|
||||
filepath.Join(sessionsDir, "rename.json.migrated"),
|
||||
)
|
||||
if statErr != nil {
|
||||
t.Errorf("rename.json.migrated should exist: %v", statErr)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMigrateFromJSON_Idempotent(t *testing.T) {
|
||||
sessionsDir := t.TempDir()
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
writeJSONSession(t, sessionsDir, "idem.json", jsonSession{
|
||||
Key: "idem",
|
||||
Messages: []providers.Message{{Role: "user", Content: "once"}},
|
||||
Created: time.Now(),
|
||||
Updated: time.Now(),
|
||||
})
|
||||
|
||||
count1, err := MigrateFromJSON(ctx, sessionsDir, store)
|
||||
if err != nil {
|
||||
t.Fatalf("first migration: %v", err)
|
||||
}
|
||||
if count1 != 1 {
|
||||
t.Errorf("first run: expected 1, got %d", count1)
|
||||
}
|
||||
|
||||
// Second run should find only .migrated files, skip them.
|
||||
count2, err := MigrateFromJSON(ctx, sessionsDir, store)
|
||||
if err != nil {
|
||||
t.Fatalf("second migration: %v", err)
|
||||
}
|
||||
if count2 != 0 {
|
||||
t.Errorf("second run: expected 0, got %d", count2)
|
||||
}
|
||||
|
||||
history, err := store.GetHistory(ctx, "idem")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
if len(history) != 1 {
|
||||
t.Errorf("expected 1 message, got %d", len(history))
|
||||
}
|
||||
}
|
||||
|
||||
func TestMigrateFromJSON_ColonInKey(t *testing.T) {
|
||||
sessionsDir := t.TempDir()
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
// File is named telegram_123 (sanitized), but the key inside is telegram:123.
|
||||
writeJSONSession(t, sessionsDir, "telegram_123.json", jsonSession{
|
||||
Key: "telegram:123",
|
||||
Messages: []providers.Message{{Role: "user", Content: "from telegram"}},
|
||||
Created: time.Now(),
|
||||
Updated: time.Now(),
|
||||
})
|
||||
|
||||
count, err := MigrateFromJSON(ctx, sessionsDir, store)
|
||||
if err != nil {
|
||||
t.Fatalf("MigrateFromJSON: %v", err)
|
||||
}
|
||||
if count != 1 {
|
||||
t.Errorf("expected 1, got %d", count)
|
||||
}
|
||||
|
||||
// Accessible via the original key "telegram:123".
|
||||
history, err := store.GetHistory(ctx, "telegram:123")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
if len(history) != 1 {
|
||||
t.Fatalf("expected 1 message, got %d", len(history))
|
||||
}
|
||||
if history[0].Content != "from telegram" {
|
||||
t.Errorf("content = %q", history[0].Content)
|
||||
}
|
||||
|
||||
// In the file-based store, "telegram:123" and "telegram_123" both
|
||||
// sanitize to the same filename, so they share storage. This is
|
||||
// expected — the colon-to-underscore mapping is a one-way function.
|
||||
history2, err := store.GetHistory(ctx, "telegram_123")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
if len(history2) != 1 {
|
||||
t.Errorf("expected 1 (same file), got %d", len(history2))
|
||||
}
|
||||
}
|
||||
|
||||
func TestMigrateFromJSON_RetryAfterCrash(t *testing.T) {
|
||||
// Simulates a crash during migration: first run writes messages
|
||||
// but doesn't rename the .json file. Second run must replace
|
||||
// (not duplicate) the messages thanks to SetHistory semantics.
|
||||
sessionsDir := t.TempDir()
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
writeJSONSession(t, sessionsDir, "retry.json", jsonSession{
|
||||
Key: "retry",
|
||||
Messages: []providers.Message{
|
||||
{Role: "user", Content: "one"},
|
||||
{Role: "assistant", Content: "two"},
|
||||
},
|
||||
Created: time.Now(),
|
||||
Updated: time.Now(),
|
||||
})
|
||||
|
||||
// First migration succeeds — writes messages and renames file.
|
||||
count, err := MigrateFromJSON(ctx, sessionsDir, store)
|
||||
if err != nil {
|
||||
t.Fatalf("first migration: %v", err)
|
||||
}
|
||||
if count != 1 {
|
||||
t.Fatalf("expected 1, got %d", count)
|
||||
}
|
||||
|
||||
// Simulate "crash before rename": restore the .json file.
|
||||
src := filepath.Join(sessionsDir, "retry.json.migrated")
|
||||
dst := filepath.Join(sessionsDir, "retry.json")
|
||||
if renameErr := os.Rename(src, dst); renameErr != nil {
|
||||
t.Fatalf("restore .json: %v", renameErr)
|
||||
}
|
||||
|
||||
// Second migration should re-import without duplicating messages.
|
||||
count, err = MigrateFromJSON(ctx, sessionsDir, store)
|
||||
if err != nil {
|
||||
t.Fatalf("second migration: %v", err)
|
||||
}
|
||||
if count != 1 {
|
||||
t.Fatalf("expected 1, got %d", count)
|
||||
}
|
||||
|
||||
history, err := store.GetHistory(ctx, "retry")
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
// Must be exactly 2 messages (not 4 from duplication).
|
||||
if len(history) != 2 {
|
||||
t.Fatalf("expected 2 messages (no duplicates), got %d", len(history))
|
||||
}
|
||||
if history[0].Content != "one" || history[1].Content != "two" {
|
||||
t.Errorf("unexpected messages: %+v", history)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMigrateFromJSON_NonexistentDir(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
count, err := MigrateFromJSON(ctx, "/nonexistent/path", store)
|
||||
if err != nil {
|
||||
t.Fatalf("MigrateFromJSON: %v", err)
|
||||
}
|
||||
if count != 0 {
|
||||
t.Errorf("expected 0, got %d", count)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,42 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/sipeed/picoclaw/pkg/providers"
|
||||
)
|
||||
|
||||
// Store defines an interface for persistent session storage.
|
||||
// Each method is an atomic operation — there is no separate Save() call.
|
||||
type Store interface {
|
||||
// AddMessage appends a simple text message to a session.
|
||||
AddMessage(ctx context.Context, sessionKey, role, content string) error
|
||||
|
||||
// AddFullMessage appends a complete message (with tool calls, etc.) to a session.
|
||||
AddFullMessage(ctx context.Context, sessionKey string, msg providers.Message) error
|
||||
|
||||
// GetHistory returns all messages for a session in insertion order.
|
||||
// Returns an empty slice (not nil) if the session does not exist.
|
||||
GetHistory(ctx context.Context, sessionKey string) ([]providers.Message, error)
|
||||
|
||||
// GetSummary returns the conversation summary for a session.
|
||||
// Returns an empty string if no summary exists.
|
||||
GetSummary(ctx context.Context, sessionKey string) (string, error)
|
||||
|
||||
// SetSummary updates the conversation summary for a session.
|
||||
SetSummary(ctx context.Context, sessionKey, summary string) error
|
||||
|
||||
// TruncateHistory removes all but the last keepLast messages from a session.
|
||||
// If keepLast <= 0, all messages are removed.
|
||||
TruncateHistory(ctx context.Context, sessionKey string, keepLast int) error
|
||||
|
||||
// SetHistory replaces all messages in a session with the provided history.
|
||||
SetHistory(ctx context.Context, sessionKey string, history []providers.Message) error
|
||||
|
||||
// Compact reclaims storage by physically removing logically truncated
|
||||
// data. Backends that do not accumulate dead data may return nil.
|
||||
Compact(ctx context.Context, sessionKey string) error
|
||||
|
||||
// Close releases any resources held by the store.
|
||||
Close() error
|
||||
}
|
||||
Reference in New Issue
Block a user