From f7e768152e076d863ce3bd20019256dc96fad44e Mon Sep 17 00:00:00 2001 From: Liu Yuan Date: Mon, 13 Apr 2026 11:04:45 +0800 Subject: [PATCH] feat(agent): /clear now clears seahorse DB in addition to JSONL - Add Clear(ctx, sessionKey) to ContextManager interface - Implement Clear for legacy (JSONL) and seahorse (DB + JSONL) - Add Engine.ClearSession + Store.ClearConversation - Fix FTS5 DELETE trigger syntax in schema (was using wrong external-content FTS5 syntax; now uses standard DELETE FROM) - Fix ClearSession to skip sessions never ingested (was creating blank conversations record via GetOrCreateConversation) - Simplify summary_parents DELETE into single OR statement - Add TestStoreClearConversation unit test --- pkg/agent/context_legacy.go | 10 ++++ pkg/agent/context_manager.go | 4 ++ pkg/agent/context_manager_test.go | 3 ++ pkg/agent/context_seahorse.go | 13 +++++ pkg/agent/loop.go | 17 +++--- pkg/seahorse/schema.go | 4 +- pkg/seahorse/short_engine.go | 13 +++++ pkg/seahorse/store.go | 51 ++++++++++++++++++ pkg/seahorse/store_test.go | 90 ++++++++++++++++++++++++++++++- 9 files changed, 192 insertions(+), 13 deletions(-) diff --git a/pkg/agent/context_legacy.go b/pkg/agent/context_legacy.go index 0f10decb3..85e331ae9 100644 --- a/pkg/agent/context_legacy.go +++ b/pkg/agent/context_legacy.go @@ -61,6 +61,16 @@ func (m *legacyContextManager) Ingest(_ context.Context, _ *IngestRequest) error return nil } +func (m *legacyContextManager) Clear(_ context.Context, sessionKey string) error { + agent := m.al.registry.GetDefaultAgent() + if agent == nil || agent.Sessions == nil { + return fmt.Errorf("sessions not initialized") + } + agent.Sessions.SetHistory(sessionKey, []providers.Message{}) + agent.Sessions.SetSummary(sessionKey, "") + return agent.Sessions.Save(sessionKey) +} + // maybeSummarize triggers summarization if the session history exceeds thresholds. // It runs asynchronously in a goroutine. func (m *legacyContextManager) maybeSummarize(sessionKey string) { diff --git a/pkg/agent/context_manager.go b/pkg/agent/context_manager.go index 5f8701812..5a5dfe97c 100644 --- a/pkg/agent/context_manager.go +++ b/pkg/agent/context_manager.go @@ -24,6 +24,10 @@ type ContextManager interface { // Ingest records a message into the ContextManager's own storage. // Called after each message is persisted to session JSONL. Ingest(ctx context.Context, req *IngestRequest) error + + // Clear removes all stored context for a session (messages, summaries, etc.). + // Called when the user issues /clear or /reset. + Clear(ctx context.Context, sessionKey string) error } // AssembleRequest is the input to Assemble. diff --git a/pkg/agent/context_manager_test.go b/pkg/agent/context_manager_test.go index 6bde5e1a9..629d11fcb 100644 --- a/pkg/agent/context_manager_test.go +++ b/pkg/agent/context_manager_test.go @@ -690,6 +690,7 @@ func (m *noopContextManager) Assemble(_ context.Context, req *AssembleRequest) ( } func (m *noopContextManager) Compact(_ context.Context, _ *CompactRequest) error { return nil } func (m *noopContextManager) Ingest(_ context.Context, _ *IngestRequest) error { return nil } +func (m *noopContextManager) Clear(_ context.Context, _ string) error { return nil } // trackingContextManager tracks call counts for each method. type trackingContextManager struct { @@ -726,6 +727,8 @@ func (m *trackingContextManager) Ingest(_ context.Context, req *IngestRequest) e return nil } +func (m *trackingContextManager) Clear(_ context.Context, _ string) error { return nil } + // resetCMRegistry clears the global factory registry and returns a cleanup // function that restores the original state after the test. func resetCMRegistry() func() { diff --git a/pkg/agent/context_seahorse.go b/pkg/agent/context_seahorse.go index 327c6162a..c6e5b30ac 100644 --- a/pkg/agent/context_seahorse.go +++ b/pkg/agent/context_seahorse.go @@ -154,6 +154,19 @@ func (m *seahorseContextManager) Ingest(ctx context.Context, req *IngestRequest) return err } +// Clear removes all stored context for a session (seahorse DB + JSONL). +func (m *seahorseContextManager) Clear(ctx context.Context, sessionKey string) error { + if err := m.engine.ClearSession(ctx, sessionKey); err != nil { + return err + } + if m.sessions != nil { + m.sessions.SetHistory(sessionKey, []providers.Message{}) + m.sessions.SetSummary(sessionKey, "") + return m.sessions.Save(sessionKey) + } + return nil +} + // bootstrapSession reconciles JSONL session history into seahorse SQLite. func (m *seahorseContextManager) bootstrapSession(ctx context.Context, sessionKey string) { if m.sessions == nil { diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index a856c0fca..f67802663 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -3368,7 +3368,7 @@ func (al *AgentLoop) handleCommand( return "", false } - rt := al.buildCommandsRuntime(agent, opts) + rt := al.buildCommandsRuntime(ctx, agent, opts) executor := commands.NewExecutor(al.cmdRegistry, rt) var commandReply string @@ -3488,7 +3488,11 @@ func (al *AgentLoop) applyExplicitSkillCommand( return true, false, "" } -func (al *AgentLoop) buildCommandsRuntime(agent *AgentInstance, opts *processOptions) *commands.Runtime { +func (al *AgentLoop) buildCommandsRuntime( + ctx context.Context, + agent *AgentInstance, + opts *processOptions, +) *commands.Runtime { registry := al.GetRegistry() cfg := al.GetConfig() rt := &commands.Runtime{ @@ -3570,14 +3574,7 @@ func (al *AgentLoop) buildCommandsRuntime(agent *AgentInstance, opts *processOpt if opts == nil { return fmt.Errorf("process options not available") } - if agent.Sessions == nil { - return fmt.Errorf("sessions not initialized for agent") - } - - agent.Sessions.SetHistory(opts.SessionKey, make([]providers.Message, 0)) - agent.Sessions.SetSummary(opts.SessionKey, "") - agent.Sessions.Save(opts.SessionKey) - return nil + return al.contextManager.Clear(ctx, opts.SessionKey) } } return rt diff --git a/pkg/seahorse/schema.go b/pkg/seahorse/schema.go index effa6d60d..bf32d548b 100644 --- a/pkg/seahorse/schema.go +++ b/pkg/seahorse/schema.go @@ -123,10 +123,10 @@ func runSchema(db *sql.DB) error { INSERT INTO summaries_fts (summary_id, content) VALUES (new.summary_id, new.content); END`, `CREATE TRIGGER IF NOT EXISTS summaries_ad AFTER DELETE ON summaries BEGIN - INSERT INTO summaries_fts (summaries_fts, summary_id, content) VALUES ('delete', old.summary_id, old.content); + DELETE FROM summaries_fts WHERE summary_id = old.summary_id; END`, `CREATE TRIGGER IF NOT EXISTS summaries_au AFTER UPDATE ON summaries BEGIN - INSERT INTO summaries_fts (summaries_fts, summary_id, content) VALUES ('delete', old.summary_id, old.content); + DELETE FROM summaries_fts WHERE summary_id = old.summary_id; INSERT INTO summaries_fts (summary_id, content) VALUES (new.summary_id, new.content); END`, diff --git a/pkg/seahorse/short_engine.go b/pkg/seahorse/short_engine.go index 4cd4d3887..f584788ce 100644 --- a/pkg/seahorse/short_engine.go +++ b/pkg/seahorse/short_engine.go @@ -377,6 +377,19 @@ func (e *Engine) IngestMessages(ctx context.Context, sessionKey string, messages return e.Ingest(ctx, sessionKey, messages) } +// ClearSession removes all stored data for a session (messages, summaries, context). +// If the session has no prior seahorse record, it is a no-op. +func (e *Engine) ClearSession(ctx context.Context, sessionKey string) error { + conv, err := e.store.GetConversationBySessionKey(ctx, sessionKey) + if err != nil { + return err + } + if conv == nil { + return nil // session never ingested, nothing to clear + } + return e.store.ClearConversation(ctx, conv.ConversationID) +} + // Bootstrap reconciles a session's messages with the database. // Called once at startup for each known session. // Bootstrap reconciles JSONL history with SQLite by ingesting only the delta. diff --git a/pkg/seahorse/store.go b/pkg/seahorse/store.go index 3026533b2..c84aaaf07 100644 --- a/pkg/seahorse/store.go +++ b/pkg/seahorse/store.go @@ -728,6 +728,57 @@ func (s *Store) DeleteMessagesAfterID(ctx context.Context, convID int64, afterID return tx.Commit() } +// ClearConversation removes all data for a conversation from all tables. +// Deletes context_items, summary_messages, summary_parents (via subquery), summaries, +// message_parts, and messages. FTS entries are handled automatically by triggers. +// Uses a transaction for atomicity. +func (s *Store) ClearConversation(ctx context.Context, convID int64) error { + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return err + } + defer tx.Rollback() + + // Delete in child→parent order. FTS tables (messages_fts, summaries_fts) are + // kept in sync by DELETE triggers, so we just delete from the parent tables. + + if _, err := tx.ExecContext(ctx, + "DELETE FROM context_items WHERE conversation_id = ?", convID); err != nil { + return fmt.Errorf("context_items: %w", err) + } + if _, err := tx.ExecContext(ctx, + `DELETE FROM summary_messages WHERE summary_id IN ( + SELECT summary_id FROM summaries WHERE conversation_id = ? + )`, convID); err != nil { + return fmt.Errorf("summary_messages: %w", err) + } + // Note: summary_parents has no convID column; delete via subquery on summaries + if _, err := tx.ExecContext(ctx, + `DELETE FROM summary_parents WHERE summary_id IN ( + SELECT summary_id FROM summaries WHERE conversation_id = ? + ) OR parent_summary_id IN ( + SELECT summary_id FROM summaries WHERE conversation_id = ? + )`, convID, convID); err != nil { + return fmt.Errorf("summary_parents: %w", err) + } + if _, err := tx.ExecContext(ctx, + "DELETE FROM summaries WHERE conversation_id = ?", convID); err != nil { + return fmt.Errorf("summaries: %w", err) + } + if _, err := tx.ExecContext(ctx, + `DELETE FROM message_parts WHERE message_id IN ( + SELECT message_id FROM messages WHERE conversation_id = ? + )`, convID); err != nil { + return fmt.Errorf("message_parts: %w", err) + } + if _, err := tx.ExecContext(ctx, + "DELETE FROM messages WHERE conversation_id = ?", convID); err != nil { + return fmt.Errorf("messages: %w", err) + } + + return tx.Commit() +} + // AppendContextMessage appends a single message to context_items at next ordinal. func (s *Store) AppendContextMessage(ctx context.Context, convID int64, messageID int64) error { return s.appendContextItems(ctx, convID, []ContextItem{ diff --git a/pkg/seahorse/store_test.go b/pkg/seahorse/store_test.go index fd55379c6..89635cc9a 100644 --- a/pkg/seahorse/store_test.go +++ b/pkg/seahorse/store_test.go @@ -79,7 +79,95 @@ func TestStoreGetConversationBySessionKey(t *testing.T) { } } -// --- Message Operations --- +// --- Conversation Clear --- + +func TestStoreClearConversation(t *testing.T) { + s := openTestStore(t) + ctx := context.Background() + + conv, err := s.GetOrCreateConversation(ctx, "agent:clear-test") + if err != nil { + t.Fatalf("create conversation: %v", err) + } + + // Add messages + msg1, err := s.AddMessage(ctx, conv.ConversationID, "user", "hello", 5) + if err != nil { + t.Fatalf("add message 1: %v", err) + } + msg2, err := s.AddMessage(ctx, conv.ConversationID, "assistant", "hi", 5) + if err != nil { + t.Fatalf("add message 2: %v", err) + } + + // Add a summary + _, err = s.CreateSummary(ctx, CreateSummaryInput{ + ConversationID: conv.ConversationID, + Content: "test summary", + TokenCount: 10, + Kind: SummaryKindLeaf, + }) + if err != nil { + t.Fatalf("create summary: %v", err) + } + + // Verify data exists + msgs, err := s.GetMessages(ctx, conv.ConversationID, 0, 0) + if err != nil { + t.Fatalf("get messages before clear: %v", err) + } + if len(msgs) != 2 { + t.Fatalf("expected 2 messages before clear, got %d", len(msgs)) + } + + sums, err := s.GetSummariesByConversation(ctx, conv.ConversationID) + if err != nil { + t.Fatalf("get summaries before clear: %v", err) + } + if len(sums) != 1 { + t.Fatalf("expected 1 summary before clear, got %d", len(sums)) + } + + // Clear + if err = s.ClearConversation(ctx, conv.ConversationID); err != nil { + t.Fatalf("clear conversation: %v", err) + } + + // Verify all data is gone + msgs, err = s.GetMessages(ctx, conv.ConversationID, 0, 0) + if err != nil { + t.Fatalf("get messages after clear: %v", err) + } + if len(msgs) != 0 { + t.Fatalf("expected 0 messages after clear, got %d", len(msgs)) + } + + sums, err = s.GetSummariesByConversation(ctx, conv.ConversationID) + if err != nil { + t.Fatalf("get summaries after clear: %v", err) + } + if len(sums) != 0 { + t.Fatalf("expected 0 summaries after clear, got %d", len(sums)) + } + + items, err := s.GetContextItems(ctx, conv.ConversationID) + if err != nil { + t.Fatalf("get context items after clear: %v", err) + } + if len(items) != 0 { + t.Fatalf("expected 0 context items after clear, got %d", len(items)) + } + + var count int + if err := s.db.QueryRowContext(ctx, + "SELECT COUNT(*) FROM message_parts WHERE message_id = ? OR message_id = ?", + msg1.ID, msg2.ID).Scan(&count); err != nil { + t.Fatalf("count message parts: %v", err) + } + if count != 0 { + t.Fatalf("expected 0 message parts after clear, got %d", count) + } +} func TestStoreAddAndGetMessages(t *testing.T) { s := openTestStore(t)