From 9f23ec22d6820a73643c5c68a21eb0affa4559c9 Mon Sep 17 00:00:00 2001 From: Hoshina Date: Tue, 7 Apr 2026 22:12:23 +0800 Subject: [PATCH] refactor(agent): normalize dispatch and outbound turn metadata --- pkg/agent/dispatch_request.go | 134 ++++++++++++++++++++++ pkg/agent/dispatch_request_test.go | 110 ++++++++++++++++++ pkg/agent/loop.go | 176 +++++++++++++++++++++-------- pkg/agent/loop_test.go | 48 ++++++-- pkg/agent/steering.go | 15 ++- pkg/agent/subturn.go | 17 +-- pkg/agent/turn.go | 12 +- pkg/bus/bus_test.go | 36 ++++++ pkg/bus/outbound_context.go | 19 ++++ pkg/bus/types.go | 25 +++- 10 files changed, 511 insertions(+), 81 deletions(-) create mode 100644 pkg/agent/dispatch_request.go create mode 100644 pkg/agent/dispatch_request_test.go diff --git a/pkg/agent/dispatch_request.go b/pkg/agent/dispatch_request.go new file mode 100644 index 000000000..40548c41a --- /dev/null +++ b/pkg/agent/dispatch_request.go @@ -0,0 +1,134 @@ +package agent + +import ( + "strings" + + "github.com/sipeed/picoclaw/pkg/bus" + "github.com/sipeed/picoclaw/pkg/routing" + "github.com/sipeed/picoclaw/pkg/session" +) + +// DispatchRequest is the normalized runtime input passed into the agent loop +// after routing and session allocation have completed. +type DispatchRequest struct { + SessionKey string + SessionAliases []string + InboundContext *bus.InboundContext + RouteResult *routing.ResolvedRoute + SessionScope *session.SessionScope + UserMessage string + Media []string +} + +func (r DispatchRequest) Channel() string { + if r.InboundContext == nil { + return "" + } + return r.InboundContext.Channel +} + +func (r DispatchRequest) ChatID() string { + if r.InboundContext == nil { + return "" + } + return r.InboundContext.ChatID +} + +func (r DispatchRequest) MessageID() string { + if r.InboundContext == nil { + return "" + } + return r.InboundContext.MessageID +} + +func (r DispatchRequest) ReplyToMessageID() string { + if r.InboundContext == nil { + return "" + } + return r.InboundContext.ReplyToMessageID +} + +func (r DispatchRequest) SenderID() string { + if r.InboundContext == nil { + return "" + } + return r.InboundContext.SenderID +} + +func normalizeProcessOptionsInPlace(opts *processOptions) { + if opts == nil { + return + } + *opts = normalizeProcessOptions(*opts) +} + +func normalizeProcessOptions(opts processOptions) processOptions { + if opts.Dispatch.SessionKey == "" { + opts.Dispatch.SessionKey = strings.TrimSpace(opts.SessionKey) + } + if len(opts.Dispatch.SessionAliases) == 0 && len(opts.SessionAliases) > 0 { + opts.Dispatch.SessionAliases = append([]string(nil), opts.SessionAliases...) + } + if opts.Dispatch.UserMessage == "" { + opts.Dispatch.UserMessage = opts.UserMessage + } + if len(opts.Dispatch.Media) == 0 && len(opts.Media) > 0 { + opts.Dispatch.Media = append([]string(nil), opts.Media...) + } + if opts.Dispatch.RouteResult == nil { + opts.Dispatch.RouteResult = cloneResolvedRoute(opts.RouteResult) + } + if opts.Dispatch.SessionScope == nil { + opts.Dispatch.SessionScope = session.CloneScope(opts.SessionScope) + } + if opts.Dispatch.InboundContext == nil { + if opts.InboundContext != nil { + opts.Dispatch.InboundContext = cloneInboundContext(opts.InboundContext) + } else if opts.Channel != "" || opts.ChatID != "" || opts.SenderID != "" || + opts.MessageID != "" || opts.ReplyToMessageID != "" { + inbound := bus.InboundContext{ + Channel: strings.TrimSpace(opts.Channel), + ChatID: strings.TrimSpace(opts.ChatID), + SenderID: strings.TrimSpace(opts.SenderID), + MessageID: strings.TrimSpace(opts.MessageID), + ReplyToMessageID: strings.TrimSpace(opts.ReplyToMessageID), + } + if inbound.Channel != "" && inbound.ChatID != "" { + inbound.ChatType = "direct" + } + if inbound.Channel != "" || inbound.ChatID != "" || inbound.SenderID != "" || + inbound.MessageID != "" || inbound.ReplyToMessageID != "" { + inbound = bus.NormalizeInboundMessage(bus.InboundMessage{Context: inbound}).Context + opts.Dispatch.InboundContext = &inbound + } + } + } + + // Keep legacy mirrors populated while the rest of the runtime migrates. + opts.SessionKey = opts.Dispatch.SessionKey + opts.SessionAliases = append([]string(nil), opts.Dispatch.SessionAliases...) + opts.UserMessage = opts.Dispatch.UserMessage + opts.Media = append([]string(nil), opts.Dispatch.Media...) + opts.InboundContext = cloneInboundContext(opts.Dispatch.InboundContext) + opts.RouteResult = cloneResolvedRoute(opts.Dispatch.RouteResult) + opts.SessionScope = session.CloneScope(opts.Dispatch.SessionScope) + if opts.InboundContext != nil { + if opts.Channel == "" { + opts.Channel = opts.InboundContext.Channel + } + if opts.ChatID == "" { + opts.ChatID = opts.InboundContext.ChatID + } + if opts.MessageID == "" { + opts.MessageID = opts.InboundContext.MessageID + } + if opts.ReplyToMessageID == "" { + opts.ReplyToMessageID = opts.InboundContext.ReplyToMessageID + } + if opts.SenderID == "" { + opts.SenderID = opts.InboundContext.SenderID + } + } + + return opts +} diff --git a/pkg/agent/dispatch_request_test.go b/pkg/agent/dispatch_request_test.go new file mode 100644 index 000000000..89fc01a3b --- /dev/null +++ b/pkg/agent/dispatch_request_test.go @@ -0,0 +1,110 @@ +package agent + +import ( + "testing" + + "github.com/sipeed/picoclaw/pkg/bus" + "github.com/sipeed/picoclaw/pkg/routing" + "github.com/sipeed/picoclaw/pkg/session" +) + +func TestNormalizeProcessOptions_PopulatesDispatchFromLegacyFields(t *testing.T) { + opts := normalizeProcessOptions(processOptions{ + SessionKey: "session-1", + SessionAliases: []string{"legacy:one"}, + Channel: "telegram", + ChatID: "chat-1", + MessageID: "msg-1", + ReplyToMessageID: "reply-1", + SenderID: "user-1", + UserMessage: "hello", + Media: []string{"media://one"}, + }) + + if opts.Dispatch.SessionKey != "session-1" { + t.Fatalf("Dispatch.SessionKey = %q, want session-1", opts.Dispatch.SessionKey) + } + if len(opts.Dispatch.SessionAliases) != 1 || opts.Dispatch.SessionAliases[0] != "legacy:one" { + t.Fatalf("Dispatch.SessionAliases = %v, want [legacy:one]", opts.Dispatch.SessionAliases) + } + if opts.Dispatch.Channel() != "telegram" || opts.Dispatch.ChatID() != "chat-1" { + t.Fatalf( + "dispatch addressing = (%q,%q), want (telegram,chat-1)", + opts.Dispatch.Channel(), + opts.Dispatch.ChatID(), + ) + } + if opts.Dispatch.SenderID() != "user-1" || opts.Dispatch.MessageID() != "msg-1" { + t.Fatalf("dispatch sender/message = (%q,%q)", opts.Dispatch.SenderID(), opts.Dispatch.MessageID()) + } + if opts.Dispatch.ReplyToMessageID() != "reply-1" { + t.Fatalf("Dispatch.ReplyToMessageID() = %q, want reply-1", opts.Dispatch.ReplyToMessageID()) + } + if opts.Dispatch.UserMessage != "hello" { + t.Fatalf("Dispatch.UserMessage = %q, want hello", opts.Dispatch.UserMessage) + } + if len(opts.Dispatch.Media) != 1 || opts.Dispatch.Media[0] != "media://one" { + t.Fatalf("Dispatch.Media = %v, want [media://one]", opts.Dispatch.Media) + } +} + +func TestNormalizeProcessOptions_UsesDispatchAsSourceOfTruth(t *testing.T) { + inbound := &bus.InboundContext{ + Channel: "slack", + ChatID: "C123", + ChatType: "channel", + SenderID: "U123", + MessageID: "m-1", + ReplyToMessageID: "parent-1", + } + route := &routing.ResolvedRoute{ + AgentID: "support", + Channel: "slack", + AccountID: "workspace-a", + MatchedBy: "dispatch.rule:test", + SessionPolicy: routing.SessionPolicy{ + Dimensions: []string{"chat", "sender"}, + }, + } + scope := &session.SessionScope{ + Version: session.ScopeVersionV1, + AgentID: "support", + Channel: "slack", + Account: "workspace-a", + Dimensions: []string{"chat"}, + Values: map[string]string{ + "chat": "channel:c123", + }, + } + + opts := normalizeProcessOptions(processOptions{ + Dispatch: DispatchRequest{ + SessionKey: "sk_v1_example", + SessionAliases: []string{"agent:support:slack:channel:c123"}, + InboundContext: inbound, + RouteResult: route, + SessionScope: scope, + UserMessage: "hello", + Media: []string{"media://one"}, + }, + }) + + if opts.SessionKey != "sk_v1_example" { + t.Fatalf("SessionKey = %q, want sk_v1_example", opts.SessionKey) + } + if opts.Channel != "slack" || opts.ChatID != "C123" { + t.Fatalf("legacy mirrors = (%q,%q), want (slack,C123)", opts.Channel, opts.ChatID) + } + if opts.SenderID != "U123" || opts.MessageID != "m-1" { + t.Fatalf("legacy sender/message = (%q,%q)", opts.SenderID, opts.MessageID) + } + if opts.ReplyToMessageID != "parent-1" { + t.Fatalf("ReplyToMessageID = %q, want parent-1", opts.ReplyToMessageID) + } + if opts.RouteResult == nil || opts.RouteResult.AgentID != "support" { + t.Fatalf("RouteResult = %#v, want support route", opts.RouteResult) + } + if opts.SessionScope == nil || opts.SessionScope.AgentID != "support" { + t.Fatalf("SessionScope = %#v, want support scope", opts.SessionScope) + } +} diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 4b75f6e1b..39cd4ccf9 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -74,6 +74,7 @@ type AgentLoop struct { // processOptions configures how a message is processed type processOptions struct { + Dispatch DispatchRequest // Normalized routed request boundary for this turn SessionKey string // Session identifier for history/context SessionAliases []string // Compatibility aliases for the session key Channel string // Target channel for tool execution @@ -761,15 +762,48 @@ func outboundContextFromInbound( return outboundCtx } +func outboundScopeFromSessionScope(scope *session.SessionScope) *bus.OutboundScope { + if scope == nil { + return nil + } + outboundScope := &bus.OutboundScope{ + Version: scope.Version, + AgentID: scope.AgentID, + Channel: scope.Channel, + Account: scope.Account, + } + if len(scope.Dimensions) > 0 { + outboundScope.Dimensions = append([]string(nil), scope.Dimensions...) + } + if len(scope.Values) > 0 { + outboundScope.Values = make(map[string]string, len(scope.Values)) + for key, value := range scope.Values { + outboundScope.Values[key] = value + } + } + return outboundScope +} + +func outboundTurnMetadata( + agentID, sessionKey string, + scope *session.SessionScope, +) (string, string, *bus.OutboundScope) { + return agentID, sessionKey, outboundScopeFromSessionScope(scope) +} + func outboundMessageForTurn(ts *turnState, content string) bus.OutboundMessage { + agentID, sessionKey, scope := outboundTurnMetadata(ts.agent.ID, ts.sessionKey, ts.opts.Dispatch.SessionScope) return bus.OutboundMessage{ Context: outboundContextFromInbound( - ts.opts.InboundContext, + ts.opts.Dispatch.InboundContext, ts.channel, ts.chatID, - ts.opts.ReplyToMessageID, + ts.opts.Dispatch.ReplyToMessageID(), ), - Content: content, + AgentID: agentID, + SessionKey: sessionKey, + Scope: scope, + Content: content, } } @@ -1442,11 +1476,20 @@ func (al *AgentLoop) ProcessHeartbeat( if agent == nil { return "", fmt.Errorf("no default agent for heartbeat") } + dispatch := DispatchRequest{ + SessionKey: "heartbeat", + UserMessage: content, + } + if channel != "" || chatID != "" { + dispatch.InboundContext = &bus.InboundContext{ + Channel: channel, + ChatID: chatID, + ChatType: "direct", + SenderID: "heartbeat", + } + } return al.runAgentLoop(ctx, agent, processOptions{ - SessionKey: "heartbeat", - Channel: channel, - ChatID: chatID, - UserMessage: content, + Dispatch: dispatch, DefaultResponse: defaultResponse, EnableSummary: false, SendResponse: false, @@ -1521,22 +1564,19 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) }) opts := processOptions{ - SessionKey: sessionKey, - SessionAliases: buildSessionAliases(sessionKey, append(allocation.SessionAliases, msg.SessionKey)...), - Channel: msg.Channel, - ChatID: msg.ChatID, - MessageID: msg.MessageID, - ReplyToMessageID: msg.Context.ReplyToMessageID, - SenderID: msg.SenderID, + Dispatch: DispatchRequest{ + SessionKey: sessionKey, + SessionAliases: buildSessionAliases(sessionKey, append(allocation.SessionAliases, msg.SessionKey)...), + InboundContext: cloneInboundContext(&msg.Context), + RouteResult: cloneResolvedRoute(&route), + SessionScope: session.CloneScope(&allocation.Scope), + UserMessage: msg.Content, + Media: append([]string(nil), msg.Media...), + }, SenderDisplayName: msg.Sender.DisplayName, - UserMessage: msg.Content, - Media: msg.Media, DefaultResponse: defaultResponse, EnableSummary: true, SendResponse: false, - InboundContext: cloneInboundContext(&msg.Context), - RouteResult: cloneResolvedRoute(&route), - SessionScope: session.CloneScope(&allocation.Scope), } // context-dependent commands check their own Runtime fields and report @@ -1545,11 +1585,11 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) return response, nil } - if pending := al.takePendingSkills(opts.SessionKey); len(pending) > 0 { + if pending := al.takePendingSkills(opts.Dispatch.SessionKey); len(pending) > 0 { opts.ForcedSkills = append(opts.ForcedSkills, pending...) logger.InfoCF("agent", "Applying pending skill override", map[string]any{ - "session_key": opts.SessionKey, + "session_key": opts.Dispatch.SessionKey, "skills": strings.Join(pending, ","), }) } @@ -1712,12 +1752,21 @@ func (al *AgentLoop) processSystemMessage( // Use the origin session for context sessionKey := session.BuildMainSessionKey(agent.ID) + dispatch := DispatchRequest{ + SessionKey: sessionKey, + UserMessage: fmt.Sprintf("[System: %s] %s", msg.SenderID, msg.Content), + } + if originChannel != "" || originChatID != "" { + dispatch.InboundContext = &bus.InboundContext{ + Channel: originChannel, + ChatID: originChatID, + ChatType: "direct", + SenderID: msg.SenderID, + } + } return al.runAgentLoop(ctx, agent, processOptions{ - SessionKey: sessionKey, - Channel: originChannel, - ChatID: originChatID, - UserMessage: fmt.Sprintf("[System: %s] %s", msg.SenderID, msg.Content), + Dispatch: dispatch, DefaultResponse: "Background task completed.", EnableSummary: false, SendResponse: true, @@ -1731,9 +1780,13 @@ func (al *AgentLoop) runAgentLoop( agent *AgentInstance, opts processOptions, ) (string, error) { + opts = normalizeProcessOptions(opts) + // Record last channel for heartbeat notifications (skip internal channels and cli) - if opts.Channel != "" && opts.ChatID != "" && !constants.IsInternalChannel(opts.Channel) { - channelKey := fmt.Sprintf("%s:%s", opts.Channel, opts.ChatID) + if opts.Dispatch.Channel() != "" && + opts.Dispatch.ChatID() != "" && + !constants.IsInternalChannel(opts.Dispatch.Channel()) { + channelKey := fmt.Sprintf("%s:%s", opts.Dispatch.Channel(), opts.Dispatch.ChatID()) if err := al.RecordLastChannel(channelKey); err != nil { logger.WarnCF( "agent", @@ -1743,12 +1796,17 @@ func (al *AgentLoop) runAgentLoop( } } - ensureSessionMetadata(agent.Sessions, opts.SessionKey, opts.SessionScope, opts.SessionAliases) + ensureSessionMetadata( + agent.Sessions, + opts.Dispatch.SessionKey, + opts.Dispatch.SessionScope, + opts.Dispatch.SessionAliases, + ) turnScope := al.newTurnEventScope( agent.ID, - opts.SessionKey, - newTurnContext(opts.InboundContext, opts.RouteResult, opts.SessionScope), + opts.Dispatch.SessionKey, + newTurnContext(opts.Dispatch.InboundContext, opts.Dispatch.RouteResult, opts.Dispatch.SessionScope), ) ts := newTurnState(agent, opts, turnScope) result, err := al.runTurn(ctx, ts) @@ -1770,14 +1828,22 @@ func (al *AgentLoop) runAgentLoop( } if opts.SendResponse && result.finalContent != "" { + agentID, sessionKey, scope := outboundTurnMetadata( + agent.ID, + opts.Dispatch.SessionKey, + opts.Dispatch.SessionScope, + ) al.bus.PublishOutbound(ctx, bus.OutboundMessage{ Context: outboundContextFromInbound( - opts.InboundContext, - opts.Channel, - opts.ChatID, - opts.ReplyToMessageID, + opts.Dispatch.InboundContext, + opts.Dispatch.Channel(), + opts.Dispatch.ChatID(), + opts.Dispatch.ReplyToMessageID(), ), - Content: result.finalContent, + AgentID: agentID, + SessionKey: sessionKey, + Scope: scope, + Content: result.finalContent, }) } @@ -1786,7 +1852,7 @@ func (al *AgentLoop) runAgentLoop( logger.InfoCF("agent", fmt.Sprintf("Response: %s", responsePreview), map[string]any{ "agent_id": agent.ID, - "session_key": opts.SessionKey, + "session_key": opts.Dispatch.SessionKey, "iterations": ts.currentIteration(), "final_length": len(result.finalContent), }) @@ -1907,7 +1973,7 @@ func (al *AgentLoop) runTurn(ctx context.Context, ts *turnState) (turnResult, er ts.media, ts.channel, ts.chatID, - ts.opts.SenderID, + ts.opts.Dispatch.SenderID(), ts.opts.SenderDisplayName, activeSkillNames(ts.agent, ts.opts)..., ) @@ -1944,7 +2010,7 @@ func (al *AgentLoop) runTurn(ctx context.Context, ts *turnState) (turnResult, er messages = ts.agent.ContextBuilder.BuildMessages( history, summary, ts.userMessage, ts.media, ts.channel, ts.chatID, - ts.opts.SenderID, ts.opts.SenderDisplayName, + ts.opts.Dispatch.SenderID(), ts.opts.SenderDisplayName, activeSkillNames(ts.agent, ts.opts)..., ) messages = resolveMediaRefs(messages, al.mediaStore, maxMediaSize) @@ -2333,7 +2399,7 @@ turnLoop: } messages = ts.agent.ContextBuilder.BuildMessages( history, summary, "", - nil, ts.channel, ts.chatID, ts.opts.SenderID, ts.opts.SenderDisplayName, + nil, ts.channel, ts.chatID, ts.opts.Dispatch.SenderID(), ts.opts.SenderDisplayName, activeSkillNames(ts.agent, ts.opts)..., ) callMessages = messages @@ -2679,8 +2745,8 @@ turnLoop: turnCtx, ts.channel, ts.chatID, - ts.opts.MessageID, - ts.opts.ReplyToMessageID, + ts.opts.Dispatch.MessageID(), + ts.opts.Dispatch.ReplyToMessageID(), ) toolResult := ts.agent.Tools.ExecuteWithContext( execCtx, @@ -2745,12 +2811,15 @@ turnLoop: } outboundMedia := bus.OutboundMediaMessage{ Context: outboundContextFromInbound( - ts.opts.InboundContext, + ts.opts.Dispatch.InboundContext, ts.channel, ts.chatID, - ts.opts.ReplyToMessageID, + ts.opts.Dispatch.ReplyToMessageID(), ), - Parts: parts, + AgentID: ts.agent.ID, + SessionKey: ts.sessionKey, + Scope: outboundScopeFromSessionScope(ts.opts.Dispatch.SessionScope), + Parts: parts, } if al.channelManager != nil && ts.channel != "" && !constants.IsInternalChannel(ts.channel) { if err := al.channelManager.SendMedia(ctx, outboundMedia); err != nil { @@ -3226,6 +3295,8 @@ func (al *AgentLoop) handleCommand( agent *AgentInstance, opts *processOptions, ) (string, bool) { + normalizeProcessOptionsInPlace(opts) + if !commands.HasCommandPrefix(msg.Content) { return "", false } @@ -3307,6 +3378,8 @@ func (al *AgentLoop) applyExplicitSkillCommand( agent *AgentInstance, opts *processOptions, ) (matched bool, handled bool, reply string) { + normalizeProcessOptionsInPlace(opts) + cmdName, ok := commands.CommandName(raw) if !ok || cmdName != "use" { return false, false, "" @@ -3324,7 +3397,7 @@ func (al *AgentLoop) applyExplicitSkillCommand( arg := strings.TrimSpace(parts[1]) if strings.EqualFold(arg, "clear") || strings.EqualFold(arg, "off") { if opts != nil { - al.clearPendingSkills(opts.SessionKey) + al.clearPendingSkills(opts.Dispatch.SessionKey) } return true, true, "Cleared pending skill override." } @@ -3335,10 +3408,10 @@ func (al *AgentLoop) applyExplicitSkillCommand( } if len(parts) < 3 { - if opts == nil || strings.TrimSpace(opts.SessionKey) == "" { + if opts == nil || strings.TrimSpace(opts.Dispatch.SessionKey) == "" { return true, true, commandsUnavailableSkillMessage() } - al.setPendingSkills(opts.SessionKey, []string{skillName}) + al.setPendingSkills(opts.Dispatch.SessionKey, []string{skillName}) return true, true, fmt.Sprintf( "Skill %q is armed for your next message. Send your next prompt normally, or use /use clear to cancel.", skillName, @@ -3352,6 +3425,7 @@ func (al *AgentLoop) applyExplicitSkillCommand( if opts != nil { opts.ForcedSkills = append(opts.ForcedSkills, skillName) + opts.Dispatch.UserMessage = message opts.UserMessage = message } @@ -3359,6 +3433,8 @@ func (al *AgentLoop) applyExplicitSkillCommand( } func (al *AgentLoop) buildCommandsRuntime(agent *AgentInstance, opts *processOptions) *commands.Runtime { + normalizeProcessOptionsInPlace(opts) + registry := al.GetRegistry() cfg := al.GetConfig() rt := &commands.Runtime{ @@ -3444,9 +3520,9 @@ func (al *AgentLoop) buildCommandsRuntime(agent *AgentInstance, opts *processOpt return fmt.Errorf("sessions not initialized for agent") } - agent.Sessions.SetHistory(opts.SessionKey, make([]providers.Message, 0)) - agent.Sessions.SetSummary(opts.SessionKey, "") - agent.Sessions.Save(opts.SessionKey) + agent.Sessions.SetHistory(opts.Dispatch.SessionKey, make([]providers.Message, 0)) + agent.Sessions.SetSummary(opts.Dispatch.SessionKey, "") + agent.Sessions.Save(opts.Dispatch.SessionKey) return nil } } diff --git a/pkg/agent/loop_test.go b/pkg/agent/loop_test.go index 127ff64b3..64ea7a943 100644 --- a/pkg/agent/loop_test.go +++ b/pkg/agent/loop_test.go @@ -770,19 +770,28 @@ func TestRunAgentLoop_ResponseHandledToolPublishesForUserWhenSendResponseDisable } response, err := al.runAgentLoop(context.Background(), defaultAgent, processOptions{ - SessionKey: "session-1", - Channel: "telegram", - ChatID: "chat1", - UserMessage: "take a screenshot of the screen and send it to me", + Dispatch: DispatchRequest{ + SessionKey: "session-1", + UserMessage: "take a screenshot of the screen and send it to me", + SessionScope: &session.SessionScope{ + Version: session.ScopeVersionV1, + AgentID: defaultAgent.ID, + Channel: "telegram", + Dimensions: []string{"chat"}, + Values: map[string]string{ + "chat": "direct:chat1", + }, + }, + InboundContext: &bus.InboundContext{ + Channel: "telegram", + ChatID: "chat1", + ChatType: "direct", + SenderID: "user1", + }, + }, DefaultResponse: defaultResponse, EnableSummary: false, SendResponse: false, - InboundContext: &bus.InboundContext{ - Channel: "telegram", - ChatID: "chat1", - ChatType: "direct", - SenderID: "user1", - }, }) if err != nil { t.Fatalf("runAgentLoop() error = %v", err) @@ -801,6 +810,16 @@ func TestRunAgentLoop_ResponseHandledToolPublishesForUserWhenSendResponseDisable if telegramChannel.sentMessages[0].Content != "Handled user output from tool." { t.Fatalf("unexpected sent text message: %+v", telegramChannel.sentMessages[0]) } + if telegramChannel.sentMessages[0].AgentID != defaultAgent.ID { + t.Fatalf("sent text agent_id = %q, want %q", telegramChannel.sentMessages[0].AgentID, defaultAgent.ID) + } + if telegramChannel.sentMessages[0].SessionKey != "session-1" { + t.Fatalf("sent text session_key = %q, want session-1", telegramChannel.sentMessages[0].SessionKey) + } + if telegramChannel.sentMessages[0].Scope == nil || + telegramChannel.sentMessages[0].Scope.Values["chat"] != "direct:chat1" { + t.Fatalf("unexpected sent text scope: %+v", telegramChannel.sentMessages[0].Scope) + } } func TestAppendEventContextFields_IncludesInboundRouteAndScope(t *testing.T) { @@ -3025,6 +3044,15 @@ func TestProcessMessage_PublishesToolFeedbackWhenEnabled(t *testing.T) { if !strings.Contains(outbound.Content, "`read_file`") { t.Fatalf("tool feedback content = %q, want read_file preview", outbound.Content) } + if outbound.AgentID != "main" { + t.Fatalf("tool feedback agent_id = %q, want main", outbound.AgentID) + } + if outbound.SessionKey == "" { + t.Fatal("expected tool feedback to carry session_key") + } + if outbound.Scope == nil || outbound.Scope.AgentID != "main" || outbound.Scope.Channel != "telegram" { + t.Fatalf("expected tool feedback scope, got %+v", outbound.Scope) + } case <-time.After(2 * time.Second): t.Fatal("expected outbound tool feedback for regular messages") } diff --git a/pkg/agent/steering.go b/pkg/agent/steering.go index f72e761f4..6c9ef19c5 100644 --- a/pkg/agent/steering.go +++ b/pkg/agent/steering.go @@ -6,6 +6,7 @@ import ( "strings" "sync" + "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/session" @@ -292,10 +293,18 @@ func (al *AgentLoop) continueWithSteeringMessages( sessionKey, channel, chatID string, steeringMsgs []providers.Message, ) (string, error) { + dispatch := DispatchRequest{ + SessionKey: sessionKey, + } + if channel != "" || chatID != "" { + dispatch.InboundContext = &bus.InboundContext{ + Channel: channel, + ChatID: chatID, + ChatType: "direct", + } + } return al.runAgentLoop(ctx, agent, processOptions{ - SessionKey: sessionKey, - Channel: channel, - ChatID: chatID, + Dispatch: dispatch, DefaultResponse: defaultResponse, EnableSummary: true, SendResponse: false, diff --git a/pkg/agent/subturn.go b/pkg/agent/subturn.go index c5eeb3a49..cd193017b 100644 --- a/pkg/agent/subturn.go +++ b/pkg/agent/subturn.go @@ -351,29 +351,30 @@ func spawnSubTurn( } // Create processOptions for the child turn + dispatch := DispatchRequest{ + SessionKey: childID, + UserMessage: cfg.SystemPrompt, + Media: nil, + InboundContext: cloneInboundContext(parentTS.opts.Dispatch.InboundContext), + } opts := processOptions{ - SessionKey: childID, - Channel: parentTS.channel, - ChatID: parentTS.chatID, - SenderID: parentTS.opts.SenderID, + Dispatch: dispatch, + SenderID: parentTS.opts.Dispatch.SenderID(), SenderDisplayName: parentTS.opts.SenderDisplayName, - UserMessage: cfg.SystemPrompt, // Task description becomes the first user message SystemPromptOverride: cfg.ActualSystemPrompt, - Media: nil, InitialSteeringMessages: cfg.InitialMessages, DefaultResponse: "", EnableSummary: false, SendResponse: false, NoHistory: true, // SubTurns don't use session history SkipInitialSteeringPoll: true, - InboundContext: cloneInboundContext(parentTS.opts.InboundContext), } // Create event scope for the child turn scope := al.newTurnEventScope( agent.ID, childID, - newTurnContext(opts.InboundContext, opts.RouteResult, opts.SessionScope), + newTurnContext(opts.Dispatch.InboundContext, opts.Dispatch.RouteResult, opts.Dispatch.SessionScope), ) // Create child turnState using the new API diff --git a/pkg/agent/turn.go b/pkg/agent/turn.go index b30fa186d..a061742e3 100644 --- a/pkg/agent/turn.go +++ b/pkg/agent/turn.go @@ -116,12 +116,12 @@ func newTurnState(agent *AgentInstance, opts processOptions, scope turnEventScop scope: scope, turnID: scope.turnID, agentID: agent.ID, - sessionKey: opts.SessionKey, + sessionKey: opts.Dispatch.SessionKey, turnCtx: cloneTurnContext(scope.context), - channel: opts.Channel, - chatID: opts.ChatID, - userMessage: opts.UserMessage, - media: append([]string(nil), opts.Media...), + channel: opts.Dispatch.Channel(), + chatID: opts.Dispatch.ChatID(), + userMessage: opts.Dispatch.UserMessage, + media: append([]string(nil), opts.Dispatch.Media...), phase: TurnPhaseSetup, startedAt: time.Now(), } @@ -129,7 +129,7 @@ func newTurnState(agent *AgentInstance, opts processOptions, scope turnEventScop // Bind session store and capture initial history length for rollback logic if agent != nil && agent.Sessions != nil { ts.session = agent.Sessions - ts.initialHistoryLength = len(agent.Sessions.GetHistory(opts.SessionKey)) + ts.initialHistoryLength = len(agent.Sessions.GetHistory(opts.Dispatch.SessionKey)) } return ts diff --git a/pkg/bus/bus_test.go b/pkg/bus/bus_test.go index b67d847d1..b261a2df3 100644 --- a/pkg/bus/bus_test.go +++ b/pkg/bus/bus_test.go @@ -180,6 +180,19 @@ func TestPublishOutbound_MirrorsContextToLegacyFields(t *testing.T) { ChatID: "chat-42", ReplyToMessageID: "msg-9", }, + AgentID: "main", + SessionKey: "sk_v1_123", + Scope: &OutboundScope{ + Version: 1, + AgentID: "main", + Channel: "telegram", + Account: "bot-a", + Dimensions: []string{"chat", "sender"}, + Values: map[string]string{ + "chat": "direct:chat-42", + "sender": "user-1", + }, + }, Content: "reply", } @@ -197,6 +210,12 @@ func TestPublishOutbound_MirrorsContextToLegacyFields(t *testing.T) { if got.ReplyToMessageID != "msg-9" { t.Fatalf("expected mirrored reply_to_message_id msg-9, got %q", got.ReplyToMessageID) } + if got.AgentID != "main" || got.SessionKey != "sk_v1_123" { + t.Fatalf("unexpected outbound turn metadata: agent=%q session=%q", got.AgentID, got.SessionKey) + } + if got.Scope == nil || got.Scope.AgentID != "main" || got.Scope.Values["chat"] != "direct:chat-42" { + t.Fatalf("unexpected outbound scope: %+v", got.Scope) + } if got.Context.Channel != "telegram" || got.Context.ChatID != "chat-42" { t.Fatalf("unexpected outbound context: %+v", got.Context) } @@ -211,6 +230,17 @@ func TestPublishOutboundMedia_MirrorsContextToLegacyFields(t *testing.T) { Channel: "slack", ChatID: "C001", }, + AgentID: "support", + SessionKey: "sk_v1_media", + Scope: &OutboundScope{ + Version: 1, + AgentID: "support", + Channel: "slack", + Dimensions: []string{"chat"}, + Values: map[string]string{ + "chat": "channel:c001", + }, + }, Parts: []MediaPart{{Type: "image", Ref: "media://1"}}, } @@ -225,6 +255,12 @@ func TestPublishOutboundMedia_MirrorsContextToLegacyFields(t *testing.T) { if got.ChatID != "C001" { t.Fatalf("expected legacy chat ID C001, got %q", got.ChatID) } + if got.AgentID != "support" || got.SessionKey != "sk_v1_media" { + t.Fatalf("unexpected outbound media turn metadata: agent=%q session=%q", got.AgentID, got.SessionKey) + } + if got.Scope == nil || got.Scope.Values["chat"] != "channel:c001" { + t.Fatalf("unexpected outbound media scope: %+v", got.Scope) + } if got.Context.Channel != "slack" || got.Context.ChatID != "C001" { t.Fatalf("unexpected outbound media context: %+v", got.Context) } diff --git a/pkg/bus/outbound_context.go b/pkg/bus/outbound_context.go index b3f58f736..416a26861 100644 --- a/pkg/bus/outbound_context.go +++ b/pkg/bus/outbound_context.go @@ -18,6 +18,7 @@ func NormalizeOutboundMessage(msg OutboundMessage) OutboundMessage { msg.Context = normalizeInboundContext(msg.Context) msg.Channel = msg.Context.Channel msg.ChatID = msg.Context.ChatID + msg.Scope = cloneOutboundScope(msg.Scope) if msg.Context.ReplyToMessageID == "" { msg.Context.ReplyToMessageID = strings.TrimSpace(msg.ReplyToMessageID) } @@ -31,5 +32,23 @@ func NormalizeOutboundMediaMessage(msg OutboundMediaMessage) OutboundMediaMessag msg.Context = normalizeInboundContext(msg.Context) msg.Channel = msg.Context.Channel msg.ChatID = msg.Context.ChatID + msg.Scope = cloneOutboundScope(msg.Scope) return msg } + +func cloneOutboundScope(scope *OutboundScope) *OutboundScope { + if scope == nil { + return nil + } + cloned := *scope + if len(scope.Dimensions) > 0 { + cloned.Dimensions = append([]string(nil), scope.Dimensions...) + } + if len(scope.Values) > 0 { + cloned.Values = make(map[string]string, len(scope.Values)) + for key, value := range scope.Values { + cloned.Values[key] = value + } + } + return &cloned +} diff --git a/pkg/bus/types.go b/pkg/bus/types.go index 0b2c1c92a..aa06ca173 100644 --- a/pkg/bus/types.go +++ b/pkg/bus/types.go @@ -50,10 +50,24 @@ type InboundMessage struct { MessageID string `json:"message_id,omitempty"` // platform message ID } +// OutboundScope captures the structured session scope associated with an +// outbound turn result without depending on the session package. +type OutboundScope struct { + Version int `json:"version,omitempty"` + AgentID string `json:"agent_id,omitempty"` + Channel string `json:"channel,omitempty"` + Account string `json:"account,omitempty"` + Dimensions []string `json:"dimensions,omitempty"` + Values map[string]string `json:"values,omitempty"` +} + type OutboundMessage struct { Channel string `json:"channel"` ChatID string `json:"chat_id"` Context InboundContext `json:"context"` + AgentID string `json:"agent_id,omitempty"` + SessionKey string `json:"session_key,omitempty"` + Scope *OutboundScope `json:"scope,omitempty"` Content string `json:"content"` ReplyToMessageID string `json:"reply_to_message_id,omitempty"` } @@ -69,10 +83,13 @@ type MediaPart struct { // OutboundMediaMessage carries media attachments from Agent to channels via the bus. type OutboundMediaMessage struct { - Channel string `json:"channel"` - ChatID string `json:"chat_id"` - Context InboundContext `json:"context"` - Parts []MediaPart `json:"parts"` + Channel string `json:"channel"` + ChatID string `json:"chat_id"` + Context InboundContext `json:"context"` + AgentID string `json:"agent_id,omitempty"` + SessionKey string `json:"session_key,omitempty"` + Scope *OutboundScope `json:"scope,omitempty"` + Parts []MediaPart `json:"parts"` } // AudioChunk represents a chunk of streaming voice data.