mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
Merge pull request #2495 from liuy/feat/seahorse-clear
feat(agent): /clear clears seahorse DB
This commit is contained in:
@@ -61,6 +61,16 @@ func (m *legacyContextManager) Ingest(_ context.Context, _ *IngestRequest) error
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *legacyContextManager) Clear(_ context.Context, sessionKey string) error {
|
||||
agent := m.al.registry.GetDefaultAgent()
|
||||
if agent == nil || agent.Sessions == nil {
|
||||
return fmt.Errorf("sessions not initialized")
|
||||
}
|
||||
agent.Sessions.SetHistory(sessionKey, []providers.Message{})
|
||||
agent.Sessions.SetSummary(sessionKey, "")
|
||||
return agent.Sessions.Save(sessionKey)
|
||||
}
|
||||
|
||||
// maybeSummarize triggers summarization if the session history exceeds thresholds.
|
||||
// It runs asynchronously in a goroutine.
|
||||
func (m *legacyContextManager) maybeSummarize(sessionKey string) {
|
||||
|
||||
@@ -24,6 +24,10 @@ type ContextManager interface {
|
||||
// Ingest records a message into the ContextManager's own storage.
|
||||
// Called after each message is persisted to session JSONL.
|
||||
Ingest(ctx context.Context, req *IngestRequest) error
|
||||
|
||||
// Clear removes all stored context for a session (messages, summaries, etc.).
|
||||
// Called when the user issues /clear or /reset.
|
||||
Clear(ctx context.Context, sessionKey string) error
|
||||
}
|
||||
|
||||
// AssembleRequest is the input to Assemble.
|
||||
|
||||
@@ -690,6 +690,7 @@ func (m *noopContextManager) Assemble(_ context.Context, req *AssembleRequest) (
|
||||
}
|
||||
func (m *noopContextManager) Compact(_ context.Context, _ *CompactRequest) error { return nil }
|
||||
func (m *noopContextManager) Ingest(_ context.Context, _ *IngestRequest) error { return nil }
|
||||
func (m *noopContextManager) Clear(_ context.Context, _ string) error { return nil }
|
||||
|
||||
// trackingContextManager tracks call counts for each method.
|
||||
type trackingContextManager struct {
|
||||
@@ -726,6 +727,8 @@ func (m *trackingContextManager) Ingest(_ context.Context, req *IngestRequest) e
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *trackingContextManager) Clear(_ context.Context, _ string) error { return nil }
|
||||
|
||||
// resetCMRegistry clears the global factory registry and returns a cleanup
|
||||
// function that restores the original state after the test.
|
||||
func resetCMRegistry() func() {
|
||||
|
||||
@@ -154,6 +154,19 @@ func (m *seahorseContextManager) Ingest(ctx context.Context, req *IngestRequest)
|
||||
return err
|
||||
}
|
||||
|
||||
// Clear removes all stored context for a session (seahorse DB + JSONL).
|
||||
func (m *seahorseContextManager) Clear(ctx context.Context, sessionKey string) error {
|
||||
if err := m.engine.ClearSession(ctx, sessionKey); err != nil {
|
||||
return err
|
||||
}
|
||||
if m.sessions != nil {
|
||||
m.sessions.SetHistory(sessionKey, []providers.Message{})
|
||||
m.sessions.SetSummary(sessionKey, "")
|
||||
return m.sessions.Save(sessionKey)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// bootstrapSession reconciles JSONL session history into seahorse SQLite.
|
||||
func (m *seahorseContextManager) bootstrapSession(ctx context.Context, sessionKey string) {
|
||||
if m.sessions == nil {
|
||||
|
||||
+7
-10
@@ -3368,7 +3368,7 @@ func (al *AgentLoop) handleCommand(
|
||||
return "", false
|
||||
}
|
||||
|
||||
rt := al.buildCommandsRuntime(agent, opts)
|
||||
rt := al.buildCommandsRuntime(ctx, agent, opts)
|
||||
executor := commands.NewExecutor(al.cmdRegistry, rt)
|
||||
|
||||
var commandReply string
|
||||
@@ -3488,7 +3488,11 @@ func (al *AgentLoop) applyExplicitSkillCommand(
|
||||
return true, false, ""
|
||||
}
|
||||
|
||||
func (al *AgentLoop) buildCommandsRuntime(agent *AgentInstance, opts *processOptions) *commands.Runtime {
|
||||
func (al *AgentLoop) buildCommandsRuntime(
|
||||
ctx context.Context,
|
||||
agent *AgentInstance,
|
||||
opts *processOptions,
|
||||
) *commands.Runtime {
|
||||
registry := al.GetRegistry()
|
||||
cfg := al.GetConfig()
|
||||
rt := &commands.Runtime{
|
||||
@@ -3570,14 +3574,7 @@ func (al *AgentLoop) buildCommandsRuntime(agent *AgentInstance, opts *processOpt
|
||||
if opts == nil {
|
||||
return fmt.Errorf("process options not available")
|
||||
}
|
||||
if agent.Sessions == nil {
|
||||
return fmt.Errorf("sessions not initialized for agent")
|
||||
}
|
||||
|
||||
agent.Sessions.SetHistory(opts.SessionKey, make([]providers.Message, 0))
|
||||
agent.Sessions.SetSummary(opts.SessionKey, "")
|
||||
agent.Sessions.Save(opts.SessionKey)
|
||||
return nil
|
||||
return al.contextManager.Clear(ctx, opts.SessionKey)
|
||||
}
|
||||
}
|
||||
return rt
|
||||
|
||||
+17
-8
@@ -118,26 +118,35 @@ func runSchema(db *sql.DB) error {
|
||||
`CREATE INDEX IF NOT EXISTS idx_summary_messages_message ON summary_messages(message_id)`,
|
||||
`CREATE INDEX IF NOT EXISTS idx_context_items_conv ON context_items(conversation_id, ordinal)`,
|
||||
|
||||
// Drop old triggers before creating new ones so existing DBs get updated bodies.
|
||||
// (CREATE TRIGGER IF NOT EXISTS does NOT replace an existing trigger body.)
|
||||
`DROP TRIGGER IF EXISTS summaries_ai`,
|
||||
`DROP TRIGGER IF EXISTS summaries_ad`,
|
||||
`DROP TRIGGER IF EXISTS summaries_au`,
|
||||
`DROP TRIGGER IF EXISTS messages_ai`,
|
||||
`DROP TRIGGER IF EXISTS messages_ad`,
|
||||
`DROP TRIGGER IF EXISTS messages_au`,
|
||||
|
||||
// FTS5 triggers to keep summaries_fts in sync with summaries table
|
||||
`CREATE TRIGGER IF NOT EXISTS summaries_ai AFTER INSERT ON summaries BEGIN
|
||||
`CREATE TRIGGER summaries_ai AFTER INSERT ON summaries BEGIN
|
||||
INSERT INTO summaries_fts (summary_id, content) VALUES (new.summary_id, new.content);
|
||||
END`,
|
||||
`CREATE TRIGGER IF NOT EXISTS summaries_ad AFTER DELETE ON summaries BEGIN
|
||||
INSERT INTO summaries_fts (summaries_fts, summary_id, content) VALUES ('delete', old.summary_id, old.content);
|
||||
`CREATE TRIGGER summaries_ad AFTER DELETE ON summaries BEGIN
|
||||
DELETE FROM summaries_fts WHERE summary_id = old.summary_id;
|
||||
END`,
|
||||
`CREATE TRIGGER IF NOT EXISTS summaries_au AFTER UPDATE ON summaries BEGIN
|
||||
INSERT INTO summaries_fts (summaries_fts, summary_id, content) VALUES ('delete', old.summary_id, old.content);
|
||||
`CREATE TRIGGER summaries_au AFTER UPDATE ON summaries BEGIN
|
||||
DELETE FROM summaries_fts WHERE summary_id = old.summary_id;
|
||||
INSERT INTO summaries_fts (summary_id, content) VALUES (new.summary_id, new.content);
|
||||
END`,
|
||||
|
||||
// FTS5 triggers to keep messages_fts in sync with messages table
|
||||
`CREATE TRIGGER IF NOT EXISTS messages_ai AFTER INSERT ON messages BEGIN
|
||||
`CREATE TRIGGER messages_ai AFTER INSERT ON messages BEGIN
|
||||
INSERT INTO messages_fts (message_id, content) VALUES (new.message_id, new.content);
|
||||
END`,
|
||||
`CREATE TRIGGER IF NOT EXISTS messages_ad AFTER DELETE ON messages BEGIN
|
||||
`CREATE TRIGGER messages_ad AFTER DELETE ON messages BEGIN
|
||||
DELETE FROM messages_fts WHERE message_id = old.message_id;
|
||||
END`,
|
||||
`CREATE TRIGGER IF NOT EXISTS messages_au AFTER UPDATE ON messages BEGIN
|
||||
`CREATE TRIGGER messages_au AFTER UPDATE ON messages BEGIN
|
||||
DELETE FROM messages_fts WHERE message_id = old.message_id;
|
||||
INSERT INTO messages_fts (message_id, content) VALUES (new.message_id, new.content);
|
||||
END`,
|
||||
|
||||
@@ -194,6 +194,84 @@ func TestMigrationSummaryParentsPK(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTriggerMigration(t *testing.T) {
|
||||
db := openTestDB(t)
|
||||
|
||||
// Run schema once to create tables and (correct) triggers
|
||||
if err := runSchema(db); err != nil {
|
||||
t.Fatalf("runSchema: %v", err)
|
||||
}
|
||||
|
||||
// Drop correct triggers and recreate them with the old buggy body.
|
||||
// The old trigger used INSERT INTO fts VALUES('delete', ...) which is wrong
|
||||
// for non-external-content FTS5 tables.
|
||||
oldSummariesDelete := `CREATE TRIGGER summaries_ad AFTER DELETE ON summaries BEGIN
|
||||
INSERT INTO summaries_fts (summaries_fts, summary_id, content) VALUES('delete', old.summary_id, old.content);
|
||||
END`
|
||||
oldMessagesDelete := `CREATE TRIGGER messages_ad AFTER DELETE ON messages BEGIN
|
||||
INSERT INTO messages_fts (messages_fts, message_id, content) VALUES('delete', old.message_id, old.content);
|
||||
END`
|
||||
|
||||
for _, sql := range []string{
|
||||
`DROP TRIGGER IF EXISTS summaries_ad`,
|
||||
`DROP TRIGGER IF EXISTS messages_ad`,
|
||||
oldSummariesDelete,
|
||||
oldMessagesDelete,
|
||||
} {
|
||||
if _, err := db.Exec(sql); err != nil {
|
||||
t.Fatalf("setup old trigger: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Insert a conversation and summary so we have something to delete
|
||||
_, err := db.Exec(`INSERT INTO conversations (session_key) VALUES ('old-db-test')`)
|
||||
if err != nil {
|
||||
t.Fatalf("insert conversation: %v", err)
|
||||
}
|
||||
_, err = db.Exec(`INSERT INTO summaries (summary_id, conversation_id, kind, depth, content, token_count)
|
||||
VALUES ('old-sum', 1, 'leaf', 0, 'old content', 5)`)
|
||||
if err != nil {
|
||||
t.Fatalf("insert summary: %v", err)
|
||||
}
|
||||
|
||||
// The old trigger body is wrong for normal FTS5 — DELETE should fail.
|
||||
_, err = db.Exec(`DELETE FROM summaries WHERE summary_id = 'old-sum'`)
|
||||
if err == nil {
|
||||
t.Error("expected error from old buggy trigger, but DELETE succeeded")
|
||||
} else {
|
||||
t.Logf("old trigger correctly causes error: %v", err)
|
||||
}
|
||||
|
||||
// Now runSchema again — this drops and recreates the triggers with correct bodies.
|
||||
err = runSchema(db)
|
||||
if err != nil {
|
||||
t.Fatalf("runSchema migration: %v", err)
|
||||
}
|
||||
|
||||
// Insert again so we have data to delete
|
||||
_, err = db.Exec(`INSERT INTO summaries (summary_id, conversation_id, kind, depth, content, token_count)
|
||||
VALUES ('migrated-sum', 1, 'leaf', 0, 'new content', 5)`)
|
||||
if err != nil {
|
||||
t.Fatalf("insert after migration: %v", err)
|
||||
}
|
||||
|
||||
// DELETE should now work with the corrected trigger body.
|
||||
_, err = db.Exec(`DELETE FROM summaries WHERE summary_id = 'migrated-sum'`)
|
||||
if err != nil {
|
||||
t.Fatalf("DELETE after migration failed (trigger not corrected): %v", err)
|
||||
}
|
||||
|
||||
// Verify the summary is gone
|
||||
var count int
|
||||
err = db.QueryRow(`SELECT count(*) FROM summaries WHERE summary_id = 'migrated-sum'`).Scan(&count)
|
||||
if err != nil {
|
||||
t.Fatalf("query after delete: %v", err)
|
||||
}
|
||||
if count != 0 {
|
||||
t.Errorf("summary should be gone after DELETE, got count=%d", count)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFTS5SQLConstants(t *testing.T) {
|
||||
db := openTestDB(t)
|
||||
|
||||
|
||||
@@ -377,6 +377,19 @@ func (e *Engine) IngestMessages(ctx context.Context, sessionKey string, messages
|
||||
return e.Ingest(ctx, sessionKey, messages)
|
||||
}
|
||||
|
||||
// ClearSession removes all stored data for a session (messages, summaries, context).
|
||||
// If the session has no prior seahorse record, it is a no-op.
|
||||
func (e *Engine) ClearSession(ctx context.Context, sessionKey string) error {
|
||||
conv, err := e.store.GetConversationBySessionKey(ctx, sessionKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if conv == nil {
|
||||
return nil // session never ingested, nothing to clear
|
||||
}
|
||||
return e.store.ClearConversation(ctx, conv.ConversationID)
|
||||
}
|
||||
|
||||
// Bootstrap reconciles a session's messages with the database.
|
||||
// Called once at startup for each known session.
|
||||
// Bootstrap reconciles JSONL history with SQLite by ingesting only the delta.
|
||||
|
||||
@@ -728,6 +728,57 @@ func (s *Store) DeleteMessagesAfterID(ctx context.Context, convID int64, afterID
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
// ClearConversation removes all data for a conversation from all tables.
|
||||
// Deletes context_items, summary_messages, summary_parents (via subquery), summaries,
|
||||
// message_parts, and messages. FTS entries are handled automatically by triggers.
|
||||
// Uses a transaction for atomicity.
|
||||
func (s *Store) ClearConversation(ctx context.Context, convID int64) error {
|
||||
tx, err := s.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
// Delete in child→parent order. FTS tables (messages_fts, summaries_fts) are
|
||||
// kept in sync by DELETE triggers, so we just delete from the parent tables.
|
||||
|
||||
if _, err := tx.ExecContext(ctx,
|
||||
"DELETE FROM context_items WHERE conversation_id = ?", convID); err != nil {
|
||||
return fmt.Errorf("context_items: %w", err)
|
||||
}
|
||||
if _, err := tx.ExecContext(ctx,
|
||||
`DELETE FROM summary_messages WHERE summary_id IN (
|
||||
SELECT summary_id FROM summaries WHERE conversation_id = ?
|
||||
)`, convID); err != nil {
|
||||
return fmt.Errorf("summary_messages: %w", err)
|
||||
}
|
||||
// Note: summary_parents has no convID column; delete via subquery on summaries
|
||||
if _, err := tx.ExecContext(ctx,
|
||||
`DELETE FROM summary_parents WHERE summary_id IN (
|
||||
SELECT summary_id FROM summaries WHERE conversation_id = ?
|
||||
) OR parent_summary_id IN (
|
||||
SELECT summary_id FROM summaries WHERE conversation_id = ?
|
||||
)`, convID, convID); err != nil {
|
||||
return fmt.Errorf("summary_parents: %w", err)
|
||||
}
|
||||
if _, err := tx.ExecContext(ctx,
|
||||
"DELETE FROM summaries WHERE conversation_id = ?", convID); err != nil {
|
||||
return fmt.Errorf("summaries: %w", err)
|
||||
}
|
||||
if _, err := tx.ExecContext(ctx,
|
||||
`DELETE FROM message_parts WHERE message_id IN (
|
||||
SELECT message_id FROM messages WHERE conversation_id = ?
|
||||
)`, convID); err != nil {
|
||||
return fmt.Errorf("message_parts: %w", err)
|
||||
}
|
||||
if _, err := tx.ExecContext(ctx,
|
||||
"DELETE FROM messages WHERE conversation_id = ?", convID); err != nil {
|
||||
return fmt.Errorf("messages: %w", err)
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
// AppendContextMessage appends a single message to context_items at next ordinal.
|
||||
func (s *Store) AppendContextMessage(ctx context.Context, convID int64, messageID int64) error {
|
||||
return s.appendContextItems(ctx, convID, []ContextItem{
|
||||
|
||||
@@ -79,7 +79,95 @@ func TestStoreGetConversationBySessionKey(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// --- Message Operations ---
|
||||
// --- Conversation Clear ---
|
||||
|
||||
func TestStoreClearConversation(t *testing.T) {
|
||||
s := openTestStore(t)
|
||||
ctx := context.Background()
|
||||
|
||||
conv, err := s.GetOrCreateConversation(ctx, "agent:clear-test")
|
||||
if err != nil {
|
||||
t.Fatalf("create conversation: %v", err)
|
||||
}
|
||||
|
||||
// Add messages
|
||||
msg1, err := s.AddMessage(ctx, conv.ConversationID, "user", "hello", 5)
|
||||
if err != nil {
|
||||
t.Fatalf("add message 1: %v", err)
|
||||
}
|
||||
msg2, err := s.AddMessage(ctx, conv.ConversationID, "assistant", "hi", 5)
|
||||
if err != nil {
|
||||
t.Fatalf("add message 2: %v", err)
|
||||
}
|
||||
|
||||
// Add a summary
|
||||
_, err = s.CreateSummary(ctx, CreateSummaryInput{
|
||||
ConversationID: conv.ConversationID,
|
||||
Content: "test summary",
|
||||
TokenCount: 10,
|
||||
Kind: SummaryKindLeaf,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("create summary: %v", err)
|
||||
}
|
||||
|
||||
// Verify data exists
|
||||
msgs, err := s.GetMessages(ctx, conv.ConversationID, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("get messages before clear: %v", err)
|
||||
}
|
||||
if len(msgs) != 2 {
|
||||
t.Fatalf("expected 2 messages before clear, got %d", len(msgs))
|
||||
}
|
||||
|
||||
sums, err := s.GetSummariesByConversation(ctx, conv.ConversationID)
|
||||
if err != nil {
|
||||
t.Fatalf("get summaries before clear: %v", err)
|
||||
}
|
||||
if len(sums) != 1 {
|
||||
t.Fatalf("expected 1 summary before clear, got %d", len(sums))
|
||||
}
|
||||
|
||||
// Clear
|
||||
if err = s.ClearConversation(ctx, conv.ConversationID); err != nil {
|
||||
t.Fatalf("clear conversation: %v", err)
|
||||
}
|
||||
|
||||
// Verify all data is gone
|
||||
msgs, err = s.GetMessages(ctx, conv.ConversationID, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("get messages after clear: %v", err)
|
||||
}
|
||||
if len(msgs) != 0 {
|
||||
t.Fatalf("expected 0 messages after clear, got %d", len(msgs))
|
||||
}
|
||||
|
||||
sums, err = s.GetSummariesByConversation(ctx, conv.ConversationID)
|
||||
if err != nil {
|
||||
t.Fatalf("get summaries after clear: %v", err)
|
||||
}
|
||||
if len(sums) != 0 {
|
||||
t.Fatalf("expected 0 summaries after clear, got %d", len(sums))
|
||||
}
|
||||
|
||||
items, err := s.GetContextItems(ctx, conv.ConversationID)
|
||||
if err != nil {
|
||||
t.Fatalf("get context items after clear: %v", err)
|
||||
}
|
||||
if len(items) != 0 {
|
||||
t.Fatalf("expected 0 context items after clear, got %d", len(items))
|
||||
}
|
||||
|
||||
var count int
|
||||
if err := s.db.QueryRowContext(ctx,
|
||||
"SELECT COUNT(*) FROM message_parts WHERE message_id = ? OR message_id = ?",
|
||||
msg1.ID, msg2.ID).Scan(&count); err != nil {
|
||||
t.Fatalf("count message parts: %v", err)
|
||||
}
|
||||
if count != 0 {
|
||||
t.Fatalf("expected 0 message parts after clear, got %d", count)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStoreAddAndGetMessages(t *testing.T) {
|
||||
s := openTestStore(t)
|
||||
|
||||
Reference in New Issue
Block a user