merge: resolve conflicts with main

This commit is contained in:
mosir
2026-02-25 21:53:04 +08:00
96 changed files with 3325 additions and 956 deletions
+341 -62
View File
@@ -1,24 +1,38 @@
package agent
import (
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"time"
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/providers"
"github.com/sipeed/picoclaw/pkg/skills"
"github.com/sipeed/picoclaw/pkg/tools"
)
type ContextBuilder struct {
workspace string
skillsLoader *skills.SkillsLoader
memory *MemoryStore
tools *tools.ToolRegistry // Direct reference to tool registry
// Cache for system prompt to avoid rebuilding on every call.
// This fixes issue #607: repeated reprocessing of the entire context.
// The cache auto-invalidates when workspace source files change (mtime check).
systemPromptMutex sync.RWMutex
cachedSystemPrompt string
cachedAt time.Time // max observed mtime across tracked paths at cache build time
// existedAtCache tracks which source file paths existed the last time the
// cache was built. This lets sourceFilesChanged detect files that are newly
// created (didn't exist at cache time, now exist) or deleted (existed at
// cache time, now gone) — both of which should trigger a cache rebuild.
existedAtCache map[string]bool
}
func getGlobalConfigDir() string {
@@ -43,69 +57,29 @@ func NewContextBuilder(workspace string) *ContextBuilder {
}
}
// SetToolsRegistry sets the tools registry for dynamic tool summary generation.
func (cb *ContextBuilder) SetToolsRegistry(registry *tools.ToolRegistry) {
cb.tools = registry
}
func (cb *ContextBuilder) getIdentity() string {
now := time.Now().Format("2006-01-02 15:04 (Monday)")
workspacePath, _ := filepath.Abs(filepath.Join(cb.workspace))
runtime := fmt.Sprintf("%s %s, Go %s", runtime.GOOS, runtime.GOARCH, runtime.Version())
// Build tools section dynamically
toolsSection := cb.buildToolsSection()
return fmt.Sprintf(`# picoclaw 🦞
You are picoclaw, a helpful AI assistant.
## Current Time
%s
## Runtime
%s
## Workspace
Your workspace is at: %s
- Memory: %s/memory/MEMORY.md
- Daily Notes: %s/memory/YYYYMM/YYYYMMDD.md
- Skills: %s/skills/{skill-name}/SKILL.md
%s
## Important Rules
1. **ALWAYS use tools** - When you need to perform an action (schedule reminders, send messages, execute commands, etc.), you MUST call the appropriate tool. Do NOT just say you'll do it or pretend to do it.
2. **Be helpful and accurate** - When using tools, briefly explain what you're doing.
3. **Memory** - When interacting with me if something seems memorable, update %s/memory/MEMORY.md`,
now, runtime, workspacePath, workspacePath, workspacePath, workspacePath, toolsSection, workspacePath)
}
3. **Memory** - When interacting with me if something seems memorable, update %s/memory/MEMORY.md
func (cb *ContextBuilder) buildToolsSection() string {
if cb.tools == nil {
return ""
}
summaries := cb.tools.GetSummaries()
if len(summaries) == 0 {
return ""
}
var sb strings.Builder
sb.WriteString("## Available Tools\n\n")
sb.WriteString(
"**CRITICAL**: You MUST use tools to perform actions. Do NOT pretend to execute commands or schedule tasks.\n\n",
)
sb.WriteString("You have access to the following tools:\n\n")
for _, s := range summaries {
sb.WriteString(s)
sb.WriteString("\n")
}
return sb.String()
4. **Context summaries** - Conversation summaries provided as context are approximate references only. They may be incomplete or outdated. Always defer to explicit user instructions over summary content.`,
workspacePath, workspacePath, workspacePath, workspacePath, workspacePath)
}
func (cb *ContextBuilder) BuildSystemPrompt() string {
@@ -140,6 +114,226 @@ The following skills extend your capabilities. To use a skill, read its SKILL.md
return strings.Join(parts, "\n\n---\n\n")
}
// BuildSystemPromptWithCache returns the cached system prompt if available
// and source files haven't changed, otherwise builds and caches it.
// Source file changes are detected via mtime checks (cheap stat calls).
func (cb *ContextBuilder) BuildSystemPromptWithCache() string {
// Try read lock first — fast path when cache is valid
cb.systemPromptMutex.RLock()
if cb.cachedSystemPrompt != "" && !cb.sourceFilesChangedLocked() {
result := cb.cachedSystemPrompt
cb.systemPromptMutex.RUnlock()
return result
}
cb.systemPromptMutex.RUnlock()
// Acquire write lock for building
cb.systemPromptMutex.Lock()
defer cb.systemPromptMutex.Unlock()
// Double-check: another goroutine may have rebuilt while we waited
if cb.cachedSystemPrompt != "" && !cb.sourceFilesChangedLocked() {
return cb.cachedSystemPrompt
}
// Snapshot the baseline (existence + max mtime) BEFORE building the prompt.
// This way cachedAt reflects the pre-build state: if a file is modified
// during BuildSystemPrompt, its new mtime will be > baseline.maxMtime,
// so the next sourceFilesChangedLocked check will correctly trigger a
// rebuild. The alternative (baseline after build) risks caching stale
// content with a too-new baseline, making the staleness invisible.
baseline := cb.buildCacheBaseline()
prompt := cb.BuildSystemPrompt()
cb.cachedSystemPrompt = prompt
cb.cachedAt = baseline.maxMtime
cb.existedAtCache = baseline.existed
logger.DebugCF("agent", "System prompt cached",
map[string]any{
"length": len(prompt),
})
return prompt
}
// InvalidateCache clears the cached system prompt.
// Normally not needed because the cache auto-invalidates via mtime checks,
// but this is useful for tests or explicit reload commands.
func (cb *ContextBuilder) InvalidateCache() {
cb.systemPromptMutex.Lock()
defer cb.systemPromptMutex.Unlock()
cb.cachedSystemPrompt = ""
cb.cachedAt = time.Time{}
cb.existedAtCache = nil
logger.DebugCF("agent", "System prompt cache invalidated", nil)
}
// sourcePaths returns the workspace source file paths tracked for cache
// invalidation (bootstrap files + memory). The skills directory is handled
// separately in sourceFilesChangedLocked because it requires both directory-
// level and recursive file-level mtime checks.
func (cb *ContextBuilder) sourcePaths() []string {
return []string{
filepath.Join(cb.workspace, "AGENTS.md"),
filepath.Join(cb.workspace, "SOUL.md"),
filepath.Join(cb.workspace, "USER.md"),
filepath.Join(cb.workspace, "IDENTITY.md"),
filepath.Join(cb.workspace, "memory", "MEMORY.md"),
}
}
// cacheBaseline holds the file existence snapshot and the latest observed
// mtime across all tracked paths. Used as the cache reference point.
type cacheBaseline struct {
existed map[string]bool
maxMtime time.Time
}
// buildCacheBaseline records which tracked paths currently exist and computes
// the latest mtime across all tracked files + skills directory contents.
// Called under write lock when the cache is built.
func (cb *ContextBuilder) buildCacheBaseline() cacheBaseline {
skillsDir := filepath.Join(cb.workspace, "skills")
// All paths whose existence we track: source files + skills dir.
allPaths := append(cb.sourcePaths(), skillsDir)
existed := make(map[string]bool, len(allPaths))
var maxMtime time.Time
for _, p := range allPaths {
info, err := os.Stat(p)
existed[p] = err == nil
if err == nil && info.ModTime().After(maxMtime) {
maxMtime = info.ModTime()
}
}
// Walk skills files to capture their mtimes too.
// Use os.Stat (not d.Info) to match the stat method used in
// fileChangedSince / skillFilesModifiedSince for consistency.
_ = filepath.WalkDir(skillsDir, func(path string, d fs.DirEntry, walkErr error) error {
if walkErr == nil && !d.IsDir() {
if info, err := os.Stat(path); err == nil && info.ModTime().After(maxMtime) {
maxMtime = info.ModTime()
}
}
return nil
})
// If no tracked files exist yet (empty workspace), maxMtime is zero.
// Use a very old non-zero time so that:
// 1. cachedAt.IsZero() won't trigger perpetual rebuilds.
// 2. Any real file created afterwards has mtime > cachedAt, so it
// will be detected by fileChangedSince (unlike time.Now() which
// could race with a file whose mtime <= Now).
if maxMtime.IsZero() {
maxMtime = time.Unix(1, 0)
}
return cacheBaseline{existed: existed, maxMtime: maxMtime}
}
// sourceFilesChangedLocked checks whether any workspace source file has been
// modified, created, or deleted since the cache was last built.
//
// IMPORTANT: The caller MUST hold at least a read lock on systemPromptMutex.
// Go's sync.RWMutex is not reentrant, so this function must NOT acquire the
// lock itself (it would deadlock when called from BuildSystemPromptWithCache
// which already holds RLock or Lock).
func (cb *ContextBuilder) sourceFilesChangedLocked() bool {
if cb.cachedAt.IsZero() {
return true
}
// Check tracked source files (bootstrap + memory).
for _, p := range cb.sourcePaths() {
if cb.fileChangedSince(p) {
return true
}
}
// --- Skills directory (handled separately from sourcePaths) ---
//
// 1. Creation/deletion: tracked via existedAtCache, same as bootstrap files.
skillsDir := filepath.Join(cb.workspace, "skills")
if cb.fileChangedSince(skillsDir) {
return true
}
// 2. Structural changes (add/remove entries inside the dir) are reflected
// in the directory's own mtime, which fileChangedSince already checks.
//
// 3. Content-only edits to files inside skills/ do NOT update the parent
// directory mtime on most filesystems, so we recursively walk to check
// individual file mtimes at any nesting depth.
if skillFilesModifiedSince(skillsDir, cb.cachedAt) {
return true
}
return false
}
// fileChangedSince returns true if a tracked source file has been modified,
// newly created, or deleted since the cache was built.
//
// Four cases:
// - existed at cache time, exists now -> check mtime
// - existed at cache time, gone now -> changed (deleted)
// - absent at cache time, exists now -> changed (created)
// - absent at cache time, gone now -> no change
func (cb *ContextBuilder) fileChangedSince(path string) bool {
// Defensive: if existedAtCache was never initialized, treat as changed
// so the cache rebuilds rather than silently serving stale data.
if cb.existedAtCache == nil {
return true
}
existedBefore := cb.existedAtCache[path]
info, err := os.Stat(path)
existsNow := err == nil
if existedBefore != existsNow {
return true // file was created or deleted
}
if !existsNow {
return false // didn't exist before, doesn't exist now
}
return info.ModTime().After(cb.cachedAt)
}
// errWalkStop is a sentinel error used to stop filepath.WalkDir early.
// Using a dedicated error (instead of fs.SkipAll) makes the early-exit
// intent explicit and avoids the nilerr linter warning that would fire
// if the callback returned nil when its err parameter is non-nil.
var errWalkStop = errors.New("walk stop")
// skillFilesModifiedSince recursively walks the skills directory and checks
// whether any file was modified after t. This catches content-only edits at
// any nesting depth (e.g. skills/name/docs/extra.md) that don't update
// parent directory mtimes.
func skillFilesModifiedSince(skillsDir string, t time.Time) bool {
changed := false
err := filepath.WalkDir(skillsDir, func(path string, d fs.DirEntry, walkErr error) error {
if walkErr == nil && !d.IsDir() {
if info, statErr := os.Stat(path); statErr == nil && info.ModTime().After(t) {
changed = true
return errWalkStop // stop walking
}
}
return nil
})
// errWalkStop is expected (early exit on first changed file).
// os.IsNotExist means the skills dir doesn't exist yet — not an error.
// Any other error is unexpected and worth logging.
if err != nil && !errors.Is(err, errWalkStop) && !os.IsNotExist(err) {
logger.DebugCF("agent", "skills walk error", map[string]any{"error": err.Error()})
}
return changed
}
func (cb *ContextBuilder) LoadBootstrapFiles() string {
bootstrapFiles := []string{
"AGENTS.md",
@@ -159,6 +353,28 @@ func (cb *ContextBuilder) LoadBootstrapFiles() string {
return sb.String()
}
// buildDynamicContext returns a short dynamic context string with per-request info.
// This changes every request (time, session) so it is NOT part of the cached prompt.
// LLM-side KV cache reuse is achieved by each provider adapter's native mechanism:
// - Anthropic: per-block cache_control (ephemeral) on the static SystemParts block
// - OpenAI / Codex: prompt_cache_key for prefix-based caching
//
// See: https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching
// See: https://platform.openai.com/docs/guides/prompt-caching
func (cb *ContextBuilder) buildDynamicContext(channel, chatID string) string {
now := time.Now().Format("2006-01-02 15:04 (Monday)")
rt := fmt.Sprintf("%s %s, Go %s", runtime.GOOS, runtime.GOARCH, runtime.Version())
var sb strings.Builder
fmt.Fprintf(&sb, "## Current Time\n%s\n\n## Runtime\n%s", now, rt)
if channel != "" && chatID != "" {
fmt.Fprintf(&sb, "\n\n## Current Session\nChannel: %s\nChat ID: %s", channel, chatID)
}
return sb.String()
}
func (cb *ContextBuilder) BuildMessages(
history []providers.Message,
summary string,
@@ -168,23 +384,65 @@ func (cb *ContextBuilder) BuildMessages(
) []providers.Message {
messages := []providers.Message{}
systemPrompt := cb.BuildSystemPrompt()
// The static part (identity, bootstrap, skills, memory) is cached locally to
// avoid repeated file I/O and string building on every call (fixes issue #607).
// Dynamic parts (time, session, summary) are appended per request.
// Everything is sent as a single system message for provider compatibility:
// - Anthropic adapter extracts messages[0] (Role=="system") and maps its content
// to the top-level "system" parameter in the Messages API request. A single
// contiguous system block makes this extraction straightforward.
// - Codex maps only the first system message to its instructions field.
// - OpenAI-compat passes messages through as-is.
staticPrompt := cb.BuildSystemPromptWithCache()
// Add Current Session info if provided
if channel != "" && chatID != "" {
systemPrompt += fmt.Sprintf("\n\n## Current Session\nChannel: %s\nChat ID: %s", channel, chatID)
// Build short dynamic context (time, runtime, session) — changes per request
dynamicCtx := cb.buildDynamicContext(channel, chatID)
// Compose a single system message: static (cached) + dynamic + optional summary.
// Keeping all system content in one message ensures every provider adapter can
// extract it correctly (Anthropic adapter -> top-level system param,
// Codex -> instructions field).
//
// SystemParts carries the same content as structured blocks so that
// cache-aware adapters (Anthropic) can set per-block cache_control.
// The static block is marked "ephemeral" — its prefix hash is stable
// across requests, enabling LLM-side KV cache reuse.
stringParts := []string{staticPrompt, dynamicCtx}
contentBlocks := []providers.ContentBlock{
{Type: "text", Text: staticPrompt, CacheControl: &providers.CacheControl{Type: "ephemeral"}},
{Type: "text", Text: dynamicCtx},
}
// Log system prompt summary for debugging (debug mode only)
if summary != "" {
summaryText := fmt.Sprintf(
"CONTEXT_SUMMARY: The following is an approximate summary of prior conversation "+
"for reference only. It may be incomplete or outdated — always defer to explicit instructions.\n\n%s",
summary)
stringParts = append(stringParts, summaryText)
contentBlocks = append(contentBlocks, providers.ContentBlock{Type: "text", Text: summaryText})
}
fullSystemPrompt := strings.Join(stringParts, "\n\n---\n\n")
// Log system prompt summary for debugging (debug mode only).
// Read cachedSystemPrompt under lock to avoid a data race with
// concurrent InvalidateCache / BuildSystemPromptWithCache writes.
cb.systemPromptMutex.RLock()
isCached := cb.cachedSystemPrompt != ""
cb.systemPromptMutex.RUnlock()
logger.DebugCF("agent", "System prompt built",
map[string]any{
"total_chars": len(systemPrompt),
"total_lines": strings.Count(systemPrompt, "\n") + 1,
"section_count": strings.Count(systemPrompt, "\n\n---\n\n") + 1,
"static_chars": len(staticPrompt),
"dynamic_chars": len(dynamicCtx),
"total_chars": len(fullSystemPrompt),
"has_summary": summary != "",
"cached": isCached,
})
// Log preview of system prompt (avoid logging huge content)
preview := systemPrompt
preview := fullSystemPrompt
if len(preview) > 500 {
preview = preview[:500] + "... (truncated)"
}
@@ -193,19 +451,21 @@ func (cb *ContextBuilder) BuildMessages(
"preview": preview,
})
if summary != "" {
systemPrompt += "\n\n## Summary of Previous Conversation\n\n" + summary
}
history = sanitizeHistoryForProvider(history)
// Single system message containing all context — compatible with all providers.
// SystemParts enables cache-aware adapters to set per-block cache_control;
// Content is the concatenated fallback for adapters that don't read SystemParts.
messages = append(messages, providers.Message{
Role: "system",
Content: systemPrompt,
Role: "system",
Content: fullSystemPrompt,
SystemParts: contentBlocks,
})
// Add conversation history
messages = append(messages, history...)
// Add current user message
if strings.TrimSpace(currentMessage) != "" {
messages = append(messages, providers.Message{
Role: "user",
@@ -224,13 +484,32 @@ func sanitizeHistoryForProvider(history []providers.Message) []providers.Message
sanitized := make([]providers.Message, 0, len(history))
for _, msg := range history {
switch msg.Role {
case "system":
// Drop system messages from history. BuildMessages always
// constructs its own single system message (static + dynamic +
// summary); extra system messages would break providers that
// only accept one (Anthropic, Codex).
logger.DebugCF("agent", "Dropping system message from history", map[string]any{})
continue
case "tool":
if len(sanitized) == 0 {
logger.DebugCF("agent", "Dropping orphaned leading tool message", map[string]any{})
continue
}
last := sanitized[len(sanitized)-1]
if last.Role != "assistant" || len(last.ToolCalls) == 0 {
// Walk backwards to find the nearest assistant message,
// skipping over any preceding tool messages (multi-tool-call case).
foundAssistant := false
for i := len(sanitized) - 1; i >= 0; i-- {
if sanitized[i].Role == "tool" {
continue
}
if sanitized[i].Role == "assistant" && len(sanitized[i].ToolCalls) > 0 {
foundAssistant = true
}
break
}
if !foundAssistant {
logger.DebugCF("agent", "Dropping orphaned tool message", map[string]any{})
continue
}
+513
View File
@@ -0,0 +1,513 @@
package agent
import (
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
"github.com/sipeed/picoclaw/pkg/providers"
)
// setupWorkspace creates a temporary workspace with standard directories and optional files.
// Returns the tmpDir path; caller should defer os.RemoveAll(tmpDir).
func setupWorkspace(t *testing.T, files map[string]string) string {
t.Helper()
tmpDir, err := os.MkdirTemp("", "picoclaw-test-*")
if err != nil {
t.Fatal(err)
}
os.MkdirAll(filepath.Join(tmpDir, "memory"), 0o755)
os.MkdirAll(filepath.Join(tmpDir, "skills"), 0o755)
for name, content := range files {
dir := filepath.Dir(filepath.Join(tmpDir, name))
os.MkdirAll(dir, 0o755)
if err := os.WriteFile(filepath.Join(tmpDir, name), []byte(content), 0o644); err != nil {
t.Fatal(err)
}
}
return tmpDir
}
// TestSingleSystemMessage verifies that BuildMessages always produces exactly one
// system message regardless of summary/history variations.
// Fix: multiple system messages break Anthropic (top-level system param) and
// Codex (only reads last system message as instructions).
func TestSingleSystemMessage(t *testing.T) {
tmpDir := setupWorkspace(t, map[string]string{
"IDENTITY.md": "# Identity\nTest agent.",
})
defer os.RemoveAll(tmpDir)
cb := NewContextBuilder(tmpDir)
tests := []struct {
name string
history []providers.Message
summary string
message string
}{
{
name: "no summary, no history",
summary: "",
message: "hello",
},
{
name: "with summary",
summary: "Previous conversation discussed X",
message: "hello",
},
{
name: "with history and summary",
history: []providers.Message{
{Role: "user", Content: "hi"},
{Role: "assistant", Content: "hello"},
},
summary: strings.Repeat("Long summary text. ", 50),
message: "new message",
},
{
name: "system message in history is filtered",
history: []providers.Message{
{Role: "system", Content: "stale system prompt from previous session"},
{Role: "user", Content: "hi"},
{Role: "assistant", Content: "hello"},
},
summary: "",
message: "new message",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
msgs := cb.BuildMessages(tt.history, tt.summary, tt.message, nil, "test", "chat1")
systemCount := 0
for _, m := range msgs {
if m.Role == "system" {
systemCount++
}
}
if systemCount != 1 {
t.Errorf("expected exactly 1 system message, got %d", systemCount)
}
if msgs[0].Role != "system" {
t.Errorf("first message should be system, got %s", msgs[0].Role)
}
if msgs[len(msgs)-1].Role != "user" {
t.Errorf("last message should be user, got %s", msgs[len(msgs)-1].Role)
}
// System message must contain identity (static) and time (dynamic)
sys := msgs[0].Content
if !strings.Contains(sys, "picoclaw") {
t.Error("system message missing identity")
}
if !strings.Contains(sys, "Current Time") {
t.Error("system message missing dynamic time context")
}
// Summary handling
if tt.summary != "" {
if !strings.Contains(sys, "CONTEXT_SUMMARY:") {
t.Error("summary present but CONTEXT_SUMMARY prefix missing")
}
if !strings.Contains(sys, tt.summary[:20]) {
t.Error("summary content not found in system message")
}
} else {
if strings.Contains(sys, "CONTEXT_SUMMARY:") {
t.Error("CONTEXT_SUMMARY should not appear without summary")
}
}
})
}
}
// TestMtimeAutoInvalidation verifies that the cache detects source file changes
// via mtime without requiring explicit InvalidateCache().
// Fix: original implementation had no auto-invalidation — edits to bootstrap files,
// memory, or skills were invisible until process restart.
func TestMtimeAutoInvalidation(t *testing.T) {
tests := []struct {
name string
file string // relative path inside workspace
contentV1 string
contentV2 string
checkField string // substring to verify in rebuilt prompt
}{
{
name: "bootstrap file change",
file: "IDENTITY.md",
contentV1: "# Original Identity",
contentV2: "# Updated Identity",
checkField: "Updated Identity",
},
{
name: "memory file change",
file: "memory/MEMORY.md",
contentV1: "# Memory\nUser likes Go.",
contentV2: "# Memory\nUser likes Rust.",
checkField: "User likes Rust",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tmpDir := setupWorkspace(t, map[string]string{tt.file: tt.contentV1})
defer os.RemoveAll(tmpDir)
cb := NewContextBuilder(tmpDir)
sp1 := cb.BuildSystemPromptWithCache()
// Overwrite file and set future mtime to ensure detection.
// Use 2s offset for filesystem mtime resolution safety (some FS
// have 1s or coarser granularity, especially in CI containers).
fullPath := filepath.Join(tmpDir, tt.file)
os.WriteFile(fullPath, []byte(tt.contentV2), 0o644)
future := time.Now().Add(2 * time.Second)
os.Chtimes(fullPath, future, future)
// Verify sourceFilesChangedLocked detects the mtime change
cb.systemPromptMutex.RLock()
changed := cb.sourceFilesChangedLocked()
cb.systemPromptMutex.RUnlock()
if !changed {
t.Fatalf("sourceFilesChangedLocked() should detect %s change", tt.file)
}
// Should auto-rebuild without explicit InvalidateCache()
sp2 := cb.BuildSystemPromptWithCache()
if sp1 == sp2 {
t.Errorf("cache not rebuilt after %s change", tt.file)
}
if !strings.Contains(sp2, tt.checkField) {
t.Errorf("rebuilt prompt missing expected content %q", tt.checkField)
}
})
}
// Skills directory mtime change
t.Run("skills dir change", func(t *testing.T) {
tmpDir := setupWorkspace(t, nil)
defer os.RemoveAll(tmpDir)
cb := NewContextBuilder(tmpDir)
_ = cb.BuildSystemPromptWithCache() // populate cache
// Touch skills directory (simulate new skill installed)
skillsDir := filepath.Join(tmpDir, "skills")
future := time.Now().Add(2 * time.Second)
os.Chtimes(skillsDir, future, future)
// Verify sourceFilesChangedLocked detects it (cache is rebuilt)
// We confirm by checking internal state: a second call should rebuild.
cb.systemPromptMutex.RLock()
changed := cb.sourceFilesChangedLocked()
cb.systemPromptMutex.RUnlock()
if !changed {
t.Error("sourceFilesChangedLocked() should detect skills dir mtime change")
}
})
}
// TestExplicitInvalidateCache verifies that InvalidateCache() forces a rebuild
// even when source files haven't changed (useful for tests and reload commands).
func TestExplicitInvalidateCache(t *testing.T) {
tmpDir := setupWorkspace(t, map[string]string{
"IDENTITY.md": "# Test Identity",
})
defer os.RemoveAll(tmpDir)
cb := NewContextBuilder(tmpDir)
sp1 := cb.BuildSystemPromptWithCache()
cb.InvalidateCache()
sp2 := cb.BuildSystemPromptWithCache()
if sp1 != sp2 {
t.Error("prompt should be identical after invalidate+rebuild when files unchanged")
}
// Verify cachedAt was reset
cb.InvalidateCache()
cb.systemPromptMutex.RLock()
if !cb.cachedAt.IsZero() {
t.Error("cachedAt should be zero after InvalidateCache()")
}
cb.systemPromptMutex.RUnlock()
}
// TestCacheStability verifies that the static prompt is stable across repeated calls
// when no files change (regression test for issue #607).
func TestCacheStability(t *testing.T) {
tmpDir := setupWorkspace(t, map[string]string{
"IDENTITY.md": "# Identity\nContent",
"SOUL.md": "# Soul\nContent",
})
defer os.RemoveAll(tmpDir)
cb := NewContextBuilder(tmpDir)
results := make([]string, 5)
for i := range results {
results[i] = cb.BuildSystemPromptWithCache()
}
for i := 1; i < len(results); i++ {
if results[i] != results[0] {
t.Errorf("cached prompt changed between call 0 and %d", i)
}
}
// Static prompt must NOT contain per-request data
if strings.Contains(results[0], "Current Time") {
t.Error("static cached prompt should not contain time (added dynamically)")
}
}
// TestNewFileCreationInvalidatesCache verifies that creating a source file that
// did not exist when the cache was built triggers a cache rebuild.
// This catches the "from nothing to something" edge case that the old
// modifiedSince (return false on stat error) would miss.
func TestNewFileCreationInvalidatesCache(t *testing.T) {
tests := []struct {
name string
file string // relative path inside workspace
content string
checkField string // substring to verify in rebuilt prompt
}{
{
name: "new bootstrap file",
file: "SOUL.md",
content: "# Soul\nBe kind and helpful.",
checkField: "Be kind and helpful",
},
{
name: "new memory file",
file: "memory/MEMORY.md",
content: "# Memory\nUser prefers dark mode.",
checkField: "User prefers dark mode",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Start with an empty workspace (no bootstrap/memory files)
tmpDir := setupWorkspace(t, nil)
defer os.RemoveAll(tmpDir)
cb := NewContextBuilder(tmpDir)
// Populate cache — file does not exist yet
sp1 := cb.BuildSystemPromptWithCache()
if strings.Contains(sp1, tt.checkField) {
t.Fatalf("prompt should not contain %q before file is created", tt.checkField)
}
// Create the file after cache was built
fullPath := filepath.Join(tmpDir, tt.file)
os.MkdirAll(filepath.Dir(fullPath), 0o755)
if err := os.WriteFile(fullPath, []byte(tt.content), 0o644); err != nil {
t.Fatal(err)
}
// Set future mtime to guarantee detection
future := time.Now().Add(2 * time.Second)
os.Chtimes(fullPath, future, future)
// Cache should auto-invalidate because file went from absent -> present
sp2 := cb.BuildSystemPromptWithCache()
if !strings.Contains(sp2, tt.checkField) {
t.Errorf("cache not invalidated on new file creation: expected %q in prompt", tt.checkField)
}
})
}
}
// TestSkillFileContentChange verifies that modifying a skill file's content
// (not just the directory structure) invalidates the cache.
// This is the scenario where directory mtime alone is insufficient — on most
// filesystems, editing a file inside a directory does NOT update the parent
// directory's mtime.
func TestSkillFileContentChange(t *testing.T) {
skillMD := `---
name: test-skill
description: "A test skill"
---
# Test Skill v1
Original content.`
tmpDir := setupWorkspace(t, map[string]string{
"skills/test-skill/SKILL.md": skillMD,
})
defer os.RemoveAll(tmpDir)
cb := NewContextBuilder(tmpDir)
// Populate cache
sp1 := cb.BuildSystemPromptWithCache()
_ = sp1 // cache is warm
// Modify the skill file content (without touching the skills/ directory)
updatedSkillMD := `---
name: test-skill
description: "An updated test skill"
---
# Test Skill v2
Updated content.`
skillPath := filepath.Join(tmpDir, "skills", "test-skill", "SKILL.md")
if err := os.WriteFile(skillPath, []byte(updatedSkillMD), 0o644); err != nil {
t.Fatal(err)
}
// Set future mtime on the skill file only (NOT the directory)
future := time.Now().Add(2 * time.Second)
os.Chtimes(skillPath, future, future)
// Verify that sourceFilesChangedLocked detects the content change
cb.systemPromptMutex.RLock()
changed := cb.sourceFilesChangedLocked()
cb.systemPromptMutex.RUnlock()
if !changed {
t.Error("sourceFilesChangedLocked() should detect skill file content change")
}
// Verify cache is actually rebuilt with new content
sp2 := cb.BuildSystemPromptWithCache()
if sp1 == sp2 && strings.Contains(sp1, "test-skill") {
// If the skill appeared in the prompt and the prompt didn't change,
// the cache was not invalidated.
t.Error("cache should be invalidated when skill file content changes")
}
}
// TestConcurrentBuildSystemPromptWithCache verifies that multiple goroutines
// can safely call BuildSystemPromptWithCache concurrently without producing
// empty results, panics, or data races.
// Run with: go test -race ./pkg/agent/ -run TestConcurrentBuildSystemPromptWithCache
func TestConcurrentBuildSystemPromptWithCache(t *testing.T) {
tmpDir := setupWorkspace(t, map[string]string{
"IDENTITY.md": "# Identity\nConcurrency test agent.",
"SOUL.md": "# Soul\nBe helpful.",
"memory/MEMORY.md": "# Memory\nUser prefers Go.",
"skills/demo/SKILL.md": "---\nname: demo\ndescription: \"demo skill\"\n---\n# Demo",
})
defer os.RemoveAll(tmpDir)
cb := NewContextBuilder(tmpDir)
const goroutines = 20
const iterations = 50
var wg sync.WaitGroup
errs := make(chan string, goroutines*iterations)
for g := 0; g < goroutines; g++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for i := 0; i < iterations; i++ {
result := cb.BuildSystemPromptWithCache()
if result == "" {
errs <- "empty prompt returned"
return
}
if !strings.Contains(result, "picoclaw") {
errs <- "prompt missing identity"
return
}
// Also exercise BuildMessages concurrently
msgs := cb.BuildMessages(nil, "", "hello", nil, "test", "chat")
if len(msgs) < 2 {
errs <- "BuildMessages returned fewer than 2 messages"
return
}
if msgs[0].Role != "system" {
errs <- "first message not system"
return
}
// Occasionally invalidate to exercise the write path
if i%10 == 0 {
cb.InvalidateCache()
}
}
}(g)
}
wg.Wait()
close(errs)
for errMsg := range errs {
t.Errorf("concurrent access error: %s", errMsg)
}
}
// BenchmarkBuildMessagesWithCache measures caching performance.
// TestEmptyWorkspaceBaselineDetectsNewFiles verifies that when the cache is
// built on an empty workspace (no tracked files exist), creating a file
// afterwards still triggers cache invalidation. This validates the
// time.Unix(1, 0) fallback for maxMtime: any real file's mtime is after epoch,
// so fileChangedSince correctly detects the absent -> present transition AND
// the mtime comparison succeeds even without artificially inflated Chtimes.
func TestEmptyWorkspaceBaselineDetectsNewFiles(t *testing.T) {
// Empty workspace: no bootstrap files, no memory, no skills content.
tmpDir := setupWorkspace(t, nil)
defer os.RemoveAll(tmpDir)
cb := NewContextBuilder(tmpDir)
// Build cache — all tracked files are absent, maxMtime falls back to epoch.
sp1 := cb.BuildSystemPromptWithCache()
// Create a bootstrap file with natural mtime (no Chtimes manipulation).
// The file's mtime should be the current wall-clock time, which is
// strictly after time.Unix(1, 0).
soulPath := filepath.Join(tmpDir, "SOUL.md")
if err := os.WriteFile(soulPath, []byte("# Soul\nNewly created."), 0o644); err != nil {
t.Fatal(err)
}
// Cache should detect the new file via existedAtCache (absent -> present).
cb.systemPromptMutex.RLock()
changed := cb.sourceFilesChangedLocked()
cb.systemPromptMutex.RUnlock()
if !changed {
t.Fatal("sourceFilesChangedLocked should detect newly created file on empty workspace")
}
sp2 := cb.BuildSystemPromptWithCache()
if !strings.Contains(sp2, "Newly created") {
t.Error("rebuilt prompt should contain new file content")
}
if sp1 == sp2 {
t.Error("cache should have been invalidated after file creation")
}
}
// BenchmarkBuildMessagesWithCache measures caching performance.
func BenchmarkBuildMessagesWithCache(b *testing.B) {
tmpDir, _ := os.MkdirTemp("", "picoclaw-bench-*")
defer os.RemoveAll(tmpDir)
os.MkdirAll(filepath.Join(tmpDir, "memory"), 0o755)
os.MkdirAll(filepath.Join(tmpDir, "skills"), 0o755)
for _, name := range []string{"IDENTITY.md", "SOUL.md", "USER.md"} {
os.WriteFile(filepath.Join(tmpDir, name), []byte(strings.Repeat("Content.\n", 10)), 0o644)
}
cb := NewContextBuilder(tmpDir)
history := []providers.Message{
{Role: "user", Content: "previous message"},
{Role: "assistant", Content: "previous response"},
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = cb.BuildMessages(history, "summary", "new message", nil, "cli", "test")
}
}
+209
View File
@@ -0,0 +1,209 @@
package agent
import (
"testing"
"github.com/sipeed/picoclaw/pkg/providers"
)
func msg(role, content string) providers.Message {
return providers.Message{Role: role, Content: content}
}
func assistantWithTools(toolIDs ...string) providers.Message {
calls := make([]providers.ToolCall, len(toolIDs))
for i, id := range toolIDs {
calls[i] = providers.ToolCall{ID: id, Type: "function"}
}
return providers.Message{Role: "assistant", ToolCalls: calls}
}
func toolResult(id string) providers.Message {
return providers.Message{Role: "tool", Content: "result", ToolCallID: id}
}
func TestSanitizeHistoryForProvider_EmptyHistory(t *testing.T) {
result := sanitizeHistoryForProvider(nil)
if len(result) != 0 {
t.Fatalf("expected empty, got %d messages", len(result))
}
result = sanitizeHistoryForProvider([]providers.Message{})
if len(result) != 0 {
t.Fatalf("expected empty, got %d messages", len(result))
}
}
func TestSanitizeHistoryForProvider_SingleToolCall(t *testing.T) {
history := []providers.Message{
msg("user", "hello"),
assistantWithTools("A"),
toolResult("A"),
msg("assistant", "done"),
}
result := sanitizeHistoryForProvider(history)
if len(result) != 4 {
t.Fatalf("expected 4 messages, got %d", len(result))
}
assertRoles(t, result, "user", "assistant", "tool", "assistant")
}
func TestSanitizeHistoryForProvider_MultiToolCalls(t *testing.T) {
history := []providers.Message{
msg("user", "do two things"),
assistantWithTools("A", "B"),
toolResult("A"),
toolResult("B"),
msg("assistant", "both done"),
}
result := sanitizeHistoryForProvider(history)
if len(result) != 5 {
t.Fatalf("expected 5 messages, got %d: %+v", len(result), roles(result))
}
assertRoles(t, result, "user", "assistant", "tool", "tool", "assistant")
}
func TestSanitizeHistoryForProvider_AssistantToolCallAfterPlainAssistant(t *testing.T) {
history := []providers.Message{
msg("user", "hi"),
msg("assistant", "thinking"),
assistantWithTools("A"),
toolResult("A"),
}
result := sanitizeHistoryForProvider(history)
if len(result) != 2 {
t.Fatalf("expected 2 messages, got %d: %+v", len(result), roles(result))
}
assertRoles(t, result, "user", "assistant")
}
func TestSanitizeHistoryForProvider_OrphanedLeadingTool(t *testing.T) {
history := []providers.Message{
toolResult("A"),
msg("user", "hello"),
}
result := sanitizeHistoryForProvider(history)
if len(result) != 1 {
t.Fatalf("expected 1 message, got %d: %+v", len(result), roles(result))
}
assertRoles(t, result, "user")
}
func TestSanitizeHistoryForProvider_ToolAfterUserDropped(t *testing.T) {
history := []providers.Message{
msg("user", "hello"),
toolResult("A"),
}
result := sanitizeHistoryForProvider(history)
if len(result) != 1 {
t.Fatalf("expected 1 message, got %d: %+v", len(result), roles(result))
}
assertRoles(t, result, "user")
}
func TestSanitizeHistoryForProvider_ToolAfterAssistantNoToolCalls(t *testing.T) {
history := []providers.Message{
msg("user", "hello"),
msg("assistant", "hi"),
toolResult("A"),
}
result := sanitizeHistoryForProvider(history)
if len(result) != 2 {
t.Fatalf("expected 2 messages, got %d: %+v", len(result), roles(result))
}
assertRoles(t, result, "user", "assistant")
}
func TestSanitizeHistoryForProvider_AssistantToolCallAtStart(t *testing.T) {
history := []providers.Message{
assistantWithTools("A"),
toolResult("A"),
msg("user", "hello"),
}
result := sanitizeHistoryForProvider(history)
if len(result) != 1 {
t.Fatalf("expected 1 message, got %d: %+v", len(result), roles(result))
}
assertRoles(t, result, "user")
}
func TestSanitizeHistoryForProvider_MultiToolCallsThenNewRound(t *testing.T) {
history := []providers.Message{
msg("user", "do two things"),
assistantWithTools("A", "B"),
toolResult("A"),
toolResult("B"),
msg("assistant", "done"),
msg("user", "hi"),
assistantWithTools("C"),
toolResult("C"),
msg("assistant", "done again"),
}
result := sanitizeHistoryForProvider(history)
if len(result) != 9 {
t.Fatalf("expected 9 messages, got %d: %+v", len(result), roles(result))
}
assertRoles(t, result, "user", "assistant", "tool", "tool", "assistant", "user", "assistant", "tool", "assistant")
}
func TestSanitizeHistoryForProvider_ConsecutiveMultiToolRounds(t *testing.T) {
history := []providers.Message{
msg("user", "start"),
assistantWithTools("A", "B"),
toolResult("A"),
toolResult("B"),
assistantWithTools("C", "D"),
toolResult("C"),
toolResult("D"),
msg("assistant", "all done"),
}
result := sanitizeHistoryForProvider(history)
if len(result) != 8 {
t.Fatalf("expected 8 messages, got %d: %+v", len(result), roles(result))
}
assertRoles(t, result, "user", "assistant", "tool", "tool", "assistant", "tool", "tool", "assistant")
}
func TestSanitizeHistoryForProvider_PlainConversation(t *testing.T) {
history := []providers.Message{
msg("user", "hello"),
msg("assistant", "hi"),
msg("user", "how are you"),
msg("assistant", "fine"),
}
result := sanitizeHistoryForProvider(history)
if len(result) != 4 {
t.Fatalf("expected 4 messages, got %d", len(result))
}
assertRoles(t, result, "user", "assistant", "user", "assistant")
}
func roles(msgs []providers.Message) []string {
r := make([]string, len(msgs))
for i, m := range msgs {
r[i] = m.Role
}
return r
}
func assertRoles(t *testing.T, msgs []providers.Message, expected ...string) {
t.Helper()
if len(msgs) != len(expected) {
t.Fatalf("role count mismatch: got %v, want %v", roles(msgs), expected)
}
for i, exp := range expected {
if msgs[i].Role != exp {
t.Errorf("message[%d]: got role %q, want %q", i, msgs[i].Role, exp)
}
}
}
-1
View File
@@ -59,7 +59,6 @@ func NewAgentInstance(
sessionsManager := session.NewSessionManager(sessionsDir)
contextBuilder := NewContextBuilder(workspace)
contextBuilder.SetToolsRegistry(toolsRegistry)
agentID := routing.DefaultAgentID
agentName := ""
+13 -12
View File
@@ -149,9 +149,6 @@ func registerSharedTools(
return registry.CanSpawnSubagent(currentAgentID, targetAgentID)
})
agent.Tools.Register(spawnTool)
// Update context builder with the complete tools registry
agent.ContextBuilder.SetToolsRegistry(agent.Tools)
}
}
@@ -524,8 +521,9 @@ func (al *AgentLoop) runLLMIteration(
fbResult, fbErr := al.fallback.Execute(ctx, agent.Candidates,
func(ctx context.Context, provider, model string) (*providers.LLMResponse, error) {
return agent.Provider.Chat(ctx, messages, providerToolDefs, model, map[string]any{
"max_tokens": agent.MaxTokens,
"temperature": agent.Temperature,
"max_tokens": agent.MaxTokens,
"temperature": agent.Temperature,
"prompt_cache_key": agent.ID,
})
},
)
@@ -540,8 +538,9 @@ func (al *AgentLoop) runLLMIteration(
return fbResult.Response, nil
}
return agent.Provider.Chat(ctx, messages, providerToolDefs, agent.Model, map[string]any{
"max_tokens": agent.MaxTokens,
"temperature": agent.Temperature,
"max_tokens": agent.MaxTokens,
"temperature": agent.Temperature,
"prompt_cache_key": agent.ID,
})
}
@@ -800,7 +799,7 @@ func (al *AgentLoop) forceCompression(agent *AgentInstance, sessionKey string) {
droppedCount := mid
keptConversation := conversation[mid:]
newHistory := make([]providers.Message, 0)
newHistory := make([]providers.Message, 0, 1+len(keptConversation)+1)
// Append compression note to the original system prompt instead of adding a new system message
// This avoids having two consecutive system messages which some APIs (like Zhipu) reject
@@ -962,8 +961,9 @@ func (al *AgentLoop) summarizeSession(agent *AgentInstance, sessionKey string) {
nil,
agent.Model,
map[string]any{
"max_tokens": 1024,
"temperature": 0.3,
"max_tokens": 1024,
"temperature": 0.3,
"prompt_cache_key": agent.ID,
},
)
if err == nil {
@@ -1012,8 +1012,9 @@ func (al *AgentLoop) summarizeBatch(
nil,
agent.Model,
map[string]any{
"max_tokens": 1024,
"temperature": 0.3,
"max_tokens": 1024,
"temperature": 0.3,
"prompt_cache_key": agent.ID,
},
)
if err != nil {
+1 -1
View File
@@ -156,7 +156,7 @@ func LoginBrowser(cfg OAuthProviderConfig) (*AuthCredential, error) {
return exchangeCodeForTokens(cfg, result.code, pkce.CodeVerifier, redirectURI)
case manualInput := <-manualCh:
if manualInput == "" {
return nil, fmt.Errorf("manual input cancelled")
return nil, fmt.Errorf("manual input canceled")
}
// Extract code from URL if it's a full URL
code := manualInput
+1 -1
View File
@@ -233,7 +233,7 @@ func (c *DiscordChannel) handleMessage(s *discordgo.Session, m *discordgo.Messag
if localPath != "" {
localFiles = append(localFiles, localPath)
transcribedText := ""
var transcribedText string
if c.transcriber != nil && c.transcriber.IsAvailable() {
ctx, cancel := context.WithTimeout(c.getContext(), transcriptionTimeout)
result, err := c.transcriber.Transcribe(ctx, localPath)
+5 -3
View File
@@ -174,7 +174,10 @@ func (c *OneBotChannel) connect() error {
header["Authorization"] = []string{"Bearer " + c.config.AccessToken}
}
conn, _, err := dialer.Dial(c.config.WSUrl, header)
conn, resp, err := dialer.Dial(c.config.WSUrl, header)
if resp != nil {
resp.Body.Close()
}
if err != nil {
return err
}
@@ -310,7 +313,7 @@ func (c *OneBotChannel) sendAPIRequest(action string, params any, timeout time.D
case <-time.After(timeout):
return nil, fmt.Errorf("API request %s timed out after %v", action, timeout)
case <-c.ctx.Done():
return nil, fmt.Errorf("context cancelled")
return nil, fmt.Errorf("context canceled")
}
}
@@ -695,7 +698,6 @@ func (c *OneBotChannel) parseMessageSegments(raw json.RawMessage, selfID int64)
textParts = append(textParts, "[forward message]")
default:
}
}
+1 -1
View File
@@ -439,5 +439,5 @@ func parseSlackChatID(chatID string) (channelID, threadTS string) {
if len(parts) > 1 {
threadTS = parts[1]
}
return
return channelID, threadTS
}
+1 -1
View File
@@ -265,7 +265,7 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Mes
localFiles = append(localFiles, voicePath)
mediaPaths = append(mediaPaths, voicePath)
transcribedText := ""
var transcribedText string
if c.transcriber != nil && c.transcriber.IsAvailable() {
transcriberCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
+4 -1
View File
@@ -41,7 +41,10 @@ func (c *WhatsAppChannel) Start(ctx context.Context) error {
dialer := websocket.DefaultDialer
dialer.HandshakeTimeout = 10 * time.Second
conn, _, err := dialer.Dial(c.url, nil)
conn, resp, err := dialer.Dial(c.url, nil)
if resp != nil {
resp.Body.Close()
}
if err != nil {
return fmt.Errorf("failed to connect to WhatsApp bridge: %w", err)
}
+24 -24
View File
@@ -16,10 +16,10 @@ import (
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/constants"
"github.com/sipeed/picoclaw/pkg/fileutil"
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/state"
"github.com/sipeed/picoclaw/pkg/tools"
"github.com/sipeed/picoclaw/pkg/fileutil"
)
const (
@@ -167,7 +167,7 @@ func (hs *HeartbeatService) executeHeartbeat() {
}
if handler == nil {
hs.logError("Heartbeat handler not configured")
hs.logErrorf("Heartbeat handler not configured")
return
}
@@ -176,23 +176,23 @@ func (hs *HeartbeatService) executeHeartbeat() {
channel, chatID := hs.parseLastChannel(lastChannel)
// Debug log for channel resolution
hs.logInfo("Resolved channel: %s, chatID: %s (from lastChannel: %s)", channel, chatID, lastChannel)
hs.logInfof("Resolved channel: %s, chatID: %s (from lastChannel: %s)", channel, chatID, lastChannel)
result := handler(prompt, channel, chatID)
if result == nil {
hs.logInfo("Heartbeat handler returned nil result")
hs.logInfof("Heartbeat handler returned nil result")
return
}
// Handle different result types
if result.IsError {
hs.logError("Heartbeat error: %s", result.ForLLM)
hs.logErrorf("Heartbeat error: %s", result.ForLLM)
return
}
if result.Async {
hs.logInfo("Async task started: %s", result.ForLLM)
hs.logInfof("Async task started: %s", result.ForLLM)
logger.InfoCF("heartbeat", "Async heartbeat task started",
map[string]any{
"message": result.ForLLM,
@@ -202,7 +202,7 @@ func (hs *HeartbeatService) executeHeartbeat() {
// Check if silent
if result.Silent {
hs.logInfo("Heartbeat OK - silent")
hs.logInfof("Heartbeat OK - silent")
return
}
@@ -213,7 +213,7 @@ func (hs *HeartbeatService) executeHeartbeat() {
hs.sendResponse(result.ForLLM)
}
hs.logInfo("Heartbeat completed: %s", result.ForLLM)
hs.logInfof("Heartbeat completed: %s", result.ForLLM)
}
// buildPrompt builds the heartbeat prompt from HEARTBEAT.md
@@ -226,7 +226,7 @@ func (hs *HeartbeatService) buildPrompt() string {
hs.createDefaultHeartbeatTemplate()
return ""
}
hs.logError("Error reading HEARTBEAT.md: %v", err)
hs.logErrorf("Error reading HEARTBEAT.md: %v", err)
return ""
}
@@ -277,9 +277,9 @@ Add your heartbeat tasks below this line:
`
if err := fileutil.WriteFileAtomic(heartbeatPath, []byte(defaultContent), 0o644); err != nil {
hs.logError("Failed to create default HEARTBEAT.md: %v", err)
hs.logErrorf("Failed to create default HEARTBEAT.md: %v", err)
} else {
hs.logInfo("Created default HEARTBEAT.md template")
hs.logInfof("Created default HEARTBEAT.md template")
}
}
@@ -290,14 +290,14 @@ func (hs *HeartbeatService) sendResponse(response string) {
hs.mu.RUnlock()
if msgBus == nil {
hs.logInfo("No message bus configured, heartbeat result not sent")
hs.logInfof("No message bus configured, heartbeat result not sent")
return
}
// Get last channel from state
lastChannel := hs.state.GetLastChannel()
if lastChannel == "" {
hs.logInfo("No last channel recorded, heartbeat result not sent")
hs.logInfof("No last channel recorded, heartbeat result not sent")
return
}
@@ -314,7 +314,7 @@ func (hs *HeartbeatService) sendResponse(response string) {
Content: response,
})
hs.logInfo("Heartbeat result sent to %s", platform)
hs.logInfof("Heartbeat result sent to %s", platform)
}
// parseLastChannel parses the last channel string into platform and userID.
@@ -327,7 +327,7 @@ func (hs *HeartbeatService) parseLastChannel(lastChannel string) (platform, user
// Parse channel format: "platform:user_id" (e.g., "telegram:123456")
parts := strings.SplitN(lastChannel, ":", 2)
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
hs.logError("Invalid last channel format: %s", lastChannel)
hs.logErrorf("Invalid last channel format: %s", lastChannel)
return "", ""
}
@@ -335,25 +335,25 @@ func (hs *HeartbeatService) parseLastChannel(lastChannel string) (platform, user
// Skip internal channels
if constants.IsInternalChannel(platform) {
hs.logInfo("Skipping internal channel: %s", platform)
hs.logInfof("Skipping internal channel: %s", platform)
return "", ""
}
return platform, userID
}
// logInfo logs an informational message to the heartbeat log
func (hs *HeartbeatService) logInfo(format string, args ...any) {
hs.log("INFO", format, args...)
// logInfof logs an informational message to the heartbeat log
func (hs *HeartbeatService) logInfof(format string, args ...any) {
hs.logf("INFO", format, args...)
}
// logError logs an error message to the heartbeat log
func (hs *HeartbeatService) logError(format string, args ...any) {
hs.log("ERROR", format, args...)
// logErrorf logs an error message to the heartbeat log
func (hs *HeartbeatService) logErrorf(format string, args ...any) {
hs.logf("ERROR", format, args...)
}
// log writes a message to the heartbeat log file
func (hs *HeartbeatService) log(level, format string, args ...any) {
// logf writes a message to the heartbeat log file
func (hs *HeartbeatService) logf(level, format string, args ...any) {
logFile := filepath.Join(hs.workspace, "heartbeat.log")
f, err := os.OpenFile(logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
+1 -1
View File
@@ -191,7 +191,7 @@ func TestLogPath(t *testing.T) {
hs := NewHeartbeatService(tmpDir, 30, true)
// Write a log entry
hs.log("INFO", "Test log entry")
hs.logf("INFO", "Test log entry")
// Verify log file exists at workspace root
expectedLogPath := filepath.Join(tmpDir, "heartbeat.log")
+1 -1
View File
@@ -153,7 +153,7 @@ func formatComponent(component string) string {
}
func formatFields(fields map[string]any) string {
var parts []string
parts := make([]string, 0, len(fields))
for k, v := range fields {
parts = append(parts, fmt.Sprintf("%s=%v", k, v))
}
+14 -1
View File
@@ -113,7 +113,20 @@ func buildParams(
for _, msg := range messages {
switch msg.Role {
case "system":
system = append(system, anthropic.TextBlockParam{Text: msg.Content})
// Prefer structured SystemParts for per-block cache_control.
// This enables LLM-side KV cache reuse: the static block's prefix
// hash stays stable across requests while dynamic parts change freely.
if len(msg.SystemParts) > 0 {
for _, part := range msg.SystemParts {
block := anthropic.TextBlockParam{Text: part.Text}
if part.CacheControl != nil && part.CacheControl.Type == "ephemeral" {
block.CacheControl = anthropic.NewCacheControlEphemeralParam()
}
system = append(system, block)
}
} else {
system = append(system, anthropic.TextBlockParam{Text: msg.Content})
}
case "user":
if msg.ToolCallID != "" {
anthropicMessages = append(anthropicMessages,
+10 -6
View File
@@ -43,12 +43,18 @@ func TestReadCodexCliCredentials_Valid(t *testing.T) {
}
}
// readCodexCliCredentialsErr calls ReadCodexCliCredentials and returns only the
// error, for tests that only need to assert on failure.
func readCodexCliCredentialsErr() error {
_, _, _, err := ReadCodexCliCredentials() //nolint:dogsled
return err
}
func TestReadCodexCliCredentials_MissingFile(t *testing.T) {
tmpDir := t.TempDir()
t.Setenv("CODEX_HOME", tmpDir)
_, _, _, err := ReadCodexCliCredentials()
if err == nil {
if err := readCodexCliCredentialsErr(); err == nil {
t.Fatal("expected error for missing auth.json")
}
}
@@ -64,8 +70,7 @@ func TestReadCodexCliCredentials_EmptyToken(t *testing.T) {
t.Setenv("CODEX_HOME", tmpDir)
_, _, _, err := ReadCodexCliCredentials()
if err == nil {
if err := readCodexCliCredentialsErr(); err == nil {
t.Fatal("expected error for empty access_token")
}
}
@@ -80,8 +85,7 @@ func TestReadCodexCliCredentials_InvalidJSON(t *testing.T) {
t.Setenv("CODEX_HOME", tmpDir)
_, _, _, err := ReadCodexCliCredentials()
if err == nil {
if err := readCodexCliCredentialsErr(); err == nil {
t.Fatal("expected error for invalid JSON")
}
}
+14 -2
View File
@@ -106,8 +106,8 @@ func (p *CodexProvider) Chat(
if evt.Type == "response.completed" || evt.Type == "response.failed" || evt.Type == "response.incomplete" {
evtResp := evt.Response
if evtResp.ID != "" {
copy := evtResp
resp = &copy
evtRespCopy := evtResp
resp = &evtRespCopy
}
}
}
@@ -208,6 +208,11 @@ func buildCodexParams(
for _, msg := range messages {
switch msg.Role {
case "system":
// Use the full concatenated system prompt (static + dynamic + summary)
// as instructions. This keeps behavior consistent with Anthropic and
// OpenAI-compat adapters where the complete system context lives in
// one place. Prefix caching is handled by prompt_cache_key below,
// not by splitting content across instructions vs input messages.
instructions = msg.Content
case "user":
if msg.ToolCallID != "" {
@@ -289,6 +294,13 @@ func buildCodexParams(
params.Instructions = openai.Opt(defaultCodexInstructions)
}
// Prompt caching: pass a stable cache key so OpenAI can bucket requests
// and reuse prefix KV cache across calls with the same key.
// See: https://platform.openai.com/docs/guides/prompt-caching
if cacheKey, ok := options["prompt_cache_key"].(string); ok && cacheKey != "" {
params.PromptCacheKey = openai.Opt(cacheKey)
}
if len(tools) > 0 || enableWebSearch {
params.Tools = translateToolsForCodex(tools, enableWebSearch)
}
+1 -2
View File
@@ -44,7 +44,6 @@ func NewGitHubCopilotProvider(uri string, connectMode string, model string) (*Gi
Hooks: &copilot.SessionHooks{},
})
if err != nil {
client.Stop()
return nil, fmt.Errorf("create session failed: %w", err)
}
@@ -101,7 +100,7 @@ func (p *GitHubCopilotProvider) Chat(
return nil, fmt.Errorf("provider closed")
}
resp, err := session.SendAndWait(ctx, copilot.MessageOptions{
resp, _ := session.SendAndWait(ctx, copilot.MessageOptions{
Prompt: string(fullcontent),
})
+35 -1
View File
@@ -77,7 +77,7 @@ func (p *Provider) Chat(
requestBody := map[string]any{
"model": model,
"messages": messages,
"messages": stripSystemParts(messages),
}
if len(tools) > 0 {
@@ -111,6 +111,14 @@ func (p *Provider) Chat(
}
}
// Prompt caching: pass a stable cache key so OpenAI can bucket requests
// with the same key and reuse prefix KV cache across calls.
// The key is typically the agent ID — stable per agent, shared across requests.
// See: https://platform.openai.com/docs/guides/prompt-caching
if cacheKey, ok := options["prompt_cache_key"].(string); ok && cacheKey != "" {
requestBody["prompt_cache_key"] = cacheKey
}
jsonData, err := json.Marshal(requestBody)
if err != nil {
return nil, fmt.Errorf("failed to marshal request: %w", err)
@@ -230,6 +238,32 @@ func parseResponse(body []byte) (*LLMResponse, error) {
}, nil
}
// openaiMessage is the wire-format message for OpenAI-compatible APIs.
// It mirrors protocoltypes.Message but omits SystemParts, which is an
// internal field that would be unknown to third-party endpoints.
type openaiMessage struct {
Role string `json:"role"`
Content string `json:"content"`
ToolCalls []ToolCall `json:"tool_calls,omitempty"`
ToolCallID string `json:"tool_call_id,omitempty"`
}
// stripSystemParts converts []Message to []openaiMessage, dropping the
// SystemParts field so it doesn't leak into the JSON payload sent to
// OpenAI-compatible APIs (some strict endpoints reject unknown fields).
func stripSystemParts(messages []Message) []openaiMessage {
out := make([]openaiMessage, len(messages))
for i, m := range messages {
out[i] = openaiMessage{
Role: m.Role,
Content: m.Content,
ToolCalls: m.ToolCalls,
ToolCallID: m.ToolCallID,
}
}
return out
}
func normalizeModel(model, apiBase string) string {
idx := strings.Index(model, "/")
if idx == -1 {
+21 -5
View File
@@ -38,12 +38,28 @@ type UsageInfo struct {
TotalTokens int `json:"total_tokens"`
}
// CacheControl marks a content block for LLM-side prefix caching.
// Currently only "ephemeral" is supported (used by Anthropic).
type CacheControl struct {
Type string `json:"type"` // "ephemeral"
}
// ContentBlock represents a structured segment of a system message.
// Adapters that understand SystemParts can use these blocks to set
// per-block cache control (e.g. Anthropic's cache_control: ephemeral).
type ContentBlock struct {
Type string `json:"type"` // "text"
Text string `json:"text"`
CacheControl *CacheControl `json:"cache_control,omitempty"`
}
type Message struct {
Role string `json:"role"`
Content string `json:"content"`
ReasoningContent string `json:"reasoning_content,omitempty"`
ToolCalls []ToolCall `json:"tool_calls,omitempty"`
ToolCallID string `json:"tool_call_id,omitempty"`
Role string `json:"role"`
Content string `json:"content"`
ReasoningContent string `json:"reasoning_content,omitempty"`
SystemParts []ContentBlock `json:"system_parts,omitempty"` // structured system blocks for cache-aware adapters
ToolCalls []ToolCall `json:"tool_calls,omitempty"`
ToolCallID string `json:"tool_call_id,omitempty"`
}
type ToolDefinition struct {
+2
View File
@@ -17,6 +17,8 @@ type (
ToolFunctionDefinition = protocoltypes.ToolFunctionDefinition
ExtraContent = protocoltypes.ExtraContent
GoogleExtra = protocoltypes.GoogleExtra
ContentBlock = protocoltypes.ContentBlock
CacheControl = protocoltypes.CacheControl
)
type LLMProvider interface {
+27 -12
View File
@@ -3,6 +3,7 @@ package tools
import (
"context"
"fmt"
"sort"
"sync"
"time"
@@ -107,13 +108,27 @@ func (r *ToolRegistry) ExecuteWithContext(
return result
}
// sortedToolNames returns tool names in sorted order for deterministic iteration.
// This is critical for KV cache stability: non-deterministic map iteration would
// produce different system prompts and tool definitions on each call, invalidating
// the LLM's prefix cache even when no tools have changed.
func (r *ToolRegistry) sortedToolNames() []string {
names := make([]string, 0, len(r.tools))
for name := range r.tools {
names = append(names, name)
}
sort.Strings(names)
return names
}
func (r *ToolRegistry) GetDefinitions() []map[string]any {
r.mu.RLock()
defer r.mu.RUnlock()
definitions := make([]map[string]any, 0, len(r.tools))
for _, tool := range r.tools {
definitions = append(definitions, ToolToSchema(tool))
sorted := r.sortedToolNames()
definitions := make([]map[string]any, 0, len(sorted))
for _, name := range sorted {
definitions = append(definitions, ToolToSchema(r.tools[name]))
}
return definitions
}
@@ -124,8 +139,10 @@ func (r *ToolRegistry) ToProviderDefs() []providers.ToolDefinition {
r.mu.RLock()
defer r.mu.RUnlock()
definitions := make([]providers.ToolDefinition, 0, len(r.tools))
for _, tool := range r.tools {
sorted := r.sortedToolNames()
definitions := make([]providers.ToolDefinition, 0, len(sorted))
for _, name := range sorted {
tool := r.tools[name]
schema := ToolToSchema(tool)
// Safely extract nested values with type checks
@@ -155,11 +172,7 @@ func (r *ToolRegistry) List() []string {
r.mu.RLock()
defer r.mu.RUnlock()
names := make([]string, 0, len(r.tools))
for name := range r.tools {
names = append(names, name)
}
return names
return r.sortedToolNames()
}
// Count returns the number of registered tools.
@@ -175,8 +188,10 @@ func (r *ToolRegistry) GetSummaries() []string {
r.mu.RLock()
defer r.mu.RUnlock()
summaries := make([]string, 0, len(r.tools))
for _, tool := range r.tools {
sorted := r.sortedToolNames()
summaries := make([]string, 0, len(sorted))
for _, name := range sorted {
tool := r.tools[name]
summaries = append(summaries, fmt.Sprintf("- `%s` - %s", tool.Name(), tool.Description()))
}
return summaries
+1 -2
View File
@@ -76,10 +76,9 @@ func NewExecTool(workingDir string, restrict bool) *ExecTool {
func NewExecToolWithConfig(workingDir string, restrict bool, config *config.Config) *ExecTool {
denyPatterns := make([]*regexp.Regexp, 0)
enableDenyPatterns := true
if config != nil {
execConfig := config.Tools.Exec
enableDenyPatterns = execConfig.EnableDenyPatterns
enableDenyPatterns := execConfig.EnableDenyPatterns
if enableDenyPatterns {
denyPatterns = append(denyPatterns, defaultDenyPatterns...)
if len(execConfig.CustomDenyPatterns) > 0 {
+3 -2
View File
@@ -3,6 +3,7 @@ package tools
import (
"context"
"fmt"
"strings"
)
type SpawnTool struct {
@@ -66,8 +67,8 @@ func (t *SpawnTool) SetAllowlistChecker(check func(targetAgentID string) bool) {
func (t *SpawnTool) Execute(ctx context.Context, args map[string]any) *ToolResult {
task, ok := args["task"].(string)
if !ok {
return ErrorResult("task is required")
if !ok || strings.TrimSpace(task) == "" {
return ErrorResult("task is required and must be a non-empty string")
}
label, _ := args["label"].(string)
+79
View File
@@ -0,0 +1,79 @@
package tools
import (
"context"
"strings"
"testing"
)
func TestSpawnTool_Execute_EmptyTask(t *testing.T) {
provider := &MockLLMProvider{}
manager := NewSubagentManager(provider, "test-model", "/tmp/test", nil)
tool := NewSpawnTool(manager)
ctx := context.Background()
tests := []struct {
name string
args map[string]any
}{
{"empty string", map[string]any{"task": ""}},
{"whitespace only", map[string]any{"task": " "}},
{"tabs and newlines", map[string]any{"task": "\t\n "}},
{"missing task key", map[string]any{"label": "test"}},
{"wrong type", map[string]any{"task": 123}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := tool.Execute(ctx, tt.args)
if result == nil {
t.Fatal("Result should not be nil")
}
if !result.IsError {
t.Error("Expected error for invalid task parameter")
}
if !strings.Contains(result.ForLLM, "task is required") {
t.Errorf("Error message should mention 'task is required', got: %s", result.ForLLM)
}
})
}
}
func TestSpawnTool_Execute_ValidTask(t *testing.T) {
provider := &MockLLMProvider{}
manager := NewSubagentManager(provider, "test-model", "/tmp/test", nil)
tool := NewSpawnTool(manager)
ctx := context.Background()
args := map[string]any{
"task": "Write a haiku about coding",
"label": "haiku-task",
}
result := tool.Execute(ctx, args)
if result == nil {
t.Fatal("Result should not be nil")
}
if result.IsError {
t.Errorf("Expected success for valid task, got error: %s", result.ForLLM)
}
if !result.Async {
t.Error("SpawnTool should return async result")
}
}
func TestSpawnTool_Execute_NilManager(t *testing.T) {
tool := NewSpawnTool(nil)
ctx := context.Background()
args := map[string]any{"task": "test task"}
result := tool.Execute(ctx, args)
if !result.IsError {
t.Error("Expected error for nil manager")
}
if !strings.Contains(result.ForLLM, "Subagent manager not configured") {
t.Errorf("Error message should mention manager not configured, got: %s", result.ForLLM)
}
}
+6 -6
View File
@@ -132,12 +132,12 @@ After completing the task, provide a clear summary of what was done.`
},
}
// Check if context is already cancelled before starting
// Check if context is already canceled before starting
select {
case <-ctx.Done():
sm.mu.Lock()
task.Status = "cancelled"
task.Result = "Task cancelled before execution"
task.Status = "canceled"
task.Result = "Task canceled before execution"
sm.mu.Unlock()
return
default:
@@ -185,10 +185,10 @@ After completing the task, provide a clear summary of what was done.`
if err != nil {
task.Status = "failed"
task.Result = fmt.Sprintf("Error: %v", err)
// Check if it was cancelled
// Check if it was canceled
if ctx.Err() != nil {
task.Status = "cancelled"
task.Result = "Task cancelled during execution"
task.Status = "canceled"
task.Result = "Task canceled during execution"
}
result = &ToolResult{
ForLLM: task.Result,