From f89c9673cbe0f14c42a420f2924ef9ad4ded2e46 Mon Sep 17 00:00:00 2001 From: afjcjsbx Date: Mon, 9 Mar 2026 11:38:23 +0100 Subject: [PATCH] sync sendmessage function --- pkg/agent/loop.go | 16 +-- pkg/channels/manager.go | 33 +++++++ pkg/channels/manager_test.go | 186 +++++++++++++++++++++++++++++++++++ 3 files changed, 223 insertions(+), 12 deletions(-) diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index d98191f33..a6a41a2ab 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -466,9 +466,9 @@ func (al *AgentLoop) transcribeAudioInMessage(ctx context.Context, msg bus.Inbou } // sendTranscriptionFeedback sends feedback to the user with the result of -// audio transcription if the option is enabled. It sends the message directly -// through the channel (bypassing the bus queue) so that ordering with the -// subsequent placeholder is guaranteed. +// audio transcription if the option is enabled. It uses Manager.SendMessage +// which executes synchronously (rate limiting, splitting, retry) so that +// ordering with the subsequent placeholder is guaranteed. func (al *AgentLoop) sendTranscriptionFeedback( ctx context.Context, channel, chatID, messageID string, @@ -495,15 +495,7 @@ func (al *AgentLoop) sendTranscriptionFeedback( feedbackMsg = "No voice detected in the audio" } - ch, ok := al.channelManager.GetChannel(channel) - if !ok { - return - } - - sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - - err := ch.Send(sendCtx, bus.OutboundMessage{ + err := al.channelManager.SendMessage(ctx, bus.OutboundMessage{ Channel: channel, ChatID: chatID, Content: feedbackMsg, diff --git a/pkg/channels/manager.go b/pkg/channels/manager.go index 9e5ceeca1..2f646a077 100644 --- a/pkg/channels/manager.go +++ b/pkg/channels/manager.go @@ -825,6 +825,39 @@ func (m *Manager) UnregisterChannel(name string) { delete(m.channels, name) } +// SendMessage sends an outbound message synchronously through the channel +// worker's rate limiter and retry logic. It blocks until the message is +// delivered (or all retries are exhausted), which preserves ordering when +// a subsequent operation depends on the message having been sent. +func (m *Manager) SendMessage(ctx context.Context, msg bus.OutboundMessage) error { + m.mu.RLock() + _, exists := m.channels[msg.Channel] + w, wExists := m.workers[msg.Channel] + m.mu.RUnlock() + + if !exists { + return fmt.Errorf("channel %s not found", msg.Channel) + } + if !wExists || w == nil { + return fmt.Errorf("channel %s has no active worker", msg.Channel) + } + + maxLen := 0 + if mlp, ok := w.ch.(MessageLengthProvider); ok { + maxLen = mlp.MaxMessageLength() + } + if maxLen > 0 && len([]rune(msg.Content)) > maxLen { + for _, chunk := range SplitMessage(msg.Content, maxLen) { + chunkMsg := msg + chunkMsg.Content = chunk + m.sendWithRetry(ctx, msg.Channel, w, chunkMsg) + } + } else { + m.sendWithRetry(ctx, msg.Channel, w, msg) + } + return nil +} + func (m *Manager) SendToChannel(ctx context.Context, channelName, chatID, content string) error { m.mu.RLock() _, exists := m.channels[channelName] diff --git a/pkg/channels/manager_test.go b/pkg/channels/manager_test.go index 9199285f7..1f3a628c2 100644 --- a/pkg/channels/manager_test.go +++ b/pkg/channels/manager_test.go @@ -937,6 +937,192 @@ func TestManager_PlaceholderConsumedByResponse(t *testing.T) { } } +func TestSendMessage_Synchronous(t *testing.T) { + m := newTestManager() + + var received []bus.OutboundMessage + ch := &mockChannel{ + sendFn: func(_ context.Context, msg bus.OutboundMessage) error { + received = append(received, msg) + return nil + }, + } + + w := &channelWorker{ + ch: ch, + limiter: rate.NewLimiter(rate.Inf, 1), + } + m.channels["test"] = ch + m.workers["test"] = w + + msg := bus.OutboundMessage{ + Channel: "test", + ChatID: "123", + Content: "hello world", + ReplyToMessageID: "msg-456", + } + + err := m.SendMessage(context.Background(), msg) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + + // SendMessage is synchronous — message should already be delivered + if len(received) != 1 { + t.Fatalf("expected 1 message sent, got %d", len(received)) + } + if received[0].ReplyToMessageID != "msg-456" { + t.Fatalf("expected ReplyToMessageID msg-456, got %s", received[0].ReplyToMessageID) + } + if received[0].Content != "hello world" { + t.Fatalf("expected content 'hello world', got %s", received[0].Content) + } +} + +func TestSendMessage_UnknownChannel(t *testing.T) { + m := newTestManager() + + msg := bus.OutboundMessage{ + Channel: "nonexistent", + ChatID: "123", + Content: "hello", + } + + err := m.SendMessage(context.Background(), msg) + if err == nil { + t.Fatal("expected error for unknown channel") + } +} + +func TestSendMessage_NoWorker(t *testing.T) { + m := newTestManager() + + ch := &mockChannel{ + sendFn: func(_ context.Context, _ bus.OutboundMessage) error { return nil }, + } + m.channels["test"] = ch + // No worker registered + + msg := bus.OutboundMessage{ + Channel: "test", + ChatID: "123", + Content: "hello", + } + + err := m.SendMessage(context.Background(), msg) + if err == nil { + t.Fatal("expected error when no worker exists") + } +} + +func TestSendMessage_WithRetry(t *testing.T) { + m := newTestManager() + + var callCount int + ch := &mockChannel{ + sendFn: func(_ context.Context, _ bus.OutboundMessage) error { + callCount++ + if callCount == 1 { + return fmt.Errorf("transient: %w", ErrTemporary) + } + return nil + }, + } + + w := &channelWorker{ + ch: ch, + limiter: rate.NewLimiter(rate.Inf, 1), + } + m.channels["test"] = ch + m.workers["test"] = w + + msg := bus.OutboundMessage{ + Channel: "test", + ChatID: "123", + Content: "retry me", + } + + err := m.SendMessage(context.Background(), msg) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + + if callCount != 2 { + t.Fatalf("expected 2 Send calls (1 failure + 1 success), got %d", callCount) + } +} + +func TestSendMessage_WithSplitting(t *testing.T) { + m := newTestManager() + + var received []string + ch := &mockChannelWithLength{ + mockChannel: mockChannel{ + sendFn: func(_ context.Context, msg bus.OutboundMessage) error { + received = append(received, msg.Content) + return nil + }, + }, + maxLen: 5, + } + + w := &channelWorker{ + ch: ch, + limiter: rate.NewLimiter(rate.Inf, 1), + } + m.channels["test"] = ch + m.workers["test"] = w + + msg := bus.OutboundMessage{ + Channel: "test", + ChatID: "123", + Content: "hello world", + } + + err := m.SendMessage(context.Background(), msg) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + + if len(received) < 2 { + t.Fatalf("expected message to be split into at least 2 chunks, got %d", len(received)) + } +} + +func TestSendMessage_PreservesOrdering(t *testing.T) { + m := newTestManager() + + var order []string + ch := &mockChannel{ + sendFn: func(_ context.Context, msg bus.OutboundMessage) error { + order = append(order, msg.Content) + return nil + }, + } + + w := &channelWorker{ + ch: ch, + limiter: rate.NewLimiter(rate.Inf, 1), + } + m.channels["test"] = ch + m.workers["test"] = w + + // Send two messages sequentially — they must arrive in order + _ = m.SendMessage(context.Background(), bus.OutboundMessage{ + Channel: "test", ChatID: "1", Content: "first", + }) + _ = m.SendMessage(context.Background(), bus.OutboundMessage{ + Channel: "test", ChatID: "1", Content: "second", + }) + + if len(order) != 2 { + t.Fatalf("expected 2 messages, got %d", len(order)) + } + if order[0] != "first" || order[1] != "second" { + t.Fatalf("expected [first, second], got %v", order) + } +} + func TestManager_SendPlaceholder(t *testing.T) { mgr := &Manager{ channels: make(map[string]Channel),