diff --git a/pkg/memory/jsonl.go b/pkg/memory/jsonl.go index 072795bc9..492205114 100644 --- a/pkg/memory/jsonl.go +++ b/pkg/memory/jsonl.go @@ -10,6 +10,7 @@ import ( "log" "os" "path/filepath" + "sort" "strings" "sync" "time" @@ -406,12 +407,9 @@ func (s *JSONLStore) promoteAliasHistoryLocked( } func (s *JSONLStore) sessionHasVisibleContentLocked(sessionKey string, meta SessionMeta) (bool, error) { - if meta.Count-meta.Skip > 0 || strings.TrimSpace(meta.Summary) != "" { + if strings.TrimSpace(meta.Summary) != "" { return true, nil } - if meta.Count != 0 || meta.Skip != 0 { - return false, nil - } history, err := readMessages(s.jsonlPath(sessionKey), meta.Skip) if err != nil { return false, err @@ -498,28 +496,44 @@ func readMessages(path string, skip int) ([]providers.Message, error) { return msgs, nil } -// countLines counts the total number of non-empty lines in a .jsonl file. -// Used by TruncateHistory to reconcile a stale meta.Count without -// the overhead of unmarshaling every message. -func countLines(path string) (int, error) { +// scanRetainedMessageLines returns the total number of non-empty raw JSONL +// lines plus the raw line numbers that survive readMessages filtering. +// TruncateHistory uses this to compute keepLast against retained messages +// while preserving the raw-line skip offset stored in metadata. +func scanRetainedMessageLines(path string) (int, []int, error) { f, err := os.Open(path) if os.IsNotExist(err) { - return 0, nil + return 0, []int{}, nil } if err != nil { - return 0, fmt.Errorf("memory: open jsonl: %w", err) + return 0, nil, fmt.Errorf("memory: open jsonl: %w", err) } defer f.Close() - n := 0 + rawCount := 0 + retained := make([]int, 0) scanner := bufio.NewScanner(f) scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize) for scanner.Scan() { - if len(scanner.Bytes()) > 0 { - n++ + line := scanner.Bytes() + if len(line) == 0 { + continue } + rawCount++ + + var msg providers.Message + if err := json.Unmarshal(line, &msg); err != nil { + continue + } + if messageutil.IsTransientAssistantThoughtMessage(msg) { + continue + } + retained = append(retained, rawCount) } - return n, scanner.Err() + if err := scanner.Err(); err != nil { + return 0, nil, err + } + return rawCount, retained, nil } func (s *JSONLStore) AddMessage( @@ -663,24 +677,26 @@ func (s *JSONLStore) TruncateHistory( return err } - // Always reconcile meta.Count with the actual line count on disk. - // A crash between the JSONL append and the meta update in addMsg - // leaves meta.Count stale (e.g. file has 101 lines but meta says - // 100). Counting lines is cheap — no unmarshal, just a scan — and - // TruncateHistory is not a hot path, so always re-count. - n, countErr := countLines(s.jsonlPath(sessionKey)) - if countErr != nil { - return countErr + rawCount, retainedRawLines, scanErr := scanRetainedMessageLines(s.jsonlPath(sessionKey)) + if scanErr != nil { + return scanErr } - meta.Count = n - - if keepLast <= 0 { + meta.Count = rawCount + if meta.Skip > meta.Count { meta.Skip = meta.Count - } else { - effective := meta.Count - meta.Skip - if keepLast < effective { - meta.Skip = meta.Count - keepLast - } + } + + activeStart := sort.Search(len(retainedRawLines), func(i int) bool { + return retainedRawLines[i] > meta.Skip + }) + activeRetainedCount := len(retainedRawLines) - activeStart + + switch { + case keepLast <= 0 || activeRetainedCount == 0: + meta.Skip = meta.Count + case keepLast < activeRetainedCount: + activeRawLines := retainedRawLines[activeStart:] + meta.Skip = activeRawLines[activeRetainedCount-keepLast-1] } meta.UpdatedAt = time.Now() diff --git a/pkg/memory/jsonl_test.go b/pkg/memory/jsonl_test.go index f72b84804..3a7b98130 100644 --- a/pkg/memory/jsonl_test.go +++ b/pkg/memory/jsonl_test.go @@ -9,6 +9,7 @@ import ( "strings" "sync" "testing" + "time" "github.com/sipeed/picoclaw/pkg/providers" ) @@ -795,6 +796,56 @@ func TestTruncateHistory_StaleMetaCount(t *testing.T) { } } +func TestTruncateHistory_IgnoresTransientThoughtForKeepLast(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + sessionKey := "transient-keep-last" + now := time.Now() + + rawJSONL := strings.Join([]string{ + `{"role":"user","content":"a"}`, + `{"role":"assistant","content":"b"}`, + `{"role":"assistant","content":"","reasoning_content":"dangling thought"}`, + `{"role":"user","content":"c"}`, + `{"role":"assistant","content":"d"}`, + }, "\n") + "\n" + if err := os.WriteFile(store.jsonlPath(sessionKey), []byte(rawJSONL), 0o644); err != nil { + t.Fatalf("WriteFile(jsonl): %v", err) + } + if err := store.writeMeta(sessionKey, SessionMeta{ + Key: sessionKey, + Count: 5, + Skip: 0, + CreatedAt: now, + UpdatedAt: now, + }); err != nil { + t.Fatalf("writeMeta: %v", err) + } + + if err := store.TruncateHistory(ctx, sessionKey, 2); err != nil { + t.Fatalf("TruncateHistory: %v", err) + } + + history, err := store.GetHistory(ctx, sessionKey) + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 2 { + t.Fatalf("expected 2 retained messages, got %d", len(history)) + } + if history[0].Content != "c" || history[1].Content != "d" { + t.Fatalf("kept history = %+v, want c,d", history) + } + + meta, err := store.readMeta(sessionKey) + if err != nil { + t.Fatalf("readMeta: %v", err) + } + if meta.Skip != 2 { + t.Fatalf("meta.Skip = %d, want 2 raw lines skipped", meta.Skip) + } +} + func TestCrashRecovery_PartialLine(t *testing.T) { store := newTestStore(t) ctx := context.Background() diff --git a/web/backend/api/session.go b/web/backend/api/session.go index 7bfe43026..287892cd2 100644 --- a/web/backend/api/session.go +++ b/web/backend/api/session.go @@ -155,6 +155,9 @@ func (h *Handler) readSessionMessages(path string, skip int) ([]providers.Messag if err := json.Unmarshal(line, &msg); err != nil { continue } + if messageutil.IsTransientAssistantThoughtMessage(msg) { + continue + } msgs = append(msgs, msg) } if err := scanner.Err(); err != nil { diff --git a/web/backend/api/session_test.go b/web/backend/api/session_test.go index f8fb2483a..8d92760d2 100644 --- a/web/backend/api/session_test.go +++ b/web/backend/api/session_test.go @@ -8,6 +8,7 @@ import ( "path/filepath" "strings" "testing" + "time" "github.com/sipeed/picoclaw/pkg/config" "github.com/sipeed/picoclaw/pkg/memory" @@ -101,6 +102,64 @@ func TestHandleListSessions_JSONLStorage(t *testing.T) { } } +func TestHandleListSessions_TransientThoughtDoesNotInflateMessageCount(t *testing.T) { + configPath, cleanup := setupOAuthTestEnv(t) + defer cleanup() + + dir := sessionsTestDir(t, configPath) + sessionKey := legacyPicoSessionPrefix + "history-jsonl-transient" + base := filepath.Join(dir, sanitizeSessionKey(sessionKey)) + now := time.Now().UTC() + + rawJSONL := strings.Join([]string{ + `{"role":"user","content":"keep me"}`, + `{"role":"assistant","content":"","reasoning_content":"dangling thought"}`, + `{"role":"assistant","content":"and me"}`, + }, "\n") + "\n" + if err := os.WriteFile(base+".jsonl", []byte(rawJSONL), 0o644); err != nil { + t.Fatalf("WriteFile(jsonl) error = %v", err) + } + metaData, err := json.Marshal(memory.SessionMeta{ + Key: sessionKey, + Count: 3, + Skip: 0, + CreatedAt: now, + UpdatedAt: now, + }) + if err != nil { + t.Fatalf("Marshal(meta) error = %v", err) + } + if err := os.WriteFile(base+".meta.json", metaData, 0o644); err != nil { + t.Fatalf("WriteFile(meta) error = %v", err) + } + + h := NewHandler(configPath) + mux := http.NewServeMux() + h.RegisterRoutes(mux) + + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/api/sessions", nil) + mux.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want %d, body=%s", rec.Code, http.StatusOK, rec.Body.String()) + } + + var items []sessionListItem + if err := json.Unmarshal(rec.Body.Bytes(), &items); err != nil { + t.Fatalf("Unmarshal() error = %v", err) + } + if len(items) != 1 { + t.Fatalf("len(items) = %d, want 1", len(items)) + } + if items[0].ID != "history-jsonl-transient" { + t.Fatalf("items[0].ID = %q, want %q", items[0].ID, "history-jsonl-transient") + } + if items[0].MessageCount != 2 { + t.Fatalf("items[0].MessageCount = %d, want 2 after dropping transient thought", items[0].MessageCount) + } +} + func TestHandleListSessions_TitleUsesFirstUserMessage(t *testing.T) { configPath, cleanup := setupOAuthTestEnv(t) defer cleanup()