sync sendmessage function

This commit is contained in:
afjcjsbx
2026-03-09 11:38:23 +01:00
parent 536e26aff1
commit f89c9673cb
3 changed files with 223 additions and 12 deletions
+4 -12
View File
@@ -466,9 +466,9 @@ func (al *AgentLoop) transcribeAudioInMessage(ctx context.Context, msg bus.Inbou
}
// sendTranscriptionFeedback sends feedback to the user with the result of
// audio transcription if the option is enabled. It sends the message directly
// through the channel (bypassing the bus queue) so that ordering with the
// subsequent placeholder is guaranteed.
// audio transcription if the option is enabled. It uses Manager.SendMessage
// which executes synchronously (rate limiting, splitting, retry) so that
// ordering with the subsequent placeholder is guaranteed.
func (al *AgentLoop) sendTranscriptionFeedback(
ctx context.Context,
channel, chatID, messageID string,
@@ -495,15 +495,7 @@ func (al *AgentLoop) sendTranscriptionFeedback(
feedbackMsg = "No voice detected in the audio"
}
ch, ok := al.channelManager.GetChannel(channel)
if !ok {
return
}
sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
err := ch.Send(sendCtx, bus.OutboundMessage{
err := al.channelManager.SendMessage(ctx, bus.OutboundMessage{
Channel: channel,
ChatID: chatID,
Content: feedbackMsg,
+33
View File
@@ -825,6 +825,39 @@ func (m *Manager) UnregisterChannel(name string) {
delete(m.channels, name)
}
// SendMessage sends an outbound message synchronously through the channel
// worker's rate limiter and retry logic. It blocks until the message is
// delivered (or all retries are exhausted), which preserves ordering when
// a subsequent operation depends on the message having been sent.
func (m *Manager) SendMessage(ctx context.Context, msg bus.OutboundMessage) error {
m.mu.RLock()
_, exists := m.channels[msg.Channel]
w, wExists := m.workers[msg.Channel]
m.mu.RUnlock()
if !exists {
return fmt.Errorf("channel %s not found", msg.Channel)
}
if !wExists || w == nil {
return fmt.Errorf("channel %s has no active worker", msg.Channel)
}
maxLen := 0
if mlp, ok := w.ch.(MessageLengthProvider); ok {
maxLen = mlp.MaxMessageLength()
}
if maxLen > 0 && len([]rune(msg.Content)) > maxLen {
for _, chunk := range SplitMessage(msg.Content, maxLen) {
chunkMsg := msg
chunkMsg.Content = chunk
m.sendWithRetry(ctx, msg.Channel, w, chunkMsg)
}
} else {
m.sendWithRetry(ctx, msg.Channel, w, msg)
}
return nil
}
func (m *Manager) SendToChannel(ctx context.Context, channelName, chatID, content string) error {
m.mu.RLock()
_, exists := m.channels[channelName]
+186
View File
@@ -937,6 +937,192 @@ func TestManager_PlaceholderConsumedByResponse(t *testing.T) {
}
}
func TestSendMessage_Synchronous(t *testing.T) {
m := newTestManager()
var received []bus.OutboundMessage
ch := &mockChannel{
sendFn: func(_ context.Context, msg bus.OutboundMessage) error {
received = append(received, msg)
return nil
},
}
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
m.channels["test"] = ch
m.workers["test"] = w
msg := bus.OutboundMessage{
Channel: "test",
ChatID: "123",
Content: "hello world",
ReplyToMessageID: "msg-456",
}
err := m.SendMessage(context.Background(), msg)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
// SendMessage is synchronous — message should already be delivered
if len(received) != 1 {
t.Fatalf("expected 1 message sent, got %d", len(received))
}
if received[0].ReplyToMessageID != "msg-456" {
t.Fatalf("expected ReplyToMessageID msg-456, got %s", received[0].ReplyToMessageID)
}
if received[0].Content != "hello world" {
t.Fatalf("expected content 'hello world', got %s", received[0].Content)
}
}
func TestSendMessage_UnknownChannel(t *testing.T) {
m := newTestManager()
msg := bus.OutboundMessage{
Channel: "nonexistent",
ChatID: "123",
Content: "hello",
}
err := m.SendMessage(context.Background(), msg)
if err == nil {
t.Fatal("expected error for unknown channel")
}
}
func TestSendMessage_NoWorker(t *testing.T) {
m := newTestManager()
ch := &mockChannel{
sendFn: func(_ context.Context, _ bus.OutboundMessage) error { return nil },
}
m.channels["test"] = ch
// No worker registered
msg := bus.OutboundMessage{
Channel: "test",
ChatID: "123",
Content: "hello",
}
err := m.SendMessage(context.Background(), msg)
if err == nil {
t.Fatal("expected error when no worker exists")
}
}
func TestSendMessage_WithRetry(t *testing.T) {
m := newTestManager()
var callCount int
ch := &mockChannel{
sendFn: func(_ context.Context, _ bus.OutboundMessage) error {
callCount++
if callCount == 1 {
return fmt.Errorf("transient: %w", ErrTemporary)
}
return nil
},
}
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
m.channels["test"] = ch
m.workers["test"] = w
msg := bus.OutboundMessage{
Channel: "test",
ChatID: "123",
Content: "retry me",
}
err := m.SendMessage(context.Background(), msg)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if callCount != 2 {
t.Fatalf("expected 2 Send calls (1 failure + 1 success), got %d", callCount)
}
}
func TestSendMessage_WithSplitting(t *testing.T) {
m := newTestManager()
var received []string
ch := &mockChannelWithLength{
mockChannel: mockChannel{
sendFn: func(_ context.Context, msg bus.OutboundMessage) error {
received = append(received, msg.Content)
return nil
},
},
maxLen: 5,
}
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
m.channels["test"] = ch
m.workers["test"] = w
msg := bus.OutboundMessage{
Channel: "test",
ChatID: "123",
Content: "hello world",
}
err := m.SendMessage(context.Background(), msg)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if len(received) < 2 {
t.Fatalf("expected message to be split into at least 2 chunks, got %d", len(received))
}
}
func TestSendMessage_PreservesOrdering(t *testing.T) {
m := newTestManager()
var order []string
ch := &mockChannel{
sendFn: func(_ context.Context, msg bus.OutboundMessage) error {
order = append(order, msg.Content)
return nil
},
}
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
m.channels["test"] = ch
m.workers["test"] = w
// Send two messages sequentially — they must arrive in order
_ = m.SendMessage(context.Background(), bus.OutboundMessage{
Channel: "test", ChatID: "1", Content: "first",
})
_ = m.SendMessage(context.Background(), bus.OutboundMessage{
Channel: "test", ChatID: "1", Content: "second",
})
if len(order) != 2 {
t.Fatalf("expected 2 messages, got %d", len(order))
}
if order[0] != "first" || order[1] != "second" {
t.Fatalf("expected [first, second], got %v", order)
}
}
func TestManager_SendPlaceholder(t *testing.T) {
mgr := &Manager{
channels: make(map[string]Channel),