refactor(agent): normalize dispatch and outbound turn metadata

This commit is contained in:
Hoshina
2026-04-07 22:12:23 +08:00
parent e32a209683
commit 9f23ec22d6
10 changed files with 511 additions and 81 deletions
+126 -50
View File
@@ -74,6 +74,7 @@ type AgentLoop struct {
// processOptions configures how a message is processed
type processOptions struct {
Dispatch DispatchRequest // Normalized routed request boundary for this turn
SessionKey string // Session identifier for history/context
SessionAliases []string // Compatibility aliases for the session key
Channel string // Target channel for tool execution
@@ -761,15 +762,48 @@ func outboundContextFromInbound(
return outboundCtx
}
func outboundScopeFromSessionScope(scope *session.SessionScope) *bus.OutboundScope {
if scope == nil {
return nil
}
outboundScope := &bus.OutboundScope{
Version: scope.Version,
AgentID: scope.AgentID,
Channel: scope.Channel,
Account: scope.Account,
}
if len(scope.Dimensions) > 0 {
outboundScope.Dimensions = append([]string(nil), scope.Dimensions...)
}
if len(scope.Values) > 0 {
outboundScope.Values = make(map[string]string, len(scope.Values))
for key, value := range scope.Values {
outboundScope.Values[key] = value
}
}
return outboundScope
}
func outboundTurnMetadata(
agentID, sessionKey string,
scope *session.SessionScope,
) (string, string, *bus.OutboundScope) {
return agentID, sessionKey, outboundScopeFromSessionScope(scope)
}
func outboundMessageForTurn(ts *turnState, content string) bus.OutboundMessage {
agentID, sessionKey, scope := outboundTurnMetadata(ts.agent.ID, ts.sessionKey, ts.opts.Dispatch.SessionScope)
return bus.OutboundMessage{
Context: outboundContextFromInbound(
ts.opts.InboundContext,
ts.opts.Dispatch.InboundContext,
ts.channel,
ts.chatID,
ts.opts.ReplyToMessageID,
ts.opts.Dispatch.ReplyToMessageID(),
),
Content: content,
AgentID: agentID,
SessionKey: sessionKey,
Scope: scope,
Content: content,
}
}
@@ -1442,11 +1476,20 @@ func (al *AgentLoop) ProcessHeartbeat(
if agent == nil {
return "", fmt.Errorf("no default agent for heartbeat")
}
dispatch := DispatchRequest{
SessionKey: "heartbeat",
UserMessage: content,
}
if channel != "" || chatID != "" {
dispatch.InboundContext = &bus.InboundContext{
Channel: channel,
ChatID: chatID,
ChatType: "direct",
SenderID: "heartbeat",
}
}
return al.runAgentLoop(ctx, agent, processOptions{
SessionKey: "heartbeat",
Channel: channel,
ChatID: chatID,
UserMessage: content,
Dispatch: dispatch,
DefaultResponse: defaultResponse,
EnableSummary: false,
SendResponse: false,
@@ -1521,22 +1564,19 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
})
opts := processOptions{
SessionKey: sessionKey,
SessionAliases: buildSessionAliases(sessionKey, append(allocation.SessionAliases, msg.SessionKey)...),
Channel: msg.Channel,
ChatID: msg.ChatID,
MessageID: msg.MessageID,
ReplyToMessageID: msg.Context.ReplyToMessageID,
SenderID: msg.SenderID,
Dispatch: DispatchRequest{
SessionKey: sessionKey,
SessionAliases: buildSessionAliases(sessionKey, append(allocation.SessionAliases, msg.SessionKey)...),
InboundContext: cloneInboundContext(&msg.Context),
RouteResult: cloneResolvedRoute(&route),
SessionScope: session.CloneScope(&allocation.Scope),
UserMessage: msg.Content,
Media: append([]string(nil), msg.Media...),
},
SenderDisplayName: msg.Sender.DisplayName,
UserMessage: msg.Content,
Media: msg.Media,
DefaultResponse: defaultResponse,
EnableSummary: true,
SendResponse: false,
InboundContext: cloneInboundContext(&msg.Context),
RouteResult: cloneResolvedRoute(&route),
SessionScope: session.CloneScope(&allocation.Scope),
}
// context-dependent commands check their own Runtime fields and report
@@ -1545,11 +1585,11 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
return response, nil
}
if pending := al.takePendingSkills(opts.SessionKey); len(pending) > 0 {
if pending := al.takePendingSkills(opts.Dispatch.SessionKey); len(pending) > 0 {
opts.ForcedSkills = append(opts.ForcedSkills, pending...)
logger.InfoCF("agent", "Applying pending skill override",
map[string]any{
"session_key": opts.SessionKey,
"session_key": opts.Dispatch.SessionKey,
"skills": strings.Join(pending, ","),
})
}
@@ -1712,12 +1752,21 @@ func (al *AgentLoop) processSystemMessage(
// Use the origin session for context
sessionKey := session.BuildMainSessionKey(agent.ID)
dispatch := DispatchRequest{
SessionKey: sessionKey,
UserMessage: fmt.Sprintf("[System: %s] %s", msg.SenderID, msg.Content),
}
if originChannel != "" || originChatID != "" {
dispatch.InboundContext = &bus.InboundContext{
Channel: originChannel,
ChatID: originChatID,
ChatType: "direct",
SenderID: msg.SenderID,
}
}
return al.runAgentLoop(ctx, agent, processOptions{
SessionKey: sessionKey,
Channel: originChannel,
ChatID: originChatID,
UserMessage: fmt.Sprintf("[System: %s] %s", msg.SenderID, msg.Content),
Dispatch: dispatch,
DefaultResponse: "Background task completed.",
EnableSummary: false,
SendResponse: true,
@@ -1731,9 +1780,13 @@ func (al *AgentLoop) runAgentLoop(
agent *AgentInstance,
opts processOptions,
) (string, error) {
opts = normalizeProcessOptions(opts)
// Record last channel for heartbeat notifications (skip internal channels and cli)
if opts.Channel != "" && opts.ChatID != "" && !constants.IsInternalChannel(opts.Channel) {
channelKey := fmt.Sprintf("%s:%s", opts.Channel, opts.ChatID)
if opts.Dispatch.Channel() != "" &&
opts.Dispatch.ChatID() != "" &&
!constants.IsInternalChannel(opts.Dispatch.Channel()) {
channelKey := fmt.Sprintf("%s:%s", opts.Dispatch.Channel(), opts.Dispatch.ChatID())
if err := al.RecordLastChannel(channelKey); err != nil {
logger.WarnCF(
"agent",
@@ -1743,12 +1796,17 @@ func (al *AgentLoop) runAgentLoop(
}
}
ensureSessionMetadata(agent.Sessions, opts.SessionKey, opts.SessionScope, opts.SessionAliases)
ensureSessionMetadata(
agent.Sessions,
opts.Dispatch.SessionKey,
opts.Dispatch.SessionScope,
opts.Dispatch.SessionAliases,
)
turnScope := al.newTurnEventScope(
agent.ID,
opts.SessionKey,
newTurnContext(opts.InboundContext, opts.RouteResult, opts.SessionScope),
opts.Dispatch.SessionKey,
newTurnContext(opts.Dispatch.InboundContext, opts.Dispatch.RouteResult, opts.Dispatch.SessionScope),
)
ts := newTurnState(agent, opts, turnScope)
result, err := al.runTurn(ctx, ts)
@@ -1770,14 +1828,22 @@ func (al *AgentLoop) runAgentLoop(
}
if opts.SendResponse && result.finalContent != "" {
agentID, sessionKey, scope := outboundTurnMetadata(
agent.ID,
opts.Dispatch.SessionKey,
opts.Dispatch.SessionScope,
)
al.bus.PublishOutbound(ctx, bus.OutboundMessage{
Context: outboundContextFromInbound(
opts.InboundContext,
opts.Channel,
opts.ChatID,
opts.ReplyToMessageID,
opts.Dispatch.InboundContext,
opts.Dispatch.Channel(),
opts.Dispatch.ChatID(),
opts.Dispatch.ReplyToMessageID(),
),
Content: result.finalContent,
AgentID: agentID,
SessionKey: sessionKey,
Scope: scope,
Content: result.finalContent,
})
}
@@ -1786,7 +1852,7 @@ func (al *AgentLoop) runAgentLoop(
logger.InfoCF("agent", fmt.Sprintf("Response: %s", responsePreview),
map[string]any{
"agent_id": agent.ID,
"session_key": opts.SessionKey,
"session_key": opts.Dispatch.SessionKey,
"iterations": ts.currentIteration(),
"final_length": len(result.finalContent),
})
@@ -1907,7 +1973,7 @@ func (al *AgentLoop) runTurn(ctx context.Context, ts *turnState) (turnResult, er
ts.media,
ts.channel,
ts.chatID,
ts.opts.SenderID,
ts.opts.Dispatch.SenderID(),
ts.opts.SenderDisplayName,
activeSkillNames(ts.agent, ts.opts)...,
)
@@ -1944,7 +2010,7 @@ func (al *AgentLoop) runTurn(ctx context.Context, ts *turnState) (turnResult, er
messages = ts.agent.ContextBuilder.BuildMessages(
history, summary, ts.userMessage,
ts.media, ts.channel, ts.chatID,
ts.opts.SenderID, ts.opts.SenderDisplayName,
ts.opts.Dispatch.SenderID(), ts.opts.SenderDisplayName,
activeSkillNames(ts.agent, ts.opts)...,
)
messages = resolveMediaRefs(messages, al.mediaStore, maxMediaSize)
@@ -2333,7 +2399,7 @@ turnLoop:
}
messages = ts.agent.ContextBuilder.BuildMessages(
history, summary, "",
nil, ts.channel, ts.chatID, ts.opts.SenderID, ts.opts.SenderDisplayName,
nil, ts.channel, ts.chatID, ts.opts.Dispatch.SenderID(), ts.opts.SenderDisplayName,
activeSkillNames(ts.agent, ts.opts)...,
)
callMessages = messages
@@ -2679,8 +2745,8 @@ turnLoop:
turnCtx,
ts.channel,
ts.chatID,
ts.opts.MessageID,
ts.opts.ReplyToMessageID,
ts.opts.Dispatch.MessageID(),
ts.opts.Dispatch.ReplyToMessageID(),
)
toolResult := ts.agent.Tools.ExecuteWithContext(
execCtx,
@@ -2745,12 +2811,15 @@ turnLoop:
}
outboundMedia := bus.OutboundMediaMessage{
Context: outboundContextFromInbound(
ts.opts.InboundContext,
ts.opts.Dispatch.InboundContext,
ts.channel,
ts.chatID,
ts.opts.ReplyToMessageID,
ts.opts.Dispatch.ReplyToMessageID(),
),
Parts: parts,
AgentID: ts.agent.ID,
SessionKey: ts.sessionKey,
Scope: outboundScopeFromSessionScope(ts.opts.Dispatch.SessionScope),
Parts: parts,
}
if al.channelManager != nil && ts.channel != "" && !constants.IsInternalChannel(ts.channel) {
if err := al.channelManager.SendMedia(ctx, outboundMedia); err != nil {
@@ -3226,6 +3295,8 @@ func (al *AgentLoop) handleCommand(
agent *AgentInstance,
opts *processOptions,
) (string, bool) {
normalizeProcessOptionsInPlace(opts)
if !commands.HasCommandPrefix(msg.Content) {
return "", false
}
@@ -3307,6 +3378,8 @@ func (al *AgentLoop) applyExplicitSkillCommand(
agent *AgentInstance,
opts *processOptions,
) (matched bool, handled bool, reply string) {
normalizeProcessOptionsInPlace(opts)
cmdName, ok := commands.CommandName(raw)
if !ok || cmdName != "use" {
return false, false, ""
@@ -3324,7 +3397,7 @@ func (al *AgentLoop) applyExplicitSkillCommand(
arg := strings.TrimSpace(parts[1])
if strings.EqualFold(arg, "clear") || strings.EqualFold(arg, "off") {
if opts != nil {
al.clearPendingSkills(opts.SessionKey)
al.clearPendingSkills(opts.Dispatch.SessionKey)
}
return true, true, "Cleared pending skill override."
}
@@ -3335,10 +3408,10 @@ func (al *AgentLoop) applyExplicitSkillCommand(
}
if len(parts) < 3 {
if opts == nil || strings.TrimSpace(opts.SessionKey) == "" {
if opts == nil || strings.TrimSpace(opts.Dispatch.SessionKey) == "" {
return true, true, commandsUnavailableSkillMessage()
}
al.setPendingSkills(opts.SessionKey, []string{skillName})
al.setPendingSkills(opts.Dispatch.SessionKey, []string{skillName})
return true, true, fmt.Sprintf(
"Skill %q is armed for your next message. Send your next prompt normally, or use /use clear to cancel.",
skillName,
@@ -3352,6 +3425,7 @@ func (al *AgentLoop) applyExplicitSkillCommand(
if opts != nil {
opts.ForcedSkills = append(opts.ForcedSkills, skillName)
opts.Dispatch.UserMessage = message
opts.UserMessage = message
}
@@ -3359,6 +3433,8 @@ func (al *AgentLoop) applyExplicitSkillCommand(
}
func (al *AgentLoop) buildCommandsRuntime(agent *AgentInstance, opts *processOptions) *commands.Runtime {
normalizeProcessOptionsInPlace(opts)
registry := al.GetRegistry()
cfg := al.GetConfig()
rt := &commands.Runtime{
@@ -3444,9 +3520,9 @@ func (al *AgentLoop) buildCommandsRuntime(agent *AgentInstance, opts *processOpt
return fmt.Errorf("sessions not initialized for agent")
}
agent.Sessions.SetHistory(opts.SessionKey, make([]providers.Message, 0))
agent.Sessions.SetSummary(opts.SessionKey, "")
agent.Sessions.Save(opts.SessionKey)
agent.Sessions.SetHistory(opts.Dispatch.SessionKey, make([]providers.Message, 0))
agent.Sessions.SetSummary(opts.Dispatch.SessionKey, "")
agent.Sessions.Save(opts.Dispatch.SessionKey)
return nil
}
}