refactor(runtime): merge bus context and handled tool delivery

This commit is contained in:
Hoshina
2026-04-07 21:05:53 +08:00
parent 168b75ae21
commit 718a5e7c75
5 changed files with 217 additions and 2 deletions
+4 -1
View File
@@ -2682,7 +2682,10 @@ turnLoop:
allResponsesHandled = false
}
if !toolResult.Silent && toolResult.ForUser != "" && ts.opts.SendResponse {
shouldSendForUser := !toolResult.Silent &&
toolResult.ForUser != "" &&
(ts.opts.SendResponse || toolResult.ResponseHandled)
if shouldSendForUser {
al.bus.PublishOutbound(ctx, outboundMessageForTurn(ts, toolResult.ForUser))
logger.DebugCF("agent", "Sent tool result to user",
map[string]any{
+112 -1
View File
@@ -39,7 +39,13 @@ func (f *fakeChannel) ReasoningChannelID() string { return f.id
type fakeMediaChannel struct {
fakeChannel
sentMedia []bus.OutboundMediaMessage
sentMessages []bus.OutboundMessage
sentMedia []bus.OutboundMediaMessage
}
func (f *fakeMediaChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
f.sentMessages = append(f.sentMessages, msg)
return nil, nil
}
func (f *fakeMediaChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) {
@@ -740,6 +746,63 @@ func TestProcessMessage_HandledToolProcessesQueuedSteeringBeforeReturning(t *tes
}
}
func TestRunAgentLoop_ResponseHandledToolPublishesForUserWhenSendResponseDisabled(t *testing.T) {
tmpDir := t.TempDir()
cfg := config.DefaultConfig()
cfg.Agents.Defaults.Workspace = tmpDir
cfg.Agents.Defaults.ModelName = "test-model"
cfg.Agents.Defaults.MaxTokens = 4096
cfg.Agents.Defaults.MaxToolIterations = 10
msgBus := bus.NewMessageBus()
provider := &handledUserProvider{}
al := NewAgentLoop(cfg, msgBus, provider)
store := media.NewFileMediaStore()
al.SetMediaStore(store)
telegramChannel := &fakeMediaChannel{fakeChannel: fakeChannel{id: "rid-telegram"}}
al.SetChannelManager(newStartedTestChannelManager(t, msgBus, store, "telegram", telegramChannel))
al.RegisterTool(&handledUserTool{})
defaultAgent := al.registry.GetDefaultAgent()
if defaultAgent == nil {
t.Fatal("expected default agent")
}
response, err := al.runAgentLoop(context.Background(), defaultAgent, processOptions{
SessionKey: "session-1",
Channel: "telegram",
ChatID: "chat1",
UserMessage: "take a screenshot of the screen and send it to me",
DefaultResponse: defaultResponse,
EnableSummary: false,
SendResponse: false,
InboundContext: &bus.InboundContext{
Channel: "telegram",
ChatID: "chat1",
ChatType: "direct",
SenderID: "user1",
},
})
if err != nil {
t.Fatalf("runAgentLoop() error = %v", err)
}
if response != "" {
t.Fatalf("expected no final response when tool already handled delivery, got %q", response)
}
deadline := time.Now().Add(2 * time.Second)
for len(telegramChannel.sentMessages) == 0 && time.Now().Before(deadline) {
time.Sleep(10 * time.Millisecond)
}
if len(telegramChannel.sentMessages) != 1 {
t.Fatalf("expected exactly 1 sent text message, got %d", len(telegramChannel.sentMessages))
}
if telegramChannel.sentMessages[0].Content != "Handled user output from tool." {
t.Fatalf("unexpected sent text message: %+v", telegramChannel.sentMessages[0])
}
}
func TestAppendEventContextFields_IncludesInboundRouteAndScope(t *testing.T) {
fields := map[string]any{}
@@ -1162,6 +1225,36 @@ func (m *handledMediaProvider) GetDefaultModel() string {
return "handled-media-model"
}
type handledUserProvider struct {
calls int
}
func (m *handledUserProvider) Chat(
ctx context.Context,
messages []providers.Message,
tools []providers.ToolDefinition,
model string,
opts map[string]any,
) (*providers.LLMResponse, error) {
m.calls++
if m.calls == 1 {
return &providers.LLMResponse{
Content: "Delivering the result now.",
ToolCalls: []providers.ToolCall{{
ID: "call_handled_user",
Type: "function",
Name: "handled_user_tool",
Arguments: map[string]any{},
}},
}, nil
}
return &providers.LLMResponse{}, nil
}
func (m *handledUserProvider) GetDefaultModel() string {
return "handled-user-model"
}
type artifactThenSendProvider struct {
calls int
}
@@ -1331,6 +1424,24 @@ func (m *handledMediaTool) Execute(ctx context.Context, args map[string]any) *to
return tools.MediaResult("Attachment delivered by tool.", []string{ref}).WithResponseHandled()
}
type handledUserTool struct{}
func (m *handledUserTool) Name() string { return "handled_user_tool" }
func (m *handledUserTool) Description() string {
return "Returns a user-visible result and marks delivery as handled"
}
func (m *handledUserTool) Parameters() map[string]any {
return map[string]any{
"type": "object",
"properties": map[string]any{},
}
}
func (m *handledUserTool) Execute(ctx context.Context, args map[string]any) *tools.ToolResult {
return tools.UserResult("Handled user output from tool.").WithResponseHandled()
}
type handledMediaWithSteeringProvider struct {
calls int
}
+28
View File
@@ -40,6 +40,8 @@ type MessageBus struct {
inbound chan InboundMessage
outbound chan OutboundMessage
outboundMedia chan OutboundMediaMessage
audioChunks chan AudioChunk
voiceControls chan VoiceControl
closeOnce sync.Once
done chan struct{}
@@ -53,6 +55,8 @@ func NewMessageBus() *MessageBus {
inbound: make(chan InboundMessage, defaultBusBufferSize),
outbound: make(chan OutboundMessage, defaultBusBufferSize),
outboundMedia: make(chan OutboundMediaMessage, defaultBusBufferSize),
audioChunks: make(chan AudioChunk, defaultBusBufferSize*4), // Audio chunks need more buffer.
voiceControls: make(chan VoiceControl, defaultBusBufferSize),
done: make(chan struct{}),
}
}
@@ -121,6 +125,22 @@ func (mb *MessageBus) OutboundMediaChan() <-chan OutboundMediaMessage {
return mb.outboundMedia
}
func (mb *MessageBus) PublishAudioChunk(ctx context.Context, chunk AudioChunk) error {
return publish(ctx, mb, mb.audioChunks, chunk)
}
func (mb *MessageBus) AudioChunksChan() <-chan AudioChunk {
return mb.audioChunks
}
func (mb *MessageBus) PublishVoiceControl(ctx context.Context, ctrl VoiceControl) error {
return publish(ctx, mb, mb.voiceControls, ctrl)
}
func (mb *MessageBus) VoiceControlsChan() <-chan VoiceControl {
return mb.voiceControls
}
// SetStreamDelegate registers a StreamDelegate (typically the channel Manager).
func (mb *MessageBus) SetStreamDelegate(d StreamDelegate) {
mb.streamDelegate.Store(d)
@@ -150,6 +170,8 @@ func (mb *MessageBus) Close() {
close(mb.inbound)
close(mb.outbound)
close(mb.outboundMedia)
close(mb.audioChunks)
close(mb.voiceControls)
// clean up any remaining messages in channels
drained := 0
@@ -162,6 +184,12 @@ func (mb *MessageBus) Close() {
for range mb.outboundMedia {
drained++
}
for range mb.audioChunks {
drained++
}
for range mb.voiceControls {
drained++
}
if drained > 0 {
logger.DebugCF("bus", "Drained buffered messages during close", map[string]any{
+51
View File
@@ -230,6 +230,57 @@ func TestPublishOutboundMedia_MirrorsContextToLegacyFields(t *testing.T) {
}
}
func TestPublishAudioChunkSubscribe(t *testing.T) {
mb := NewMessageBus()
defer mb.Close()
chunk := AudioChunk{
SessionID: "voice-1",
SpeakerID: "speaker-1",
ChatID: "chat-1",
Channel: "discord",
Sequence: 7,
Format: "opus",
Data: []byte{0x01, 0x02},
}
if err := mb.PublishAudioChunk(context.Background(), chunk); err != nil {
t.Fatalf("PublishAudioChunk failed: %v", err)
}
got, ok := <-mb.AudioChunksChan()
if !ok {
t.Fatal("AudioChunksChan returned ok=false")
}
if got.SessionID != "voice-1" || got.Sequence != 7 {
t.Fatalf("unexpected audio chunk: %+v", got)
}
}
func TestPublishVoiceControlSubscribe(t *testing.T) {
mb := NewMessageBus()
defer mb.Close()
ctrl := VoiceControl{
SessionID: "voice-1",
ChatID: "chat-1",
Type: "command",
Action: "start",
}
if err := mb.PublishVoiceControl(context.Background(), ctrl); err != nil {
t.Fatalf("PublishVoiceControl failed: %v", err)
}
got, ok := <-mb.VoiceControlsChan()
if !ok {
t.Fatal("VoiceControlsChan returned ok=false")
}
if got.Type != "command" || got.Action != "start" {
t.Fatalf("unexpected voice control: %+v", got)
}
}
func TestNewOutboundContext_NormalizesReplyAddress(t *testing.T) {
ctx := NewOutboundContext(" telegram ", " chat-42 ", " msg-9 ")
if ctx.Channel != "telegram" {
+22
View File
@@ -74,3 +74,25 @@ type OutboundMediaMessage struct {
Context InboundContext `json:"context"`
Parts []MediaPart `json:"parts"`
}
// AudioChunk represents a chunk of streaming voice data.
type AudioChunk struct {
SessionID string `json:"session_id"`
SpeakerID string `json:"speaker_id"` // User ID or SSRC
ChatID string `json:"chat_id"` // Where to respond
Channel string `json:"channel"` // Source channel type (e.g. "discord")
Sequence uint64 `json:"sequence"`
Timestamp uint32 `json:"timestamp"`
SampleRate int `json:"sample_rate"`
Channels int `json:"channels"`
Format string `json:"format"` // "opus", "pcm", etc
Data []byte `json:"data"`
}
// VoiceControl represents state or commands for voice sessions.
type VoiceControl struct {
SessionID string `json:"session_id"`
ChatID string `json:"chat_id"`
Type string `json:"type"` // "state", "command"
Action string `json:"action"` // "idle", "listening", "start", "stop", "leave"
}