diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index b12ad5b1d..a7dcb0b9f 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -2682,7 +2682,10 @@ turnLoop: allResponsesHandled = false } - if !toolResult.Silent && toolResult.ForUser != "" && ts.opts.SendResponse { + shouldSendForUser := !toolResult.Silent && + toolResult.ForUser != "" && + (ts.opts.SendResponse || toolResult.ResponseHandled) + if shouldSendForUser { al.bus.PublishOutbound(ctx, outboundMessageForTurn(ts, toolResult.ForUser)) logger.DebugCF("agent", "Sent tool result to user", map[string]any{ diff --git a/pkg/agent/loop_test.go b/pkg/agent/loop_test.go index 6d6ee4a6d..b544ffb4f 100644 --- a/pkg/agent/loop_test.go +++ b/pkg/agent/loop_test.go @@ -39,7 +39,13 @@ func (f *fakeChannel) ReasoningChannelID() string { return f.id type fakeMediaChannel struct { fakeChannel - sentMedia []bus.OutboundMediaMessage + sentMessages []bus.OutboundMessage + sentMedia []bus.OutboundMediaMessage +} + +func (f *fakeMediaChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) { + f.sentMessages = append(f.sentMessages, msg) + return nil, nil } func (f *fakeMediaChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) { @@ -740,6 +746,63 @@ func TestProcessMessage_HandledToolProcessesQueuedSteeringBeforeReturning(t *tes } } +func TestRunAgentLoop_ResponseHandledToolPublishesForUserWhenSendResponseDisabled(t *testing.T) { + tmpDir := t.TempDir() + cfg := config.DefaultConfig() + cfg.Agents.Defaults.Workspace = tmpDir + cfg.Agents.Defaults.ModelName = "test-model" + cfg.Agents.Defaults.MaxTokens = 4096 + cfg.Agents.Defaults.MaxToolIterations = 10 + + msgBus := bus.NewMessageBus() + provider := &handledUserProvider{} + al := NewAgentLoop(cfg, msgBus, provider) + + store := media.NewFileMediaStore() + al.SetMediaStore(store) + telegramChannel := &fakeMediaChannel{fakeChannel: fakeChannel{id: "rid-telegram"}} + al.SetChannelManager(newStartedTestChannelManager(t, msgBus, store, "telegram", telegramChannel)) + al.RegisterTool(&handledUserTool{}) + + defaultAgent := al.registry.GetDefaultAgent() + if defaultAgent == nil { + t.Fatal("expected default agent") + } + + response, err := al.runAgentLoop(context.Background(), defaultAgent, processOptions{ + SessionKey: "session-1", + Channel: "telegram", + ChatID: "chat1", + UserMessage: "take a screenshot of the screen and send it to me", + DefaultResponse: defaultResponse, + EnableSummary: false, + SendResponse: false, + InboundContext: &bus.InboundContext{ + Channel: "telegram", + ChatID: "chat1", + ChatType: "direct", + SenderID: "user1", + }, + }) + if err != nil { + t.Fatalf("runAgentLoop() error = %v", err) + } + if response != "" { + t.Fatalf("expected no final response when tool already handled delivery, got %q", response) + } + + deadline := time.Now().Add(2 * time.Second) + for len(telegramChannel.sentMessages) == 0 && time.Now().Before(deadline) { + time.Sleep(10 * time.Millisecond) + } + if len(telegramChannel.sentMessages) != 1 { + t.Fatalf("expected exactly 1 sent text message, got %d", len(telegramChannel.sentMessages)) + } + if telegramChannel.sentMessages[0].Content != "Handled user output from tool." { + t.Fatalf("unexpected sent text message: %+v", telegramChannel.sentMessages[0]) + } +} + func TestAppendEventContextFields_IncludesInboundRouteAndScope(t *testing.T) { fields := map[string]any{} @@ -1162,6 +1225,36 @@ func (m *handledMediaProvider) GetDefaultModel() string { return "handled-media-model" } +type handledUserProvider struct { + calls int +} + +func (m *handledUserProvider) Chat( + ctx context.Context, + messages []providers.Message, + tools []providers.ToolDefinition, + model string, + opts map[string]any, +) (*providers.LLMResponse, error) { + m.calls++ + if m.calls == 1 { + return &providers.LLMResponse{ + Content: "Delivering the result now.", + ToolCalls: []providers.ToolCall{{ + ID: "call_handled_user", + Type: "function", + Name: "handled_user_tool", + Arguments: map[string]any{}, + }}, + }, nil + } + return &providers.LLMResponse{}, nil +} + +func (m *handledUserProvider) GetDefaultModel() string { + return "handled-user-model" +} + type artifactThenSendProvider struct { calls int } @@ -1331,6 +1424,24 @@ func (m *handledMediaTool) Execute(ctx context.Context, args map[string]any) *to return tools.MediaResult("Attachment delivered by tool.", []string{ref}).WithResponseHandled() } +type handledUserTool struct{} + +func (m *handledUserTool) Name() string { return "handled_user_tool" } +func (m *handledUserTool) Description() string { + return "Returns a user-visible result and marks delivery as handled" +} + +func (m *handledUserTool) Parameters() map[string]any { + return map[string]any{ + "type": "object", + "properties": map[string]any{}, + } +} + +func (m *handledUserTool) Execute(ctx context.Context, args map[string]any) *tools.ToolResult { + return tools.UserResult("Handled user output from tool.").WithResponseHandled() +} + type handledMediaWithSteeringProvider struct { calls int } diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 45e755673..03ef3123f 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -40,6 +40,8 @@ type MessageBus struct { inbound chan InboundMessage outbound chan OutboundMessage outboundMedia chan OutboundMediaMessage + audioChunks chan AudioChunk + voiceControls chan VoiceControl closeOnce sync.Once done chan struct{} @@ -53,6 +55,8 @@ func NewMessageBus() *MessageBus { inbound: make(chan InboundMessage, defaultBusBufferSize), outbound: make(chan OutboundMessage, defaultBusBufferSize), outboundMedia: make(chan OutboundMediaMessage, defaultBusBufferSize), + audioChunks: make(chan AudioChunk, defaultBusBufferSize*4), // Audio chunks need more buffer. + voiceControls: make(chan VoiceControl, defaultBusBufferSize), done: make(chan struct{}), } } @@ -121,6 +125,22 @@ func (mb *MessageBus) OutboundMediaChan() <-chan OutboundMediaMessage { return mb.outboundMedia } +func (mb *MessageBus) PublishAudioChunk(ctx context.Context, chunk AudioChunk) error { + return publish(ctx, mb, mb.audioChunks, chunk) +} + +func (mb *MessageBus) AudioChunksChan() <-chan AudioChunk { + return mb.audioChunks +} + +func (mb *MessageBus) PublishVoiceControl(ctx context.Context, ctrl VoiceControl) error { + return publish(ctx, mb, mb.voiceControls, ctrl) +} + +func (mb *MessageBus) VoiceControlsChan() <-chan VoiceControl { + return mb.voiceControls +} + // SetStreamDelegate registers a StreamDelegate (typically the channel Manager). func (mb *MessageBus) SetStreamDelegate(d StreamDelegate) { mb.streamDelegate.Store(d) @@ -150,6 +170,8 @@ func (mb *MessageBus) Close() { close(mb.inbound) close(mb.outbound) close(mb.outboundMedia) + close(mb.audioChunks) + close(mb.voiceControls) // clean up any remaining messages in channels drained := 0 @@ -162,6 +184,12 @@ func (mb *MessageBus) Close() { for range mb.outboundMedia { drained++ } + for range mb.audioChunks { + drained++ + } + for range mb.voiceControls { + drained++ + } if drained > 0 { logger.DebugCF("bus", "Drained buffered messages during close", map[string]any{ diff --git a/pkg/bus/bus_test.go b/pkg/bus/bus_test.go index 18d1d1df8..b67d847d1 100644 --- a/pkg/bus/bus_test.go +++ b/pkg/bus/bus_test.go @@ -230,6 +230,57 @@ func TestPublishOutboundMedia_MirrorsContextToLegacyFields(t *testing.T) { } } +func TestPublishAudioChunkSubscribe(t *testing.T) { + mb := NewMessageBus() + defer mb.Close() + + chunk := AudioChunk{ + SessionID: "voice-1", + SpeakerID: "speaker-1", + ChatID: "chat-1", + Channel: "discord", + Sequence: 7, + Format: "opus", + Data: []byte{0x01, 0x02}, + } + + if err := mb.PublishAudioChunk(context.Background(), chunk); err != nil { + t.Fatalf("PublishAudioChunk failed: %v", err) + } + + got, ok := <-mb.AudioChunksChan() + if !ok { + t.Fatal("AudioChunksChan returned ok=false") + } + if got.SessionID != "voice-1" || got.Sequence != 7 { + t.Fatalf("unexpected audio chunk: %+v", got) + } +} + +func TestPublishVoiceControlSubscribe(t *testing.T) { + mb := NewMessageBus() + defer mb.Close() + + ctrl := VoiceControl{ + SessionID: "voice-1", + ChatID: "chat-1", + Type: "command", + Action: "start", + } + + if err := mb.PublishVoiceControl(context.Background(), ctrl); err != nil { + t.Fatalf("PublishVoiceControl failed: %v", err) + } + + got, ok := <-mb.VoiceControlsChan() + if !ok { + t.Fatal("VoiceControlsChan returned ok=false") + } + if got.Type != "command" || got.Action != "start" { + t.Fatalf("unexpected voice control: %+v", got) + } +} + func TestNewOutboundContext_NormalizesReplyAddress(t *testing.T) { ctx := NewOutboundContext(" telegram ", " chat-42 ", " msg-9 ") if ctx.Channel != "telegram" { diff --git a/pkg/bus/types.go b/pkg/bus/types.go index cccfc8baf..0b2c1c92a 100644 --- a/pkg/bus/types.go +++ b/pkg/bus/types.go @@ -74,3 +74,25 @@ type OutboundMediaMessage struct { Context InboundContext `json:"context"` Parts []MediaPart `json:"parts"` } + +// AudioChunk represents a chunk of streaming voice data. +type AudioChunk struct { + SessionID string `json:"session_id"` + SpeakerID string `json:"speaker_id"` // User ID or SSRC + ChatID string `json:"chat_id"` // Where to respond + Channel string `json:"channel"` // Source channel type (e.g. "discord") + Sequence uint64 `json:"sequence"` + Timestamp uint32 `json:"timestamp"` + SampleRate int `json:"sample_rate"` + Channels int `json:"channels"` + Format string `json:"format"` // "opus", "pcm", etc + Data []byte `json:"data"` +} + +// VoiceControl represents state or commands for voice sessions. +type VoiceControl struct { + SessionID string `json:"session_id"` + ChatID string `json:"chat_id"` + Type string `json:"type"` // "state", "command" + Action string `json:"action"` // "idle", "listening", "start", "stop", "leave" +}