Merge pull request #1214 from afjcjsbx/feat/echo-voice-audio-transcription

feat(channel): echo voice audio transcription feedback
This commit is contained in:
Mauro
2026-03-11 08:45:25 +01:00
committed by GitHub
11 changed files with 474 additions and 19 deletions
+3
View File
@@ -477,6 +477,9 @@
"enabled": false,
"monitor_usb": true
},
"voice": {
"echo_transcription": false
},
"gateway": {
"host": "127.0.0.1",
"port": 18790
+56 -5
View File
@@ -467,9 +467,10 @@ var audioAnnotationRe = regexp.MustCompile(`\[(voice|audio)(?::[^\]]*)?\]`)
// transcribeAudioInMessage resolves audio media refs, transcribes them, and
// replaces audio annotations in msg.Content with the transcribed text.
func (al *AgentLoop) transcribeAudioInMessage(ctx context.Context, msg bus.InboundMessage) bus.InboundMessage {
// Returns the (possibly modified) message and true if audio was transcribed.
func (al *AgentLoop) transcribeAudioInMessage(ctx context.Context, msg bus.InboundMessage) (bus.InboundMessage, bool) {
if al.transcriber == nil || al.mediaStore == nil || len(msg.Media) == 0 {
return msg
return msg, false
}
// Transcribe each audio media ref in order.
@@ -493,9 +494,11 @@ func (al *AgentLoop) transcribeAudioInMessage(ctx context.Context, msg bus.Inbou
}
if len(transcriptions) == 0 {
return msg
return msg, false
}
al.sendTranscriptionFeedback(ctx, msg.Channel, msg.ChatID, msg.MessageID, transcriptions)
// Replace audio annotations sequentially with transcriptions.
idx := 0
newContent := audioAnnotationRe.ReplaceAllStringFunc(msg.Content, func(match string) string {
@@ -513,7 +516,48 @@ func (al *AgentLoop) transcribeAudioInMessage(ctx context.Context, msg bus.Inbou
}
msg.Content = newContent
return msg
return msg, true
}
// sendTranscriptionFeedback sends feedback to the user with the result of
// audio transcription if the option is enabled. It uses Manager.SendMessage
// which executes synchronously (rate limiting, splitting, retry) so that
// ordering with the subsequent placeholder is guaranteed.
func (al *AgentLoop) sendTranscriptionFeedback(
ctx context.Context,
channel, chatID, messageID string,
validTexts []string,
) {
if !al.cfg.Voice.EchoTranscription {
return
}
if al.channelManager == nil {
return
}
var nonEmpty []string
for _, t := range validTexts {
if t != "" {
nonEmpty = append(nonEmpty, t)
}
}
var feedbackMsg string
if len(nonEmpty) > 0 {
feedbackMsg = "Transcript: " + strings.Join(nonEmpty, "\n")
} else {
feedbackMsg = "No voice detected in the audio"
}
err := al.channelManager.SendMessage(ctx, bus.OutboundMessage{
Channel: channel,
ChatID: chatID,
Content: feedbackMsg,
ReplyToMessageID: messageID,
})
if err != nil {
logger.WarnCF("voice", "Failed to send transcription feedback", map[string]any{"error": err.Error()})
}
}
// inferMediaType determines the media type ("image", "audio", "video", "file")
@@ -627,7 +671,14 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
},
)
msg = al.transcribeAudioInMessage(ctx, msg)
var hadAudio bool
msg, hadAudio = al.transcribeAudioInMessage(ctx, msg)
// For audio messages the placeholder was deferred by the channel.
// Now that transcription (and optional feedback) is done, send it.
if hadAudio && al.channelManager != nil {
al.channelManager.SendPlaceholder(ctx, msg.Channel, msg.ChatID)
}
// Route system messages to processSystemMessage
if msg.Channel == "system" {
+4 -3
View File
@@ -30,9 +30,10 @@ type InboundMessage struct {
}
type OutboundMessage struct {
Channel string `json:"channel"`
ChatID string `json:"chat_id"`
Content string `json:"content"`
Channel string `json:"channel"`
ChatID string `json:"chat_id"`
Content string `json:"content"`
ReplyToMessageID string `json:"reply_to_message_id,omitempty"`
}
// MediaPart describes a single media attachment to send.
+13 -4
View File
@@ -5,6 +5,7 @@ import (
"crypto/rand"
"encoding/binary"
"encoding/hex"
"regexp"
"strconv"
"strings"
"sync/atomic"
@@ -32,6 +33,9 @@ func init() {
uniqueIDPrefix = hex.EncodeToString(b[:])
}
// audioAnnotationRe matches audio/voice annotations injected by channels (e.g. [voice], [audio: file.ogg]).
var audioAnnotationRe = regexp.MustCompile(`\[(voice|audio)(?::[^\]]*)?\]`)
// uniqueID generates a process-unique ID using a random prefix and an atomic counter.
// This ID is intended for internal correlation (e.g. media scope keys) and is NOT
// cryptographically secure — it must not be used in contexts where unpredictability matters.
@@ -284,10 +288,15 @@ func (c *BaseChannel) HandleMessage(
c.placeholderRecorder.RecordReactionUndo(c.name, chatID, undo)
}
}
// Placeholder — independent pipeline
if pc, ok := c.owner.(PlaceholderCapable); ok {
if phID, err := pc.SendPlaceholder(ctx, chatID); err == nil && phID != "" {
c.placeholderRecorder.RecordPlaceholder(c.name, chatID, phID)
// Placeholder — independent pipeline.
// Skip when the message contains audio: the agent will send the
// placeholder after transcription completes, so the user sees
// "Thinking…" only once the voice has been processed.
if !audioAnnotationRe.MatchString(content) {
if pc, ok := c.owner.(PlaceholderCapable); ok {
if phID, err := pc.SendPlaceholder(ctx, chatID); err == nil && phID != "" {
c.placeholderRecorder.RecordPlaceholder(c.name, chatID, phID)
}
}
}
}
+18 -3
View File
@@ -134,7 +134,7 @@ func (c *DiscordChannel) Send(ctx context.Context, msg bus.OutboundMessage) erro
return nil
}
return c.sendChunk(ctx, channelID, msg.Content)
return c.sendChunk(ctx, channelID, msg.Content, msg.ReplyToMessageID)
}
// SendMedia implements the channels.MediaSender interface.
@@ -259,14 +259,29 @@ func (c *DiscordChannel) SendPlaceholder(ctx context.Context, chatID string) (st
return msg.ID, nil
}
func (c *DiscordChannel) sendChunk(ctx context.Context, channelID, content string) error {
func (c *DiscordChannel) sendChunk(ctx context.Context, channelID, content, replyToID string) error {
// Use the passed ctx for timeout control
sendCtx, cancel := context.WithTimeout(ctx, sendTimeout)
defer cancel()
done := make(chan error, 1)
go func() {
_, err := c.session.ChannelMessageSend(channelID, content)
var err error
// If we have an ID, we send the message as "Reply"
if replyToID != "" {
_, err = c.session.ChannelMessageSendComplex(channelID, &discordgo.MessageSend{
Content: content,
Reference: &discordgo.MessageReference{
MessageID: replyToID,
ChannelID: channelID,
},
})
} else {
// Otherwise, we send a normal message
_, err = c.session.ChannelMessageSend(channelID, content)
}
done <- err
}()
+54
View File
@@ -102,6 +102,27 @@ func (m *Manager) RecordPlaceholder(channel, chatID, placeholderID string) {
m.placeholders.Store(key, placeholderEntry{id: placeholderID, createdAt: time.Now()})
}
// SendPlaceholder sends a "Thinking…" placeholder for the given channel/chatID
// and records it for later editing. Returns true if a placeholder was sent.
func (m *Manager) SendPlaceholder(ctx context.Context, channel, chatID string) bool {
m.mu.RLock()
ch, ok := m.channels[channel]
m.mu.RUnlock()
if !ok {
return false
}
pc, ok := ch.(PlaceholderCapable)
if !ok {
return false
}
phID, err := pc.SendPlaceholder(ctx, chatID)
if err != nil || phID == "" {
return false
}
m.RecordPlaceholder(channel, chatID, phID)
return true
}
// RecordTypingStop registers a typing stop function for later invocation.
// Implements PlaceholderRecorder.
func (m *Manager) RecordTypingStop(channel, chatID string, stop func()) {
@@ -813,6 +834,39 @@ func (m *Manager) UnregisterChannel(name string) {
delete(m.channels, name)
}
// SendMessage sends an outbound message synchronously through the channel
// worker's rate limiter and retry logic. It blocks until the message is
// delivered (or all retries are exhausted), which preserves ordering when
// a subsequent operation depends on the message having been sent.
func (m *Manager) SendMessage(ctx context.Context, msg bus.OutboundMessage) error {
m.mu.RLock()
_, exists := m.channels[msg.Channel]
w, wExists := m.workers[msg.Channel]
m.mu.RUnlock()
if !exists {
return fmt.Errorf("channel %s not found", msg.Channel)
}
if !wExists || w == nil {
return fmt.Errorf("channel %s has no active worker", msg.Channel)
}
maxLen := 0
if mlp, ok := w.ch.(MessageLengthProvider); ok {
maxLen = mlp.MaxMessageLength()
}
if maxLen > 0 && len([]rune(msg.Content)) > maxLen {
for _, chunk := range SplitMessage(msg.Content, maxLen) {
chunkMsg := msg
chunkMsg.Content = chunk
m.sendWithRetry(ctx, msg.Channel, w, chunkMsg)
}
} else {
m.sendWithRetry(ctx, msg.Channel, w, msg)
}
return nil
}
func (m *Manager) SendToChannel(ctx context.Context, channelName, chatID, content string) error {
m.mu.RLock()
_, exists := m.channels[channelName]
+300 -1
View File
@@ -17,16 +17,32 @@ import (
// mockChannel is a test double that delegates Send to a configurable function.
type mockChannel struct {
BaseChannel
sendFn func(ctx context.Context, msg bus.OutboundMessage) error
sendFn func(ctx context.Context, msg bus.OutboundMessage) error
sentMessages []bus.OutboundMessage
placeholdersSent int
editedMessages int
lastPlaceholderID string
}
func (m *mockChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
m.sentMessages = append(m.sentMessages, msg)
return m.sendFn(ctx, msg)
}
func (m *mockChannel) Start(ctx context.Context) error { return nil }
func (m *mockChannel) Stop(ctx context.Context) error { return nil }
func (m *mockChannel) SendPlaceholder(ctx context.Context, chatID string) (string, error) {
m.placeholdersSent++
m.lastPlaceholderID = "mock-ph-123"
return m.lastPlaceholderID, nil
}
func (m *mockChannel) EditMessage(ctx context.Context, chatID, messageID, content string) error {
m.editedMessages++
return nil
}
// newTestManager creates a minimal Manager suitable for unit tests.
func newTestManager() *Manager {
return &Manager{
@@ -860,3 +876,286 @@ func TestBuildMediaScope_WithMessageID(t *testing.T) {
t.Fatalf("expected %s, got %s", expected, scope)
}
}
func TestManager_PlaceholderConsumedByResponse(t *testing.T) {
mgr := &Manager{
channels: make(map[string]Channel),
workers: make(map[string]*channelWorker),
placeholders: sync.Map{},
}
mockCh := &mockChannel{
sendFn: func(ctx context.Context, msg bus.OutboundMessage) error {
return nil
},
}
worker := newChannelWorker("mock", mockCh)
mgr.channels["mock"] = mockCh
mgr.workers["mock"] = worker
ctx := context.Background()
key := "mock:chat-1"
// Simulate a placeholder recorded by base.go HandleMessage
mgr.RecordPlaceholder("mock", "chat-1", "ph-123")
if _, ok := mgr.placeholders.Load(key); !ok {
t.Fatal("expected placeholder to be recorded")
}
// Transcription feedback arrives first — it should consume the placeholder
// and be delivered via EditMessage, not Send.
msgTranscript := bus.OutboundMessage{
Channel: "mock",
ChatID: "chat-1",
Content: "Transcript: hello",
}
mgr.sendWithRetry(ctx, "mock", worker, msgTranscript)
if mockCh.editedMessages != 1 {
t.Errorf("expected 1 edited message (placeholder consumed by transcript), got %d", mockCh.editedMessages)
}
if len(mockCh.sentMessages) != 0 {
t.Errorf("expected 0 normal messages (transcript used edit), got %d", len(mockCh.sentMessages))
}
// Placeholder should be gone now
if _, ok := mgr.placeholders.Load(key); ok {
t.Error("expected placeholder to be removed after being consumed")
}
// Final LLM response arrives — no placeholder left, so it goes through Send
msgFinal := bus.OutboundMessage{
Channel: "mock",
ChatID: "chat-1",
Content: "Final Answer",
}
mgr.sendWithRetry(ctx, "mock", worker, msgFinal)
if len(mockCh.sentMessages) != 1 {
t.Errorf("expected 1 normal message sent, got %d", len(mockCh.sentMessages))
}
}
func TestSendMessage_Synchronous(t *testing.T) {
m := newTestManager()
var received []bus.OutboundMessage
ch := &mockChannel{
sendFn: func(_ context.Context, msg bus.OutboundMessage) error {
received = append(received, msg)
return nil
},
}
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
m.channels["test"] = ch
m.workers["test"] = w
msg := bus.OutboundMessage{
Channel: "test",
ChatID: "123",
Content: "hello world",
ReplyToMessageID: "msg-456",
}
err := m.SendMessage(context.Background(), msg)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
// SendMessage is synchronous — message should already be delivered
if len(received) != 1 {
t.Fatalf("expected 1 message sent, got %d", len(received))
}
if received[0].ReplyToMessageID != "msg-456" {
t.Fatalf("expected ReplyToMessageID msg-456, got %s", received[0].ReplyToMessageID)
}
if received[0].Content != "hello world" {
t.Fatalf("expected content 'hello world', got %s", received[0].Content)
}
}
func TestSendMessage_UnknownChannel(t *testing.T) {
m := newTestManager()
msg := bus.OutboundMessage{
Channel: "nonexistent",
ChatID: "123",
Content: "hello",
}
err := m.SendMessage(context.Background(), msg)
if err == nil {
t.Fatal("expected error for unknown channel")
}
}
func TestSendMessage_NoWorker(t *testing.T) {
m := newTestManager()
ch := &mockChannel{
sendFn: func(_ context.Context, _ bus.OutboundMessage) error { return nil },
}
m.channels["test"] = ch
// No worker registered
msg := bus.OutboundMessage{
Channel: "test",
ChatID: "123",
Content: "hello",
}
err := m.SendMessage(context.Background(), msg)
if err == nil {
t.Fatal("expected error when no worker exists")
}
}
func TestSendMessage_WithRetry(t *testing.T) {
m := newTestManager()
var callCount int
ch := &mockChannel{
sendFn: func(_ context.Context, _ bus.OutboundMessage) error {
callCount++
if callCount == 1 {
return fmt.Errorf("transient: %w", ErrTemporary)
}
return nil
},
}
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
m.channels["test"] = ch
m.workers["test"] = w
msg := bus.OutboundMessage{
Channel: "test",
ChatID: "123",
Content: "retry me",
}
err := m.SendMessage(context.Background(), msg)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if callCount != 2 {
t.Fatalf("expected 2 Send calls (1 failure + 1 success), got %d", callCount)
}
}
func TestSendMessage_WithSplitting(t *testing.T) {
m := newTestManager()
var received []string
ch := &mockChannelWithLength{
mockChannel: mockChannel{
sendFn: func(_ context.Context, msg bus.OutboundMessage) error {
received = append(received, msg.Content)
return nil
},
},
maxLen: 5,
}
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
m.channels["test"] = ch
m.workers["test"] = w
msg := bus.OutboundMessage{
Channel: "test",
ChatID: "123",
Content: "hello world",
}
err := m.SendMessage(context.Background(), msg)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if len(received) < 2 {
t.Fatalf("expected message to be split into at least 2 chunks, got %d", len(received))
}
}
func TestSendMessage_PreservesOrdering(t *testing.T) {
m := newTestManager()
var order []string
ch := &mockChannel{
sendFn: func(_ context.Context, msg bus.OutboundMessage) error {
order = append(order, msg.Content)
return nil
},
}
w := &channelWorker{
ch: ch,
limiter: rate.NewLimiter(rate.Inf, 1),
}
m.channels["test"] = ch
m.workers["test"] = w
// Send two messages sequentially — they must arrive in order
_ = m.SendMessage(context.Background(), bus.OutboundMessage{
Channel: "test", ChatID: "1", Content: "first",
})
_ = m.SendMessage(context.Background(), bus.OutboundMessage{
Channel: "test", ChatID: "1", Content: "second",
})
if len(order) != 2 {
t.Fatalf("expected 2 messages, got %d", len(order))
}
if order[0] != "first" || order[1] != "second" {
t.Fatalf("expected [first, second], got %v", order)
}
}
func TestManager_SendPlaceholder(t *testing.T) {
mgr := &Manager{
channels: make(map[string]Channel),
workers: make(map[string]*channelWorker),
placeholders: sync.Map{},
}
mockCh := &mockChannel{
sendFn: func(ctx context.Context, msg bus.OutboundMessage) error {
return nil
},
}
mgr.channels["mock"] = mockCh
ctx := context.Background()
// SendPlaceholder should send a placeholder and record it
ok := mgr.SendPlaceholder(ctx, "mock", "chat-1")
if !ok {
t.Fatal("expected SendPlaceholder to succeed")
}
if mockCh.placeholdersSent != 1 {
t.Errorf("expected 1 placeholder sent, got %d", mockCh.placeholdersSent)
}
key := "mock:chat-1"
if _, loaded := mgr.placeholders.Load(key); !loaded {
t.Error("expected placeholder to be recorded in manager")
}
// SendPlaceholder on unknown channel should return false
ok = mgr.SendPlaceholder(ctx, "unknown", "chat-1")
if ok {
t.Error("expected SendPlaceholder to fail for unknown channel")
}
}
+5 -1
View File
@@ -122,7 +122,11 @@ func (c *SlackChannel) Send(ctx context.Context, msg bus.OutboundMessage) error
slack.MsgOptionText(msg.Content, false),
}
if threadTS != "" {
if msg.ReplyToMessageID != "" && threadTS == "" {
// Answer to the message by creating a Thread under it
opts = append(opts, slack.MsgOptionTS(msg.ReplyToMessageID))
} else if threadTS != "" {
// If we are already in a thread, continue in the thread
opts = append(opts, slack.MsgOptionTS(threadTS))
}
+13 -2
View File
@@ -180,6 +180,7 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
// The Manager already splits messages to ≤4000 chars (WithMaxMessageLength),
// so msg.Content is guaranteed to be within that limit. We still need to
// check if HTML expansion pushes it beyond Telegram's 4096-char API limit.
replyToID := msg.ReplyToMessageID
queue := []string{msg.Content}
for len(queue) > 0 {
chunk := queue[0]
@@ -200,9 +201,11 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
continue
}
if err := c.sendHTMLChunk(ctx, chatID, threadID, htmlContent, chunk); err != nil {
if err := c.sendHTMLChunk(ctx, chatID, threadID, htmlContent, chunk, replyToID); err != nil {
return err
}
// Only the first chunk should be a reply; subsequent chunks are normal messages.
replyToID = ""
}
return nil
@@ -211,12 +214,20 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
// sendHTMLChunk sends a single HTML message, falling back to the original
// markdown as plain text on parse failure so users never see raw HTML tags.
func (c *TelegramChannel) sendHTMLChunk(
ctx context.Context, chatID int64, threadID int, htmlContent, mdFallback string,
ctx context.Context, chatID int64, threadID int, htmlContent, mdFallback string, replyToID string,
) error {
tgMsg := tu.Message(tu.ID(chatID), htmlContent)
tgMsg.ParseMode = telego.ModeHTML
tgMsg.MessageThreadID = threadID
if replyToID != "" {
if mid, parseErr := strconv.Atoi(replyToID); parseErr == nil {
tgMsg.ReplyParameters = &telego.ReplyParameters{
MessageID: mid,
}
}
}
if _, err := c.bot.SendMessage(ctx, tgMsg); err != nil {
logger.ErrorCF("telegram", "HTML parse failed, falling back to plain text", map[string]any{
"error": err.Error(),
+5
View File
@@ -59,6 +59,7 @@ type Config struct {
Tools ToolsConfig `json:"tools"`
Heartbeat HeartbeatConfig `json:"heartbeat"`
Devices DevicesConfig `json:"devices"`
Voice VoiceConfig `json:"voice"`
// BuildInfo contains build-time version information
BuildInfo BuildInfo `json:"build_info,omitempty"`
}
@@ -472,6 +473,10 @@ type DevicesConfig struct {
MonitorUSB bool `json:"monitor_usb" env:"PICOCLAW_DEVICES_MONITOR_USB"`
}
type VoiceConfig struct {
EchoTranscription bool `json:"echo_transcription" env:"PICOCLAW_VOICE_ECHO_TRANSCRIPTION"`
}
type ProvidersConfig struct {
Anthropic ProviderConfig `json:"anthropic"`
OpenAI OpenAIProviderConfig `json:"openai"`
+3
View File
@@ -510,6 +510,9 @@ func DefaultConfig() *Config {
Enabled: false,
MonitorUSB: true,
},
Voice: VoiceConfig{
EchoTranscription: false,
},
BuildInfo: BuildInfo{
Version: Version,
GitCommit: GitCommit,