diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index fa521def2..d98191f33 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -413,9 +413,10 @@ var audioAnnotationRe = regexp.MustCompile(`\[(voice|audio)(?::[^\]]*)?\]`) // transcribeAudioInMessage resolves audio media refs, transcribes them, and // replaces audio annotations in msg.Content with the transcribed text. -func (al *AgentLoop) transcribeAudioInMessage(ctx context.Context, msg bus.InboundMessage) bus.InboundMessage { +// Returns the (possibly modified) message and true if audio was transcribed. +func (al *AgentLoop) transcribeAudioInMessage(ctx context.Context, msg bus.InboundMessage) (bus.InboundMessage, bool) { if al.transcriber == nil || al.mediaStore == nil || len(msg.Media) == 0 { - return msg + return msg, false } // Transcribe each audio media ref in order. @@ -439,10 +440,10 @@ func (al *AgentLoop) transcribeAudioInMessage(ctx context.Context, msg bus.Inbou } if len(transcriptions) == 0 { - return msg + return msg, false } - al.sendTranscriptionFeedback(msg.Channel, msg.ChatID, msg.MessageID, transcriptions) + al.sendTranscriptionFeedback(ctx, msg.Channel, msg.ChatID, msg.MessageID, transcriptions) // Replace audio annotations sequentially with transcriptions. idx := 0 @@ -461,45 +462,56 @@ func (al *AgentLoop) transcribeAudioInMessage(ctx context.Context, msg bus.Inbou } msg.Content = newContent - return msg + return msg, true } -// sendTranscriptionFeedback Asynchronously sends feedback to the user -// with the result of audio transcription if the option is enabled. -func (al *AgentLoop) sendTranscriptionFeedback(channel, chatID string, messageID string, validTexts []string) { +// sendTranscriptionFeedback sends feedback to the user with the result of +// audio transcription if the option is enabled. It sends the message directly +// through the channel (bypassing the bus queue) so that ordering with the +// subsequent placeholder is guaranteed. +func (al *AgentLoop) sendTranscriptionFeedback( + ctx context.Context, + channel, chatID, messageID string, + validTexts []string, +) { if !al.cfg.Voice.EchoTranscription { return } + if al.channelManager == nil { + return + } - go func() { - pubCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - var nonEmpty []string - for _, t := range validTexts { - if t != "" { - nonEmpty = append(nonEmpty, t) - } + var nonEmpty []string + for _, t := range validTexts { + if t != "" { + nonEmpty = append(nonEmpty, t) } + } - var feedbackMsg string - if len(nonEmpty) > 0 { - feedbackMsg = "Transcript: " + strings.Join(nonEmpty, "\n") - } else { - feedbackMsg = "No voice detected in the audio" - } + var feedbackMsg string + if len(nonEmpty) > 0 { + feedbackMsg = "Transcript: " + strings.Join(nonEmpty, "\n") + } else { + feedbackMsg = "No voice detected in the audio" + } - err := al.bus.PublishOutbound(pubCtx, bus.OutboundMessage{ - Channel: channel, - ChatID: chatID, - Content: feedbackMsg, - ReplyToMessageID: messageID, - SkipPlaceholder: true, // It serves to avoid consuming the message "Thinking..." - }) - if err != nil { - logger.WarnCF("voice", "Failed to send transcription feedback", map[string]any{"error": err.Error()}) - } - }() + ch, ok := al.channelManager.GetChannel(channel) + if !ok { + return + } + + sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + err := ch.Send(sendCtx, bus.OutboundMessage{ + Channel: channel, + ChatID: chatID, + Content: feedbackMsg, + ReplyToMessageID: messageID, + }) + if err != nil { + logger.WarnCF("voice", "Failed to send transcription feedback", map[string]any{"error": err.Error()}) + } } // inferMediaType determines the media type ("image", "audio", "video", "file") @@ -613,7 +625,14 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) }, ) - msg = al.transcribeAudioInMessage(ctx, msg) + var hadAudio bool + msg, hadAudio = al.transcribeAudioInMessage(ctx, msg) + + // For audio messages the placeholder was deferred by the channel. + // Now that transcription (and optional feedback) is done, send it. + if hadAudio && al.channelManager != nil { + al.channelManager.SendPlaceholder(ctx, msg.Channel, msg.ChatID) + } // Route system messages to processSystemMessage if msg.Channel == "system" { @@ -803,15 +822,6 @@ func (al *AgentLoop) runAgentLoop( // 2. Save user message to session agent.Sessions.AddMessage(opts.SessionKey, "user", opts.UserMessage) - // thinking message only for channels, not for background tasks - if opts.Channel != "" && opts.ChatID != "" && !constants.IsInternalChannel(opts.Channel) && !opts.NoHistory { - al.bus.PublishOutbound(ctx, bus.OutboundMessage{ - Channel: opts.Channel, - ChatID: opts.ChatID, - TriggerPlaceholder: true, - }) - } - // 3. Run LLM iteration loop finalContent, iteration, err := al.runLLMIteration(ctx, agent, messages, opts) if err != nil { diff --git a/pkg/bus/types.go b/pkg/bus/types.go index 7b7335327..12da3f1dd 100644 --- a/pkg/bus/types.go +++ b/pkg/bus/types.go @@ -30,12 +30,10 @@ type InboundMessage struct { } type OutboundMessage struct { - Channel string `json:"channel"` - ChatID string `json:"chat_id"` - Content string `json:"content"` - ReplyToMessageID string `json:"reply_to_message_id,omitempty"` - SkipPlaceholder bool `json:"skip_placeholder,omitempty"` // Tells Manager not to use Thinking - TriggerPlaceholder bool `json:"trigger_placeholder,omitempty"` + Channel string `json:"channel"` + ChatID string `json:"chat_id"` + Content string `json:"content"` + ReplyToMessageID string `json:"reply_to_message_id,omitempty"` } // MediaPart describes a single media attachment to send. diff --git a/pkg/channels/base.go b/pkg/channels/base.go index 334dc9254..edb5b6f08 100644 --- a/pkg/channels/base.go +++ b/pkg/channels/base.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "encoding/binary" "encoding/hex" + "regexp" "strconv" "strings" "sync/atomic" @@ -32,6 +33,9 @@ func init() { uniqueIDPrefix = hex.EncodeToString(b[:]) } +// audioAnnotationRe matches audio/voice annotations injected by channels (e.g. [voice], [audio: file.ogg]). +var audioAnnotationRe = regexp.MustCompile(`\[(voice|audio)(?::[^\]]*)?\]`) + // uniqueID generates a process-unique ID using a random prefix and an atomic counter. // This ID is intended for internal correlation (e.g. media scope keys) and is NOT // cryptographically secure — it must not be used in contexts where unpredictability matters. @@ -284,6 +288,17 @@ func (c *BaseChannel) HandleMessage( c.placeholderRecorder.RecordReactionUndo(c.name, chatID, undo) } } + // Placeholder — independent pipeline. + // Skip when the message contains audio: the agent will send the + // placeholder after transcription completes, so the user sees + // "Thinking…" only once the voice has been processed. + if !audioAnnotationRe.MatchString(content) { + if pc, ok := c.owner.(PlaceholderCapable); ok { + if phID, err := pc.SendPlaceholder(ctx, chatID); err == nil && phID != "" { + c.placeholderRecorder.RecordPlaceholder(c.name, chatID, phID) + } + } + } } if err := c.bus.PublishInbound(ctx, msg); err != nil { diff --git a/pkg/channels/manager.go b/pkg/channels/manager.go index d3ed02919..9e5ceeca1 100644 --- a/pkg/channels/manager.go +++ b/pkg/channels/manager.go @@ -100,6 +100,27 @@ func (m *Manager) RecordPlaceholder(channel, chatID, placeholderID string) { m.placeholders.Store(key, placeholderEntry{id: placeholderID, createdAt: time.Now()}) } +// SendPlaceholder sends a "Thinking…" placeholder for the given channel/chatID +// and records it for later editing. Returns true if a placeholder was sent. +func (m *Manager) SendPlaceholder(ctx context.Context, channel, chatID string) bool { + m.mu.RLock() + ch, ok := m.channels[channel] + m.mu.RUnlock() + if !ok { + return false + } + pc, ok := ch.(PlaceholderCapable) + if !ok { + return false + } + phID, err := pc.SendPlaceholder(ctx, chatID) + if err != nil || phID == "" { + return false + } + m.RecordPlaceholder(channel, chatID, phID) + return true +} + // RecordTypingStop registers a typing stop function for later invocation. // Implements PlaceholderRecorder. func (m *Manager) RecordTypingStop(channel, chatID string, stop func()) { @@ -134,15 +155,13 @@ func (m *Manager) preSend(ctx context.Context, name string, msg bus.OutboundMess } // 3. Try editing placeholder - if !msg.SkipPlaceholder { - if v, loaded := m.placeholders.LoadAndDelete(key); loaded { - if entry, ok := v.(placeholderEntry); ok && entry.id != "" { - if editor, ok := ch.(MessageEditor); ok { - if err := editor.EditMessage(ctx, msg.ChatID, entry.id, msg.Content); err == nil { - return true // edited successfully, skip Send - } - // edit failed → fall through to normal Send + if v, loaded := m.placeholders.LoadAndDelete(key); loaded { + if entry, ok := v.(placeholderEntry); ok && entry.id != "" { + if editor, ok := ch.(MessageEditor); ok { + if err := editor.EditMessage(ctx, msg.ChatID, entry.id, msg.Content); err == nil { + return true // edited successfully, skip Send } + // edit failed → fall through to normal Send } } } @@ -500,15 +519,6 @@ func (m *Manager) sendWithRetry(ctx context.Context, name string, w *channelWork return } - if msg.TriggerPlaceholder { - if pc, ok := w.ch.(PlaceholderCapable); ok { - if phID, err := pc.SendPlaceholder(ctx, msg.ChatID); err == nil && phID != "" { - m.RecordPlaceholder(name, msg.ChatID, phID) - } - } - return - } - // Pre-send: stop typing and try to edit placeholder if m.preSend(ctx, name, msg, w.ch) { return // placeholder was edited successfully, skip Send diff --git a/pkg/channels/manager_test.go b/pkg/channels/manager_test.go index 223c4f4de..9199285f7 100644 --- a/pkg/channels/manager_test.go +++ b/pkg/channels/manager_test.go @@ -877,7 +877,7 @@ func TestBuildMediaScope_WithMessageID(t *testing.T) { } } -func TestManager_PlaceholderLogic(t *testing.T) { +func TestManager_PlaceholderConsumedByResponse(t *testing.T) { mgr := &Manager{ channels: make(map[string]Channel), workers: make(map[string]*channelWorker), @@ -894,50 +894,37 @@ func TestManager_PlaceholderLogic(t *testing.T) { mgr.workers["mock"] = worker ctx := context.Background() + key := "mock:chat-1" - // Scenario 1: TriggerPlaceholder creates a placeholder but does NOT send text messages - msgTrigger := bus.OutboundMessage{ - Channel: "mock", - ChatID: "chat-1", - TriggerPlaceholder: true, + // Simulate a placeholder recorded by base.go HandleMessage + mgr.RecordPlaceholder("mock", "chat-1", "ph-123") + + if _, ok := mgr.placeholders.Load(key); !ok { + t.Fatal("expected placeholder to be recorded") } - mgr.sendWithRetry(ctx, "mock", worker, msgTrigger) - if mockCh.placeholdersSent != 1 { - t.Errorf("expected 1 placeholder sent, got %d", mockCh.placeholdersSent) + // Transcription feedback arrives first — it should consume the placeholder + // and be delivered via EditMessage, not Send. + msgTranscript := bus.OutboundMessage{ + Channel: "mock", + ChatID: "chat-1", + Content: "Transcript: hello", + } + mgr.sendWithRetry(ctx, "mock", worker, msgTranscript) + + if mockCh.editedMessages != 1 { + t.Errorf("expected 1 edited message (placeholder consumed by transcript), got %d", mockCh.editedMessages) } if len(mockCh.sentMessages) != 0 { - t.Errorf("expected 0 normal messages sent, got %d", len(mockCh.sentMessages)) + t.Errorf("expected 0 normal messages (transcript used edit), got %d", len(mockCh.sentMessages)) } - // Verify that the placeholder has been registered in the manager - key := "mock:chat-1" - if _, ok := mgr.placeholders.Load(key); !ok { - t.Errorf("expected placeholder to be recorded in manager") + // Placeholder should be gone now + if _, ok := mgr.placeholders.Load(key); ok { + t.Error("expected placeholder to be removed after being consumed") } - // Scenario 2: SkipPlaceholder (simulates transcription). Must send normally, ignoring Edit. - msgSkip := bus.OutboundMessage{ - Channel: "mock", - ChatID: "chat-1", - Content: "Transcript: hello", - SkipPlaceholder: true, - } - mgr.sendWithRetry(ctx, "mock", worker, msgSkip) - - if mockCh.editedMessages != 0 { - t.Errorf("expected 0 edited messages due to SkipPlaceholder, got %d", mockCh.editedMessages) - } - if len(mockCh.sentMessages) != 1 { - t.Errorf("expected 1 normal message sent, got %d", len(mockCh.sentMessages)) - } - - // The placeholder must still exist for the next response - if _, ok := mgr.placeholders.Load(key); !ok { - t.Errorf("expected placeholder to STILL be in manager after SkipPlaceholder") - } - - // Scenario 3: Normal Message (simulates the final LLM response). Must consume the placeholder. + // Final LLM response arrives — no placeholder left, so it goes through Send msgFinal := bus.OutboundMessage{ Channel: "mock", ChatID: "chat-1", @@ -945,11 +932,44 @@ func TestManager_PlaceholderLogic(t *testing.T) { } mgr.sendWithRetry(ctx, "mock", worker, msgFinal) - if mockCh.editedMessages != 1 { - t.Errorf("expected 1 edited message (consuming placeholder), got %d", mockCh.editedMessages) - } - // The placeholder must have been removed - if _, ok := mgr.placeholders.Load(key); ok { - t.Errorf("expected placeholder to be removed after being consumed") + if len(mockCh.sentMessages) != 1 { + t.Errorf("expected 1 normal message sent, got %d", len(mockCh.sentMessages)) + } +} + +func TestManager_SendPlaceholder(t *testing.T) { + mgr := &Manager{ + channels: make(map[string]Channel), + workers: make(map[string]*channelWorker), + placeholders: sync.Map{}, + } + + mockCh := &mockChannel{ + sendFn: func(ctx context.Context, msg bus.OutboundMessage) error { + return nil + }, + } + mgr.channels["mock"] = mockCh + + ctx := context.Background() + + // SendPlaceholder should send a placeholder and record it + ok := mgr.SendPlaceholder(ctx, "mock", "chat-1") + if !ok { + t.Fatal("expected SendPlaceholder to succeed") + } + if mockCh.placeholdersSent != 1 { + t.Errorf("expected 1 placeholder sent, got %d", mockCh.placeholdersSent) + } + + key := "mock:chat-1" + if _, loaded := mgr.placeholders.Load(key); !loaded { + t.Error("expected placeholder to be recorded in manager") + } + + // SendPlaceholder on unknown channel should return false + ok = mgr.SendPlaceholder(ctx, "unknown", "chat-1") + if ok { + t.Error("expected SendPlaceholder to fail for unknown channel") } }