mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
9a25fad20a
* feat(wecom): add WebSocket long-connection support for WeCom AI Bot - Introduced WeComAIBotWSChannel to handle WebSocket connections. - Updated NewWeComAIBotChannel to prioritize WebSocket mode when BotID and Secret are provided. - Enhanced WeComAIBotConfig to include BotID and Secret for WebSocket mode. - Implemented message handling for text, image, voice, and mixed messages in WebSocket mode. - Added tests for WebSocket mode functionality and ensured backward compatibility with webhook mode. - Refactored existing code to improve clarity and maintainability. * feat(wecom): implement periodic processing hints and enforce WeCom stream deadline * feat(wecom): update WeCom AI Bot setup instructions and configuration parameters * feat(wecom): enhance WeCom AI Bot with image handling and media support * feat(wecom): refactor WeCom AI Bot task management to use req_id for concurrent message handling * feat(wecom): refactor WeCom AI Bot to manage request states and late replies * feat(wecom): add response timeout handling and improve WebSocket command acknowledgment * fix(wecom): improve error handling for late reply proactive push delivery * refactor(wecom): reorganize WeCom AI Bot configuration fields for improved readability * fix(wecom): update error message for websocket delivery failure in late reply proactive push * feat(wecom): implement shared HTTP clients for WeCom image handling and response URL posting * refactor(wecom): simplify image download and storage process in storeWSImage * fix(wecom): improve error logging for WebSocket message handling and proactive push delivery * fix(wecom): enhance WebSocket connection stability and task cancellation handling * fix(wecom): improve WS image message handling by ensuring proper error response and initializing mediaRefs * feat(wecom): enhance WeCom AIBot WebSocket handling with message deduplication and support for file and video messages * refactor(wecom): rename image handling functions to media handling and enhance media type support * feat(wecom): implement byte-aware content splitting for WeCom AI Bot stream messages * refactor(wecom): remove max message length constraint from WeCom AIBot WS channel
456 lines
13 KiB
Go
456 lines
13 KiB
Go
package wecom
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/sipeed/picoclaw/pkg/bus"
|
|
"github.com/sipeed/picoclaw/pkg/channels"
|
|
"github.com/sipeed/picoclaw/pkg/config"
|
|
)
|
|
|
|
// ---- Webhook mode tests ----
|
|
|
|
func TestNewWeComAIBotChannel_WebhookMode(t *testing.T) {
|
|
t.Run("success with valid config", func(t *testing.T) {
|
|
cfg := config.WeComAIBotConfig{
|
|
Enabled: true,
|
|
Token: "test_token",
|
|
EncodingAESKey: "testkey1234567890123456789012345678901234567",
|
|
WebhookPath: "/webhook/test",
|
|
}
|
|
|
|
messageBus := bus.NewMessageBus()
|
|
ch, err := NewWeComAIBotChannel(cfg, messageBus)
|
|
if err != nil {
|
|
t.Fatalf("Expected no error, got %v", err)
|
|
}
|
|
if ch == nil {
|
|
t.Fatal("Expected channel to be created")
|
|
}
|
|
if ch.Name() != "wecom_aibot" {
|
|
t.Errorf("Expected name 'wecom_aibot', got '%s'", ch.Name())
|
|
}
|
|
// Webhook mode must implement WebhookHandler.
|
|
if _, ok := ch.(channels.WebhookHandler); !ok {
|
|
t.Error("Webhook mode channel should implement WebhookHandler")
|
|
}
|
|
})
|
|
|
|
t.Run("error with missing token", func(t *testing.T) {
|
|
cfg := config.WeComAIBotConfig{
|
|
Enabled: true,
|
|
EncodingAESKey: "testkey1234567890123456789012345678901234567",
|
|
}
|
|
messageBus := bus.NewMessageBus()
|
|
_, err := NewWeComAIBotChannel(cfg, messageBus)
|
|
if err == nil {
|
|
t.Fatal("Expected error for missing token, got nil")
|
|
}
|
|
})
|
|
|
|
t.Run("error with missing encoding key", func(t *testing.T) {
|
|
cfg := config.WeComAIBotConfig{
|
|
Enabled: true,
|
|
Token: "test_token",
|
|
}
|
|
messageBus := bus.NewMessageBus()
|
|
_, err := NewWeComAIBotChannel(cfg, messageBus)
|
|
if err == nil {
|
|
t.Fatal("Expected error for missing encoding key, got nil")
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestWeComAIBotWebhookChannelStartStop(t *testing.T) {
|
|
cfg := config.WeComAIBotConfig{
|
|
Enabled: true,
|
|
Token: "test_token",
|
|
EncodingAESKey: "testkey1234567890123456789012345678901234567",
|
|
}
|
|
|
|
messageBus := bus.NewMessageBus()
|
|
ch, err := NewWeComAIBotChannel(cfg, messageBus)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create channel: %v", err)
|
|
}
|
|
|
|
ctx := context.Background()
|
|
|
|
if err := ch.Start(ctx); err != nil {
|
|
t.Fatalf("Failed to start channel: %v", err)
|
|
}
|
|
if !ch.IsRunning() {
|
|
t.Error("Expected channel to be running after Start")
|
|
}
|
|
|
|
if err := ch.Stop(ctx); err != nil {
|
|
t.Fatalf("Failed to stop channel: %v", err)
|
|
}
|
|
if ch.IsRunning() {
|
|
t.Error("Expected channel to be stopped after Stop")
|
|
}
|
|
}
|
|
|
|
func TestWeComAIBotChannelWebhookPath(t *testing.T) {
|
|
t.Run("default path", func(t *testing.T) {
|
|
cfg := config.WeComAIBotConfig{
|
|
Enabled: true,
|
|
Token: "test_token",
|
|
EncodingAESKey: "testkey1234567890123456789012345678901234567",
|
|
}
|
|
messageBus := bus.NewMessageBus()
|
|
ch, _ := NewWeComAIBotChannel(cfg, messageBus)
|
|
|
|
wh, ok := ch.(channels.WebhookHandler)
|
|
if !ok {
|
|
t.Fatal("Expected channel to implement WebhookHandler")
|
|
}
|
|
expectedPath := "/webhook/wecom-aibot"
|
|
if wh.WebhookPath() != expectedPath {
|
|
t.Errorf("Expected webhook path '%s', got '%s'", expectedPath, wh.WebhookPath())
|
|
}
|
|
})
|
|
|
|
t.Run("custom path", func(t *testing.T) {
|
|
customPath := "/custom/webhook"
|
|
cfg := config.WeComAIBotConfig{
|
|
Enabled: true,
|
|
Token: "test_token",
|
|
EncodingAESKey: "testkey1234567890123456789012345678901234567",
|
|
WebhookPath: customPath,
|
|
}
|
|
messageBus := bus.NewMessageBus()
|
|
ch, _ := NewWeComAIBotChannel(cfg, messageBus)
|
|
|
|
wh, ok := ch.(channels.WebhookHandler)
|
|
if !ok {
|
|
t.Fatal("Expected channel to implement WebhookHandler")
|
|
}
|
|
if wh.WebhookPath() != customPath {
|
|
t.Errorf("Expected webhook path '%s', got '%s'", customPath, wh.WebhookPath())
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestGenerateStreamID(t *testing.T) {
|
|
cfg := config.WeComAIBotConfig{
|
|
Enabled: true,
|
|
Token: "test_token",
|
|
EncodingAESKey: "testkey1234567890123456789012345678901234567",
|
|
}
|
|
messageBus := bus.NewMessageBus()
|
|
ch, _ := NewWeComAIBotChannel(cfg, messageBus)
|
|
webhookCh, ok := ch.(*WeComAIBotChannel)
|
|
if !ok {
|
|
t.Fatal("Expected webhook mode channel")
|
|
}
|
|
|
|
ids := make(map[string]bool)
|
|
for i := 0; i < 100; i++ {
|
|
id := webhookCh.generateStreamID()
|
|
if len(id) != 10 {
|
|
t.Errorf("Expected stream ID length 10, got %d", len(id))
|
|
}
|
|
if ids[id] {
|
|
t.Errorf("Duplicate stream ID generated: %s", id)
|
|
}
|
|
ids[id] = true
|
|
}
|
|
}
|
|
|
|
func TestEncryptDecrypt(t *testing.T) {
|
|
cfg := config.WeComAIBotConfig{
|
|
Enabled: true,
|
|
Token: "test_token",
|
|
EncodingAESKey: "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFG", // 43 characters
|
|
}
|
|
messageBus := bus.NewMessageBus()
|
|
ch, _ := NewWeComAIBotChannel(cfg, messageBus)
|
|
webhookCh, ok := ch.(*WeComAIBotChannel)
|
|
if !ok {
|
|
t.Fatal("Expected webhook mode channel")
|
|
}
|
|
|
|
plaintext := "Hello, World!"
|
|
receiveid := ""
|
|
|
|
encrypted, err := webhookCh.encryptMessage(plaintext, receiveid)
|
|
if err != nil {
|
|
t.Fatalf("Failed to encrypt message: %v", err)
|
|
}
|
|
if encrypted == "" {
|
|
t.Fatal("Encrypted message is empty")
|
|
}
|
|
|
|
decrypted, err := decryptMessageWithVerify(encrypted, cfg.EncodingAESKey, receiveid)
|
|
if err != nil {
|
|
t.Fatalf("Failed to decrypt message: %v", err)
|
|
}
|
|
if decrypted != plaintext {
|
|
t.Errorf("Expected decrypted message '%s', got '%s'", plaintext, decrypted)
|
|
}
|
|
}
|
|
|
|
func TestGenerateSignature(t *testing.T) {
|
|
token := "test_token"
|
|
timestamp := "1234567890"
|
|
nonce := "test_nonce"
|
|
encrypt := "encrypted_msg"
|
|
|
|
signature := computeSignature(token, timestamp, nonce, encrypt)
|
|
if signature == "" {
|
|
t.Error("Generated signature is empty")
|
|
}
|
|
if !verifySignature(token, signature, timestamp, nonce, encrypt) {
|
|
t.Error("Generated signature does not verify correctly")
|
|
}
|
|
}
|
|
|
|
// ---- WebSocket long-connection mode tests ----
|
|
|
|
func TestNewWeComAIBotChannel_WSMode(t *testing.T) {
|
|
t.Run("success with bot_id and secret", func(t *testing.T) {
|
|
cfg := config.WeComAIBotConfig{
|
|
Enabled: true,
|
|
BotID: "test_bot_id",
|
|
Secret: "test_secret",
|
|
}
|
|
messageBus := bus.NewMessageBus()
|
|
ch, err := NewWeComAIBotChannel(cfg, messageBus)
|
|
if err != nil {
|
|
t.Fatalf("Expected no error, got %v", err)
|
|
}
|
|
if ch == nil {
|
|
t.Fatal("Expected channel to be created")
|
|
}
|
|
if ch.Name() != "wecom_aibot" {
|
|
t.Errorf("Expected name 'wecom_aibot', got '%s'", ch.Name())
|
|
}
|
|
// WebSocket mode must NOT implement WebhookHandler.
|
|
if _, ok := ch.(channels.WebhookHandler); ok {
|
|
t.Error("WebSocket mode channel should NOT implement WebhookHandler")
|
|
}
|
|
})
|
|
|
|
t.Run("ws mode takes priority over webhook fields", func(t *testing.T) {
|
|
cfg := config.WeComAIBotConfig{
|
|
Enabled: true,
|
|
BotID: "test_bot_id",
|
|
Secret: "test_secret",
|
|
Token: "also_set",
|
|
EncodingAESKey: "testkey1234567890123456789012345678901234567",
|
|
}
|
|
messageBus := bus.NewMessageBus()
|
|
ch, err := NewWeComAIBotChannel(cfg, messageBus)
|
|
if err != nil {
|
|
t.Fatalf("Expected no error, got %v", err)
|
|
}
|
|
if _, ok := ch.(*WeComAIBotWSChannel); !ok {
|
|
t.Error("Expected WebSocket mode channel when both BotID+Secret and Token+Key are set")
|
|
}
|
|
})
|
|
|
|
t.Run("error with missing bot_id", func(t *testing.T) {
|
|
cfg := config.WeComAIBotConfig{
|
|
Enabled: true,
|
|
Secret: "test_secret",
|
|
}
|
|
messageBus := bus.NewMessageBus()
|
|
_, err := NewWeComAIBotChannel(cfg, messageBus)
|
|
// Missing bot_id alone means neither WS mode nor webhook mode is fully configured.
|
|
if err == nil {
|
|
t.Fatal("Expected error for missing bot_id, got nil")
|
|
}
|
|
})
|
|
|
|
t.Run("error with missing secret", func(t *testing.T) {
|
|
cfg := config.WeComAIBotConfig{
|
|
Enabled: true,
|
|
BotID: "test_bot_id",
|
|
}
|
|
messageBus := bus.NewMessageBus()
|
|
_, err := NewWeComAIBotChannel(cfg, messageBus)
|
|
if err == nil {
|
|
t.Fatal("Expected error for missing secret, got nil")
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestWeComAIBotWSChannelStartStop(t *testing.T) {
|
|
cfg := config.WeComAIBotConfig{
|
|
Enabled: true,
|
|
BotID: "test_bot_id",
|
|
Secret: "test_secret",
|
|
}
|
|
messageBus := bus.NewMessageBus()
|
|
ch, err := NewWeComAIBotChannel(cfg, messageBus)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create channel: %v", err)
|
|
}
|
|
|
|
ctx := context.Background()
|
|
|
|
// Start launches a background goroutine; it should not block or return an error.
|
|
if err := ch.Start(ctx); err != nil {
|
|
t.Fatalf("Failed to start channel: %v", err)
|
|
}
|
|
if !ch.IsRunning() {
|
|
t.Error("Expected channel to be running after Start")
|
|
}
|
|
|
|
// Stop should work regardless of whether the WebSocket actually connected.
|
|
if err := ch.Stop(ctx); err != nil {
|
|
t.Fatalf("Failed to stop channel: %v", err)
|
|
}
|
|
if ch.IsRunning() {
|
|
t.Error("Expected channel to be stopped after Stop")
|
|
}
|
|
}
|
|
|
|
func TestGenerateRandomID(t *testing.T) {
|
|
ids := make(map[string]bool)
|
|
for i := 0; i < 200; i++ {
|
|
id := generateRandomID(10)
|
|
if len(id) != 10 {
|
|
t.Errorf("Expected ID length 10, got %d", len(id))
|
|
}
|
|
if ids[id] {
|
|
t.Errorf("Duplicate ID generated: %s", id)
|
|
}
|
|
ids[id] = true
|
|
}
|
|
}
|
|
|
|
func TestWSGenerateID(t *testing.T) {
|
|
ids := make(map[string]bool)
|
|
for i := 0; i < 200; i++ {
|
|
id := wsGenerateID()
|
|
if len(id) != 10 {
|
|
t.Errorf("Expected ID length 10, got %d", len(id))
|
|
}
|
|
if ids[id] {
|
|
t.Errorf("Duplicate wsGenerateID result: %s", id)
|
|
}
|
|
ids[id] = true
|
|
}
|
|
}
|
|
|
|
// ---- Webhook streaming fallback tests ----
|
|
|
|
// makeWebhookChannel creates a started WeComAIBotChannel for testing.
|
|
func makeWebhookChannel(t *testing.T) *WeComAIBotChannel {
|
|
t.Helper()
|
|
cfg := config.WeComAIBotConfig{
|
|
Enabled: true,
|
|
Token: "test_token",
|
|
EncodingAESKey: "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFG",
|
|
}
|
|
ch, err := NewWeComAIBotChannel(cfg, bus.NewMessageBus())
|
|
if err != nil {
|
|
t.Fatalf("create channel: %v", err)
|
|
}
|
|
wc := ch.(*WeComAIBotChannel)
|
|
wc.ctx, wc.cancel = context.WithCancel(context.Background())
|
|
return wc
|
|
}
|
|
|
|
// makeStreamTask creates and registers a streamTask for testing.
|
|
func makeStreamTask(t *testing.T, ch *WeComAIBotChannel, streamID, chatID string, deadline time.Time) *streamTask {
|
|
t.Helper()
|
|
task := &streamTask{
|
|
StreamID: streamID,
|
|
ChatID: chatID,
|
|
Deadline: deadline,
|
|
answerCh: make(chan string, 1),
|
|
}
|
|
task.ctx, task.cancel = context.WithCancel(ch.ctx)
|
|
ch.taskMu.Lock()
|
|
ch.streamTasks[streamID] = task
|
|
ch.chatTasks[chatID] = append(ch.chatTasks[chatID], task)
|
|
ch.taskMu.Unlock()
|
|
return task
|
|
}
|
|
|
|
// TestGetStreamResponse_ImmediateAnswer verifies that when the agent has already
|
|
// placed its answer in answerCh, getStreamResponse returns a finish=true response
|
|
// and fully removes the task.
|
|
func TestGetStreamResponse_ImmediateAnswer(t *testing.T) {
|
|
ch := makeWebhookChannel(t)
|
|
defer ch.cancel()
|
|
|
|
task := makeStreamTask(t, ch, "stream-1", "chat-1", time.Now().Add(30*time.Second))
|
|
task.answerCh <- "hello from agent"
|
|
|
|
result := ch.getStreamResponse(task, "ts123", "nonce123")
|
|
if result == "" {
|
|
t.Fatal("expected non-empty encrypted response")
|
|
}
|
|
|
|
ch.taskMu.RLock()
|
|
_, exists := ch.streamTasks["stream-1"]
|
|
ch.taskMu.RUnlock()
|
|
if exists {
|
|
t.Error("task should have been removed from streamTasks after normal finish")
|
|
}
|
|
if !task.Finished {
|
|
t.Error("task.Finished should be true after normal finish")
|
|
}
|
|
}
|
|
|
|
// TestGetStreamResponse_DeadlinePassed verifies that when the stream deadline has
|
|
// elapsed (no agent reply yet), getStreamResponse closes the stream but keeps the
|
|
// task alive so the response_url fallback can still deliver the answer.
|
|
func TestGetStreamResponse_DeadlinePassed(t *testing.T) {
|
|
ch := makeWebhookChannel(t)
|
|
defer ch.cancel()
|
|
|
|
task := makeStreamTask(t, ch, "stream-2", "chat-2", time.Now().Add(-time.Millisecond))
|
|
|
|
result := ch.getStreamResponse(task, "ts456", "nonce456")
|
|
if result == "" {
|
|
t.Fatal("expected non-empty encrypted response")
|
|
}
|
|
|
|
ch.taskMu.RLock()
|
|
_, stillStreaming := ch.streamTasks["stream-2"]
|
|
ch.taskMu.RUnlock()
|
|
if stillStreaming {
|
|
t.Error("task should have been removed from streamTasks after deadline")
|
|
}
|
|
if !task.StreamClosed {
|
|
t.Error("task.StreamClosed should be true after deadline")
|
|
}
|
|
if task.Finished {
|
|
t.Error("task.Finished must remain false: agent reply still expected via response_url")
|
|
}
|
|
}
|
|
|
|
// TestGetStreamResponse_StillPending verifies that when neither the agent has
|
|
// replied nor the deadline has passed, getStreamResponse returns without altering
|
|
// task state (client should poll again).
|
|
func TestGetStreamResponse_StillPending(t *testing.T) {
|
|
ch := makeWebhookChannel(t)
|
|
defer ch.cancel()
|
|
|
|
task := makeStreamTask(t, ch, "stream-3", "chat-3", time.Now().Add(30*time.Second))
|
|
|
|
result := ch.getStreamResponse(task, "ts789", "nonce789")
|
|
if result == "" {
|
|
t.Fatal("expected non-empty encrypted response")
|
|
}
|
|
|
|
ch.taskMu.RLock()
|
|
_, exists := ch.streamTasks["stream-3"]
|
|
ch.taskMu.RUnlock()
|
|
if !exists {
|
|
t.Error("pending task should still be in streamTasks")
|
|
}
|
|
if task.Finished || task.StreamClosed {
|
|
t.Error("pending task should not be finished or stream-closed")
|
|
}
|
|
// Cleanup.
|
|
ch.removeTask(task)
|
|
}
|