mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
Merge branch 'refactor/channel-system' into main
This commit is contained in:
+44
-2
@@ -55,6 +55,8 @@ type processOptions struct {
|
||||
NoHistory bool // If true, don't load session history (for heartbeat)
|
||||
}
|
||||
|
||||
const defaultResponse = "I've completed processing but have no response to give. Increase `max_tool_iterations` in config.json."
|
||||
|
||||
func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers.LLMProvider) *AgentLoop {
|
||||
registry := NewAgentRegistry(cfg, provider)
|
||||
|
||||
@@ -330,7 +332,7 @@ func (al *AgentLoop) ProcessHeartbeat(ctx context.Context, content, channel, cha
|
||||
Channel: channel,
|
||||
ChatID: chatID,
|
||||
UserMessage: content,
|
||||
DefaultResponse: "I've completed processing but have no response to give.",
|
||||
DefaultResponse: defaultResponse,
|
||||
EnableSummary: false,
|
||||
SendResponse: false,
|
||||
NoHistory: true, // Don't load session history for heartbeat
|
||||
@@ -406,7 +408,7 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
|
||||
Channel: msg.Channel,
|
||||
ChatID: msg.ChatID,
|
||||
UserMessage: msg.Content,
|
||||
DefaultResponse: "I've completed processing but have no response to give.",
|
||||
DefaultResponse: defaultResponse,
|
||||
EnableSummary: true,
|
||||
SendResponse: false,
|
||||
})
|
||||
@@ -551,6 +553,34 @@ func (al *AgentLoop) runAgentLoop(ctx context.Context, agent *AgentInstance, opt
|
||||
return finalContent, nil
|
||||
}
|
||||
|
||||
func (al *AgentLoop) targetReasoningChannelID(channelName string) (chatID string) {
|
||||
if al.channelManager == nil {
|
||||
return ""
|
||||
}
|
||||
if ch, ok := al.channelManager.GetChannel(channelName); ok {
|
||||
return ch.ReasoningChannelID()
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (al *AgentLoop) handleReasoning(ctx context.Context, reasoningContent, channelName, channelID string) {
|
||||
if reasoningContent == "" || channelName == "" || channelID == "" {
|
||||
return
|
||||
}
|
||||
|
||||
// Check context cancellation before attempting to publish,
|
||||
// since PublishOutbound's select may race between send and ctx.Done().
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
al.bus.PublishOutbound(ctx, bus.OutboundMessage{
|
||||
Channel: channelName,
|
||||
ChatID: channelID,
|
||||
Content: reasoningContent,
|
||||
})
|
||||
}
|
||||
|
||||
// runLLMIteration executes the LLM call loop with tool handling.
|
||||
func (al *AgentLoop) runLLMIteration(
|
||||
ctx context.Context,
|
||||
@@ -677,6 +707,18 @@ func (al *AgentLoop) runLLMIteration(
|
||||
return "", iteration, fmt.Errorf("LLM call failed after retries: %w", err)
|
||||
}
|
||||
|
||||
go al.handleReasoning(ctx, response.Reasoning, opts.Channel, al.targetReasoningChannelID(opts.Channel))
|
||||
|
||||
logger.DebugCF("agent", "LLM response",
|
||||
map[string]any{
|
||||
"agent_id": agent.ID,
|
||||
"iteration": iteration,
|
||||
"content_chars": len(response.Content),
|
||||
"tool_calls": len(response.ToolCalls),
|
||||
"reasoning": response.Reasoning,
|
||||
"target_channel": al.targetReasoningChannelID(opts.Channel),
|
||||
"channel": opts.Channel,
|
||||
})
|
||||
// Check if no tool calls - we're done
|
||||
if len(response.ToolCalls) == 0 {
|
||||
finalContent = response.Content
|
||||
|
||||
@@ -9,11 +9,23 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/sipeed/picoclaw/pkg/bus"
|
||||
"github.com/sipeed/picoclaw/pkg/channels"
|
||||
"github.com/sipeed/picoclaw/pkg/config"
|
||||
"github.com/sipeed/picoclaw/pkg/providers"
|
||||
"github.com/sipeed/picoclaw/pkg/tools"
|
||||
)
|
||||
|
||||
type fakeChannel struct{ id string }
|
||||
|
||||
func (f *fakeChannel) Name() string { return "fake" }
|
||||
func (f *fakeChannel) Start(ctx context.Context) error { return nil }
|
||||
func (f *fakeChannel) Stop(ctx context.Context) error { return nil }
|
||||
func (f *fakeChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { return nil }
|
||||
func (f *fakeChannel) IsRunning() bool { return true }
|
||||
func (f *fakeChannel) IsAllowed(string) bool { return true }
|
||||
func (f *fakeChannel) IsAllowedSender(sender bus.SenderInfo) bool { return true }
|
||||
func (f *fakeChannel) ReasoningChannelID() string { return f.id }
|
||||
|
||||
func TestRecordLastChannel(t *testing.T) {
|
||||
// Create temp workspace
|
||||
tmpDir, err := os.MkdirTemp("", "agent-test-*")
|
||||
@@ -631,3 +643,158 @@ func TestAgentLoop_ContextExhaustionRetry(t *testing.T) {
|
||||
t.Errorf("Expected history to be compressed (len < 8), got %d", len(finalHistory))
|
||||
}
|
||||
}
|
||||
|
||||
func TestTargetReasoningChannelID_AllChannels(t *testing.T) {
|
||||
tmpDir, err := os.MkdirTemp("", "agent-test-*")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
cfg := &config.Config{
|
||||
Agents: config.AgentsConfig{
|
||||
Defaults: config.AgentDefaults{
|
||||
Workspace: tmpDir,
|
||||
Model: "test-model",
|
||||
MaxTokens: 4096,
|
||||
MaxToolIterations: 10,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
al := NewAgentLoop(cfg, bus.NewMessageBus(), &mockProvider{})
|
||||
chManager, err := channels.NewManager(&config.Config{}, bus.NewMessageBus(), nil)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create channel manager: %v", err)
|
||||
}
|
||||
for name, id := range map[string]string{
|
||||
"whatsapp": "rid-whatsapp",
|
||||
"telegram": "rid-telegram",
|
||||
"feishu": "rid-feishu",
|
||||
"discord": "rid-discord",
|
||||
"maixcam": "rid-maixcam",
|
||||
"qq": "rid-qq",
|
||||
"dingtalk": "rid-dingtalk",
|
||||
"slack": "rid-slack",
|
||||
"line": "rid-line",
|
||||
"onebot": "rid-onebot",
|
||||
"wecom": "rid-wecom",
|
||||
"wecom_app": "rid-wecom-app",
|
||||
} {
|
||||
chManager.RegisterChannel(name, &fakeChannel{id: id})
|
||||
}
|
||||
al.SetChannelManager(chManager)
|
||||
tests := []struct {
|
||||
channel string
|
||||
wantID string
|
||||
}{
|
||||
{channel: "whatsapp", wantID: "rid-whatsapp"},
|
||||
{channel: "telegram", wantID: "rid-telegram"},
|
||||
{channel: "feishu", wantID: "rid-feishu"},
|
||||
{channel: "discord", wantID: "rid-discord"},
|
||||
{channel: "maixcam", wantID: "rid-maixcam"},
|
||||
{channel: "qq", wantID: "rid-qq"},
|
||||
{channel: "dingtalk", wantID: "rid-dingtalk"},
|
||||
{channel: "slack", wantID: "rid-slack"},
|
||||
{channel: "line", wantID: "rid-line"},
|
||||
{channel: "onebot", wantID: "rid-onebot"},
|
||||
{channel: "wecom", wantID: "rid-wecom"},
|
||||
{channel: "wecom_app", wantID: "rid-wecom-app"},
|
||||
{channel: "unknown", wantID: ""},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.channel, func(t *testing.T) {
|
||||
got := al.targetReasoningChannelID(tt.channel)
|
||||
if got != tt.wantID {
|
||||
t.Fatalf("targetReasoningChannelID(%q) = %q, want %q", tt.channel, got, tt.wantID)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleReasoning(t *testing.T) {
|
||||
newLoop := func(t *testing.T) (*AgentLoop, *bus.MessageBus) {
|
||||
t.Helper()
|
||||
tmpDir, err := os.MkdirTemp("", "agent-test-*")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create temp dir: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { _ = os.RemoveAll(tmpDir) })
|
||||
cfg := &config.Config{
|
||||
Agents: config.AgentsConfig{
|
||||
Defaults: config.AgentDefaults{
|
||||
Workspace: tmpDir,
|
||||
Model: "test-model",
|
||||
MaxTokens: 4096,
|
||||
MaxToolIterations: 10,
|
||||
},
|
||||
},
|
||||
}
|
||||
msgBus := bus.NewMessageBus()
|
||||
return NewAgentLoop(cfg, msgBus, &mockProvider{}), msgBus
|
||||
}
|
||||
|
||||
t.Run("skips when any required field is empty", func(t *testing.T) {
|
||||
al, msgBus := newLoop(t)
|
||||
al.handleReasoning(context.Background(), "reasoning", "telegram", "")
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
|
||||
defer cancel()
|
||||
if msg, ok := msgBus.SubscribeOutbound(ctx); ok {
|
||||
t.Fatalf("expected no outbound message, got %+v", msg)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("publishes one message for non telegram", func(t *testing.T) {
|
||||
al, msgBus := newLoop(t)
|
||||
al.handleReasoning(context.Background(), "hello reasoning", "slack", "channel-1")
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
|
||||
defer cancel()
|
||||
msg, ok := msgBus.SubscribeOutbound(ctx)
|
||||
if !ok {
|
||||
t.Fatal("expected an outbound message")
|
||||
}
|
||||
if msg.Channel != "slack" || msg.ChatID != "channel-1" || msg.Content != "hello reasoning" {
|
||||
t.Fatalf("unexpected outbound message: %+v", msg)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("publishes one message for telegram", func(t *testing.T) {
|
||||
al, msgBus := newLoop(t)
|
||||
reasoning := "hello telegram reasoning"
|
||||
al.handleReasoning(context.Background(), reasoning, "telegram", "tg-chat")
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
|
||||
defer cancel()
|
||||
msg, ok := msgBus.SubscribeOutbound(ctx)
|
||||
if !ok {
|
||||
t.Fatal("expected outbound message")
|
||||
}
|
||||
|
||||
if msg.Channel != "telegram" {
|
||||
t.Fatalf("expected telegram channel message, got %+v", msg)
|
||||
}
|
||||
if msg.ChatID != "tg-chat" {
|
||||
t.Fatalf("expected chatID tg-chat, got %+v", msg)
|
||||
}
|
||||
if msg.Content != reasoning {
|
||||
t.Fatalf("content mismatch: got %q want %q", msg.Content, reasoning)
|
||||
}
|
||||
})
|
||||
t.Run("expired ctx", func(t *testing.T) {
|
||||
al, msgBus := newLoop(t)
|
||||
reasoning := "hello telegram reasoning"
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
al.handleReasoning(ctx, reasoning, "telegram", "tg-chat")
|
||||
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 200*time.Millisecond)
|
||||
defer cancel()
|
||||
msg, ok := msgBus.SubscribeOutbound(ctx)
|
||||
if ok {
|
||||
t.Fatalf("expected no outbound message, got %+v", msg)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
+10
-3
@@ -12,6 +12,8 @@ import (
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/sipeed/picoclaw/pkg/fileutil"
|
||||
)
|
||||
|
||||
// MemoryStore manages persistent memory for the agent.
|
||||
@@ -58,7 +60,9 @@ func (ms *MemoryStore) ReadLongTerm() string {
|
||||
|
||||
// WriteLongTerm writes content to the long-term memory file (MEMORY.md).
|
||||
func (ms *MemoryStore) WriteLongTerm(content string) error {
|
||||
return os.WriteFile(ms.memoryFile, []byte(content), 0o644)
|
||||
// Use unified atomic write utility with explicit sync for flash storage reliability.
|
||||
// Using 0o600 (owner read/write only) for secure default permissions.
|
||||
return fileutil.WriteFileAtomic(ms.memoryFile, []byte(content), 0o600)
|
||||
}
|
||||
|
||||
// ReadToday reads today's daily note.
|
||||
@@ -78,7 +82,9 @@ func (ms *MemoryStore) AppendToday(content string) error {
|
||||
|
||||
// Ensure month directory exists
|
||||
monthDir := filepath.Dir(todayFile)
|
||||
os.MkdirAll(monthDir, 0o755)
|
||||
if err := os.MkdirAll(monthDir, 0o755); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var existingContent string
|
||||
if data, err := os.ReadFile(todayFile); err == nil {
|
||||
@@ -95,7 +101,8 @@ func (ms *MemoryStore) AppendToday(content string) error {
|
||||
newContent = existingContent + "\n" + content
|
||||
}
|
||||
|
||||
return os.WriteFile(todayFile, []byte(newContent), 0o644)
|
||||
// Use unified atomic write utility with explicit sync for flash storage reliability.
|
||||
return fileutil.WriteFileAtomic(todayFile, []byte(newContent), 0o600)
|
||||
}
|
||||
|
||||
// GetRecentDailyNotes returns daily notes from the last N days.
|
||||
|
||||
Reference in New Issue
Block a user