package agent import ( "context" "fmt" "os" "path/filepath" "slices" "strings" "testing" "time" "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/channels" "github.com/sipeed/picoclaw/pkg/config" "github.com/sipeed/picoclaw/pkg/media" "github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/routing" "github.com/sipeed/picoclaw/pkg/tools" ) type fakeChannel struct{ id string } func (f *fakeChannel) Name() string { return "fake" } func (f *fakeChannel) Start(ctx context.Context) error { return nil } func (f *fakeChannel) Stop(ctx context.Context) error { return nil } func (f *fakeChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { return nil } func (f *fakeChannel) IsRunning() bool { return true } func (f *fakeChannel) IsAllowed(string) bool { return true } func (f *fakeChannel) IsAllowedSender(sender bus.SenderInfo) bool { return true } func (f *fakeChannel) ReasoningChannelID() string { return f.id } func newTestAgentLoop( t *testing.T, ) (al *AgentLoop, cfg *config.Config, msgBus *bus.MessageBus, provider *mockProvider, cleanup func()) { t.Helper() tmpDir, err := os.MkdirTemp("", "agent-test-*") if err != nil { t.Fatalf("Failed to create temp dir: %v", err) } cfg = &config.Config{ Agents: config.AgentsConfig{ Defaults: config.AgentDefaults{ Workspace: tmpDir, Model: "test-model", MaxTokens: 4096, MaxToolIterations: 10, }, }, } msgBus = bus.NewMessageBus() provider = &mockProvider{} al = NewAgentLoop(cfg, msgBus, provider) return al, cfg, msgBus, provider, func() { os.RemoveAll(tmpDir) } } func TestRecordLastChannel(t *testing.T) { al, cfg, msgBus, provider, cleanup := newTestAgentLoop(t) defer cleanup() testChannel := "test-channel" if err := al.RecordLastChannel(testChannel); err != nil { t.Fatalf("RecordLastChannel failed: %v", err) } if got := al.state.GetLastChannel(); got != testChannel { t.Errorf("Expected channel '%s', got '%s'", testChannel, got) } al2 := NewAgentLoop(cfg, msgBus, provider) if got := al2.state.GetLastChannel(); got != testChannel { t.Errorf("Expected persistent channel '%s', got '%s'", testChannel, got) } } func TestRecordLastChatID(t *testing.T) { al, cfg, msgBus, provider, cleanup := newTestAgentLoop(t) defer cleanup() testChatID := "test-chat-id-123" if err := al.RecordLastChatID(testChatID); err != nil { t.Fatalf("RecordLastChatID failed: %v", err) } if got := al.state.GetLastChatID(); got != testChatID { t.Errorf("Expected chat ID '%s', got '%s'", testChatID, got) } al2 := NewAgentLoop(cfg, msgBus, provider) if got := al2.state.GetLastChatID(); got != testChatID { t.Errorf("Expected persistent chat ID '%s', got '%s'", testChatID, got) } } func TestNewAgentLoop_StateInitialized(t *testing.T) { // Create temp workspace tmpDir, err := os.MkdirTemp("", "agent-test-*") if err != nil { t.Fatalf("Failed to create temp dir: %v", err) } defer os.RemoveAll(tmpDir) // Create test config cfg := &config.Config{ Agents: config.AgentsConfig{ Defaults: config.AgentDefaults{ Workspace: tmpDir, Model: "test-model", MaxTokens: 4096, MaxToolIterations: 10, }, }, } // Create agent loop msgBus := bus.NewMessageBus() provider := &mockProvider{} al := NewAgentLoop(cfg, msgBus, provider) // Verify state manager is initialized if al.state == nil { t.Error("Expected state manager to be initialized") } // Verify state directory was created stateDir := filepath.Join(tmpDir, "state") if _, err := os.Stat(stateDir); os.IsNotExist(err) { t.Error("Expected state directory to exist") } } // TestToolRegistry_ToolRegistration verifies tools can be registered and retrieved func TestToolRegistry_ToolRegistration(t *testing.T) { tmpDir, err := os.MkdirTemp("", "agent-test-*") if err != nil { t.Fatalf("Failed to create temp dir: %v", err) } defer os.RemoveAll(tmpDir) cfg := &config.Config{ Agents: config.AgentsConfig{ Defaults: config.AgentDefaults{ Workspace: tmpDir, Model: "test-model", MaxTokens: 4096, MaxToolIterations: 10, }, }, } msgBus := bus.NewMessageBus() provider := &mockProvider{} al := NewAgentLoop(cfg, msgBus, provider) // Register a custom tool customTool := &mockCustomTool{} al.RegisterTool(customTool) // Verify tool is registered by checking it doesn't panic on GetStartupInfo // (actual tool retrieval is tested in tools package tests) info := al.GetStartupInfo() toolsInfo := info["tools"].(map[string]any) toolsList := toolsInfo["names"].([]string) // Check that our custom tool name is in the list found := slices.Contains(toolsList, "mock_custom") if !found { t.Error("Expected custom tool to be registered") } } // TestToolContext_Updates verifies tool context helpers work correctly func TestToolContext_Updates(t *testing.T) { ctx := tools.WithToolContext(context.Background(), "telegram", "chat-42") if got := tools.ToolChannel(ctx); got != "telegram" { t.Errorf("expected channel 'telegram', got %q", got) } if got := tools.ToolChatID(ctx); got != "chat-42" { t.Errorf("expected chatID 'chat-42', got %q", got) } // Empty context returns empty strings if got := tools.ToolChannel(context.Background()); got != "" { t.Errorf("expected empty channel from bare context, got %q", got) } } // TestToolRegistry_GetDefinitions verifies tool definitions can be retrieved func TestToolRegistry_GetDefinitions(t *testing.T) { tmpDir, err := os.MkdirTemp("", "agent-test-*") if err != nil { t.Fatalf("Failed to create temp dir: %v", err) } defer os.RemoveAll(tmpDir) cfg := &config.Config{ Agents: config.AgentsConfig{ Defaults: config.AgentDefaults{ Workspace: tmpDir, Model: "test-model", MaxTokens: 4096, MaxToolIterations: 10, }, }, } msgBus := bus.NewMessageBus() provider := &mockProvider{} al := NewAgentLoop(cfg, msgBus, provider) // Register a test tool and verify it shows up in startup info testTool := &mockCustomTool{} al.RegisterTool(testTool) info := al.GetStartupInfo() toolsInfo := info["tools"].(map[string]any) toolsList := toolsInfo["names"].([]string) // Check that our custom tool name is in the list found := slices.Contains(toolsList, "mock_custom") if !found { t.Error("Expected custom tool to be registered") } } // TestAgentLoop_GetStartupInfo verifies startup info contains tools func TestAgentLoop_GetStartupInfo(t *testing.T) { tmpDir, err := os.MkdirTemp("", "agent-test-*") if err != nil { t.Fatalf("Failed to create temp dir: %v", err) } defer os.RemoveAll(tmpDir) cfg := config.DefaultConfig() cfg.Agents.Defaults.Workspace = tmpDir cfg.Agents.Defaults.Model = "test-model" cfg.Agents.Defaults.MaxTokens = 4096 cfg.Agents.Defaults.MaxToolIterations = 10 msgBus := bus.NewMessageBus() provider := &mockProvider{} al := NewAgentLoop(cfg, msgBus, provider) info := al.GetStartupInfo() // Verify tools info exists toolsInfo, ok := info["tools"] if !ok { t.Fatal("Expected 'tools' key in startup info") } toolsMap, ok := toolsInfo.(map[string]any) if !ok { t.Fatal("Expected 'tools' to be a map") } count, ok := toolsMap["count"] if !ok { t.Fatal("Expected 'count' in tools info") } // Should have default tools registered if count.(int) == 0 { t.Error("Expected at least some tools to be registered") } } // TestAgentLoop_Stop verifies Stop() sets running to false func TestAgentLoop_Stop(t *testing.T) { tmpDir, err := os.MkdirTemp("", "agent-test-*") if err != nil { t.Fatalf("Failed to create temp dir: %v", err) } defer os.RemoveAll(tmpDir) cfg := &config.Config{ Agents: config.AgentsConfig{ Defaults: config.AgentDefaults{ Workspace: tmpDir, Model: "test-model", MaxTokens: 4096, MaxToolIterations: 10, }, }, } msgBus := bus.NewMessageBus() provider := &mockProvider{} al := NewAgentLoop(cfg, msgBus, provider) // Note: running is only set to true when Run() is called // We can't test that without starting the event loop // Instead, verify the Stop method can be called safely al.Stop() // Verify running is false (initial state or after Stop) if al.running.Load() { t.Error("Expected agent to be stopped (or never started)") } } // Mock implementations for testing type simpleMockProvider struct { response string } func (m *simpleMockProvider) Chat( ctx context.Context, messages []providers.Message, tools []providers.ToolDefinition, model string, opts map[string]any, ) (*providers.LLMResponse, error) { return &providers.LLMResponse{ Content: m.response, ToolCalls: []providers.ToolCall{}, }, nil } func (m *simpleMockProvider) GetDefaultModel() string { return "mock-model" } type countingMockProvider struct { response string calls int } func (m *countingMockProvider) Chat( ctx context.Context, messages []providers.Message, tools []providers.ToolDefinition, model string, opts map[string]any, ) (*providers.LLMResponse, error) { m.calls++ return &providers.LLMResponse{ Content: m.response, ToolCalls: []providers.ToolCall{}, }, nil } func (m *countingMockProvider) GetDefaultModel() string { return "counting-mock-model" } // mockCustomTool is a simple mock tool for registration testing type mockCustomTool struct{} func (m *mockCustomTool) Name() string { return "mock_custom" } func (m *mockCustomTool) Description() string { return "Mock custom tool for testing" } func (m *mockCustomTool) Parameters() map[string]any { return map[string]any{ "type": "object", "properties": map[string]any{}, } } func (m *mockCustomTool) Execute(ctx context.Context, args map[string]any) *tools.ToolResult { return tools.SilentResult("Custom tool executed") } // testHelper executes a message and returns the response type testHelper struct { al *AgentLoop } func (h testHelper) executeAndGetResponse(tb testing.TB, ctx context.Context, msg bus.InboundMessage) string { // Use a short timeout to avoid hanging timeoutCtx, cancel := context.WithTimeout(ctx, responseTimeout) defer cancel() response, err := h.al.processMessage(timeoutCtx, msg) if err != nil { tb.Fatalf("processMessage failed: %v", err) } return response } const responseTimeout = 3 * time.Second func TestProcessMessage_UsesRouteSessionKey(t *testing.T) { tmpDir, err := os.MkdirTemp("", "agent-test-*") if err != nil { t.Fatalf("Failed to create temp dir: %v", err) } defer os.RemoveAll(tmpDir) cfg := &config.Config{ Agents: config.AgentsConfig{ Defaults: config.AgentDefaults{ Workspace: tmpDir, Model: "test-model", MaxTokens: 4096, MaxToolIterations: 10, }, }, } msgBus := bus.NewMessageBus() provider := &simpleMockProvider{response: "ok"} al := NewAgentLoop(cfg, msgBus, provider) msg := bus.InboundMessage{ Channel: "telegram", SenderID: "user1", ChatID: "chat1", Content: "hello", Peer: bus.Peer{ Kind: "direct", ID: "user1", }, } route := al.registry.ResolveRoute(routing.RouteInput{ Channel: msg.Channel, Peer: extractPeer(msg), }) sessionKey := route.SessionKey defaultAgent := al.registry.GetDefaultAgent() if defaultAgent == nil { t.Fatal("No default agent found") } helper := testHelper{al: al} _ = helper.executeAndGetResponse(t, context.Background(), msg) history := defaultAgent.Sessions.GetHistory(sessionKey) if len(history) != 2 { t.Fatalf("expected session history len=2, got %d", len(history)) } if history[0].Role != "user" || history[0].Content != "hello" { t.Fatalf("unexpected first message in session: %+v", history[0]) } } func TestProcessMessage_CommandOutcomes(t *testing.T) { tmpDir, err := os.MkdirTemp("", "agent-test-*") if err != nil { t.Fatalf("Failed to create temp dir: %v", err) } defer os.RemoveAll(tmpDir) cfg := &config.Config{ Agents: config.AgentsConfig{ Defaults: config.AgentDefaults{ Workspace: tmpDir, Model: "test-model", MaxTokens: 4096, MaxToolIterations: 10, }, }, Session: config.SessionConfig{ DMScope: "per-channel-peer", }, } msgBus := bus.NewMessageBus() provider := &countingMockProvider{response: "LLM reply"} al := NewAgentLoop(cfg, msgBus, provider) helper := testHelper{al: al} baseMsg := bus.InboundMessage{ Channel: "whatsapp", SenderID: "user1", ChatID: "chat1", Peer: bus.Peer{ Kind: "direct", ID: "user1", }, } showResp := helper.executeAndGetResponse(t, context.Background(), bus.InboundMessage{ Channel: baseMsg.Channel, SenderID: baseMsg.SenderID, ChatID: baseMsg.ChatID, Content: "/show channel", Peer: baseMsg.Peer, }) if showResp != "Current Channel: whatsapp" { t.Fatalf("unexpected /show reply: %q", showResp) } if provider.calls != 0 { t.Fatalf("LLM should not be called for handled command, calls=%d", provider.calls) } fooResp := helper.executeAndGetResponse(t, context.Background(), bus.InboundMessage{ Channel: baseMsg.Channel, SenderID: baseMsg.SenderID, ChatID: baseMsg.ChatID, Content: "/foo", Peer: baseMsg.Peer, }) if fooResp != "LLM reply" { t.Fatalf("unexpected /foo reply: %q", fooResp) } if provider.calls != 1 { t.Fatalf("LLM should be called exactly once after /foo passthrough, calls=%d", provider.calls) } newResp := helper.executeAndGetResponse(t, context.Background(), bus.InboundMessage{ Channel: baseMsg.Channel, SenderID: baseMsg.SenderID, ChatID: baseMsg.ChatID, Content: "/new", Peer: baseMsg.Peer, }) if newResp != "LLM reply" { t.Fatalf("unexpected /new reply: %q", newResp) } if provider.calls != 2 { t.Fatalf("LLM should be called for passthrough /new command, calls=%d", provider.calls) } } func TestProcessMessage_SwitchModelShowModelConsistency(t *testing.T) { tmpDir, err := os.MkdirTemp("", "agent-test-*") if err != nil { t.Fatalf("Failed to create temp dir: %v", err) } defer os.RemoveAll(tmpDir) cfg := &config.Config{ Agents: config.AgentsConfig{ Defaults: config.AgentDefaults{ Workspace: tmpDir, Provider: "openai", Model: "before-switch", MaxTokens: 4096, MaxToolIterations: 10, }, }, } msgBus := bus.NewMessageBus() provider := &countingMockProvider{response: "LLM reply"} al := NewAgentLoop(cfg, msgBus, provider) helper := testHelper{al: al} switchResp := helper.executeAndGetResponse(t, context.Background(), bus.InboundMessage{ Channel: "telegram", SenderID: "user1", ChatID: "chat1", Content: "/switch model to after-switch", Peer: bus.Peer{ Kind: "direct", ID: "user1", }, }) if !strings.Contains(switchResp, "Switched model from before-switch to after-switch") { t.Fatalf("unexpected /switch reply: %q", switchResp) } showResp := helper.executeAndGetResponse(t, context.Background(), bus.InboundMessage{ Channel: "telegram", SenderID: "user1", ChatID: "chat1", Content: "/show model", Peer: bus.Peer{ Kind: "direct", ID: "user1", }, }) if !strings.Contains(showResp, "Current Model: after-switch (Provider: openai)") { t.Fatalf("unexpected /show model reply after switch: %q", showResp) } if provider.calls != 0 { t.Fatalf("LLM should not be called for /switch and /show, calls=%d", provider.calls) } } // TestToolResult_SilentToolDoesNotSendUserMessage verifies silent tools don't trigger outbound func TestToolResult_SilentToolDoesNotSendUserMessage(t *testing.T) { tmpDir, err := os.MkdirTemp("", "agent-test-*") if err != nil { t.Fatalf("Failed to create temp dir: %v", err) } defer os.RemoveAll(tmpDir) cfg := &config.Config{ Agents: config.AgentsConfig{ Defaults: config.AgentDefaults{ Workspace: tmpDir, Model: "test-model", MaxTokens: 4096, MaxToolIterations: 10, }, }, } msgBus := bus.NewMessageBus() provider := &simpleMockProvider{response: "File operation complete"} al := NewAgentLoop(cfg, msgBus, provider) helper := testHelper{al: al} // ReadFileTool returns SilentResult, which should not send user message ctx := context.Background() msg := bus.InboundMessage{ Channel: "test", SenderID: "user1", ChatID: "chat1", Content: "read test.txt", SessionKey: "test-session", } response := helper.executeAndGetResponse(t, ctx, msg) // Silent tool should return the LLM's response directly if response != "File operation complete" { t.Errorf("Expected 'File operation complete', got: %s", response) } } // TestToolResult_UserFacingToolDoesSendMessage verifies user-facing tools trigger outbound func TestToolResult_UserFacingToolDoesSendMessage(t *testing.T) { tmpDir, err := os.MkdirTemp("", "agent-test-*") if err != nil { t.Fatalf("Failed to create temp dir: %v", err) } defer os.RemoveAll(tmpDir) cfg := &config.Config{ Agents: config.AgentsConfig{ Defaults: config.AgentDefaults{ Workspace: tmpDir, Model: "test-model", MaxTokens: 4096, MaxToolIterations: 10, }, }, } msgBus := bus.NewMessageBus() provider := &simpleMockProvider{response: "Command output: hello world"} al := NewAgentLoop(cfg, msgBus, provider) helper := testHelper{al: al} // ExecTool returns UserResult, which should send user message ctx := context.Background() msg := bus.InboundMessage{ Channel: "test", SenderID: "user1", ChatID: "chat1", Content: "run hello", SessionKey: "test-session", } response := helper.executeAndGetResponse(t, ctx, msg) // User-facing tool should include the output in final response if response != "Command output: hello world" { t.Errorf("Expected 'Command output: hello world', got: %s", response) } } // failFirstMockProvider fails on the first N calls with a specific error type failFirstMockProvider struct { failures int currentCall int failError error successResp string } func (m *failFirstMockProvider) Chat( ctx context.Context, messages []providers.Message, tools []providers.ToolDefinition, model string, opts map[string]any, ) (*providers.LLMResponse, error) { m.currentCall++ if m.currentCall <= m.failures { return nil, m.failError } return &providers.LLMResponse{ Content: m.successResp, ToolCalls: []providers.ToolCall{}, }, nil } func (m *failFirstMockProvider) GetDefaultModel() string { return "mock-fail-model" } // TestAgentLoop_ContextExhaustionRetry verify that the agent retries on context errors func TestAgentLoop_ContextExhaustionRetry(t *testing.T) { tmpDir, err := os.MkdirTemp("", "agent-test-*") if err != nil { t.Fatalf("Failed to create temp dir: %v", err) } defer os.RemoveAll(tmpDir) cfg := &config.Config{ Agents: config.AgentsConfig{ Defaults: config.AgentDefaults{ Workspace: tmpDir, Model: "test-model", MaxTokens: 4096, MaxToolIterations: 10, }, }, } msgBus := bus.NewMessageBus() // Create a provider that fails once with a context error contextErr := fmt.Errorf("InvalidParameter: Total tokens of image and text exceed max message tokens") provider := &failFirstMockProvider{ failures: 1, failError: contextErr, successResp: "Recovered from context error", } al := NewAgentLoop(cfg, msgBus, provider) // Inject some history to simulate a full context sessionKey := "test-session-context" // Create dummy history history := []providers.Message{ {Role: "system", Content: "System prompt"}, {Role: "user", Content: "Old message 1"}, {Role: "assistant", Content: "Old response 1"}, {Role: "user", Content: "Old message 2"}, {Role: "assistant", Content: "Old response 2"}, {Role: "user", Content: "Trigger message"}, } defaultAgent := al.registry.GetDefaultAgent() if defaultAgent == nil { t.Fatal("No default agent found") } defaultAgent.Sessions.SetHistory(sessionKey, history) // Call ProcessDirectWithChannel // Note: ProcessDirectWithChannel calls processMessage which will execute runLLMIteration response, err := al.ProcessDirectWithChannel( context.Background(), "Trigger message", sessionKey, "test", "test-chat", ) if err != nil { t.Fatalf("Expected success after retry, got error: %v", err) } if response != "Recovered from context error" { t.Errorf("Expected 'Recovered from context error', got '%s'", response) } // We expect 2 calls: 1st failed, 2nd succeeded if provider.currentCall != 2 { t.Errorf("Expected 2 calls (1 fail + 1 success), got %d", provider.currentCall) } // Check final history length finalHistory := defaultAgent.Sessions.GetHistory(sessionKey) // We verify that the history has been modified (compressed) // Original length: 6 // Expected behavior: compression drops ~50% of history (mid slice) // We can assert that the length is NOT what it would be without compression. // Without compression: 6 + 1 (new user msg) + 1 (assistant msg) = 8 if len(finalHistory) >= 8 { t.Errorf("Expected history to be compressed (len < 8), got %d", len(finalHistory)) } } // TestProcessDirectWithChannel_TriggersMCPInitialization verifies that // ProcessDirectWithChannel triggers MCP initialization when MCP is enabled. // Note: Manager is only initialized when at least one MCP server is configured // and successfully connected. func TestProcessDirectWithChannel_TriggersMCPInitialization(t *testing.T) { tmpDir, err := os.MkdirTemp("", "agent-test-*") if err != nil { t.Fatalf("Failed to create temp dir: %v", err) } defer os.RemoveAll(tmpDir) // Test with MCP enabled but no servers - should not initialize manager cfg := &config.Config{ Agents: config.AgentsConfig{ Defaults: config.AgentDefaults{ Workspace: tmpDir, Model: "test-model", MaxTokens: 4096, MaxToolIterations: 10, }, }, Tools: config.ToolsConfig{ MCP: config.MCPConfig{ ToolConfig: config.ToolConfig{ Enabled: true, }, // No servers configured - manager should not be initialized }, }, } msgBus := bus.NewMessageBus() provider := &mockProvider{} al := NewAgentLoop(cfg, msgBus, provider) defer al.Close() if al.mcp.hasManager() { t.Fatal("expected MCP manager to be nil before first direct processing") } _, err = al.ProcessDirectWithChannel( context.Background(), "hello", "session-1", "cli", "direct", ) if err != nil { t.Fatalf("ProcessDirectWithChannel failed: %v", err) } // Manager should not be initialized when no servers are configured if al.mcp.hasManager() { t.Fatal("expected MCP manager to be nil when no servers are configured") } } func TestTargetReasoningChannelID_AllChannels(t *testing.T) { tmpDir, err := os.MkdirTemp("", "agent-test-*") if err != nil { t.Fatalf("Failed to create temp dir: %v", err) } defer os.RemoveAll(tmpDir) cfg := &config.Config{ Agents: config.AgentsConfig{ Defaults: config.AgentDefaults{ Workspace: tmpDir, Model: "test-model", MaxTokens: 4096, MaxToolIterations: 10, }, }, } al := NewAgentLoop(cfg, bus.NewMessageBus(), &mockProvider{}) chManager, err := channels.NewManager(&config.Config{}, bus.NewMessageBus(), nil) if err != nil { t.Fatalf("Failed to create channel manager: %v", err) } for name, id := range map[string]string{ "whatsapp": "rid-whatsapp", "telegram": "rid-telegram", "feishu": "rid-feishu", "discord": "rid-discord", "maixcam": "rid-maixcam", "qq": "rid-qq", "dingtalk": "rid-dingtalk", "slack": "rid-slack", "line": "rid-line", "onebot": "rid-onebot", "wecom": "rid-wecom", "wecom_app": "rid-wecom-app", } { chManager.RegisterChannel(name, &fakeChannel{id: id}) } al.SetChannelManager(chManager) tests := []struct { channel string wantID string }{ {channel: "whatsapp", wantID: "rid-whatsapp"}, {channel: "telegram", wantID: "rid-telegram"}, {channel: "feishu", wantID: "rid-feishu"}, {channel: "discord", wantID: "rid-discord"}, {channel: "maixcam", wantID: "rid-maixcam"}, {channel: "qq", wantID: "rid-qq"}, {channel: "dingtalk", wantID: "rid-dingtalk"}, {channel: "slack", wantID: "rid-slack"}, {channel: "line", wantID: "rid-line"}, {channel: "onebot", wantID: "rid-onebot"}, {channel: "wecom", wantID: "rid-wecom"}, {channel: "wecom_app", wantID: "rid-wecom-app"}, {channel: "unknown", wantID: ""}, } for _, tt := range tests { t.Run(tt.channel, func(t *testing.T) { got := al.targetReasoningChannelID(tt.channel) if got != tt.wantID { t.Fatalf("targetReasoningChannelID(%q) = %q, want %q", tt.channel, got, tt.wantID) } }) } } func TestHandleReasoning(t *testing.T) { newLoop := func(t *testing.T) (*AgentLoop, *bus.MessageBus) { t.Helper() tmpDir, err := os.MkdirTemp("", "agent-test-*") if err != nil { t.Fatalf("Failed to create temp dir: %v", err) } t.Cleanup(func() { _ = os.RemoveAll(tmpDir) }) cfg := &config.Config{ Agents: config.AgentsConfig{ Defaults: config.AgentDefaults{ Workspace: tmpDir, Model: "test-model", MaxTokens: 4096, MaxToolIterations: 10, }, }, } msgBus := bus.NewMessageBus() return NewAgentLoop(cfg, msgBus, &mockProvider{}), msgBus } t.Run("skips when any required field is empty", func(t *testing.T) { al, msgBus := newLoop(t) al.handleReasoning(context.Background(), "reasoning", "telegram", "") ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) defer cancel() if msg, ok := msgBus.SubscribeOutbound(ctx); ok { t.Fatalf("expected no outbound message, got %+v", msg) } }) t.Run("publishes one message for non telegram", func(t *testing.T) { al, msgBus := newLoop(t) al.handleReasoning(context.Background(), "hello reasoning", "slack", "channel-1") ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() msg, ok := msgBus.SubscribeOutbound(ctx) if !ok { t.Fatal("expected an outbound message") } if msg.Channel != "slack" || msg.ChatID != "channel-1" || msg.Content != "hello reasoning" { t.Fatalf("unexpected outbound message: %+v", msg) } }) t.Run("publishes one message for telegram", func(t *testing.T) { al, msgBus := newLoop(t) reasoning := "hello telegram reasoning" al.handleReasoning(context.Background(), reasoning, "telegram", "tg-chat") ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() msg, ok := msgBus.SubscribeOutbound(ctx) if !ok { t.Fatal("expected outbound message") } if msg.Channel != "telegram" { t.Fatalf("expected telegram channel message, got %+v", msg) } if msg.ChatID != "tg-chat" { t.Fatalf("expected chatID tg-chat, got %+v", msg) } if msg.Content != reasoning { t.Fatalf("content mismatch: got %q want %q", msg.Content, reasoning) } }) t.Run("expired ctx", func(t *testing.T) { al, msgBus := newLoop(t) reasoning := "hello telegram reasoning" ctx, cancel := context.WithCancel(context.Background()) cancel() al.handleReasoning(ctx, reasoning, "telegram", "tg-chat") ctx, cancel = context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() msg, ok := msgBus.SubscribeOutbound(ctx) if ok { t.Fatalf("expected no outbound message, got %+v", msg) } }) t.Run("returns promptly when bus is full", func(t *testing.T) { al, msgBus := newLoop(t) // Fill the outbound bus buffer until a publish would block. // Use a short timeout to detect when the buffer is full, // rather than hardcoding the buffer size. for i := 0; ; i++ { fillCtx, fillCancel := context.WithTimeout(context.Background(), 50*time.Millisecond) err := msgBus.PublishOutbound(fillCtx, bus.OutboundMessage{ Channel: "filler", ChatID: "filler", Content: fmt.Sprintf("filler-%d", i), }) fillCancel() if err != nil { // Buffer is full (timed out trying to send). break } } // Use a short-deadline parent context to bound the test. ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() start := time.Now() al.handleReasoning(ctx, "should timeout", "slack", "channel-full") elapsed := time.Since(start) // handleReasoning uses a 5s internal timeout, but the parent ctx // expires in 500ms. It should return within ~500ms, not 5s. if elapsed > 2*time.Second { t.Fatalf("handleReasoning blocked too long (%v); expected prompt return", elapsed) } // Drain the bus and verify the reasoning message was NOT published // (it should have been dropped due to timeout). drainCtx, drainCancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer drainCancel() foundReasoning := false for { msg, ok := msgBus.SubscribeOutbound(drainCtx) if !ok { break } if msg.Content == "should timeout" { foundReasoning = true } } if foundReasoning { t.Fatal("expected reasoning message to be dropped when bus is full, but it was published") } }) } func TestResolveMediaRefs_ResolvesToBase64(t *testing.T) { store := media.NewFileMediaStore() dir := t.TempDir() // Create a minimal valid PNG (8-byte header is enough for filetype detection) pngPath := filepath.Join(dir, "test.png") // PNG magic: 0x89 P N G \r \n 0x1A \n + minimal IHDR pngHeader := []byte{ 0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A, // PNG signature 0x00, 0x00, 0x00, 0x0D, // IHDR length 0x49, 0x48, 0x44, 0x52, // "IHDR" 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x08, 0x02, // 1x1 RGB 0x00, 0x00, 0x00, // no interlace 0x90, 0x77, 0x53, 0xDE, // CRC } if err := os.WriteFile(pngPath, pngHeader, 0o644); err != nil { t.Fatal(err) } ref, err := store.Store(pngPath, media.MediaMeta{}, "test") if err != nil { t.Fatal(err) } messages := []providers.Message{ {Role: "user", Content: "describe this", Media: []string{ref}}, } result := resolveMediaRefs(messages, store, config.DefaultMaxMediaSize) if len(result[0].Media) != 1 { t.Fatalf("expected 1 resolved media, got %d", len(result[0].Media)) } if !strings.HasPrefix(result[0].Media[0], "data:image/png;base64,") { t.Fatalf("expected data:image/png;base64, prefix, got %q", result[0].Media[0][:40]) } } func TestResolveMediaRefs_SkipsOversizedFile(t *testing.T) { store := media.NewFileMediaStore() dir := t.TempDir() bigPath := filepath.Join(dir, "big.png") // Write PNG header + padding to exceed limit data := make([]byte, 1024+1) // 1KB + 1 byte copy(data, []byte{0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A}) if err := os.WriteFile(bigPath, data, 0o644); err != nil { t.Fatal(err) } ref, _ := store.Store(bigPath, media.MediaMeta{}, "test") messages := []providers.Message{ {Role: "user", Content: "hi", Media: []string{ref}}, } // Use a tiny limit (1KB) so the file is oversized result := resolveMediaRefs(messages, store, 1024) if len(result[0].Media) != 0 { t.Fatalf("expected 0 media (oversized), got %d", len(result[0].Media)) } } func TestResolveMediaRefs_SkipsUnknownType(t *testing.T) { store := media.NewFileMediaStore() dir := t.TempDir() txtPath := filepath.Join(dir, "readme.txt") if err := os.WriteFile(txtPath, []byte("hello world"), 0o644); err != nil { t.Fatal(err) } ref, _ := store.Store(txtPath, media.MediaMeta{}, "test") messages := []providers.Message{ {Role: "user", Content: "hi", Media: []string{ref}}, } result := resolveMediaRefs(messages, store, config.DefaultMaxMediaSize) if len(result[0].Media) != 0 { t.Fatalf("expected 0 media (unknown type), got %d", len(result[0].Media)) } } func TestResolveMediaRefs_PassesThroughNonMediaRefs(t *testing.T) { messages := []providers.Message{ {Role: "user", Content: "hi", Media: []string{"https://example.com/img.png"}}, } result := resolveMediaRefs(messages, nil, config.DefaultMaxMediaSize) if len(result[0].Media) != 1 || result[0].Media[0] != "https://example.com/img.png" { t.Fatalf("expected passthrough of non-media:// URL, got %v", result[0].Media) } } func TestResolveMediaRefs_DoesNotMutateOriginal(t *testing.T) { store := media.NewFileMediaStore() dir := t.TempDir() pngPath := filepath.Join(dir, "test.png") pngHeader := []byte{ 0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A, 0x00, 0x00, 0x00, 0x0D, 0x49, 0x48, 0x44, 0x52, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x08, 0x02, 0x00, 0x00, 0x00, 0x90, 0x77, 0x53, 0xDE, } os.WriteFile(pngPath, pngHeader, 0o644) ref, _ := store.Store(pngPath, media.MediaMeta{}, "test") original := []providers.Message{ {Role: "user", Content: "hi", Media: []string{ref}}, } originalRef := original[0].Media[0] resolveMediaRefs(original, store, config.DefaultMaxMediaSize) if original[0].Media[0] != originalRef { t.Fatal("resolveMediaRefs mutated original message slice") } } func TestResolveMediaRefs_UsesMetaContentType(t *testing.T) { store := media.NewFileMediaStore() dir := t.TempDir() // File with JPEG content but stored with explicit content type jpegPath := filepath.Join(dir, "photo") jpegHeader := []byte{0xFF, 0xD8, 0xFF, 0xE0} // JPEG magic bytes os.WriteFile(jpegPath, jpegHeader, 0o644) ref, _ := store.Store(jpegPath, media.MediaMeta{ContentType: "image/jpeg"}, "test") messages := []providers.Message{ {Role: "user", Content: "hi", Media: []string{ref}}, } result := resolveMediaRefs(messages, store, config.DefaultMaxMediaSize) if len(result[0].Media) != 1 { t.Fatalf("expected 1 media, got %d", len(result[0].Media)) } if !strings.HasPrefix(result[0].Media[0], "data:image/jpeg;base64,") { t.Fatalf("expected jpeg prefix, got %q", result[0].Media[0][:30]) } }