mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
fix(message): ignore transient assistant thoughts in message count and history truncation
This commit is contained in:
+46
-30
@@ -10,6 +10,7 @@ import (
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -406,12 +407,9 @@ func (s *JSONLStore) promoteAliasHistoryLocked(
|
||||
}
|
||||
|
||||
func (s *JSONLStore) sessionHasVisibleContentLocked(sessionKey string, meta SessionMeta) (bool, error) {
|
||||
if meta.Count-meta.Skip > 0 || strings.TrimSpace(meta.Summary) != "" {
|
||||
if strings.TrimSpace(meta.Summary) != "" {
|
||||
return true, nil
|
||||
}
|
||||
if meta.Count != 0 || meta.Skip != 0 {
|
||||
return false, nil
|
||||
}
|
||||
history, err := readMessages(s.jsonlPath(sessionKey), meta.Skip)
|
||||
if err != nil {
|
||||
return false, err
|
||||
@@ -498,28 +496,44 @@ func readMessages(path string, skip int) ([]providers.Message, error) {
|
||||
return msgs, nil
|
||||
}
|
||||
|
||||
// countLines counts the total number of non-empty lines in a .jsonl file.
|
||||
// Used by TruncateHistory to reconcile a stale meta.Count without
|
||||
// the overhead of unmarshaling every message.
|
||||
func countLines(path string) (int, error) {
|
||||
// scanRetainedMessageLines returns the total number of non-empty raw JSONL
|
||||
// lines plus the raw line numbers that survive readMessages filtering.
|
||||
// TruncateHistory uses this to compute keepLast against retained messages
|
||||
// while preserving the raw-line skip offset stored in metadata.
|
||||
func scanRetainedMessageLines(path string) (int, []int, error) {
|
||||
f, err := os.Open(path)
|
||||
if os.IsNotExist(err) {
|
||||
return 0, nil
|
||||
return 0, []int{}, nil
|
||||
}
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("memory: open jsonl: %w", err)
|
||||
return 0, nil, fmt.Errorf("memory: open jsonl: %w", err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
n := 0
|
||||
rawCount := 0
|
||||
retained := make([]int, 0)
|
||||
scanner := bufio.NewScanner(f)
|
||||
scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize)
|
||||
for scanner.Scan() {
|
||||
if len(scanner.Bytes()) > 0 {
|
||||
n++
|
||||
line := scanner.Bytes()
|
||||
if len(line) == 0 {
|
||||
continue
|
||||
}
|
||||
rawCount++
|
||||
|
||||
var msg providers.Message
|
||||
if err := json.Unmarshal(line, &msg); err != nil {
|
||||
continue
|
||||
}
|
||||
if messageutil.IsTransientAssistantThoughtMessage(msg) {
|
||||
continue
|
||||
}
|
||||
retained = append(retained, rawCount)
|
||||
}
|
||||
return n, scanner.Err()
|
||||
if err := scanner.Err(); err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
return rawCount, retained, nil
|
||||
}
|
||||
|
||||
func (s *JSONLStore) AddMessage(
|
||||
@@ -663,24 +677,26 @@ func (s *JSONLStore) TruncateHistory(
|
||||
return err
|
||||
}
|
||||
|
||||
// Always reconcile meta.Count with the actual line count on disk.
|
||||
// A crash between the JSONL append and the meta update in addMsg
|
||||
// leaves meta.Count stale (e.g. file has 101 lines but meta says
|
||||
// 100). Counting lines is cheap — no unmarshal, just a scan — and
|
||||
// TruncateHistory is not a hot path, so always re-count.
|
||||
n, countErr := countLines(s.jsonlPath(sessionKey))
|
||||
if countErr != nil {
|
||||
return countErr
|
||||
rawCount, retainedRawLines, scanErr := scanRetainedMessageLines(s.jsonlPath(sessionKey))
|
||||
if scanErr != nil {
|
||||
return scanErr
|
||||
}
|
||||
meta.Count = n
|
||||
|
||||
if keepLast <= 0 {
|
||||
meta.Count = rawCount
|
||||
if meta.Skip > meta.Count {
|
||||
meta.Skip = meta.Count
|
||||
} else {
|
||||
effective := meta.Count - meta.Skip
|
||||
if keepLast < effective {
|
||||
meta.Skip = meta.Count - keepLast
|
||||
}
|
||||
}
|
||||
|
||||
activeStart := sort.Search(len(retainedRawLines), func(i int) bool {
|
||||
return retainedRawLines[i] > meta.Skip
|
||||
})
|
||||
activeRetainedCount := len(retainedRawLines) - activeStart
|
||||
|
||||
switch {
|
||||
case keepLast <= 0 || activeRetainedCount == 0:
|
||||
meta.Skip = meta.Count
|
||||
case keepLast < activeRetainedCount:
|
||||
activeRawLines := retainedRawLines[activeStart:]
|
||||
meta.Skip = activeRawLines[activeRetainedCount-keepLast-1]
|
||||
}
|
||||
meta.UpdatedAt = time.Now()
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/sipeed/picoclaw/pkg/providers"
|
||||
)
|
||||
@@ -795,6 +796,56 @@ func TestTruncateHistory_StaleMetaCount(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTruncateHistory_IgnoresTransientThoughtForKeepLast(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
sessionKey := "transient-keep-last"
|
||||
now := time.Now()
|
||||
|
||||
rawJSONL := strings.Join([]string{
|
||||
`{"role":"user","content":"a"}`,
|
||||
`{"role":"assistant","content":"b"}`,
|
||||
`{"role":"assistant","content":"","reasoning_content":"dangling thought"}`,
|
||||
`{"role":"user","content":"c"}`,
|
||||
`{"role":"assistant","content":"d"}`,
|
||||
}, "\n") + "\n"
|
||||
if err := os.WriteFile(store.jsonlPath(sessionKey), []byte(rawJSONL), 0o644); err != nil {
|
||||
t.Fatalf("WriteFile(jsonl): %v", err)
|
||||
}
|
||||
if err := store.writeMeta(sessionKey, SessionMeta{
|
||||
Key: sessionKey,
|
||||
Count: 5,
|
||||
Skip: 0,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}); err != nil {
|
||||
t.Fatalf("writeMeta: %v", err)
|
||||
}
|
||||
|
||||
if err := store.TruncateHistory(ctx, sessionKey, 2); err != nil {
|
||||
t.Fatalf("TruncateHistory: %v", err)
|
||||
}
|
||||
|
||||
history, err := store.GetHistory(ctx, sessionKey)
|
||||
if err != nil {
|
||||
t.Fatalf("GetHistory: %v", err)
|
||||
}
|
||||
if len(history) != 2 {
|
||||
t.Fatalf("expected 2 retained messages, got %d", len(history))
|
||||
}
|
||||
if history[0].Content != "c" || history[1].Content != "d" {
|
||||
t.Fatalf("kept history = %+v, want c,d", history)
|
||||
}
|
||||
|
||||
meta, err := store.readMeta(sessionKey)
|
||||
if err != nil {
|
||||
t.Fatalf("readMeta: %v", err)
|
||||
}
|
||||
if meta.Skip != 2 {
|
||||
t.Fatalf("meta.Skip = %d, want 2 raw lines skipped", meta.Skip)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCrashRecovery_PartialLine(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
@@ -155,6 +155,9 @@ func (h *Handler) readSessionMessages(path string, skip int) ([]providers.Messag
|
||||
if err := json.Unmarshal(line, &msg); err != nil {
|
||||
continue
|
||||
}
|
||||
if messageutil.IsTransientAssistantThoughtMessage(msg) {
|
||||
continue
|
||||
}
|
||||
msgs = append(msgs, msg)
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/sipeed/picoclaw/pkg/config"
|
||||
"github.com/sipeed/picoclaw/pkg/memory"
|
||||
@@ -101,6 +102,64 @@ func TestHandleListSessions_JSONLStorage(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleListSessions_TransientThoughtDoesNotInflateMessageCount(t *testing.T) {
|
||||
configPath, cleanup := setupOAuthTestEnv(t)
|
||||
defer cleanup()
|
||||
|
||||
dir := sessionsTestDir(t, configPath)
|
||||
sessionKey := legacyPicoSessionPrefix + "history-jsonl-transient"
|
||||
base := filepath.Join(dir, sanitizeSessionKey(sessionKey))
|
||||
now := time.Now().UTC()
|
||||
|
||||
rawJSONL := strings.Join([]string{
|
||||
`{"role":"user","content":"keep me"}`,
|
||||
`{"role":"assistant","content":"","reasoning_content":"dangling thought"}`,
|
||||
`{"role":"assistant","content":"and me"}`,
|
||||
}, "\n") + "\n"
|
||||
if err := os.WriteFile(base+".jsonl", []byte(rawJSONL), 0o644); err != nil {
|
||||
t.Fatalf("WriteFile(jsonl) error = %v", err)
|
||||
}
|
||||
metaData, err := json.Marshal(memory.SessionMeta{
|
||||
Key: sessionKey,
|
||||
Count: 3,
|
||||
Skip: 0,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Marshal(meta) error = %v", err)
|
||||
}
|
||||
if err := os.WriteFile(base+".meta.json", metaData, 0o644); err != nil {
|
||||
t.Fatalf("WriteFile(meta) error = %v", err)
|
||||
}
|
||||
|
||||
h := NewHandler(configPath)
|
||||
mux := http.NewServeMux()
|
||||
h.RegisterRoutes(mux)
|
||||
|
||||
rec := httptest.NewRecorder()
|
||||
req := httptest.NewRequest(http.MethodGet, "/api/sessions", nil)
|
||||
mux.ServeHTTP(rec, req)
|
||||
|
||||
if rec.Code != http.StatusOK {
|
||||
t.Fatalf("status = %d, want %d, body=%s", rec.Code, http.StatusOK, rec.Body.String())
|
||||
}
|
||||
|
||||
var items []sessionListItem
|
||||
if err := json.Unmarshal(rec.Body.Bytes(), &items); err != nil {
|
||||
t.Fatalf("Unmarshal() error = %v", err)
|
||||
}
|
||||
if len(items) != 1 {
|
||||
t.Fatalf("len(items) = %d, want 1", len(items))
|
||||
}
|
||||
if items[0].ID != "history-jsonl-transient" {
|
||||
t.Fatalf("items[0].ID = %q, want %q", items[0].ID, "history-jsonl-transient")
|
||||
}
|
||||
if items[0].MessageCount != 2 {
|
||||
t.Fatalf("items[0].MessageCount = %d, want 2 after dropping transient thought", items[0].MessageCount)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleListSessions_TitleUsesFirstUserMessage(t *testing.T) {
|
||||
configPath, cleanup := setupOAuthTestEnv(t)
|
||||
defer cleanup()
|
||||
|
||||
Reference in New Issue
Block a user