Merge pull request #617 from Zhaoyikaiii/fix/repeated-context-reprocessing

fix: implement caching for system prompt to avoid repeated context reprocessing (fixes #607)
This commit is contained in:
daming大铭
2026-02-25 19:51:23 +08:00
committed by GitHub
9 changed files with 965 additions and 52 deletions
+329 -25
View File
@@ -1,11 +1,14 @@
package agent
import (
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"time"
"github.com/sipeed/picoclaw/pkg/logger"
@@ -19,6 +22,19 @@ type ContextBuilder struct {
skillsLoader *skills.SkillsLoader
memory *MemoryStore
tools *tools.ToolRegistry // Direct reference to tool registry
// Cache for system prompt to avoid rebuilding on every call.
// This fixes issue #607: repeated reprocessing of the entire context.
// The cache auto-invalidates when workspace source files change (mtime check).
systemPromptMutex sync.RWMutex
cachedSystemPrompt string
cachedAt time.Time // max observed mtime across tracked paths at cache build time
// existedAtCache tracks which source file paths existed the last time the
// cache was built. This lets sourceFilesChanged detect files that are newly
// created (didn't exist at cache time, now exist) or deleted (existed at
// cache time, now gone) — both of which should trigger a cache rebuild.
existedAtCache map[string]bool
}
func getGlobalConfigDir() string {
@@ -49,9 +65,7 @@ func (cb *ContextBuilder) SetToolsRegistry(registry *tools.ToolRegistry) {
}
func (cb *ContextBuilder) getIdentity() string {
now := time.Now().Format("2006-01-02 15:04 (Monday)")
workspacePath, _ := filepath.Abs(filepath.Join(cb.workspace))
runtime := fmt.Sprintf("%s %s, Go %s", runtime.GOOS, runtime.GOARCH, runtime.Version())
// Build tools section dynamically
toolsSection := cb.buildToolsSection()
@@ -60,12 +74,6 @@ func (cb *ContextBuilder) getIdentity() string {
You are picoclaw, a helpful AI assistant.
## Current Time
%s
## Runtime
%s
## Workspace
Your workspace is at: %s
- Memory: %s/memory/MEMORY.md
@@ -80,8 +88,10 @@ Your workspace is at: %s
2. **Be helpful and accurate** - When using tools, briefly explain what you're doing.
3. **Memory** - When interacting with me if something seems memorable, update %s/memory/MEMORY.md`,
now, runtime, workspacePath, workspacePath, workspacePath, workspacePath, toolsSection, workspacePath)
3. **Memory** - When interacting with me if something seems memorable, update %s/memory/MEMORY.md
4. **Context summaries** - Conversation summaries provided as context are approximate references only. They may be incomplete or outdated. Always defer to explicit user instructions over summary content.`,
workspacePath, workspacePath, workspacePath, workspacePath, toolsSection, workspacePath)
}
func (cb *ContextBuilder) buildToolsSection() string {
@@ -140,6 +150,226 @@ The following skills extend your capabilities. To use a skill, read its SKILL.md
return strings.Join(parts, "\n\n---\n\n")
}
// BuildSystemPromptWithCache returns the cached system prompt if available
// and source files haven't changed, otherwise builds and caches it.
// Source file changes are detected via mtime checks (cheap stat calls).
func (cb *ContextBuilder) BuildSystemPromptWithCache() string {
// Try read lock first — fast path when cache is valid
cb.systemPromptMutex.RLock()
if cb.cachedSystemPrompt != "" && !cb.sourceFilesChangedLocked() {
result := cb.cachedSystemPrompt
cb.systemPromptMutex.RUnlock()
return result
}
cb.systemPromptMutex.RUnlock()
// Acquire write lock for building
cb.systemPromptMutex.Lock()
defer cb.systemPromptMutex.Unlock()
// Double-check: another goroutine may have rebuilt while we waited
if cb.cachedSystemPrompt != "" && !cb.sourceFilesChangedLocked() {
return cb.cachedSystemPrompt
}
// Snapshot the baseline (existence + max mtime) BEFORE building the prompt.
// This way cachedAt reflects the pre-build state: if a file is modified
// during BuildSystemPrompt, its new mtime will be > baseline.maxMtime,
// so the next sourceFilesChangedLocked check will correctly trigger a
// rebuild. The alternative (baseline after build) risks caching stale
// content with a too-new baseline, making the staleness invisible.
baseline := cb.buildCacheBaseline()
prompt := cb.BuildSystemPrompt()
cb.cachedSystemPrompt = prompt
cb.cachedAt = baseline.maxMtime
cb.existedAtCache = baseline.existed
logger.DebugCF("agent", "System prompt cached",
map[string]any{
"length": len(prompt),
})
return prompt
}
// InvalidateCache clears the cached system prompt.
// Normally not needed because the cache auto-invalidates via mtime checks,
// but this is useful for tests or explicit reload commands.
func (cb *ContextBuilder) InvalidateCache() {
cb.systemPromptMutex.Lock()
defer cb.systemPromptMutex.Unlock()
cb.cachedSystemPrompt = ""
cb.cachedAt = time.Time{}
cb.existedAtCache = nil
logger.DebugCF("agent", "System prompt cache invalidated", nil)
}
// sourcePaths returns the workspace source file paths tracked for cache
// invalidation (bootstrap files + memory). The skills directory is handled
// separately in sourceFilesChangedLocked because it requires both directory-
// level and recursive file-level mtime checks.
func (cb *ContextBuilder) sourcePaths() []string {
return []string{
filepath.Join(cb.workspace, "AGENTS.md"),
filepath.Join(cb.workspace, "SOUL.md"),
filepath.Join(cb.workspace, "USER.md"),
filepath.Join(cb.workspace, "IDENTITY.md"),
filepath.Join(cb.workspace, "memory", "MEMORY.md"),
}
}
// cacheBaseline holds the file existence snapshot and the latest observed
// mtime across all tracked paths. Used as the cache reference point.
type cacheBaseline struct {
existed map[string]bool
maxMtime time.Time
}
// buildCacheBaseline records which tracked paths currently exist and computes
// the latest mtime across all tracked files + skills directory contents.
// Called under write lock when the cache is built.
func (cb *ContextBuilder) buildCacheBaseline() cacheBaseline {
skillsDir := filepath.Join(cb.workspace, "skills")
// All paths whose existence we track: source files + skills dir.
allPaths := append(cb.sourcePaths(), skillsDir)
existed := make(map[string]bool, len(allPaths))
var maxMtime time.Time
for _, p := range allPaths {
info, err := os.Stat(p)
existed[p] = err == nil
if err == nil && info.ModTime().After(maxMtime) {
maxMtime = info.ModTime()
}
}
// Walk skills files to capture their mtimes too.
// Use os.Stat (not d.Info) to match the stat method used in
// fileChangedSince / skillFilesModifiedSince for consistency.
_ = filepath.WalkDir(skillsDir, func(path string, d fs.DirEntry, walkErr error) error {
if walkErr == nil && !d.IsDir() {
if info, err := os.Stat(path); err == nil && info.ModTime().After(maxMtime) {
maxMtime = info.ModTime()
}
}
return nil
})
// If no tracked files exist yet (empty workspace), maxMtime is zero.
// Use a very old non-zero time so that:
// 1. cachedAt.IsZero() won't trigger perpetual rebuilds.
// 2. Any real file created afterwards has mtime > cachedAt, so it
// will be detected by fileChangedSince (unlike time.Now() which
// could race with a file whose mtime <= Now).
if maxMtime.IsZero() {
maxMtime = time.Unix(1, 0)
}
return cacheBaseline{existed: existed, maxMtime: maxMtime}
}
// sourceFilesChangedLocked checks whether any workspace source file has been
// modified, created, or deleted since the cache was last built.
//
// IMPORTANT: The caller MUST hold at least a read lock on systemPromptMutex.
// Go's sync.RWMutex is not reentrant, so this function must NOT acquire the
// lock itself (it would deadlock when called from BuildSystemPromptWithCache
// which already holds RLock or Lock).
func (cb *ContextBuilder) sourceFilesChangedLocked() bool {
if cb.cachedAt.IsZero() {
return true
}
// Check tracked source files (bootstrap + memory).
for _, p := range cb.sourcePaths() {
if cb.fileChangedSince(p) {
return true
}
}
// --- Skills directory (handled separately from sourcePaths) ---
//
// 1. Creation/deletion: tracked via existedAtCache, same as bootstrap files.
skillsDir := filepath.Join(cb.workspace, "skills")
if cb.fileChangedSince(skillsDir) {
return true
}
// 2. Structural changes (add/remove entries inside the dir) are reflected
// in the directory's own mtime, which fileChangedSince already checks.
//
// 3. Content-only edits to files inside skills/ do NOT update the parent
// directory mtime on most filesystems, so we recursively walk to check
// individual file mtimes at any nesting depth.
if skillFilesModifiedSince(skillsDir, cb.cachedAt) {
return true
}
return false
}
// fileChangedSince returns true if a tracked source file has been modified,
// newly created, or deleted since the cache was built.
//
// Four cases:
// - existed at cache time, exists now -> check mtime
// - existed at cache time, gone now -> changed (deleted)
// - absent at cache time, exists now -> changed (created)
// - absent at cache time, gone now -> no change
func (cb *ContextBuilder) fileChangedSince(path string) bool {
// Defensive: if existedAtCache was never initialised, treat as changed
// so the cache rebuilds rather than silently serving stale data.
if cb.existedAtCache == nil {
return true
}
existedBefore := cb.existedAtCache[path]
info, err := os.Stat(path)
existsNow := err == nil
if existedBefore != existsNow {
return true // file was created or deleted
}
if !existsNow {
return false // didn't exist before, doesn't exist now
}
return info.ModTime().After(cb.cachedAt)
}
// errWalkStop is a sentinel error used to stop filepath.WalkDir early.
// Using a dedicated error (instead of fs.SkipAll) makes the early-exit
// intent explicit and avoids the nilerr linter warning that would fire
// if the callback returned nil when its err parameter is non-nil.
var errWalkStop = errors.New("walk stop")
// skillFilesModifiedSince recursively walks the skills directory and checks
// whether any file was modified after t. This catches content-only edits at
// any nesting depth (e.g. skills/name/docs/extra.md) that don't update
// parent directory mtimes.
func skillFilesModifiedSince(skillsDir string, t time.Time) bool {
changed := false
err := filepath.WalkDir(skillsDir, func(path string, d fs.DirEntry, walkErr error) error {
if walkErr == nil && !d.IsDir() {
if info, statErr := os.Stat(path); statErr == nil && info.ModTime().After(t) {
changed = true
return errWalkStop // stop walking
}
}
return nil
})
// errWalkStop is expected (early exit on first changed file).
// os.IsNotExist means the skills dir doesn't exist yet — not an error.
// Any other error is unexpected and worth logging.
if err != nil && !errors.Is(err, errWalkStop) && !os.IsNotExist(err) {
logger.DebugCF("agent", "skills walk error", map[string]any{"error": err.Error()})
}
return changed
}
func (cb *ContextBuilder) LoadBootstrapFiles() string {
bootstrapFiles := []string{
"AGENTS.md",
@@ -159,6 +389,28 @@ func (cb *ContextBuilder) LoadBootstrapFiles() string {
return sb.String()
}
// buildDynamicContext returns a short dynamic context string with per-request info.
// This changes every request (time, session) so it is NOT part of the cached prompt.
// LLM-side KV cache reuse is achieved by each provider adapter's native mechanism:
// - Anthropic: per-block cache_control (ephemeral) on the static SystemParts block
// - OpenAI / Codex: prompt_cache_key for prefix-based caching
//
// See: https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching
// See: https://platform.openai.com/docs/guides/prompt-caching
func (cb *ContextBuilder) buildDynamicContext(channel, chatID string) string {
now := time.Now().Format("2006-01-02 15:04 (Monday)")
rt := fmt.Sprintf("%s %s, Go %s", runtime.GOOS, runtime.GOARCH, runtime.Version())
var sb strings.Builder
fmt.Fprintf(&sb, "## Current Time\n%s\n\n## Runtime\n%s", now, rt)
if channel != "" && chatID != "" {
fmt.Fprintf(&sb, "\n\n## Current Session\nChannel: %s\nChat ID: %s", channel, chatID)
}
return sb.String()
}
func (cb *ContextBuilder) BuildMessages(
history []providers.Message,
summary string,
@@ -168,23 +420,65 @@ func (cb *ContextBuilder) BuildMessages(
) []providers.Message {
messages := []providers.Message{}
systemPrompt := cb.BuildSystemPrompt()
// The static part (identity, bootstrap, skills, memory) is cached locally to
// avoid repeated file I/O and string building on every call (fixes issue #607).
// Dynamic parts (time, session, summary) are appended per request.
// Everything is sent as a single system message for provider compatibility:
// - Anthropic adapter extracts messages[0] (Role=="system") and maps its content
// to the top-level "system" parameter in the Messages API request. A single
// contiguous system block makes this extraction straightforward.
// - Codex maps only the first system message to its instructions field.
// - OpenAI-compat passes messages through as-is.
staticPrompt := cb.BuildSystemPromptWithCache()
// Add Current Session info if provided
if channel != "" && chatID != "" {
systemPrompt += fmt.Sprintf("\n\n## Current Session\nChannel: %s\nChat ID: %s", channel, chatID)
// Build short dynamic context (time, runtime, session) — changes per request
dynamicCtx := cb.buildDynamicContext(channel, chatID)
// Compose a single system message: static (cached) + dynamic + optional summary.
// Keeping all system content in one message ensures every provider adapter can
// extract it correctly (Anthropic adapter -> top-level system param,
// Codex -> instructions field).
//
// SystemParts carries the same content as structured blocks so that
// cache-aware adapters (Anthropic) can set per-block cache_control.
// The static block is marked "ephemeral" — its prefix hash is stable
// across requests, enabling LLM-side KV cache reuse.
stringParts := []string{staticPrompt, dynamicCtx}
contentBlocks := []providers.ContentBlock{
{Type: "text", Text: staticPrompt, CacheControl: &providers.CacheControl{Type: "ephemeral"}},
{Type: "text", Text: dynamicCtx},
}
// Log system prompt summary for debugging (debug mode only)
if summary != "" {
summaryText := fmt.Sprintf(
"CONTEXT_SUMMARY: The following is an approximate summary of prior conversation "+
"for reference only. It may be incomplete or outdated — always defer to explicit instructions.\n\n%s",
summary)
stringParts = append(stringParts, summaryText)
contentBlocks = append(contentBlocks, providers.ContentBlock{Type: "text", Text: summaryText})
}
fullSystemPrompt := strings.Join(stringParts, "\n\n---\n\n")
// Log system prompt summary for debugging (debug mode only).
// Read cachedSystemPrompt under lock to avoid a data race with
// concurrent InvalidateCache / BuildSystemPromptWithCache writes.
cb.systemPromptMutex.RLock()
isCached := cb.cachedSystemPrompt != ""
cb.systemPromptMutex.RUnlock()
logger.DebugCF("agent", "System prompt built",
map[string]any{
"total_chars": len(systemPrompt),
"total_lines": strings.Count(systemPrompt, "\n") + 1,
"section_count": strings.Count(systemPrompt, "\n\n---\n\n") + 1,
"static_chars": len(staticPrompt),
"dynamic_chars": len(dynamicCtx),
"total_chars": len(fullSystemPrompt),
"has_summary": summary != "",
"cached": isCached,
})
// Log preview of system prompt (avoid logging huge content)
preview := systemPrompt
preview := fullSystemPrompt
if len(preview) > 500 {
preview = preview[:500] + "... (truncated)"
}
@@ -193,19 +487,21 @@ func (cb *ContextBuilder) BuildMessages(
"preview": preview,
})
if summary != "" {
systemPrompt += "\n\n## Summary of Previous Conversation\n\n" + summary
}
history = sanitizeHistoryForProvider(history)
// Single system message containing all context — compatible with all providers.
// SystemParts enables cache-aware adapters to set per-block cache_control;
// Content is the concatenated fallback for adapters that don't read SystemParts.
messages = append(messages, providers.Message{
Role: "system",
Content: systemPrompt,
Role: "system",
Content: fullSystemPrompt,
SystemParts: contentBlocks,
})
// Add conversation history
messages = append(messages, history...)
// Add current user message
if strings.TrimSpace(currentMessage) != "" {
messages = append(messages, providers.Message{
Role: "user",
@@ -224,6 +520,14 @@ func sanitizeHistoryForProvider(history []providers.Message) []providers.Message
sanitized := make([]providers.Message, 0, len(history))
for _, msg := range history {
switch msg.Role {
case "system":
// Drop system messages from history. BuildMessages always
// constructs its own single system message (static + dynamic +
// summary); extra system messages would break providers that
// only accept one (Anthropic, Codex).
logger.DebugCF("agent", "Dropping system message from history", map[string]any{})
continue
case "tool":
if len(sanitized) == 0 {
logger.DebugCF("agent", "Dropping orphaned leading tool message", map[string]any{})
+513
View File
@@ -0,0 +1,513 @@
package agent
import (
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
"github.com/sipeed/picoclaw/pkg/providers"
)
// setupWorkspace creates a temporary workspace with standard directories and optional files.
// Returns the tmpDir path; caller should defer os.RemoveAll(tmpDir).
func setupWorkspace(t *testing.T, files map[string]string) string {
t.Helper()
tmpDir, err := os.MkdirTemp("", "picoclaw-test-*")
if err != nil {
t.Fatal(err)
}
os.MkdirAll(filepath.Join(tmpDir, "memory"), 0o755)
os.MkdirAll(filepath.Join(tmpDir, "skills"), 0o755)
for name, content := range files {
dir := filepath.Dir(filepath.Join(tmpDir, name))
os.MkdirAll(dir, 0o755)
if err := os.WriteFile(filepath.Join(tmpDir, name), []byte(content), 0o644); err != nil {
t.Fatal(err)
}
}
return tmpDir
}
// TestSingleSystemMessage verifies that BuildMessages always produces exactly one
// system message regardless of summary/history variations.
// Fix: multiple system messages break Anthropic (top-level system param) and
// Codex (only reads last system message as instructions).
func TestSingleSystemMessage(t *testing.T) {
tmpDir := setupWorkspace(t, map[string]string{
"IDENTITY.md": "# Identity\nTest agent.",
})
defer os.RemoveAll(tmpDir)
cb := NewContextBuilder(tmpDir)
tests := []struct {
name string
history []providers.Message
summary string
message string
}{
{
name: "no summary, no history",
summary: "",
message: "hello",
},
{
name: "with summary",
summary: "Previous conversation discussed X",
message: "hello",
},
{
name: "with history and summary",
history: []providers.Message{
{Role: "user", Content: "hi"},
{Role: "assistant", Content: "hello"},
},
summary: strings.Repeat("Long summary text. ", 50),
message: "new message",
},
{
name: "system message in history is filtered",
history: []providers.Message{
{Role: "system", Content: "stale system prompt from previous session"},
{Role: "user", Content: "hi"},
{Role: "assistant", Content: "hello"},
},
summary: "",
message: "new message",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
msgs := cb.BuildMessages(tt.history, tt.summary, tt.message, nil, "test", "chat1")
systemCount := 0
for _, m := range msgs {
if m.Role == "system" {
systemCount++
}
}
if systemCount != 1 {
t.Errorf("expected exactly 1 system message, got %d", systemCount)
}
if msgs[0].Role != "system" {
t.Errorf("first message should be system, got %s", msgs[0].Role)
}
if msgs[len(msgs)-1].Role != "user" {
t.Errorf("last message should be user, got %s", msgs[len(msgs)-1].Role)
}
// System message must contain identity (static) and time (dynamic)
sys := msgs[0].Content
if !strings.Contains(sys, "picoclaw") {
t.Error("system message missing identity")
}
if !strings.Contains(sys, "Current Time") {
t.Error("system message missing dynamic time context")
}
// Summary handling
if tt.summary != "" {
if !strings.Contains(sys, "CONTEXT_SUMMARY:") {
t.Error("summary present but CONTEXT_SUMMARY prefix missing")
}
if !strings.Contains(sys, tt.summary[:20]) {
t.Error("summary content not found in system message")
}
} else {
if strings.Contains(sys, "CONTEXT_SUMMARY:") {
t.Error("CONTEXT_SUMMARY should not appear without summary")
}
}
})
}
}
// TestMtimeAutoInvalidation verifies that the cache detects source file changes
// via mtime without requiring explicit InvalidateCache().
// Fix: original implementation had no auto-invalidation — edits to bootstrap files,
// memory, or skills were invisible until process restart.
func TestMtimeAutoInvalidation(t *testing.T) {
tests := []struct {
name string
file string // relative path inside workspace
contentV1 string
contentV2 string
checkField string // substring to verify in rebuilt prompt
}{
{
name: "bootstrap file change",
file: "IDENTITY.md",
contentV1: "# Original Identity",
contentV2: "# Updated Identity",
checkField: "Updated Identity",
},
{
name: "memory file change",
file: "memory/MEMORY.md",
contentV1: "# Memory\nUser likes Go.",
contentV2: "# Memory\nUser likes Rust.",
checkField: "User likes Rust",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tmpDir := setupWorkspace(t, map[string]string{tt.file: tt.contentV1})
defer os.RemoveAll(tmpDir)
cb := NewContextBuilder(tmpDir)
sp1 := cb.BuildSystemPromptWithCache()
// Overwrite file and set future mtime to ensure detection.
// Use 2s offset for filesystem mtime resolution safety (some FS
// have 1s or coarser granularity, especially in CI containers).
fullPath := filepath.Join(tmpDir, tt.file)
os.WriteFile(fullPath, []byte(tt.contentV2), 0o644)
future := time.Now().Add(2 * time.Second)
os.Chtimes(fullPath, future, future)
// Verify sourceFilesChangedLocked detects the mtime change
cb.systemPromptMutex.RLock()
changed := cb.sourceFilesChangedLocked()
cb.systemPromptMutex.RUnlock()
if !changed {
t.Fatalf("sourceFilesChangedLocked() should detect %s change", tt.file)
}
// Should auto-rebuild without explicit InvalidateCache()
sp2 := cb.BuildSystemPromptWithCache()
if sp1 == sp2 {
t.Errorf("cache not rebuilt after %s change", tt.file)
}
if !strings.Contains(sp2, tt.checkField) {
t.Errorf("rebuilt prompt missing expected content %q", tt.checkField)
}
})
}
// Skills directory mtime change
t.Run("skills dir change", func(t *testing.T) {
tmpDir := setupWorkspace(t, nil)
defer os.RemoveAll(tmpDir)
cb := NewContextBuilder(tmpDir)
_ = cb.BuildSystemPromptWithCache() // populate cache
// Touch skills directory (simulate new skill installed)
skillsDir := filepath.Join(tmpDir, "skills")
future := time.Now().Add(2 * time.Second)
os.Chtimes(skillsDir, future, future)
// Verify sourceFilesChangedLocked detects it (cache is rebuilt)
// We confirm by checking internal state: a second call should rebuild.
cb.systemPromptMutex.RLock()
changed := cb.sourceFilesChangedLocked()
cb.systemPromptMutex.RUnlock()
if !changed {
t.Error("sourceFilesChangedLocked() should detect skills dir mtime change")
}
})
}
// TestExplicitInvalidateCache verifies that InvalidateCache() forces a rebuild
// even when source files haven't changed (useful for tests and reload commands).
func TestExplicitInvalidateCache(t *testing.T) {
tmpDir := setupWorkspace(t, map[string]string{
"IDENTITY.md": "# Test Identity",
})
defer os.RemoveAll(tmpDir)
cb := NewContextBuilder(tmpDir)
sp1 := cb.BuildSystemPromptWithCache()
cb.InvalidateCache()
sp2 := cb.BuildSystemPromptWithCache()
if sp1 != sp2 {
t.Error("prompt should be identical after invalidate+rebuild when files unchanged")
}
// Verify cachedAt was reset
cb.InvalidateCache()
cb.systemPromptMutex.RLock()
if !cb.cachedAt.IsZero() {
t.Error("cachedAt should be zero after InvalidateCache()")
}
cb.systemPromptMutex.RUnlock()
}
// TestCacheStability verifies that the static prompt is stable across repeated calls
// when no files change (regression test for issue #607).
func TestCacheStability(t *testing.T) {
tmpDir := setupWorkspace(t, map[string]string{
"IDENTITY.md": "# Identity\nContent",
"SOUL.md": "# Soul\nContent",
})
defer os.RemoveAll(tmpDir)
cb := NewContextBuilder(tmpDir)
results := make([]string, 5)
for i := range results {
results[i] = cb.BuildSystemPromptWithCache()
}
for i := 1; i < len(results); i++ {
if results[i] != results[0] {
t.Errorf("cached prompt changed between call 0 and %d", i)
}
}
// Static prompt must NOT contain per-request data
if strings.Contains(results[0], "Current Time") {
t.Error("static cached prompt should not contain time (added dynamically)")
}
}
// TestNewFileCreationInvalidatesCache verifies that creating a source file that
// did not exist when the cache was built triggers a cache rebuild.
// This catches the "from nothing to something" edge case that the old
// modifiedSince (return false on stat error) would miss.
func TestNewFileCreationInvalidatesCache(t *testing.T) {
tests := []struct {
name string
file string // relative path inside workspace
content string
checkField string // substring to verify in rebuilt prompt
}{
{
name: "new bootstrap file",
file: "SOUL.md",
content: "# Soul\nBe kind and helpful.",
checkField: "Be kind and helpful",
},
{
name: "new memory file",
file: "memory/MEMORY.md",
content: "# Memory\nUser prefers dark mode.",
checkField: "User prefers dark mode",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Start with an empty workspace (no bootstrap/memory files)
tmpDir := setupWorkspace(t, nil)
defer os.RemoveAll(tmpDir)
cb := NewContextBuilder(tmpDir)
// Populate cache — file does not exist yet
sp1 := cb.BuildSystemPromptWithCache()
if strings.Contains(sp1, tt.checkField) {
t.Fatalf("prompt should not contain %q before file is created", tt.checkField)
}
// Create the file after cache was built
fullPath := filepath.Join(tmpDir, tt.file)
os.MkdirAll(filepath.Dir(fullPath), 0o755)
if err := os.WriteFile(fullPath, []byte(tt.content), 0o644); err != nil {
t.Fatal(err)
}
// Set future mtime to guarantee detection
future := time.Now().Add(2 * time.Second)
os.Chtimes(fullPath, future, future)
// Cache should auto-invalidate because file went from absent -> present
sp2 := cb.BuildSystemPromptWithCache()
if !strings.Contains(sp2, tt.checkField) {
t.Errorf("cache not invalidated on new file creation: expected %q in prompt", tt.checkField)
}
})
}
}
// TestSkillFileContentChange verifies that modifying a skill file's content
// (not just the directory structure) invalidates the cache.
// This is the scenario where directory mtime alone is insufficient — on most
// filesystems, editing a file inside a directory does NOT update the parent
// directory's mtime.
func TestSkillFileContentChange(t *testing.T) {
skillMD := `---
name: test-skill
description: "A test skill"
---
# Test Skill v1
Original content.`
tmpDir := setupWorkspace(t, map[string]string{
"skills/test-skill/SKILL.md": skillMD,
})
defer os.RemoveAll(tmpDir)
cb := NewContextBuilder(tmpDir)
// Populate cache
sp1 := cb.BuildSystemPromptWithCache()
_ = sp1 // cache is warm
// Modify the skill file content (without touching the skills/ directory)
updatedSkillMD := `---
name: test-skill
description: "An updated test skill"
---
# Test Skill v2
Updated content.`
skillPath := filepath.Join(tmpDir, "skills", "test-skill", "SKILL.md")
if err := os.WriteFile(skillPath, []byte(updatedSkillMD), 0o644); err != nil {
t.Fatal(err)
}
// Set future mtime on the skill file only (NOT the directory)
future := time.Now().Add(2 * time.Second)
os.Chtimes(skillPath, future, future)
// Verify that sourceFilesChangedLocked detects the content change
cb.systemPromptMutex.RLock()
changed := cb.sourceFilesChangedLocked()
cb.systemPromptMutex.RUnlock()
if !changed {
t.Error("sourceFilesChangedLocked() should detect skill file content change")
}
// Verify cache is actually rebuilt with new content
sp2 := cb.BuildSystemPromptWithCache()
if sp1 == sp2 && strings.Contains(sp1, "test-skill") {
// If the skill appeared in the prompt and the prompt didn't change,
// the cache was not invalidated.
t.Error("cache should be invalidated when skill file content changes")
}
}
// TestConcurrentBuildSystemPromptWithCache verifies that multiple goroutines
// can safely call BuildSystemPromptWithCache concurrently without producing
// empty results, panics, or data races.
// Run with: go test -race ./pkg/agent/ -run TestConcurrentBuildSystemPromptWithCache
func TestConcurrentBuildSystemPromptWithCache(t *testing.T) {
tmpDir := setupWorkspace(t, map[string]string{
"IDENTITY.md": "# Identity\nConcurrency test agent.",
"SOUL.md": "# Soul\nBe helpful.",
"memory/MEMORY.md": "# Memory\nUser prefers Go.",
"skills/demo/SKILL.md": "---\nname: demo\ndescription: \"demo skill\"\n---\n# Demo",
})
defer os.RemoveAll(tmpDir)
cb := NewContextBuilder(tmpDir)
const goroutines = 20
const iterations = 50
var wg sync.WaitGroup
errs := make(chan string, goroutines*iterations)
for g := 0; g < goroutines; g++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for i := 0; i < iterations; i++ {
result := cb.BuildSystemPromptWithCache()
if result == "" {
errs <- "empty prompt returned"
return
}
if !strings.Contains(result, "picoclaw") {
errs <- "prompt missing identity"
return
}
// Also exercise BuildMessages concurrently
msgs := cb.BuildMessages(nil, "", "hello", nil, "test", "chat")
if len(msgs) < 2 {
errs <- "BuildMessages returned fewer than 2 messages"
return
}
if msgs[0].Role != "system" {
errs <- "first message not system"
return
}
// Occasionally invalidate to exercise the write path
if i%10 == 0 {
cb.InvalidateCache()
}
}
}(g)
}
wg.Wait()
close(errs)
for errMsg := range errs {
t.Errorf("concurrent access error: %s", errMsg)
}
}
// BenchmarkBuildMessagesWithCache measures caching performance.
// TestEmptyWorkspaceBaselineDetectsNewFiles verifies that when the cache is
// built on an empty workspace (no tracked files exist), creating a file
// afterwards still triggers cache invalidation. This validates the
// time.Unix(1, 0) fallback for maxMtime: any real file's mtime is after epoch,
// so fileChangedSince correctly detects the absent -> present transition AND
// the mtime comparison succeeds even without artificially inflated Chtimes.
func TestEmptyWorkspaceBaselineDetectsNewFiles(t *testing.T) {
// Empty workspace: no bootstrap files, no memory, no skills content.
tmpDir := setupWorkspace(t, nil)
defer os.RemoveAll(tmpDir)
cb := NewContextBuilder(tmpDir)
// Build cache — all tracked files are absent, maxMtime falls back to epoch.
sp1 := cb.BuildSystemPromptWithCache()
// Create a bootstrap file with natural mtime (no Chtimes manipulation).
// The file's mtime should be the current wall-clock time, which is
// strictly after time.Unix(1, 0).
soulPath := filepath.Join(tmpDir, "SOUL.md")
if err := os.WriteFile(soulPath, []byte("# Soul\nNewly created."), 0o644); err != nil {
t.Fatal(err)
}
// Cache should detect the new file via existedAtCache (absent -> present).
cb.systemPromptMutex.RLock()
changed := cb.sourceFilesChangedLocked()
cb.systemPromptMutex.RUnlock()
if !changed {
t.Fatal("sourceFilesChangedLocked should detect newly created file on empty workspace")
}
sp2 := cb.BuildSystemPromptWithCache()
if !strings.Contains(sp2, "Newly created") {
t.Error("rebuilt prompt should contain new file content")
}
if sp1 == sp2 {
t.Error("cache should have been invalidated after file creation")
}
}
// BenchmarkBuildMessagesWithCache measures caching performance.
func BenchmarkBuildMessagesWithCache(b *testing.B) {
tmpDir, _ := os.MkdirTemp("", "picoclaw-bench-*")
defer os.RemoveAll(tmpDir)
os.MkdirAll(filepath.Join(tmpDir, "memory"), 0o755)
os.MkdirAll(filepath.Join(tmpDir, "skills"), 0o755)
for _, name := range []string{"IDENTITY.md", "SOUL.md", "USER.md"} {
os.WriteFile(filepath.Join(tmpDir, name), []byte(strings.Repeat("Content.\n", 10)), 0o644)
}
cb := NewContextBuilder(tmpDir)
history := []providers.Message{
{Role: "user", Content: "previous message"},
{Role: "assistant", Content: "previous response"},
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = cb.BuildMessages(history, "summary", "new message", nil, "cli", "test")
}
}
+12 -8
View File
@@ -524,8 +524,9 @@ func (al *AgentLoop) runLLMIteration(
fbResult, fbErr := al.fallback.Execute(ctx, agent.Candidates,
func(ctx context.Context, provider, model string) (*providers.LLMResponse, error) {
return agent.Provider.Chat(ctx, messages, providerToolDefs, model, map[string]any{
"max_tokens": agent.MaxTokens,
"temperature": agent.Temperature,
"max_tokens": agent.MaxTokens,
"temperature": agent.Temperature,
"prompt_cache_key": agent.ID,
})
},
)
@@ -540,8 +541,9 @@ func (al *AgentLoop) runLLMIteration(
return fbResult.Response, nil
}
return agent.Provider.Chat(ctx, messages, providerToolDefs, agent.Model, map[string]any{
"max_tokens": agent.MaxTokens,
"temperature": agent.Temperature,
"max_tokens": agent.MaxTokens,
"temperature": agent.Temperature,
"prompt_cache_key": agent.ID,
})
}
@@ -962,8 +964,9 @@ func (al *AgentLoop) summarizeSession(agent *AgentInstance, sessionKey string) {
nil,
agent.Model,
map[string]any{
"max_tokens": 1024,
"temperature": 0.3,
"max_tokens": 1024,
"temperature": 0.3,
"prompt_cache_key": agent.ID,
},
)
if err == nil {
@@ -1012,8 +1015,9 @@ func (al *AgentLoop) summarizeBatch(
nil,
agent.Model,
map[string]any{
"max_tokens": 1024,
"temperature": 0.3,
"max_tokens": 1024,
"temperature": 0.3,
"prompt_cache_key": agent.ID,
},
)
if err != nil {
+14 -1
View File
@@ -113,7 +113,20 @@ func buildParams(
for _, msg := range messages {
switch msg.Role {
case "system":
system = append(system, anthropic.TextBlockParam{Text: msg.Content})
// Prefer structured SystemParts for per-block cache_control.
// This enables LLM-side KV cache reuse: the static block's prefix
// hash stays stable across requests while dynamic parts change freely.
if len(msg.SystemParts) > 0 {
for _, part := range msg.SystemParts {
block := anthropic.TextBlockParam{Text: part.Text}
if part.CacheControl != nil && part.CacheControl.Type == "ephemeral" {
block.CacheControl = anthropic.NewCacheControlEphemeralParam()
}
system = append(system, block)
}
} else {
system = append(system, anthropic.TextBlockParam{Text: msg.Content})
}
case "user":
if msg.ToolCallID != "" {
anthropicMessages = append(anthropicMessages,
+12
View File
@@ -208,6 +208,11 @@ func buildCodexParams(
for _, msg := range messages {
switch msg.Role {
case "system":
// Use the full concatenated system prompt (static + dynamic + summary)
// as instructions. This keeps behavior consistent with Anthropic and
// OpenAI-compat adapters where the complete system context lives in
// one place. Prefix caching is handled by prompt_cache_key below,
// not by splitting content across instructions vs input messages.
instructions = msg.Content
case "user":
if msg.ToolCallID != "" {
@@ -289,6 +294,13 @@ func buildCodexParams(
params.Instructions = openai.Opt(defaultCodexInstructions)
}
// Prompt caching: pass a stable cache key so OpenAI can bucket requests
// and reuse prefix KV cache across calls with the same key.
// See: https://platform.openai.com/docs/guides/prompt-caching
if cacheKey, ok := options["prompt_cache_key"].(string); ok && cacheKey != "" {
params.PromptCacheKey = openai.Opt(cacheKey)
}
if len(tools) > 0 || enableWebSearch {
params.Tools = translateToolsForCodex(tools, enableWebSearch)
}
+35 -1
View File
@@ -77,7 +77,7 @@ func (p *Provider) Chat(
requestBody := map[string]any{
"model": model,
"messages": messages,
"messages": stripSystemParts(messages),
}
if len(tools) > 0 {
@@ -111,6 +111,14 @@ func (p *Provider) Chat(
}
}
// Prompt caching: pass a stable cache key so OpenAI can bucket requests
// with the same key and reuse prefix KV cache across calls.
// The key is typically the agent ID — stable per agent, shared across requests.
// See: https://platform.openai.com/docs/guides/prompt-caching
if cacheKey, ok := options["prompt_cache_key"].(string); ok && cacheKey != "" {
requestBody["prompt_cache_key"] = cacheKey
}
jsonData, err := json.Marshal(requestBody)
if err != nil {
return nil, fmt.Errorf("failed to marshal request: %w", err)
@@ -230,6 +238,32 @@ func parseResponse(body []byte) (*LLMResponse, error) {
}, nil
}
// openaiMessage is the wire-format message for OpenAI-compatible APIs.
// It mirrors protocoltypes.Message but omits SystemParts, which is an
// internal field that would be unknown to third-party endpoints.
type openaiMessage struct {
Role string `json:"role"`
Content string `json:"content"`
ToolCalls []ToolCall `json:"tool_calls,omitempty"`
ToolCallID string `json:"tool_call_id,omitempty"`
}
// stripSystemParts converts []Message to []openaiMessage, dropping the
// SystemParts field so it doesn't leak into the JSON payload sent to
// OpenAI-compatible APIs (some strict endpoints reject unknown fields).
func stripSystemParts(messages []Message) []openaiMessage {
out := make([]openaiMessage, len(messages))
for i, m := range messages {
out[i] = openaiMessage{
Role: m.Role,
Content: m.Content,
ToolCalls: m.ToolCalls,
ToolCallID: m.ToolCallID,
}
}
return out
}
func normalizeModel(model, apiBase string) string {
idx := strings.Index(model, "/")
if idx == -1 {
+21 -5
View File
@@ -38,12 +38,28 @@ type UsageInfo struct {
TotalTokens int `json:"total_tokens"`
}
// CacheControl marks a content block for LLM-side prefix caching.
// Currently only "ephemeral" is supported (used by Anthropic).
type CacheControl struct {
Type string `json:"type"` // "ephemeral"
}
// ContentBlock represents a structured segment of a system message.
// Adapters that understand SystemParts can use these blocks to set
// per-block cache control (e.g. Anthropic's cache_control: ephemeral).
type ContentBlock struct {
Type string `json:"type"` // "text"
Text string `json:"text"`
CacheControl *CacheControl `json:"cache_control,omitempty"`
}
type Message struct {
Role string `json:"role"`
Content string `json:"content"`
ReasoningContent string `json:"reasoning_content,omitempty"`
ToolCalls []ToolCall `json:"tool_calls,omitempty"`
ToolCallID string `json:"tool_call_id,omitempty"`
Role string `json:"role"`
Content string `json:"content"`
ReasoningContent string `json:"reasoning_content,omitempty"`
SystemParts []ContentBlock `json:"system_parts,omitempty"` // structured system blocks for cache-aware adapters
ToolCalls []ToolCall `json:"tool_calls,omitempty"`
ToolCallID string `json:"tool_call_id,omitempty"`
}
type ToolDefinition struct {
+2
View File
@@ -17,6 +17,8 @@ type (
ToolFunctionDefinition = protocoltypes.ToolFunctionDefinition
ExtraContent = protocoltypes.ExtraContent
GoogleExtra = protocoltypes.GoogleExtra
ContentBlock = protocoltypes.ContentBlock
CacheControl = protocoltypes.CacheControl
)
type LLMProvider interface {
+27 -12
View File
@@ -3,6 +3,7 @@ package tools
import (
"context"
"fmt"
"sort"
"sync"
"time"
@@ -107,13 +108,27 @@ func (r *ToolRegistry) ExecuteWithContext(
return result
}
// sortedToolNames returns tool names in sorted order for deterministic iteration.
// This is critical for KV cache stability: non-deterministic map iteration would
// produce different system prompts and tool definitions on each call, invalidating
// the LLM's prefix cache even when no tools have changed.
func (r *ToolRegistry) sortedToolNames() []string {
names := make([]string, 0, len(r.tools))
for name := range r.tools {
names = append(names, name)
}
sort.Strings(names)
return names
}
func (r *ToolRegistry) GetDefinitions() []map[string]any {
r.mu.RLock()
defer r.mu.RUnlock()
definitions := make([]map[string]any, 0, len(r.tools))
for _, tool := range r.tools {
definitions = append(definitions, ToolToSchema(tool))
sorted := r.sortedToolNames()
definitions := make([]map[string]any, 0, len(sorted))
for _, name := range sorted {
definitions = append(definitions, ToolToSchema(r.tools[name]))
}
return definitions
}
@@ -124,8 +139,10 @@ func (r *ToolRegistry) ToProviderDefs() []providers.ToolDefinition {
r.mu.RLock()
defer r.mu.RUnlock()
definitions := make([]providers.ToolDefinition, 0, len(r.tools))
for _, tool := range r.tools {
sorted := r.sortedToolNames()
definitions := make([]providers.ToolDefinition, 0, len(sorted))
for _, name := range sorted {
tool := r.tools[name]
schema := ToolToSchema(tool)
// Safely extract nested values with type checks
@@ -155,11 +172,7 @@ func (r *ToolRegistry) List() []string {
r.mu.RLock()
defer r.mu.RUnlock()
names := make([]string, 0, len(r.tools))
for name := range r.tools {
names = append(names, name)
}
return names
return r.sortedToolNames()
}
// Count returns the number of registered tools.
@@ -175,8 +188,10 @@ func (r *ToolRegistry) GetSummaries() []string {
r.mu.RLock()
defer r.mu.RUnlock()
summaries := make([]string, 0, len(r.tools))
for _, tool := range r.tools {
sorted := r.sortedToolNames()
summaries := make([]string, 0, len(sorted))
for _, name := range sorted {
tool := r.tools[name]
summaries = append(summaries, fmt.Sprintf("- `%s` - %s", tool.Name(), tool.Description()))
}
return summaries