Files
picoclaw/pkg/channels/manager_test.go
T
DimonB 6c0798ca3f feat(channels): make Channel.Send return delivered message IDs (#2190)
* feat(channels): Channel.Send and MediaSender.SendMedia return delivered message IDs

Change Channel.Send signature from (ctx, msg) error to (ctx, msg) ([]string, error)
and MediaSender.SendMedia similarly, so callers can capture platform message IDs
for threading, reactions, and history annotation.

Adapters that return real IDs: Telegram (per-chunk MessageID), Discord (Message.ID),
Slack Send (ts), QQ (sentMsg.ID), Matrix (EventID). Slack SendMedia returns nil
because UploadFileV2 does not expose the posted message timestamp in its response.
All other adapters return nil IDs.

preSend and sendWithRetry in manager.go updated to propagate ([]string, bool).
README examples updated for both English and Chinese docs.

* style: apply golangci-lint fixes (golines)

* docs: fix Send migration guide — restore old error-only signature in before/after example
2026-03-31 11:07:32 +08:00

1387 lines
33 KiB
Go

package channels
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"golang.org/x/time/rate"
"github.com/sipeed/picoclaw/pkg/bus"
)
// mockChannel is a test double that delegates Send to a configurable function.
type mockChannel struct {
BaseChannel
sendFn func(ctx context.Context, msg bus.OutboundMessage) error
sentMessages []bus.OutboundMessage
placeholdersSent int
editedMessages int
lastPlaceholderID string
}
func (m *mockChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
m.sentMessages = append(m.sentMessages, msg)
if m.sendFn == nil {
return nil, nil
}
return nil, m.sendFn(ctx, msg)
}
func (m *mockChannel) Start(ctx context.Context) error { return nil }
func (m *mockChannel) Stop(ctx context.Context) error { return nil }
func (m *mockChannel) SendPlaceholder(ctx context.Context, chatID string) (string, error) {
m.placeholdersSent++
m.lastPlaceholderID = "mock-ph-123"
return m.lastPlaceholderID, nil
}
func (m *mockChannel) EditMessage(ctx context.Context, chatID, messageID, content string) error {
m.editedMessages++
return nil
}
type mockMediaChannel struct {
mockChannel
sendMediaFn func(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error)
sentMediaMessages []bus.OutboundMediaMessage
}
func (m *mockMediaChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) {
m.sentMediaMessages = append(m.sentMediaMessages, msg)
if m.sendMediaFn != nil {
return m.sendMediaFn(ctx, msg)
}
return nil, nil
}
type mockDeletingMediaChannel struct {
mockMediaChannel
deleteCalls int
lastDeleted struct {
chatID string
messageID string
}
}
func (m *mockDeletingMediaChannel) DeleteMessage(
_ context.Context,
chatID string,
messageID string,
) error {
m.deleteCalls++
m.lastDeleted.chatID = chatID
m.lastDeleted.messageID = messageID
return nil
}
// newTestManager creates a minimal Manager suitable for unit tests.
func newTestManager() *Manager {
return &Manager{
channels: make(map[string]Channel),
workers: make(map[string]*channelWorker),
}
}
func TestSendWithRetry_Success(t *testing.T) {
m := newTestManager()
var callCount int
ch := &mockChannel{
sendFn: func(_ context.Context, _ bus.OutboundMessage) error {
callCount++
return nil
},
}
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
ctx := context.Background()
msg := bus.OutboundMessage{Channel: "test", ChatID: "1", Content: "hello"}
m.sendWithRetry(ctx, "test", w, msg)
if callCount != 1 {
t.Fatalf("expected 1 Send call, got %d", callCount)
}
}
func TestSendWithRetry_TemporaryThenSuccess(t *testing.T) {
m := newTestManager()
var callCount int
ch := &mockChannel{
sendFn: func(_ context.Context, _ bus.OutboundMessage) error {
callCount++
if callCount <= 2 {
return fmt.Errorf("network error: %w", ErrTemporary)
}
return nil
},
}
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
ctx := context.Background()
msg := bus.OutboundMessage{Channel: "test", ChatID: "1", Content: "hello"}
m.sendWithRetry(ctx, "test", w, msg)
if callCount != 3 {
t.Fatalf("expected 3 Send calls (2 failures + 1 success), got %d", callCount)
}
}
func TestSendWithRetry_PermanentFailure(t *testing.T) {
m := newTestManager()
var callCount int
ch := &mockChannel{
sendFn: func(_ context.Context, _ bus.OutboundMessage) error {
callCount++
return fmt.Errorf("bad chat ID: %w", ErrSendFailed)
},
}
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
ctx := context.Background()
msg := bus.OutboundMessage{Channel: "test", ChatID: "1", Content: "hello"}
m.sendWithRetry(ctx, "test", w, msg)
if callCount != 1 {
t.Fatalf("expected 1 Send call (no retry for permanent failure), got %d", callCount)
}
}
func TestSendWithRetry_NotRunning(t *testing.T) {
m := newTestManager()
var callCount int
ch := &mockChannel{
sendFn: func(_ context.Context, _ bus.OutboundMessage) error {
callCount++
return ErrNotRunning
},
}
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
ctx := context.Background()
msg := bus.OutboundMessage{Channel: "test", ChatID: "1", Content: "hello"}
m.sendWithRetry(ctx, "test", w, msg)
if callCount != 1 {
t.Fatalf("expected 1 Send call (no retry for ErrNotRunning), got %d", callCount)
}
}
func TestSendWithRetry_RateLimitRetry(t *testing.T) {
m := newTestManager()
var callCount int
ch := &mockChannel{
sendFn: func(_ context.Context, _ bus.OutboundMessage) error {
callCount++
if callCount == 1 {
return fmt.Errorf("429: %w", ErrRateLimit)
}
return nil
},
}
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
ctx := context.Background()
msg := bus.OutboundMessage{Channel: "test", ChatID: "1", Content: "hello"}
start := time.Now()
m.sendWithRetry(ctx, "test", w, msg)
elapsed := time.Since(start)
if callCount != 2 {
t.Fatalf("expected 2 Send calls (1 rate limit + 1 success), got %d", callCount)
}
// Should have waited at least rateLimitDelay (1s) but allow some slack
if elapsed < 900*time.Millisecond {
t.Fatalf("expected at least ~1s delay for rate limit retry, got %v", elapsed)
}
}
func TestSendWithRetry_MaxRetriesExhausted(t *testing.T) {
m := newTestManager()
var callCount int
ch := &mockChannel{
sendFn: func(_ context.Context, _ bus.OutboundMessage) error {
callCount++
return fmt.Errorf("timeout: %w", ErrTemporary)
},
}
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
ctx := context.Background()
msg := bus.OutboundMessage{Channel: "test", ChatID: "1", Content: "hello"}
m.sendWithRetry(ctx, "test", w, msg)
expected := maxRetries + 1 // initial attempt + maxRetries retries
if callCount != expected {
t.Fatalf("expected %d Send calls, got %d", expected, callCount)
}
}
func TestSendMedia_Success(t *testing.T) {
m := newTestManager()
var callCount int
ch := &mockMediaChannel{
sendMediaFn: func(_ context.Context, _ bus.OutboundMediaMessage) ([]string, error) {
callCount++
return nil, nil
},
}
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
m.channels["test"] = ch
m.workers["test"] = w
err := m.SendMedia(context.Background(), bus.OutboundMediaMessage{
Channel: "test",
ChatID: "chat1",
Parts: []bus.MediaPart{{Ref: "media://abc"}},
})
if err != nil {
t.Fatalf("SendMedia() error = %v", err)
}
if callCount != 1 {
t.Fatalf("expected 1 SendMedia call, got %d", callCount)
}
}
func TestSendMedia_PropagatesFailure(t *testing.T) {
m := newTestManager()
ch := &mockMediaChannel{
sendMediaFn: func(_ context.Context, _ bus.OutboundMediaMessage) ([]string, error) {
return nil, fmt.Errorf("bad upload: %w", ErrSendFailed)
},
}
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
m.channels["test"] = ch
m.workers["test"] = w
err := m.SendMedia(context.Background(), bus.OutboundMediaMessage{
Channel: "test",
ChatID: "chat1",
Parts: []bus.MediaPart{{Ref: "media://abc"}},
})
if err == nil {
t.Fatal("expected SendMedia to return error")
}
if !errors.Is(err, ErrSendFailed) {
t.Fatalf("expected ErrSendFailed, got %v", err)
}
}
func TestSendMedia_UnsupportedChannelReturnsError(t *testing.T) {
m := newTestManager()
ch := &mockChannel{
sendFn: func(_ context.Context, _ bus.OutboundMessage) error {
return nil
},
}
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
m.channels["test"] = ch
m.workers["test"] = w
err := m.SendMedia(context.Background(), bus.OutboundMediaMessage{
Channel: "test",
ChatID: "chat1",
Parts: []bus.MediaPart{{Ref: "media://abc"}},
})
if err == nil {
t.Fatal("expected SendMedia to return error for unsupported channel")
}
if !strings.Contains(err.Error(), "does not support media sending") {
t.Fatalf("unexpected error: %v", err)
}
}
func TestSendMedia_DeletesPlaceholderBeforeSending(t *testing.T) {
m := newTestManager()
ch := &mockDeletingMediaChannel{
mockMediaChannel: mockMediaChannel{
sendMediaFn: func(_ context.Context, _ bus.OutboundMediaMessage) ([]string, error) {
return nil, nil
},
},
}
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
m.channels["test"] = ch
m.workers["test"] = w
m.RecordPlaceholder("test", "chat1", "placeholder-1")
err := m.SendMedia(context.Background(), bus.OutboundMediaMessage{
Channel: "test",
ChatID: "chat1",
Parts: []bus.MediaPart{{Ref: "media://abc"}},
})
if err != nil {
t.Fatalf("SendMedia() error = %v", err)
}
if ch.deleteCalls != 1 {
t.Fatalf("expected placeholder delete to be called once, got %d", ch.deleteCalls)
}
if ch.lastDeleted.chatID != "chat1" || ch.lastDeleted.messageID != "placeholder-1" {
t.Fatalf("unexpected placeholder deletion target: %+v", ch.lastDeleted)
}
if len(ch.sentMediaMessages) != 1 {
t.Fatalf("expected media to be sent once, got %d", len(ch.sentMediaMessages))
}
}
func TestSendWithRetry_UnknownError(t *testing.T) {
m := newTestManager()
var callCount int
ch := &mockChannel{
sendFn: func(_ context.Context, _ bus.OutboundMessage) error {
callCount++
if callCount == 1 {
return errors.New("random unexpected error")
}
return nil
},
}
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
ctx := context.Background()
msg := bus.OutboundMessage{Channel: "test", ChatID: "1", Content: "hello"}
m.sendWithRetry(ctx, "test", w, msg)
if callCount != 2 {
t.Fatalf("expected 2 Send calls (unknown error treated as temporary), got %d", callCount)
}
}
func TestSendWithRetry_ContextCancelled(t *testing.T) {
m := newTestManager()
var callCount int
ch := &mockChannel{
sendFn: func(_ context.Context, _ bus.OutboundMessage) error {
callCount++
return fmt.Errorf("timeout: %w", ErrTemporary)
},
}
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
ctx, cancel := context.WithCancel(context.Background())
msg := bus.OutboundMessage{Channel: "test", ChatID: "1", Content: "hello"}
// Cancel context after first Send attempt returns
ch.sendFn = func(_ context.Context, _ bus.OutboundMessage) error {
callCount++
cancel()
return fmt.Errorf("timeout: %w", ErrTemporary)
}
m.sendWithRetry(ctx, "test", w, msg)
// Should have called Send once, then noticed ctx canceled during backoff
if callCount != 1 {
t.Fatalf("expected 1 Send call before context cancellation, got %d", callCount)
}
}
func TestWorkerRateLimiter(t *testing.T) {
m := newTestManager()
var mu sync.Mutex
var sendTimes []time.Time
ch := &mockChannel{
sendFn: func(_ context.Context, _ bus.OutboundMessage) error {
mu.Lock()
sendTimes = append(sendTimes, time.Now())
mu.Unlock()
return nil
},
}
// Create a worker with a low rate: 2 msg/s, burst 1
w := &channelWorker{
ch: ch,
queue: make(chan bus.OutboundMessage, 10),
done: make(chan struct{}),
limiter: rate.NewLimiter(2, 1),
}
ctx := t.Context()
go m.runWorker(ctx, "test", w)
// Enqueue 4 messages
for i := range 4 {
w.queue <- bus.OutboundMessage{Channel: "test", ChatID: "1", Content: fmt.Sprintf("msg%d", i)}
}
// Wait enough time for all messages to be sent (4 msgs at 2/s = ~2s, give extra margin)
time.Sleep(3 * time.Second)
mu.Lock()
times := make([]time.Time, len(sendTimes))
copy(times, sendTimes)
mu.Unlock()
if len(times) != 4 {
t.Fatalf("expected 4 sends, got %d", len(times))
}
// Verify rate limiting: total duration should be at least 1s
// (first message immediate, then ~500ms between each subsequent one at 2/s)
totalDuration := times[len(times)-1].Sub(times[0])
if totalDuration < 1*time.Second {
t.Fatalf("expected total duration >= 1s for 4 msgs at 2/s rate, got %v", totalDuration)
}
}
func TestNewChannelWorker_DefaultRate(t *testing.T) {
ch := &mockChannel{}
w := newChannelWorker("unknown_channel", ch)
if w.limiter == nil {
t.Fatal("expected limiter to be non-nil")
}
if w.limiter.Limit() != rate.Limit(defaultRateLimit) {
t.Fatalf("expected rate limit %v, got %v", rate.Limit(defaultRateLimit), w.limiter.Limit())
}
}
func TestNewChannelWorker_ConfiguredRate(t *testing.T) {
ch := &mockChannel{}
for name, expectedRate := range channelRateConfig {
w := newChannelWorker(name, ch)
if w.limiter.Limit() != rate.Limit(expectedRate) {
t.Fatalf("channel %s: expected rate %v, got %v", name, expectedRate, w.limiter.Limit())
}
}
}
func TestRunWorker_MessageSplitting(t *testing.T) {
m := newTestManager()
var mu sync.Mutex
var received []string
ch := &mockChannelWithLength{
mockChannel: mockChannel{
sendFn: func(_ context.Context, msg bus.OutboundMessage) error {
mu.Lock()
received = append(received, msg.Content)
mu.Unlock()
return nil
},
},
maxLen: 5,
}
w := &channelWorker{
ch: ch,
queue: make(chan bus.OutboundMessage, 10),
done: make(chan struct{}),
limiter: rate.NewLimiter(rate.Inf, 1),
}
ctx := t.Context()
go m.runWorker(ctx, "test", w)
// Send a message that should be split
w.queue <- bus.OutboundMessage{Channel: "test", ChatID: "1", Content: "hello world"}
time.Sleep(100 * time.Millisecond)
mu.Lock()
count := len(received)
mu.Unlock()
if count < 2 {
t.Fatalf("expected message to be split into at least 2 chunks, got %d", count)
}
}
// mockChannelWithLength implements MessageLengthProvider.
type mockChannelWithLength struct {
mockChannel
maxLen int
}
func (m *mockChannelWithLength) MaxMessageLength() int {
return m.maxLen
}
func TestSendWithRetry_ExponentialBackoff(t *testing.T) {
m := newTestManager()
var callTimes []time.Time
var callCount atomic.Int32
ch := &mockChannel{
sendFn: func(_ context.Context, _ bus.OutboundMessage) error {
callTimes = append(callTimes, time.Now())
callCount.Add(1)
return fmt.Errorf("timeout: %w", ErrTemporary)
},
}
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
ctx := context.Background()
msg := bus.OutboundMessage{Channel: "test", ChatID: "1", Content: "hello"}
start := time.Now()
m.sendWithRetry(ctx, "test", w, msg)
totalElapsed := time.Since(start)
// With maxRetries=3: attempts at 0, ~500ms, ~1.5s, ~3.5s
// Total backoff: 500ms + 1s + 2s = 3.5s
// Allow some margin
if totalElapsed < 3*time.Second {
t.Fatalf("expected total elapsed >= 3s for exponential backoff, got %v", totalElapsed)
}
if int(callCount.Load()) != maxRetries+1 {
t.Fatalf("expected %d calls, got %d", maxRetries+1, callCount.Load())
}
}
// --- Phase 10: preSend orchestration tests ---
// mockMessageEditor is a channel that supports MessageEditor.
type mockMessageEditor struct {
mockChannel
editFn func(ctx context.Context, chatID, messageID, content string) error
}
func (m *mockMessageEditor) EditMessage(ctx context.Context, chatID, messageID, content string) error {
return m.editFn(ctx, chatID, messageID, content)
}
func TestPreSend_PlaceholderEditSuccess(t *testing.T) {
m := newTestManager()
var sendCalled bool
var editCalled bool
ch := &mockMessageEditor{
mockChannel: mockChannel{
sendFn: func(_ context.Context, _ bus.OutboundMessage) error {
sendCalled = true
return nil
},
},
editFn: func(_ context.Context, chatID, messageID, content string) error {
editCalled = true
if chatID != "123" {
t.Fatalf("expected chatID 123, got %s", chatID)
}
if messageID != "456" {
t.Fatalf("expected messageID 456, got %s", messageID)
}
if content != "hello" {
t.Fatalf("expected content 'hello', got %s", content)
}
return nil
},
}
// Register placeholder
m.RecordPlaceholder("test", "123", "456")
msg := bus.OutboundMessage{Channel: "test", ChatID: "123", Content: "hello"}
_, edited := m.preSend(context.Background(), "test", msg, ch)
if !edited {
t.Fatal("expected preSend to return true (placeholder edited)")
}
if !editCalled {
t.Fatal("expected EditMessage to be called")
}
if sendCalled {
t.Fatal("expected Send to NOT be called when placeholder edited")
}
}
func TestPreSend_PlaceholderEditFails_FallsThrough(t *testing.T) {
m := newTestManager()
ch := &mockMessageEditor{
mockChannel: mockChannel{
sendFn: func(_ context.Context, _ bus.OutboundMessage) error {
return nil
},
},
editFn: func(_ context.Context, _, _, _ string) error {
return fmt.Errorf("edit failed")
},
}
m.RecordPlaceholder("test", "123", "456")
msg := bus.OutboundMessage{Channel: "test", ChatID: "123", Content: "hello"}
_, edited := m.preSend(context.Background(), "test", msg, ch)
if edited {
t.Fatal("expected preSend to return false when edit fails")
}
}
func TestInvokeTypingStop_CallsRegisteredStop(t *testing.T) {
m := newTestManager()
var stopCalled bool
m.RecordTypingStop("telegram", "chat123", func() {
stopCalled = true
})
m.InvokeTypingStop("telegram", "chat123")
if !stopCalled {
t.Fatal("expected typing stop func to be called")
}
}
func TestInvokeTypingStop_NoOpWhenNoEntry(t *testing.T) {
m := newTestManager()
// Should not panic
m.InvokeTypingStop("telegram", "nonexistent")
}
func TestInvokeTypingStop_Idempotent(t *testing.T) {
m := newTestManager()
var callCount int
m.RecordTypingStop("telegram", "chat123", func() {
callCount++
})
m.InvokeTypingStop("telegram", "chat123")
m.InvokeTypingStop("telegram", "chat123") // Second call: entry already removed, no-op
if callCount != 1 {
t.Fatalf("expected stop to be called once, got %d", callCount)
}
}
func TestPreSend_TypingStopCalled(t *testing.T) {
m := newTestManager()
var stopCalled bool
ch := &mockChannel{
sendFn: func(_ context.Context, _ bus.OutboundMessage) error {
return nil
},
}
m.RecordTypingStop("test", "123", func() {
stopCalled = true
})
msg := bus.OutboundMessage{Channel: "test", ChatID: "123", Content: "hello"}
m.preSend(context.Background(), "test", msg, ch)
if !stopCalled {
t.Fatal("expected typing stop func to be called")
}
}
func TestPreSend_NoRegisteredState(t *testing.T) {
m := newTestManager()
ch := &mockChannel{
sendFn: func(_ context.Context, _ bus.OutboundMessage) error {
return nil
},
}
msg := bus.OutboundMessage{Channel: "test", ChatID: "123", Content: "hello"}
_, edited := m.preSend(context.Background(), "test", msg, ch)
if edited {
t.Fatal("expected preSend to return false with no registered state")
}
}
func TestPreSend_TypingAndPlaceholder(t *testing.T) {
m := newTestManager()
var stopCalled bool
var editCalled bool
ch := &mockMessageEditor{
mockChannel: mockChannel{
sendFn: func(_ context.Context, _ bus.OutboundMessage) error {
return nil
},
},
editFn: func(_ context.Context, _, _, _ string) error {
editCalled = true
return nil
},
}
m.RecordTypingStop("test", "123", func() {
stopCalled = true
})
m.RecordPlaceholder("test", "123", "456")
msg := bus.OutboundMessage{Channel: "test", ChatID: "123", Content: "hello"}
_, edited := m.preSend(context.Background(), "test", msg, ch)
if !stopCalled {
t.Fatal("expected typing stop to be called")
}
if !editCalled {
t.Fatal("expected EditMessage to be called")
}
if !edited {
t.Fatal("expected preSend to return true")
}
}
func TestRecordPlaceholder_ConcurrentSafe(t *testing.T) {
m := newTestManager()
var wg sync.WaitGroup
for i := range 100 {
wg.Add(1)
go func(i int) {
defer wg.Done()
chatID := fmt.Sprintf("chat_%d", i%10)
m.RecordPlaceholder("test", chatID, fmt.Sprintf("msg_%d", i))
}(i)
}
wg.Wait()
}
func TestRecordTypingStop_ConcurrentSafe(t *testing.T) {
m := newTestManager()
var wg sync.WaitGroup
for i := range 100 {
wg.Add(1)
go func(i int) {
defer wg.Done()
chatID := fmt.Sprintf("chat_%d", i%10)
m.RecordTypingStop("test", chatID, func() {})
}(i)
}
wg.Wait()
}
func TestRecordTypingStop_ReplacesExistingStop(t *testing.T) {
m := newTestManager()
var oldStopCalls int
var newStopCalls int
m.RecordTypingStop("test", "123", func() {
oldStopCalls++
})
m.RecordTypingStop("test", "123", func() {
newStopCalls++
})
if oldStopCalls != 1 {
t.Fatalf("expected previous typing stop to be called once when replaced, got %d", oldStopCalls)
}
if newStopCalls != 0 {
t.Fatalf("expected replacement typing stop to stay active until preSend, got %d calls", newStopCalls)
}
msg := bus.OutboundMessage{Channel: "test", ChatID: "123", Content: "hello"}
m.preSend(context.Background(), "test", msg, &mockChannel{})
if newStopCalls != 1 {
t.Fatalf("expected replacement typing stop to be called by preSend, got %d", newStopCalls)
}
if oldStopCalls != 1 {
t.Fatalf("expected previous typing stop to not be called again, got %d", oldStopCalls)
}
}
func TestSendWithRetry_PreSendEditsPlaceholder(t *testing.T) {
m := newTestManager()
var sendCalled bool
ch := &mockMessageEditor{
mockChannel: mockChannel{
sendFn: func(_ context.Context, _ bus.OutboundMessage) error {
sendCalled = true
return nil
},
},
editFn: func(_ context.Context, _, _, _ string) error {
return nil // edit succeeds
},
}
m.RecordPlaceholder("test", "123", "456")
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
msg := bus.OutboundMessage{Channel: "test", ChatID: "123", Content: "hello"}
m.sendWithRetry(context.Background(), "test", w, msg)
if sendCalled {
t.Fatal("expected Send to NOT be called when placeholder was edited")
}
}
// --- Dispatcher exit tests (Step 1) ---
func TestDispatcherExitsOnCancel(t *testing.T) {
mb := bus.NewMessageBus()
defer mb.Close()
m := &Manager{
channels: make(map[string]Channel),
workers: make(map[string]*channelWorker),
bus: mb,
}
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
m.dispatchOutbound(ctx)
close(done)
}()
// Cancel context and verify the dispatcher exits quickly
cancel()
select {
case <-done:
// success
case <-time.After(2 * time.Second):
t.Fatal("dispatchOutbound did not exit within 2s after context cancel")
}
}
func TestDispatcherMediaExitsOnCancel(t *testing.T) {
mb := bus.NewMessageBus()
defer mb.Close()
m := &Manager{
channels: make(map[string]Channel),
workers: make(map[string]*channelWorker),
bus: mb,
}
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
m.dispatchOutboundMedia(ctx)
close(done)
}()
cancel()
select {
case <-done:
// success
case <-time.After(2 * time.Second):
t.Fatal("dispatchOutboundMedia did not exit within 2s after context cancel")
}
}
// --- TTL Janitor tests (Step 2) ---
func TestTypingStopJanitorEviction(t *testing.T) {
m := newTestManager()
var stopCalled atomic.Bool
// Store a typing entry with a creation time far in the past
m.typingStops.Store("test:123", typingEntry{
stop: func() { stopCalled.Store(true) },
createdAt: time.Now().Add(-10 * time.Minute), // well past typingStopTTL
})
// Run janitor with a short-lived context
ctx, cancel := context.WithCancel(context.Background())
// Manually trigger the janitor logic once by simulating a tick
go func() {
// Override janitor to run immediately
now := time.Now()
m.typingStops.Range(func(key, value any) bool {
if entry, ok := value.(typingEntry); ok {
if now.Sub(entry.createdAt) > typingStopTTL {
if _, loaded := m.typingStops.LoadAndDelete(key); loaded {
entry.stop()
}
}
}
return true
})
cancel()
}()
<-ctx.Done()
if !stopCalled.Load() {
t.Fatal("expected typing stop function to be called by janitor eviction")
}
// Verify entry was deleted
if _, loaded := m.typingStops.Load("test:123"); loaded {
t.Fatal("expected typing entry to be deleted after eviction")
}
}
func TestPlaceholderJanitorEviction(t *testing.T) {
m := newTestManager()
// Store a placeholder entry with a creation time far in the past
m.placeholders.Store("test:456", placeholderEntry{
id: "msg_old",
createdAt: time.Now().Add(-20 * time.Minute), // well past placeholderTTL
})
// Simulate janitor logic
now := time.Now()
m.placeholders.Range(func(key, value any) bool {
if entry, ok := value.(placeholderEntry); ok {
if now.Sub(entry.createdAt) > placeholderTTL {
m.placeholders.Delete(key)
}
}
return true
})
// Verify entry was deleted
if _, loaded := m.placeholders.Load("test:456"); loaded {
t.Fatal("expected placeholder entry to be deleted after eviction")
}
}
func TestPreSendStillWorksWithWrappedTypes(t *testing.T) {
m := newTestManager()
var stopCalled bool
var editCalled bool
ch := &mockMessageEditor{
mockChannel: mockChannel{
sendFn: func(_ context.Context, _ bus.OutboundMessage) error {
return nil
},
},
editFn: func(_ context.Context, chatID, messageID, content string) error {
editCalled = true
if messageID != "ph_id" {
t.Fatalf("expected messageID ph_id, got %s", messageID)
}
return nil
},
}
// Use the new wrapped types via the public API
m.RecordTypingStop("test", "chat1", func() {
stopCalled = true
})
m.RecordPlaceholder("test", "chat1", "ph_id")
msg := bus.OutboundMessage{Channel: "test", ChatID: "chat1", Content: "response"}
_, edited := m.preSend(context.Background(), "test", msg, ch)
if !stopCalled {
t.Fatal("expected typing stop to be called via wrapped type")
}
if !editCalled {
t.Fatal("expected EditMessage to be called via wrapped type")
}
if !edited {
t.Fatal("expected preSend to return true")
}
}
// --- Lazy worker creation tests (Step 6) ---
func TestLazyWorkerCreation(t *testing.T) {
m := newTestManager()
ch := &mockChannel{
sendFn: func(_ context.Context, _ bus.OutboundMessage) error {
return nil
},
}
// RegisterChannel should NOT create a worker
m.RegisterChannel("lazy", ch)
m.mu.RLock()
_, chExists := m.channels["lazy"]
_, wExists := m.workers["lazy"]
m.mu.RUnlock()
if !chExists {
t.Fatal("expected channel to be registered")
}
if wExists {
t.Fatal("expected worker to NOT be created by RegisterChannel (lazy creation)")
}
}
// --- FastID uniqueness test (Step 5) ---
func TestBuildMediaScope_FastIDUniqueness(t *testing.T) {
seen := make(map[string]bool)
for range 1000 {
scope := BuildMediaScope("test", "chat1", "")
if seen[scope] {
t.Fatalf("duplicate scope generated: %s", scope)
}
seen[scope] = true
}
// Verify format: "channel:chatID:id"
scope := BuildMediaScope("telegram", "42", "")
parts := 0
for _, c := range scope {
if c == ':' {
parts++
}
}
if parts != 2 {
t.Fatalf("expected scope to have 2 colons (channel:chatID:id), got: %s", scope)
}
}
func TestBuildMediaScope_WithMessageID(t *testing.T) {
scope := BuildMediaScope("discord", "chat99", "msg123")
expected := "discord:chat99:msg123"
if scope != expected {
t.Fatalf("expected %s, got %s", expected, scope)
}
}
func TestManager_PlaceholderConsumedByResponse(t *testing.T) {
mgr := &Manager{
channels: make(map[string]Channel),
workers: make(map[string]*channelWorker),
placeholders: sync.Map{},
}
mockCh := &mockChannel{
sendFn: func(ctx context.Context, msg bus.OutboundMessage) error {
return nil
},
}
worker := newChannelWorker("mock", mockCh)
mgr.channels["mock"] = mockCh
mgr.workers["mock"] = worker
ctx := context.Background()
key := "mock:chat-1"
// Simulate a placeholder recorded by base.go HandleMessage
mgr.RecordPlaceholder("mock", "chat-1", "ph-123")
if _, ok := mgr.placeholders.Load(key); !ok {
t.Fatal("expected placeholder to be recorded")
}
// Transcription feedback arrives first — it should consume the placeholder
// and be delivered via EditMessage, not Send.
msgTranscript := bus.OutboundMessage{
Channel: "mock",
ChatID: "chat-1",
Content: "Transcript: hello",
}
mgr.sendWithRetry(ctx, "mock", worker, msgTranscript)
if mockCh.editedMessages != 1 {
t.Errorf("expected 1 edited message (placeholder consumed by transcript), got %d", mockCh.editedMessages)
}
if len(mockCh.sentMessages) != 0 {
t.Errorf("expected 0 normal messages (transcript used edit), got %d", len(mockCh.sentMessages))
}
// Placeholder should be gone now
if _, ok := mgr.placeholders.Load(key); ok {
t.Error("expected placeholder to be removed after being consumed")
}
// Final LLM response arrives — no placeholder left, so it goes through Send
msgFinal := bus.OutboundMessage{
Channel: "mock",
ChatID: "chat-1",
Content: "Final Answer",
}
mgr.sendWithRetry(ctx, "mock", worker, msgFinal)
if len(mockCh.sentMessages) != 1 {
t.Errorf("expected 1 normal message sent, got %d", len(mockCh.sentMessages))
}
}
func TestSendMessage_Synchronous(t *testing.T) {
m := newTestManager()
var received []bus.OutboundMessage
ch := &mockChannel{
sendFn: func(_ context.Context, msg bus.OutboundMessage) error {
received = append(received, msg)
return nil
},
}
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
m.channels["test"] = ch
m.workers["test"] = w
msg := bus.OutboundMessage{
Channel: "test",
ChatID: "123",
Content: "hello world",
ReplyToMessageID: "msg-456",
}
err := m.SendMessage(context.Background(), msg)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
// SendMessage is synchronous — message should already be delivered
if len(received) != 1 {
t.Fatalf("expected 1 message sent, got %d", len(received))
}
if received[0].ReplyToMessageID != "msg-456" {
t.Fatalf("expected ReplyToMessageID msg-456, got %s", received[0].ReplyToMessageID)
}
if received[0].Content != "hello world" {
t.Fatalf("expected content 'hello world', got %s", received[0].Content)
}
}
func TestSendMessage_UnknownChannel(t *testing.T) {
m := newTestManager()
msg := bus.OutboundMessage{
Channel: "nonexistent",
ChatID: "123",
Content: "hello",
}
err := m.SendMessage(context.Background(), msg)
if err == nil {
t.Fatal("expected error for unknown channel")
}
}
func TestSendMessage_NoWorker(t *testing.T) {
m := newTestManager()
ch := &mockChannel{
sendFn: func(_ context.Context, _ bus.OutboundMessage) error { return nil },
}
m.channels["test"] = ch
// No worker registered
msg := bus.OutboundMessage{
Channel: "test",
ChatID: "123",
Content: "hello",
}
err := m.SendMessage(context.Background(), msg)
if err == nil {
t.Fatal("expected error when no worker exists")
}
}
func TestSendMessage_WithRetry(t *testing.T) {
m := newTestManager()
var callCount int
ch := &mockChannel{
sendFn: func(_ context.Context, _ bus.OutboundMessage) error {
callCount++
if callCount == 1 {
return fmt.Errorf("transient: %w", ErrTemporary)
}
return nil
},
}
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
m.channels["test"] = ch
m.workers["test"] = w
msg := bus.OutboundMessage{
Channel: "test",
ChatID: "123",
Content: "retry me",
}
err := m.SendMessage(context.Background(), msg)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if callCount != 2 {
t.Fatalf("expected 2 Send calls (1 failure + 1 success), got %d", callCount)
}
}
func TestSendMessage_WithSplitting(t *testing.T) {
m := newTestManager()
var received []string
ch := &mockChannelWithLength{
mockChannel: mockChannel{
sendFn: func(_ context.Context, msg bus.OutboundMessage) error {
received = append(received, msg.Content)
return nil
},
},
maxLen: 5,
}
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
m.channels["test"] = ch
m.workers["test"] = w
msg := bus.OutboundMessage{
Channel: "test",
ChatID: "123",
Content: "hello world",
}
err := m.SendMessage(context.Background(), msg)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if len(received) < 2 {
t.Fatalf("expected message to be split into at least 2 chunks, got %d", len(received))
}
}
func TestSendMessage_PreservesOrdering(t *testing.T) {
m := newTestManager()
var order []string
ch := &mockChannel{
sendFn: func(_ context.Context, msg bus.OutboundMessage) error {
order = append(order, msg.Content)
return nil
},
}
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
m.channels["test"] = ch
m.workers["test"] = w
// Send two messages sequentially — they must arrive in order
_ = m.SendMessage(context.Background(), bus.OutboundMessage{
Channel: "test", ChatID: "1", Content: "first",
})
_ = m.SendMessage(context.Background(), bus.OutboundMessage{
Channel: "test", ChatID: "1", Content: "second",
})
if len(order) != 2 {
t.Fatalf("expected 2 messages, got %d", len(order))
}
if order[0] != "first" || order[1] != "second" {
t.Fatalf("expected [first, second], got %v", order)
}
}
func TestManager_SendPlaceholder(t *testing.T) {
mgr := &Manager{
channels: make(map[string]Channel),
workers: make(map[string]*channelWorker),
placeholders: sync.Map{},
}
mockCh := &mockChannel{
sendFn: func(ctx context.Context, msg bus.OutboundMessage) error {
return nil
},
}
mgr.channels["mock"] = mockCh
ctx := context.Background()
// SendPlaceholder should send a placeholder and record it
ok := mgr.SendPlaceholder(ctx, "mock", "chat-1")
if !ok {
t.Fatal("expected SendPlaceholder to succeed")
}
if mockCh.placeholdersSent != 1 {
t.Errorf("expected 1 placeholder sent, got %d", mockCh.placeholdersSent)
}
key := "mock:chat-1"
if _, loaded := mgr.placeholders.Load(key); !loaded {
t.Error("expected placeholder to be recorded in manager")
}
// SendPlaceholder on unknown channel should return false
ok = mgr.SendPlaceholder(ctx, "unknown", "chat-1")
if ok {
t.Error("expected SendPlaceholder to fail for unknown channel")
}
}