From 26f623ed321ba06c7a22012174ff8e23c8c90a40 Mon Sep 17 00:00:00 2001 From: is-Xiaoen <2768753269@qq.com> Date: Tue, 10 Mar 2026 15:14:09 +0800 Subject: [PATCH] feat(session): integrate JSONL persistence into agent loop (#1170) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(session): add SessionStore interface and JSONL backend adapter Extract a SessionStore interface from the methods the agent loop uses (AddMessage, GetHistory, SetSummary, TruncateHistory, Save, etc.). Both SessionManager and the new JSONLBackend satisfy this interface, allowing the persistence layer to be swapped transparently. JSONLBackend wraps memory.Store and maps its error-returning API to the fire-and-forget contract that the agent loop expects — write errors are logged, reads return empty defaults on failure. Save() triggers compaction to reclaim space after logical truncation. Part of #1169 * test(session): add JSONLBackend integration tests 8 tests covering the full SessionStore contract through the JSONL backend: message roundtrip, tool calls, summary, truncation with compaction, history replacement, empty sessions, session isolation, and the complete summarization flow (SetSummary → TruncateHistory → Save). Includes compile-time interface satisfaction checks for both SessionManager and JSONLBackend. Part of #1169 * feat(agent): wire JSONL session store into agent loop Replace the concrete *SessionManager field with the SessionStore interface and initialize the JSONL backend by default. Legacy .json session files are auto-migrated on first startup. Falls back to SessionManager if the JSONL store cannot be initialized. The agent loop code (loop.go) requires zero changes — all method calls work identically through the interface. Closes #1169 * fix(session): propagate compact error from Save Save() was swallowing the error returned by Compact and always returning nil. Callers checking Save's return value would never see a compaction failure. Return the error directly so the agent loop can log or handle it as needed. * feat(session): add Close to SessionStore interface Add Close() error to SessionStore so callers can release resources through the interface. JSONLBackend already had Close; this adds a no-op implementation to SessionManager for compatibility. * fix(session): close session stores on shutdown and harden migration - Add Close() to AgentInstance, AgentRegistry, and AgentLoop so JSONL file handles are released during gateway shutdown and CLI exit. - Fall back to SessionManager when migration fails, preventing a split state where some sessions live in JSONL and others remain in JSON. - Add defer agentLoop.Close() in the CLI agent command path. - Document SessionStore interface methods (fire-and-forget contract). --- cmd/picoclaw/internal/agent/helpers.go | 1 + cmd/picoclaw/internal/gateway/helpers.go | 1 + pkg/agent/instance.go | 41 +++++- pkg/agent/loop.go | 5 + pkg/agent/registry.go | 12 ++ pkg/session/jsonl_backend.go | 81 ++++++++++ pkg/session/jsonl_backend_test.go | 179 +++++++++++++++++++++++ pkg/session/manager.go | 6 + pkg/session/session_store.go | 32 ++++ 9 files changed, 355 insertions(+), 3 deletions(-) create mode 100644 pkg/session/jsonl_backend.go create mode 100644 pkg/session/jsonl_backend_test.go create mode 100644 pkg/session/session_store.go diff --git a/cmd/picoclaw/internal/agent/helpers.go b/cmd/picoclaw/internal/agent/helpers.go index f754abc65..a995945d2 100644 --- a/cmd/picoclaw/internal/agent/helpers.go +++ b/cmd/picoclaw/internal/agent/helpers.go @@ -50,6 +50,7 @@ func agentCmd(message, sessionKey, model string, debug bool) error { msgBus := bus.NewMessageBus() defer msgBus.Close() agentLoop := agent.NewAgentLoop(cfg, msgBus, provider) + defer agentLoop.Close() // Print agent startup info (only for interactive mode) startupInfo := agentLoop.GetStartupInfo() diff --git a/cmd/picoclaw/internal/gateway/helpers.go b/cmd/picoclaw/internal/gateway/helpers.go index 4f93b858a..fed3d5ffb 100644 --- a/cmd/picoclaw/internal/gateway/helpers.go +++ b/cmd/picoclaw/internal/gateway/helpers.go @@ -214,6 +214,7 @@ func gatewayCmd(debug bool) error { cronService.Stop() mediaStore.Stop() agentLoop.Stop() + agentLoop.Close() fmt.Println("✓ Gateway stopped") return nil diff --git a/pkg/agent/instance.go b/pkg/agent/instance.go index b60818875..0c7baa1ee 100644 --- a/pkg/agent/instance.go +++ b/pkg/agent/instance.go @@ -1,6 +1,7 @@ package agent import ( + "context" "fmt" "log" "os" @@ -9,6 +10,7 @@ import ( "strings" "github.com/sipeed/picoclaw/pkg/config" + "github.com/sipeed/picoclaw/pkg/memory" "github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/routing" "github.com/sipeed/picoclaw/pkg/session" @@ -31,7 +33,7 @@ type AgentInstance struct { SummarizeMessageThreshold int SummarizeTokenPercent int Provider providers.LLMProvider - Sessions *session.SessionManager + Sessions session.SessionStore ContextBuilder *ContextBuilder Tools *tools.ToolRegistry Subagents *config.SubagentsConfig @@ -95,7 +97,7 @@ func NewAgentInstance( } sessionsDir := filepath.Join(workspace, "sessions") - sessionsManager := session.NewSessionManager(sessionsDir) + sessions := initSessionStore(sessionsDir) mcpDiscoveryActive := cfg.Tools.MCP.Enabled && cfg.Tools.MCP.Discovery.Enabled contextBuilder := NewContextBuilder(workspace).WithToolDiscovery( @@ -226,7 +228,7 @@ func NewAgentInstance( SummarizeMessageThreshold: summarizeMessageThreshold, SummarizeTokenPercent: summarizeTokenPercent, Provider: provider, - Sessions: sessionsManager, + Sessions: sessions, ContextBuilder: contextBuilder, Tools: toolsRegistry, Subagents: subagents, @@ -280,6 +282,39 @@ func compilePatterns(patterns []string) []*regexp.Regexp { return compiled } +// Close releases resources held by the agent's session store. +func (a *AgentInstance) Close() error { + if a.Sessions != nil { + return a.Sessions.Close() + } + return nil +} + +// initSessionStore creates the session persistence backend. +// It uses the JSONL store by default and auto-migrates legacy JSON sessions. +// Falls back to SessionManager if the JSONL store cannot be initialized or +// if migration fails (which indicates the store cannot write reliably). +func initSessionStore(dir string) session.SessionStore { + store, err := memory.NewJSONLStore(dir) + if err != nil { + log.Printf("memory: init store: %v; using json sessions", err) + return session.NewSessionManager(dir) + } + + if n, merr := memory.MigrateFromJSON(context.Background(), dir, store); merr != nil { + // Migration failure means the store could not write data. + // Fall back to SessionManager to avoid a split state where + // some sessions are in JSONL and others remain in JSON. + log.Printf("memory: migration failed: %v; falling back to json sessions", merr) + store.Close() + return session.NewSessionManager(dir) + } else if n > 0 { + log.Printf("memory: migrated %d session(s) to jsonl", n) + } + + return session.NewJSONLBackend(store) +} + func expandHome(path string) string { if path == "" { return path diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 58f53bef8..d5f661293 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -427,6 +427,11 @@ func (al *AgentLoop) Stop() { al.running.Store(false) } +// Close releases resources held by agent session stores. Call after Stop. +func (al *AgentLoop) Close() { + al.registry.Close() +} + func (al *AgentLoop) RegisterTool(tool tools.Tool) { for _, agentID := range al.registry.ListAgentIDs() { if agent, ok := al.registry.GetAgent(agentID); ok { diff --git a/pkg/agent/registry.go b/pkg/agent/registry.go index 0e7973dc3..58b7ce440 100644 --- a/pkg/agent/registry.go +++ b/pkg/agent/registry.go @@ -114,6 +114,18 @@ func (r *AgentRegistry) ForEachTool(name string, fn func(tools.Tool)) { } } +// Close releases resources held by all registered agents. +func (r *AgentRegistry) Close() { + r.mu.RLock() + defer r.mu.RUnlock() + for _, agent := range r.agents { + if err := agent.Close(); err != nil { + logger.WarnCF("agent", "Failed to close agent", + map[string]any{"agent_id": agent.ID, "error": err.Error()}) + } + } +} + // GetDefaultAgent returns the default agent instance. func (r *AgentRegistry) GetDefaultAgent() *AgentInstance { r.mu.RLock() diff --git a/pkg/session/jsonl_backend.go b/pkg/session/jsonl_backend.go new file mode 100644 index 000000000..7f470de15 --- /dev/null +++ b/pkg/session/jsonl_backend.go @@ -0,0 +1,81 @@ +package session + +import ( + "context" + "log" + + "github.com/sipeed/picoclaw/pkg/memory" + "github.com/sipeed/picoclaw/pkg/providers" +) + +// JSONLBackend adapts a memory.Store into the SessionStore interface. +// Write errors are logged rather than returned, matching the fire-and-forget +// contract of SessionManager that the agent loop relies on. +type JSONLBackend struct { + store memory.Store +} + +// NewJSONLBackend wraps a memory.Store for use as a SessionStore. +func NewJSONLBackend(store memory.Store) *JSONLBackend { + return &JSONLBackend{store: store} +} + +func (b *JSONLBackend) AddMessage(sessionKey, role, content string) { + if err := b.store.AddMessage(context.Background(), sessionKey, role, content); err != nil { + log.Printf("session: add message: %v", err) + } +} + +func (b *JSONLBackend) AddFullMessage(sessionKey string, msg providers.Message) { + if err := b.store.AddFullMessage(context.Background(), sessionKey, msg); err != nil { + log.Printf("session: add full message: %v", err) + } +} + +func (b *JSONLBackend) GetHistory(key string) []providers.Message { + msgs, err := b.store.GetHistory(context.Background(), key) + if err != nil { + log.Printf("session: get history: %v", err) + return []providers.Message{} + } + return msgs +} + +func (b *JSONLBackend) GetSummary(key string) string { + summary, err := b.store.GetSummary(context.Background(), key) + if err != nil { + log.Printf("session: get summary: %v", err) + return "" + } + return summary +} + +func (b *JSONLBackend) SetSummary(key, summary string) { + if err := b.store.SetSummary(context.Background(), key, summary); err != nil { + log.Printf("session: set summary: %v", err) + } +} + +func (b *JSONLBackend) SetHistory(key string, history []providers.Message) { + if err := b.store.SetHistory(context.Background(), key, history); err != nil { + log.Printf("session: set history: %v", err) + } +} + +func (b *JSONLBackend) TruncateHistory(key string, keepLast int) { + if err := b.store.TruncateHistory(context.Background(), key, keepLast); err != nil { + log.Printf("session: truncate history: %v", err) + } +} + +// Save persists session state. Since the JSONL store fsyncs every write +// immediately, the data is already durable. Save runs compaction to reclaim +// space from logically truncated messages (no-op when there are none). +func (b *JSONLBackend) Save(key string) error { + return b.store.Compact(context.Background(), key) +} + +// Close releases resources held by the underlying store. +func (b *JSONLBackend) Close() error { + return b.store.Close() +} diff --git a/pkg/session/jsonl_backend_test.go b/pkg/session/jsonl_backend_test.go new file mode 100644 index 000000000..40fa019cb --- /dev/null +++ b/pkg/session/jsonl_backend_test.go @@ -0,0 +1,179 @@ +package session_test + +import ( + "fmt" + "testing" + + "github.com/sipeed/picoclaw/pkg/memory" + "github.com/sipeed/picoclaw/pkg/providers" + "github.com/sipeed/picoclaw/pkg/session" +) + +// Compile-time interface satisfaction checks. +var ( + _ session.SessionStore = (*session.SessionManager)(nil) + _ session.SessionStore = (*session.JSONLBackend)(nil) +) + +func newBackend(t *testing.T) *session.JSONLBackend { + t.Helper() + store, err := memory.NewJSONLStore(t.TempDir()) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { store.Close() }) + return session.NewJSONLBackend(store) +} + +func TestJSONLBackend_AddAndGetHistory(t *testing.T) { + b := newBackend(t) + + b.AddMessage("s1", "user", "hello") + b.AddMessage("s1", "assistant", "hi") + + history := b.GetHistory("s1") + if len(history) != 2 { + t.Fatalf("got %d messages, want 2", len(history)) + } + if history[0].Role != "user" || history[0].Content != "hello" { + t.Errorf("msg[0] = %+v", history[0]) + } + if history[1].Role != "assistant" || history[1].Content != "hi" { + t.Errorf("msg[1] = %+v", history[1]) + } +} + +func TestJSONLBackend_AddFullMessage(t *testing.T) { + b := newBackend(t) + + msg := providers.Message{ + Role: "assistant", + Content: "done", + ToolCalls: []providers.ToolCall{ + {ID: "tc1", Function: &providers.FunctionCall{Name: "read_file", Arguments: `{"path":"x"}`}}, + }, + } + b.AddFullMessage("s1", msg) + + history := b.GetHistory("s1") + if len(history) != 1 { + t.Fatalf("got %d, want 1", len(history)) + } + if len(history[0].ToolCalls) != 1 || history[0].ToolCalls[0].ID != "tc1" { + t.Errorf("tool calls = %+v", history[0].ToolCalls) + } +} + +func TestJSONLBackend_Summary(t *testing.T) { + b := newBackend(t) + + if got := b.GetSummary("s1"); got != "" { + t.Errorf("got %q, want empty", got) + } + + b.SetSummary("s1", "test summary") + if got := b.GetSummary("s1"); got != "test summary" { + t.Errorf("got %q, want %q", got, "test summary") + } +} + +func TestJSONLBackend_TruncateAndSave(t *testing.T) { + b := newBackend(t) + + for i := 0; i < 10; i++ { + b.AddMessage("s1", "user", fmt.Sprintf("msg %d", i)) + } + b.TruncateHistory("s1", 3) + + history := b.GetHistory("s1") + if len(history) != 3 { + t.Fatalf("got %d, want 3", len(history)) + } + if history[0].Content != "msg 7" { + t.Errorf("got %q, want %q", history[0].Content, "msg 7") + } + + // Save triggers compaction. + if err := b.Save("s1"); err != nil { + t.Fatal(err) + } + + // Messages still accessible after compaction. + history = b.GetHistory("s1") + if len(history) != 3 { + t.Fatalf("after save: got %d, want 3", len(history)) + } +} + +func TestJSONLBackend_SetHistory(t *testing.T) { + b := newBackend(t) + b.AddMessage("s1", "user", "old") + + b.SetHistory("s1", []providers.Message{ + {Role: "user", Content: "new1"}, + {Role: "assistant", Content: "new2"}, + }) + + history := b.GetHistory("s1") + if len(history) != 2 { + t.Fatalf("got %d, want 2", len(history)) + } + if history[0].Content != "new1" { + t.Errorf("got %q, want %q", history[0].Content, "new1") + } +} + +func TestJSONLBackend_EmptySession(t *testing.T) { + b := newBackend(t) + + history := b.GetHistory("nonexistent") + if history == nil { + t.Fatal("got nil, want empty slice") + } + if len(history) != 0 { + t.Errorf("got %d, want 0", len(history)) + } +} + +func TestJSONLBackend_SessionIsolation(t *testing.T) { + b := newBackend(t) + b.AddMessage("s1", "user", "session1") + b.AddMessage("s2", "user", "session2") + + h1 := b.GetHistory("s1") + h2 := b.GetHistory("s2") + + if len(h1) != 1 || h1[0].Content != "session1" { + t.Errorf("s1: %+v", h1) + } + if len(h2) != 1 || h2[0].Content != "session2" { + t.Errorf("s2: %+v", h2) + } +} + +func TestJSONLBackend_SummarizeFlow(t *testing.T) { + // Simulates the real summarization flow in the agent loop: + // SetSummary → TruncateHistory → Save + b := newBackend(t) + + for i := 0; i < 20; i++ { + b.AddMessage("s1", "user", fmt.Sprintf("msg %d", i)) + } + + b.SetSummary("s1", "conversation about testing") + b.TruncateHistory("s1", 4) + if err := b.Save("s1"); err != nil { + t.Fatal(err) + } + + if got := b.GetSummary("s1"); got != "conversation about testing" { + t.Errorf("summary = %q", got) + } + history := b.GetHistory("s1") + if len(history) != 4 { + t.Fatalf("got %d messages, want 4", len(history)) + } + if history[0].Content != "msg 16" { + t.Errorf("first message = %q, want %q", history[0].Content, "msg 16") + } +} diff --git a/pkg/session/manager.go b/pkg/session/manager.go index 08f0b0ad2..07f981df1 100644 --- a/pkg/session/manager.go +++ b/pkg/session/manager.go @@ -265,6 +265,12 @@ func (sm *SessionManager) loadSessions() error { return nil } +// Close is a no-op for the in-memory SessionManager; it satisfies the +// SessionStore interface so callers can release resources uniformly. +func (sm *SessionManager) Close() error { + return nil +} + // SetHistory updates the messages of a session. func (sm *SessionManager) SetHistory(key string, history []providers.Message) { sm.mu.Lock() diff --git a/pkg/session/session_store.go b/pkg/session/session_store.go new file mode 100644 index 000000000..1d1a2f967 --- /dev/null +++ b/pkg/session/session_store.go @@ -0,0 +1,32 @@ +package session + +import "github.com/sipeed/picoclaw/pkg/providers" + +// SessionStore defines the persistence operations used by the agent loop. +// Both SessionManager (legacy JSON backend) and JSONLBackend satisfy this +// interface, allowing the storage layer to be swapped without touching the +// agent loop code. +// +// Write methods (Add*, Set*, Truncate*) are fire-and-forget: they do not +// return errors. Implementations should log failures internally. This +// matches the original SessionManager contract that the agent loop relies on. +type SessionStore interface { + // AddMessage appends a simple role/content message to the session. + AddMessage(sessionKey, role, content string) + // AddFullMessage appends a complete message including tool calls. + AddFullMessage(sessionKey string, msg providers.Message) + // GetHistory returns the full message history for the session. + GetHistory(key string) []providers.Message + // GetSummary returns the conversation summary, or "" if none. + GetSummary(key string) string + // SetSummary replaces the conversation summary. + SetSummary(key, summary string) + // SetHistory replaces the full message history. + SetHistory(key string, history []providers.Message) + // TruncateHistory keeps only the last keepLast messages. + TruncateHistory(key string, keepLast int) + // Save persists any pending state to durable storage. + Save(key string) error + // Close releases resources held by the store. + Close() error +}