feat(session): integrate JSONL persistence into agent loop (#1170)

* feat(session): add SessionStore interface and JSONL backend adapter

Extract a SessionStore interface from the methods the agent loop uses
(AddMessage, GetHistory, SetSummary, TruncateHistory, Save, etc.).
Both SessionManager and the new JSONLBackend satisfy this interface,
allowing the persistence layer to be swapped transparently.

JSONLBackend wraps memory.Store and maps its error-returning API to
the fire-and-forget contract that the agent loop expects — write
errors are logged, reads return empty defaults on failure. Save()
triggers compaction to reclaim space after logical truncation.

Part of #1169

* test(session): add JSONLBackend integration tests

8 tests covering the full SessionStore contract through the JSONL
backend: message roundtrip, tool calls, summary, truncation with
compaction, history replacement, empty sessions, session isolation,
and the complete summarization flow (SetSummary → TruncateHistory →
Save).

Includes compile-time interface satisfaction checks for both
SessionManager and JSONLBackend.

Part of #1169

* feat(agent): wire JSONL session store into agent loop

Replace the concrete *SessionManager field with the SessionStore
interface and initialize the JSONL backend by default. Legacy .json
session files are auto-migrated on first startup. Falls back to
SessionManager if the JSONL store cannot be initialized.

The agent loop code (loop.go) requires zero changes — all method
calls work identically through the interface.

Closes #1169

* fix(session): propagate compact error from Save

Save() was swallowing the error returned by Compact and always
returning nil. Callers checking Save's return value would never
see a compaction failure. Return the error directly so the agent
loop can log or handle it as needed.

* feat(session): add Close to SessionStore interface

Add Close() error to SessionStore so callers can release resources
through the interface. JSONLBackend already had Close; this adds
a no-op implementation to SessionManager for compatibility.

* fix(session): close session stores on shutdown and harden migration

- Add Close() to AgentInstance, AgentRegistry, and AgentLoop so JSONL
  file handles are released during gateway shutdown and CLI exit.
- Fall back to SessionManager when migration fails, preventing a split
  state where some sessions live in JSONL and others remain in JSON.
- Add defer agentLoop.Close() in the CLI agent command path.
- Document SessionStore interface methods (fire-and-forget contract).
This commit is contained in:
is-Xiaoen
2026-03-10 15:14:09 +08:00
committed by GitHub
parent 2312553286
commit 26f623ed32
9 changed files with 355 additions and 3 deletions
+1
View File
@@ -50,6 +50,7 @@ func agentCmd(message, sessionKey, model string, debug bool) error {
msgBus := bus.NewMessageBus()
defer msgBus.Close()
agentLoop := agent.NewAgentLoop(cfg, msgBus, provider)
defer agentLoop.Close()
// Print agent startup info (only for interactive mode)
startupInfo := agentLoop.GetStartupInfo()
+1
View File
@@ -214,6 +214,7 @@ func gatewayCmd(debug bool) error {
cronService.Stop()
mediaStore.Stop()
agentLoop.Stop()
agentLoop.Close()
fmt.Println("✓ Gateway stopped")
return nil
+38 -3
View File
@@ -1,6 +1,7 @@
package agent
import (
"context"
"fmt"
"log"
"os"
@@ -9,6 +10,7 @@ import (
"strings"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/memory"
"github.com/sipeed/picoclaw/pkg/providers"
"github.com/sipeed/picoclaw/pkg/routing"
"github.com/sipeed/picoclaw/pkg/session"
@@ -31,7 +33,7 @@ type AgentInstance struct {
SummarizeMessageThreshold int
SummarizeTokenPercent int
Provider providers.LLMProvider
Sessions *session.SessionManager
Sessions session.SessionStore
ContextBuilder *ContextBuilder
Tools *tools.ToolRegistry
Subagents *config.SubagentsConfig
@@ -95,7 +97,7 @@ func NewAgentInstance(
}
sessionsDir := filepath.Join(workspace, "sessions")
sessionsManager := session.NewSessionManager(sessionsDir)
sessions := initSessionStore(sessionsDir)
mcpDiscoveryActive := cfg.Tools.MCP.Enabled && cfg.Tools.MCP.Discovery.Enabled
contextBuilder := NewContextBuilder(workspace).WithToolDiscovery(
@@ -226,7 +228,7 @@ func NewAgentInstance(
SummarizeMessageThreshold: summarizeMessageThreshold,
SummarizeTokenPercent: summarizeTokenPercent,
Provider: provider,
Sessions: sessionsManager,
Sessions: sessions,
ContextBuilder: contextBuilder,
Tools: toolsRegistry,
Subagents: subagents,
@@ -280,6 +282,39 @@ func compilePatterns(patterns []string) []*regexp.Regexp {
return compiled
}
// Close releases resources held by the agent's session store.
func (a *AgentInstance) Close() error {
if a.Sessions != nil {
return a.Sessions.Close()
}
return nil
}
// initSessionStore creates the session persistence backend.
// It uses the JSONL store by default and auto-migrates legacy JSON sessions.
// Falls back to SessionManager if the JSONL store cannot be initialized or
// if migration fails (which indicates the store cannot write reliably).
func initSessionStore(dir string) session.SessionStore {
store, err := memory.NewJSONLStore(dir)
if err != nil {
log.Printf("memory: init store: %v; using json sessions", err)
return session.NewSessionManager(dir)
}
if n, merr := memory.MigrateFromJSON(context.Background(), dir, store); merr != nil {
// Migration failure means the store could not write data.
// Fall back to SessionManager to avoid a split state where
// some sessions are in JSONL and others remain in JSON.
log.Printf("memory: migration failed: %v; falling back to json sessions", merr)
store.Close()
return session.NewSessionManager(dir)
} else if n > 0 {
log.Printf("memory: migrated %d session(s) to jsonl", n)
}
return session.NewJSONLBackend(store)
}
func expandHome(path string) string {
if path == "" {
return path
+5
View File
@@ -427,6 +427,11 @@ func (al *AgentLoop) Stop() {
al.running.Store(false)
}
// Close releases resources held by agent session stores. Call after Stop.
func (al *AgentLoop) Close() {
al.registry.Close()
}
func (al *AgentLoop) RegisterTool(tool tools.Tool) {
for _, agentID := range al.registry.ListAgentIDs() {
if agent, ok := al.registry.GetAgent(agentID); ok {
+12
View File
@@ -114,6 +114,18 @@ func (r *AgentRegistry) ForEachTool(name string, fn func(tools.Tool)) {
}
}
// Close releases resources held by all registered agents.
func (r *AgentRegistry) Close() {
r.mu.RLock()
defer r.mu.RUnlock()
for _, agent := range r.agents {
if err := agent.Close(); err != nil {
logger.WarnCF("agent", "Failed to close agent",
map[string]any{"agent_id": agent.ID, "error": err.Error()})
}
}
}
// GetDefaultAgent returns the default agent instance.
func (r *AgentRegistry) GetDefaultAgent() *AgentInstance {
r.mu.RLock()
+81
View File
@@ -0,0 +1,81 @@
package session
import (
"context"
"log"
"github.com/sipeed/picoclaw/pkg/memory"
"github.com/sipeed/picoclaw/pkg/providers"
)
// JSONLBackend adapts a memory.Store into the SessionStore interface.
// Write errors are logged rather than returned, matching the fire-and-forget
// contract of SessionManager that the agent loop relies on.
type JSONLBackend struct {
store memory.Store
}
// NewJSONLBackend wraps a memory.Store for use as a SessionStore.
func NewJSONLBackend(store memory.Store) *JSONLBackend {
return &JSONLBackend{store: store}
}
func (b *JSONLBackend) AddMessage(sessionKey, role, content string) {
if err := b.store.AddMessage(context.Background(), sessionKey, role, content); err != nil {
log.Printf("session: add message: %v", err)
}
}
func (b *JSONLBackend) AddFullMessage(sessionKey string, msg providers.Message) {
if err := b.store.AddFullMessage(context.Background(), sessionKey, msg); err != nil {
log.Printf("session: add full message: %v", err)
}
}
func (b *JSONLBackend) GetHistory(key string) []providers.Message {
msgs, err := b.store.GetHistory(context.Background(), key)
if err != nil {
log.Printf("session: get history: %v", err)
return []providers.Message{}
}
return msgs
}
func (b *JSONLBackend) GetSummary(key string) string {
summary, err := b.store.GetSummary(context.Background(), key)
if err != nil {
log.Printf("session: get summary: %v", err)
return ""
}
return summary
}
func (b *JSONLBackend) SetSummary(key, summary string) {
if err := b.store.SetSummary(context.Background(), key, summary); err != nil {
log.Printf("session: set summary: %v", err)
}
}
func (b *JSONLBackend) SetHistory(key string, history []providers.Message) {
if err := b.store.SetHistory(context.Background(), key, history); err != nil {
log.Printf("session: set history: %v", err)
}
}
func (b *JSONLBackend) TruncateHistory(key string, keepLast int) {
if err := b.store.TruncateHistory(context.Background(), key, keepLast); err != nil {
log.Printf("session: truncate history: %v", err)
}
}
// Save persists session state. Since the JSONL store fsyncs every write
// immediately, the data is already durable. Save runs compaction to reclaim
// space from logically truncated messages (no-op when there are none).
func (b *JSONLBackend) Save(key string) error {
return b.store.Compact(context.Background(), key)
}
// Close releases resources held by the underlying store.
func (b *JSONLBackend) Close() error {
return b.store.Close()
}
+179
View File
@@ -0,0 +1,179 @@
package session_test
import (
"fmt"
"testing"
"github.com/sipeed/picoclaw/pkg/memory"
"github.com/sipeed/picoclaw/pkg/providers"
"github.com/sipeed/picoclaw/pkg/session"
)
// Compile-time interface satisfaction checks.
var (
_ session.SessionStore = (*session.SessionManager)(nil)
_ session.SessionStore = (*session.JSONLBackend)(nil)
)
func newBackend(t *testing.T) *session.JSONLBackend {
t.Helper()
store, err := memory.NewJSONLStore(t.TempDir())
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { store.Close() })
return session.NewJSONLBackend(store)
}
func TestJSONLBackend_AddAndGetHistory(t *testing.T) {
b := newBackend(t)
b.AddMessage("s1", "user", "hello")
b.AddMessage("s1", "assistant", "hi")
history := b.GetHistory("s1")
if len(history) != 2 {
t.Fatalf("got %d messages, want 2", len(history))
}
if history[0].Role != "user" || history[0].Content != "hello" {
t.Errorf("msg[0] = %+v", history[0])
}
if history[1].Role != "assistant" || history[1].Content != "hi" {
t.Errorf("msg[1] = %+v", history[1])
}
}
func TestJSONLBackend_AddFullMessage(t *testing.T) {
b := newBackend(t)
msg := providers.Message{
Role: "assistant",
Content: "done",
ToolCalls: []providers.ToolCall{
{ID: "tc1", Function: &providers.FunctionCall{Name: "read_file", Arguments: `{"path":"x"}`}},
},
}
b.AddFullMessage("s1", msg)
history := b.GetHistory("s1")
if len(history) != 1 {
t.Fatalf("got %d, want 1", len(history))
}
if len(history[0].ToolCalls) != 1 || history[0].ToolCalls[0].ID != "tc1" {
t.Errorf("tool calls = %+v", history[0].ToolCalls)
}
}
func TestJSONLBackend_Summary(t *testing.T) {
b := newBackend(t)
if got := b.GetSummary("s1"); got != "" {
t.Errorf("got %q, want empty", got)
}
b.SetSummary("s1", "test summary")
if got := b.GetSummary("s1"); got != "test summary" {
t.Errorf("got %q, want %q", got, "test summary")
}
}
func TestJSONLBackend_TruncateAndSave(t *testing.T) {
b := newBackend(t)
for i := 0; i < 10; i++ {
b.AddMessage("s1", "user", fmt.Sprintf("msg %d", i))
}
b.TruncateHistory("s1", 3)
history := b.GetHistory("s1")
if len(history) != 3 {
t.Fatalf("got %d, want 3", len(history))
}
if history[0].Content != "msg 7" {
t.Errorf("got %q, want %q", history[0].Content, "msg 7")
}
// Save triggers compaction.
if err := b.Save("s1"); err != nil {
t.Fatal(err)
}
// Messages still accessible after compaction.
history = b.GetHistory("s1")
if len(history) != 3 {
t.Fatalf("after save: got %d, want 3", len(history))
}
}
func TestJSONLBackend_SetHistory(t *testing.T) {
b := newBackend(t)
b.AddMessage("s1", "user", "old")
b.SetHistory("s1", []providers.Message{
{Role: "user", Content: "new1"},
{Role: "assistant", Content: "new2"},
})
history := b.GetHistory("s1")
if len(history) != 2 {
t.Fatalf("got %d, want 2", len(history))
}
if history[0].Content != "new1" {
t.Errorf("got %q, want %q", history[0].Content, "new1")
}
}
func TestJSONLBackend_EmptySession(t *testing.T) {
b := newBackend(t)
history := b.GetHistory("nonexistent")
if history == nil {
t.Fatal("got nil, want empty slice")
}
if len(history) != 0 {
t.Errorf("got %d, want 0", len(history))
}
}
func TestJSONLBackend_SessionIsolation(t *testing.T) {
b := newBackend(t)
b.AddMessage("s1", "user", "session1")
b.AddMessage("s2", "user", "session2")
h1 := b.GetHistory("s1")
h2 := b.GetHistory("s2")
if len(h1) != 1 || h1[0].Content != "session1" {
t.Errorf("s1: %+v", h1)
}
if len(h2) != 1 || h2[0].Content != "session2" {
t.Errorf("s2: %+v", h2)
}
}
func TestJSONLBackend_SummarizeFlow(t *testing.T) {
// Simulates the real summarization flow in the agent loop:
// SetSummary → TruncateHistory → Save
b := newBackend(t)
for i := 0; i < 20; i++ {
b.AddMessage("s1", "user", fmt.Sprintf("msg %d", i))
}
b.SetSummary("s1", "conversation about testing")
b.TruncateHistory("s1", 4)
if err := b.Save("s1"); err != nil {
t.Fatal(err)
}
if got := b.GetSummary("s1"); got != "conversation about testing" {
t.Errorf("summary = %q", got)
}
history := b.GetHistory("s1")
if len(history) != 4 {
t.Fatalf("got %d messages, want 4", len(history))
}
if history[0].Content != "msg 16" {
t.Errorf("first message = %q, want %q", history[0].Content, "msg 16")
}
}
+6
View File
@@ -265,6 +265,12 @@ func (sm *SessionManager) loadSessions() error {
return nil
}
// Close is a no-op for the in-memory SessionManager; it satisfies the
// SessionStore interface so callers can release resources uniformly.
func (sm *SessionManager) Close() error {
return nil
}
// SetHistory updates the messages of a session.
func (sm *SessionManager) SetHistory(key string, history []providers.Message) {
sm.mu.Lock()
+32
View File
@@ -0,0 +1,32 @@
package session
import "github.com/sipeed/picoclaw/pkg/providers"
// SessionStore defines the persistence operations used by the agent loop.
// Both SessionManager (legacy JSON backend) and JSONLBackend satisfy this
// interface, allowing the storage layer to be swapped without touching the
// agent loop code.
//
// Write methods (Add*, Set*, Truncate*) are fire-and-forget: they do not
// return errors. Implementations should log failures internally. This
// matches the original SessionManager contract that the agent loop relies on.
type SessionStore interface {
// AddMessage appends a simple role/content message to the session.
AddMessage(sessionKey, role, content string)
// AddFullMessage appends a complete message including tool calls.
AddFullMessage(sessionKey string, msg providers.Message)
// GetHistory returns the full message history for the session.
GetHistory(key string) []providers.Message
// GetSummary returns the conversation summary, or "" if none.
GetSummary(key string) string
// SetSummary replaces the conversation summary.
SetSummary(key, summary string)
// SetHistory replaces the full message history.
SetHistory(key string, history []providers.Message)
// TruncateHistory keeps only the last keepLast messages.
TruncateHistory(key string, keepLast int)
// Save persists any pending state to durable storage.
Save(key string) error
// Close releases resources held by the store.
Close() error
}