diff --git a/pkg/agent/eventbus_test.go b/pkg/agent/eventbus_test.go index 8706a2c4e..6a75ab8d9 100644 --- a/pkg/agent/eventbus_test.go +++ b/pkg/agent/eventbus_test.go @@ -10,6 +10,8 @@ import ( "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/config" "github.com/sipeed/picoclaw/pkg/providers" + "github.com/sipeed/picoclaw/pkg/routing" + "github.com/sipeed/picoclaw/pkg/session" "github.com/sipeed/picoclaw/pkg/tools" ) @@ -142,6 +144,25 @@ func TestAgentLoop_EmitsMinimalTurnEvents(t *testing.T) { ChatType: "direct", SenderID: "tester", }, + RouteResult: &routing.ResolvedRoute{ + AgentID: "main", + Channel: "cli", + AccountID: routing.DefaultAccountID, + SessionPolicy: routing.SessionPolicy{ + DMScope: routing.DMScopePerPeer, + }, + MatchedBy: "default", + }, + SessionScope: &session.SessionScope{ + Version: session.ScopeVersionV1, + AgentID: "main", + Channel: "cli", + Account: routing.DefaultAccountID, + Dimensions: []string{"sender"}, + Values: map[string]string{ + "sender": "tester", + }, + }, }) if err != nil { t.Fatalf("runAgentLoop failed: %v", err) @@ -182,11 +203,17 @@ func TestAgentLoop_EmitsMinimalTurnEvents(t *testing.T) { if evt.Meta.SessionKey != "session-1" { t.Fatalf("event %d has session key %q, want session-1", i, evt.Meta.SessionKey) } - if evt.Meta.Context == nil || evt.Meta.Context.Inbound == nil { + if evt.Context == nil || evt.Context.Inbound == nil { t.Fatalf("event %d missing inbound turn context", i) } - if evt.Meta.Context.Inbound.Channel != "cli" || evt.Meta.Context.Inbound.SenderID != "tester" { - t.Fatalf("event %d inbound context = %+v", i, evt.Meta.Context.Inbound) + if evt.Context.Inbound.Channel != "cli" || evt.Context.Inbound.SenderID != "tester" { + t.Fatalf("event %d inbound context = %+v", i, evt.Context.Inbound) + } + if evt.Context.Route == nil || evt.Context.Route.AgentID != "main" { + t.Fatalf("event %d missing route context: %+v", i, evt.Context.Route) + } + if evt.Context.Scope == nil || evt.Context.Scope.Values["sender"] != "tester" { + t.Fatalf("event %d missing session scope: %+v", i, evt.Context.Scope) } } diff --git a/pkg/agent/events.go b/pkg/agent/events.go index fa006b9a5..d17f5a90b 100644 --- a/pkg/agent/events.go +++ b/pkg/agent/events.go @@ -86,6 +86,7 @@ type Event struct { Kind EventKind Time time.Time Meta EventMeta + Context *TurnContext Payload any } @@ -98,7 +99,7 @@ type EventMeta struct { Iteration int TracePath string Source string - Context *TurnContext `json:"context,omitempty"` + turnContext *TurnContext } // TurnEndStatus describes the terminal state of a turn. diff --git a/pkg/agent/hooks.go b/pkg/agent/hooks.go index 7a5f8c59b..c3c4b21ce 100644 --- a/pkg/agent/hooks.go +++ b/pkg/agent/hooks.go @@ -89,6 +89,7 @@ type ToolApprover interface { type LLMHookRequest struct { Meta EventMeta `json:"meta"` + Context *TurnContext `json:"context,omitempty"` Model string `json:"model"` Messages []providers.Message `json:"messages,omitempty"` Tools []providers.ToolDefinition `json:"tools,omitempty"` @@ -104,6 +105,7 @@ func (r *LLMHookRequest) Clone() *LLMHookRequest { } cloned := *r cloned.Meta = cloneEventMeta(r.Meta) + cloned.Context = cloneTurnContext(r.Context) cloned.Messages = cloneProviderMessages(r.Messages) cloned.Tools = cloneToolDefinitions(r.Tools) cloned.Options = cloneStringAnyMap(r.Options) @@ -112,6 +114,7 @@ func (r *LLMHookRequest) Clone() *LLMHookRequest { type LLMHookResponse struct { Meta EventMeta `json:"meta"` + Context *TurnContext `json:"context,omitempty"` Model string `json:"model"` Response *providers.LLMResponse `json:"response,omitempty"` Channel string `json:"channel,omitempty"` @@ -124,12 +127,14 @@ func (r *LLMHookResponse) Clone() *LLMHookResponse { } cloned := *r cloned.Meta = cloneEventMeta(r.Meta) + cloned.Context = cloneTurnContext(r.Context) cloned.Response = cloneLLMResponse(r.Response) return &cloned } type ToolCallHookRequest struct { Meta EventMeta `json:"meta"` + Context *TurnContext `json:"context,omitempty"` Tool string `json:"tool"` Arguments map[string]any `json:"arguments,omitempty"` Channel string `json:"channel,omitempty"` @@ -142,12 +147,14 @@ func (r *ToolCallHookRequest) Clone() *ToolCallHookRequest { } cloned := *r cloned.Meta = cloneEventMeta(r.Meta) + cloned.Context = cloneTurnContext(r.Context) cloned.Arguments = cloneStringAnyMap(r.Arguments) return &cloned } type ToolApprovalRequest struct { Meta EventMeta `json:"meta"` + Context *TurnContext `json:"context,omitempty"` Tool string `json:"tool"` Arguments map[string]any `json:"arguments,omitempty"` Channel string `json:"channel,omitempty"` @@ -160,12 +167,14 @@ func (r *ToolApprovalRequest) Clone() *ToolApprovalRequest { } cloned := *r cloned.Meta = cloneEventMeta(r.Meta) + cloned.Context = cloneTurnContext(r.Context) cloned.Arguments = cloneStringAnyMap(r.Arguments) return &cloned } type ToolResultHookResponse struct { Meta EventMeta `json:"meta"` + Context *TurnContext `json:"context,omitempty"` Tool string `json:"tool"` Arguments map[string]any `json:"arguments,omitempty"` Result *tools.ToolResult `json:"result,omitempty"` @@ -180,6 +189,7 @@ func (r *ToolResultHookResponse) Clone() *ToolResultHookResponse { } cloned := *r cloned.Meta = cloneEventMeta(r.Meta) + cloned.Context = cloneTurnContext(r.Context) cloned.Arguments = cloneStringAnyMap(r.Arguments) cloned.Result = cloneToolResult(r.Result) return &cloned diff --git a/pkg/agent/hooks_test.go b/pkg/agent/hooks_test.go index 1851090b8..3287a2a1d 100644 --- a/pkg/agent/hooks_test.go +++ b/pkg/agent/hooks_test.go @@ -10,6 +10,8 @@ import ( "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/config" "github.com/sipeed/picoclaw/pkg/providers" + "github.com/sipeed/picoclaw/pkg/routing" + "github.com/sipeed/picoclaw/pkg/session" "github.com/sipeed/picoclaw/pkg/tools" ) @@ -124,8 +126,8 @@ func (h *llmObserverHook) BeforeLLM( ctx context.Context, req *LLMHookRequest, ) (*LLMHookRequest, HookDecision, error) { - if req.Meta.Context != nil { - h.lastInbound = cloneInboundContext(req.Meta.Context.Inbound) + if req.Context != nil { + h.lastInbound = cloneInboundContext(req.Context.Inbound) } next := req.Clone() next.Model = "hook-model" @@ -165,6 +167,25 @@ func TestAgentLoop_Hooks_ObserverAndLLMInterceptor(t *testing.T) { ChatType: "direct", SenderID: "hook-user", }, + RouteResult: &routing.ResolvedRoute{ + AgentID: "main", + Channel: "cli", + AccountID: routing.DefaultAccountID, + SessionPolicy: routing.SessionPolicy{ + DMScope: routing.DMScopePerPeer, + }, + MatchedBy: "default", + }, + SessionScope: &session.SessionScope{ + Version: session.ScopeVersionV1, + AgentID: "main", + Channel: "cli", + Account: routing.DefaultAccountID, + Dimensions: []string{"sender"}, + Values: map[string]string{ + "sender": "hook-user", + }, + }, }) if err != nil { t.Fatalf("runAgentLoop failed: %v", err) @@ -185,15 +206,24 @@ func TestAgentLoop_Hooks_ObserverAndLLMInterceptor(t *testing.T) { if hook.lastInbound.Channel != "cli" || hook.lastInbound.SenderID != "hook-user" { t.Fatalf("hook inbound context = %+v", hook.lastInbound) } + if hook.lastInbound != nil && hook.lastInbound.ChatID != "direct" { + t.Fatalf("hook inbound chat ID = %q, want direct", hook.lastInbound.ChatID) + } select { case evt := <-hook.eventCh: if evt.Kind != EventKindTurnEnd { t.Fatalf("expected turn end event, got %v", evt.Kind) } - if evt.Meta.Context == nil || evt.Meta.Context.Inbound == nil { + if evt.Context == nil || evt.Context.Inbound == nil { t.Fatal("expected observer event to carry inbound context") } + if evt.Context.Route == nil || evt.Context.Route.AgentID != "main" { + t.Fatalf("expected observer event to carry route context, got %+v", evt.Context.Route) + } + if evt.Context.Scope == nil || evt.Context.Scope.Values["sender"] != "hook-user" { + t.Fatalf("expected observer event to carry session scope, got %+v", evt.Context.Scope) + } case <-time.After(2 * time.Second): t.Fatal("timed out waiting for hook observer event") } diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 8b388755a..0b3c2fee4 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -73,25 +73,27 @@ type AgentLoop struct { // processOptions configures how a message is processed type processOptions struct { - SessionKey string // Session identifier for history/context - Channel string // Target channel for tool execution - ChatID string // Target chat ID for tool execution - MessageID string // Current inbound platform message ID - ReplyToMessageID string // Current inbound reply target message ID - SenderID string // Current sender ID for dynamic context - SenderDisplayName string // Current sender display name for dynamic context - UserMessage string // User message content (may include prefix) - ForcedSkills []string // Skills explicitly requested for this message - SystemPromptOverride string // Override the default system prompt (Used by SubTurns) - Media []string // media:// refs from inbound message - InitialSteeringMessages []providers.Message // Steering messages from refactor/agent - DefaultResponse string // Response when LLM returns empty - EnableSummary bool // Whether to trigger summarization - SendResponse bool // Whether to send response via bus - SuppressToolFeedback bool // Whether to suppress inline tool feedback messages - NoHistory bool // If true, don't load session history (for heartbeat) - SkipInitialSteeringPoll bool // If true, skip the steering poll at loop start (used by Continue) - InboundContext *bus.InboundContext // Normalized inbound facts for events/hooks + SessionKey string // Session identifier for history/context + Channel string // Target channel for tool execution + ChatID string // Target chat ID for tool execution + MessageID string // Current inbound platform message ID + ReplyToMessageID string // Current inbound reply target message ID + SenderID string // Current sender ID for dynamic context + SenderDisplayName string // Current sender display name for dynamic context + UserMessage string // User message content (may include prefix) + ForcedSkills []string // Skills explicitly requested for this message + SystemPromptOverride string // Override the default system prompt (Used by SubTurns) + Media []string // media:// refs from inbound message + InitialSteeringMessages []providers.Message // Steering messages from refactor/agent + DefaultResponse string // Response when LLM returns empty + EnableSummary bool // Whether to trigger summarization + SendResponse bool // Whether to send response via bus + SuppressToolFeedback bool // Whether to suppress inline tool feedback messages + NoHistory bool // If true, don't load session history (for heartbeat) + SkipInitialSteeringPoll bool // If true, skip the steering poll at loop start (used by Continue) + InboundContext *bus.InboundContext // Normalized inbound facts for events/hooks + RouteResult *routing.ResolvedRoute // Route decision snapshot for events/hooks + SessionScope *session.SessionScope // Session scope snapshot for events/hooks } type continuationTarget struct { @@ -705,6 +707,45 @@ func (al *AgentLoop) Close() { } } +func outboundContextFromInbound( + inbound *bus.InboundContext, + channel, chatID, replyToMessageID string, +) bus.InboundContext { + if inbound == nil { + return bus.ContextFromLegacyOutbound(bus.OutboundMessage{ + Channel: channel, + ChatID: chatID, + ReplyToMessageID: replyToMessageID, + }) + } + + outboundCtx := *cloneInboundContext(inbound) + if outboundCtx.Channel == "" { + outboundCtx.Channel = channel + } + if outboundCtx.ChatID == "" { + outboundCtx.ChatID = chatID + } + if outboundCtx.ReplyToMessageID == "" { + outboundCtx.ReplyToMessageID = replyToMessageID + } + return outboundCtx +} + +func outboundMessageForTurn(ts *turnState, content string) bus.OutboundMessage { + return bus.OutboundMessage{ + Channel: ts.channel, + ChatID: ts.chatID, + Context: outboundContextFromInbound( + ts.opts.InboundContext, + ts.channel, + ts.chatID, + ts.opts.ReplyToMessageID, + ), + Content: content, + } +} + // MountHook registers an in-process hook on the agent loop. func (al *AgentLoop) MountHook(reg HookRegistration) error { if al == nil || al.hooks == nil { @@ -766,20 +807,22 @@ func (al *AgentLoop) newTurnEventScope(agentID, sessionKey string, turnCtx *Turn func (ts turnEventScope) meta(iteration int, source, tracePath string) EventMeta { return EventMeta{ - AgentID: ts.agentID, - TurnID: ts.turnID, - SessionKey: ts.sessionKey, - Iteration: iteration, - Source: source, - TracePath: tracePath, - Context: cloneTurnContext(ts.context), + AgentID: ts.agentID, + TurnID: ts.turnID, + SessionKey: ts.sessionKey, + Iteration: iteration, + Source: source, + TracePath: tracePath, + turnContext: cloneTurnContext(ts.context), } } func (al *AgentLoop) emitEvent(kind EventKind, meta EventMeta, payload any) { + clonedMeta := cloneEventMeta(meta) evt := Event{ Kind: kind, - Meta: cloneEventMeta(meta), + Meta: clonedMeta, + Context: cloneTurnContext(clonedMeta.turnContext), Payload: payload, } @@ -1361,6 +1404,8 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) EnableSummary: true, SendResponse: false, InboundContext: cloneInboundContext(&msg.Context), + RouteResult: cloneResolvedRoute(&route), + SessionScope: session.CloneScope(&allocation.Scope), } // context-dependent commands check their own Runtime fields and report @@ -1540,7 +1585,11 @@ func (al *AgentLoop) runAgentLoop( } } - turnScope := al.newTurnEventScope(agent.ID, opts.SessionKey, newTurnContext(opts.InboundContext)) + turnScope := al.newTurnEventScope( + agent.ID, + opts.SessionKey, + newTurnContext(opts.InboundContext, opts.RouteResult, opts.SessionScope), + ) ts := newTurnState(agent, opts, turnScope) result, err := al.runTurn(ctx, ts) if err != nil { @@ -1564,6 +1613,12 @@ func (al *AgentLoop) runAgentLoop( al.bus.PublishOutbound(ctx, bus.OutboundMessage{ Channel: opts.Channel, ChatID: opts.ChatID, + Context: outboundContextFromInbound( + opts.InboundContext, + opts.Channel, + opts.ChatID, + opts.ReplyToMessageID, + ), Content: result.finalContent, }) } @@ -1897,6 +1952,7 @@ turnLoop: if al.hooks != nil { llmReq, decision := al.hooks.BeforeLLM(turnCtx, &LLMHookRequest{ Meta: ts.eventMeta("runTurn", "turn.llm.request"), + Context: cloneTurnContext(ts.turnCtx), Model: llmModel, Messages: callMessages, Tools: providerToolDefs, @@ -2069,11 +2125,10 @@ turnLoop: ) if retry == 0 && !constants.IsInternalChannel(ts.channel) { - al.bus.PublishOutbound(ctx, bus.OutboundMessage{ - Channel: ts.channel, - ChatID: ts.chatID, - Content: "Context window exceeded. Compressing history and retrying...", - }) + al.bus.PublishOutbound(ctx, outboundMessageForTurn( + ts, + "Context window exceeded. Compressing history and retrying...", + )) } if compression, ok := al.forceCompression(ts.agent, ts.sessionKey); ok { @@ -2128,6 +2183,7 @@ turnLoop: if al.hooks != nil { llmResp, decision := al.hooks.AfterLLM(turnCtx, &LLMHookResponse{ Meta: ts.eventMeta("runTurn", "turn.llm.response"), + Context: cloneTurnContext(ts.turnCtx), Model: llmModel, Response: response, Channel: ts.channel, @@ -2280,6 +2336,7 @@ turnLoop: if al.hooks != nil { toolReq, decision := al.hooks.BeforeTool(turnCtx, &ToolCallHookRequest{ Meta: ts.eventMeta("runTurn", "turn.tool.before"), + Context: cloneTurnContext(ts.turnCtx), Tool: toolName, Arguments: toolArgs, Channel: ts.channel, @@ -2326,6 +2383,7 @@ turnLoop: if al.hooks != nil { approval := al.hooks.ApproveTool(turnCtx, &ToolApprovalRequest{ Meta: ts.eventMeta("runTurn", "turn.tool.approve"), + Context: cloneTurnContext(ts.turnCtx), Tool: toolName, Arguments: toolArgs, Channel: ts.channel, @@ -2383,11 +2441,7 @@ turnLoop: ) feedbackMsg := fmt.Sprintf("\U0001f527 `%s`\n```\n%s\n```", tc.Name, feedbackPreview) fbCtx, fbCancel := context.WithTimeout(turnCtx, 3*time.Second) - _ = al.bus.PublishOutbound(fbCtx, bus.OutboundMessage{ - Channel: ts.channel, - ChatID: ts.chatID, - Content: feedbackMsg, - }) + _ = al.bus.PublishOutbound(fbCtx, outboundMessageForTurn(ts, feedbackMsg)) fbCancel() } @@ -2400,11 +2454,7 @@ turnLoop: if !result.Silent && result.ForUser != "" { outCtx, outCancel := context.WithTimeout(context.Background(), 5*time.Second) defer outCancel() - _ = al.bus.PublishOutbound(outCtx, bus.OutboundMessage{ - Channel: ts.channel, - ChatID: ts.chatID, - Content: result.ForUser, - }) + _ = al.bus.PublishOutbound(outCtx, outboundMessageForTurn(ts, result.ForUser)) } // Determine content for the agent loop (ForLLM or error). @@ -2469,6 +2519,7 @@ turnLoop: if al.hooks != nil { toolResp, decision := al.hooks.AfterTool(turnCtx, &ToolResultHookResponse{ Meta: ts.eventMeta("runTurn", "turn.tool.after"), + Context: cloneTurnContext(ts.turnCtx), Tool: toolName, Arguments: toolArgs, Result: toolResult, @@ -2545,11 +2596,7 @@ turnLoop: } if !toolResult.Silent && toolResult.ForUser != "" && ts.opts.SendResponse { - al.bus.PublishOutbound(ctx, bus.OutboundMessage{ - Channel: ts.channel, - ChatID: ts.chatID, - Content: toolResult.ForUser, - }) + al.bus.PublishOutbound(ctx, outboundMessageForTurn(ts, toolResult.ForUser)) logger.DebugCF("agent", "Sent tool result to user", map[string]any{ "tool": toolName, diff --git a/pkg/agent/subturn.go b/pkg/agent/subturn.go index e243d8ac0..56439885a 100644 --- a/pkg/agent/subturn.go +++ b/pkg/agent/subturn.go @@ -370,7 +370,11 @@ func spawnSubTurn( } // Create event scope for the child turn - scope := al.newTurnEventScope(agent.ID, childID, newTurnContext(opts.InboundContext)) + scope := al.newTurnEventScope( + agent.ID, + childID, + newTurnContext(opts.InboundContext, opts.RouteResult, opts.SessionScope), + ) // Create child turnState using the new API childTS := newTurnState(&agent, opts, scope) diff --git a/pkg/agent/turn.go b/pkg/agent/turn.go index 3339b3418..41a57d942 100644 --- a/pkg/agent/turn.go +++ b/pkg/agent/turn.go @@ -303,13 +303,13 @@ func (ts *turnState) hardAbortRequested() bool { func (ts *turnState) eventMeta(source, tracePath string) EventMeta { snap := ts.snapshot() return EventMeta{ - AgentID: snap.AgentID, - TurnID: snap.TurnID, - SessionKey: snap.SessionKey, - Iteration: snap.Iteration, - Source: source, - TracePath: tracePath, - Context: cloneTurnContext(ts.turnCtx), + AgentID: snap.AgentID, + TurnID: snap.TurnID, + SessionKey: snap.SessionKey, + Iteration: snap.Iteration, + Source: source, + TracePath: tracePath, + turnContext: cloneTurnContext(ts.turnCtx), } } diff --git a/pkg/agent/turn_context.go b/pkg/agent/turn_context.go index a448e24cd..95ed5a0f3 100644 --- a/pkg/agent/turn_context.go +++ b/pkg/agent/turn_context.go @@ -1,19 +1,31 @@ package agent -import "github.com/sipeed/picoclaw/pkg/bus" +import ( + "github.com/sipeed/picoclaw/pkg/bus" + "github.com/sipeed/picoclaw/pkg/routing" + "github.com/sipeed/picoclaw/pkg/session" +) // TurnContext carries normalized turn-scoped facts that can be shared across // events, hooks, and other runtime observers without re-parsing legacy fields. type TurnContext struct { - Inbound *bus.InboundContext `json:"inbound,omitempty"` + Inbound *bus.InboundContext `json:"inbound,omitempty"` + Route *routing.ResolvedRoute `json:"route,omitempty"` + Scope *session.SessionScope `json:"scope,omitempty"` } -func newTurnContext(inbound *bus.InboundContext) *TurnContext { - if inbound == nil { +func newTurnContext( + inbound *bus.InboundContext, + route *routing.ResolvedRoute, + scope *session.SessionScope, +) *TurnContext { + if inbound == nil && route == nil && scope == nil { return nil } return &TurnContext{ Inbound: cloneInboundContext(inbound), + Route: cloneResolvedRoute(route), + Scope: session.CloneScope(scope), } } @@ -23,6 +35,8 @@ func cloneTurnContext(ctx *TurnContext) *TurnContext { } cloned := *ctx cloned.Inbound = cloneInboundContext(ctx.Inbound) + cloned.Route = cloneResolvedRoute(ctx.Route) + cloned.Scope = session.CloneScope(ctx.Scope) return &cloned } @@ -48,6 +62,31 @@ func cloneStringMap(src map[string]string) map[string]string { } func cloneEventMeta(meta EventMeta) EventMeta { - meta.Context = cloneTurnContext(meta.Context) + meta.turnContext = cloneTurnContext(meta.turnContext) return meta } + +func cloneResolvedRoute(route *routing.ResolvedRoute) *routing.ResolvedRoute { + if route == nil { + return nil + } + cloned := *route + cloned.SessionPolicy = routing.SessionPolicy{ + DMScope: route.SessionPolicy.DMScope, + IdentityLinks: cloneIdentityLinks(route.SessionPolicy.IdentityLinks), + } + return &cloned +} + +func cloneIdentityLinks(src map[string][]string) map[string][]string { + if len(src) == 0 { + return nil + } + cloned := make(map[string][]string, len(src)) + for canonical, ids := range src { + dup := make([]string, len(ids)) + copy(dup, ids) + cloned[canonical] = dup + } + return cloned +} diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index f6a339ff0..3e7ec9cdc 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -89,6 +89,7 @@ func (mb *MessageBus) InboundChan() <-chan InboundMessage { } func (mb *MessageBus) PublishOutbound(ctx context.Context, msg OutboundMessage) error { + msg = NormalizeOutboundMessage(msg) return publish(ctx, mb, mb.outbound, msg) } @@ -97,6 +98,7 @@ func (mb *MessageBus) OutboundChan() <-chan OutboundMessage { } func (mb *MessageBus) PublishOutboundMedia(ctx context.Context, msg OutboundMediaMessage) error { + msg = NormalizeOutboundMediaMessage(msg) return publish(ctx, mb, mb.outboundMedia, msg) } diff --git a/pkg/bus/bus_test.go b/pkg/bus/bus_test.go index ab79c0d49..087c0a65e 100644 --- a/pkg/bus/bus_test.go +++ b/pkg/bus/bus_test.go @@ -181,6 +181,66 @@ func TestPublishOutboundSubscribe(t *testing.T) { } } +func TestPublishOutbound_MirrorsContextToLegacyFields(t *testing.T) { + mb := NewMessageBus() + defer mb.Close() + + msg := OutboundMessage{ + Context: InboundContext{ + Channel: "telegram", + ChatID: "chat-42", + ReplyToMessageID: "msg-9", + }, + Content: "reply", + } + + if err := mb.PublishOutbound(context.Background(), msg); err != nil { + t.Fatalf("PublishOutbound failed: %v", err) + } + + got := <-mb.OutboundChan() + if got.Channel != "telegram" { + t.Fatalf("expected legacy channel telegram, got %q", got.Channel) + } + if got.ChatID != "chat-42" { + t.Fatalf("expected legacy chat ID chat-42, got %q", got.ChatID) + } + if got.ReplyToMessageID != "msg-9" { + t.Fatalf("expected mirrored reply_to_message_id msg-9, got %q", got.ReplyToMessageID) + } + if got.Context.Channel != "telegram" || got.Context.ChatID != "chat-42" { + t.Fatalf("unexpected outbound context: %+v", got.Context) + } +} + +func TestPublishOutboundMedia_MirrorsContextToLegacyFields(t *testing.T) { + mb := NewMessageBus() + defer mb.Close() + + msg := OutboundMediaMessage{ + Context: InboundContext{ + Channel: "slack", + ChatID: "C001", + }, + Parts: []MediaPart{{Type: "image", Ref: "media://1"}}, + } + + if err := mb.PublishOutboundMedia(context.Background(), msg); err != nil { + t.Fatalf("PublishOutboundMedia failed: %v", err) + } + + got := <-mb.OutboundMediaChan() + if got.Channel != "slack" { + t.Fatalf("expected legacy channel slack, got %q", got.Channel) + } + if got.ChatID != "C001" { + t.Fatalf("expected legacy chat ID C001, got %q", got.ChatID) + } + if got.Context.Channel != "slack" || got.Context.ChatID != "C001" { + t.Fatalf("unexpected outbound media context: %+v", got.Context) + } +} + func TestPublishInbound_ContextCancel(t *testing.T) { mb := NewMessageBus() defer mb.Close() diff --git a/pkg/bus/outbound_context.go b/pkg/bus/outbound_context.go new file mode 100644 index 000000000..e02353ea9 --- /dev/null +++ b/pkg/bus/outbound_context.go @@ -0,0 +1,63 @@ +package bus + +import "strings" + +// ContextFromLegacyOutbound builds a minimal outbound context from the legacy +// top-level outbound fields. This keeps older outbound publishers working +// while new publishers gradually start carrying the original InboundContext. +func ContextFromLegacyOutbound(msg OutboundMessage) InboundContext { + return normalizeInboundContext(InboundContext{ + Channel: strings.TrimSpace(msg.Channel), + ChatID: strings.TrimSpace(msg.ChatID), + ReplyToMessageID: strings.TrimSpace(msg.ReplyToMessageID), + }) +} + +// ContextFromLegacyOutboundMedia builds a minimal outbound context for media. +func ContextFromLegacyOutboundMedia(msg OutboundMediaMessage) InboundContext { + return normalizeInboundContext(InboundContext{ + Channel: strings.TrimSpace(msg.Channel), + ChatID: strings.TrimSpace(msg.ChatID), + }) +} + +// NormalizeOutboundMessage ensures Context is present and mirrors legacy +// top-level addressing fields from it so older senders keep working. +func NormalizeOutboundMessage(msg OutboundMessage) OutboundMessage { + if msg.Context.isZero() { + msg.Context = ContextFromLegacyOutbound(msg) + } else { + msg.Context = normalizeInboundContext(msg.Context) + } + + if msg.Channel == "" { + msg.Channel = msg.Context.Channel + } + if msg.ChatID == "" { + msg.ChatID = msg.Context.ChatID + } + if msg.ReplyToMessageID == "" { + msg.ReplyToMessageID = msg.Context.ReplyToMessageID + } + + return msg +} + +// NormalizeOutboundMediaMessage ensures media outbound messages also carry a +// normalized context while preserving the legacy top-level routing fields. +func NormalizeOutboundMediaMessage(msg OutboundMediaMessage) OutboundMediaMessage { + if msg.Context.isZero() { + msg.Context = ContextFromLegacyOutboundMedia(msg) + } else { + msg.Context = normalizeInboundContext(msg.Context) + } + + if msg.Channel == "" { + msg.Channel = msg.Context.Channel + } + if msg.ChatID == "" { + msg.ChatID = msg.Context.ChatID + } + + return msg +} diff --git a/pkg/bus/types.go b/pkg/bus/types.go index 0c4cd707b..f844ab1e0 100644 --- a/pkg/bus/types.go +++ b/pkg/bus/types.go @@ -58,10 +58,11 @@ type InboundMessage struct { } type OutboundMessage struct { - Channel string `json:"channel"` - ChatID string `json:"chat_id"` - Content string `json:"content"` - ReplyToMessageID string `json:"reply_to_message_id,omitempty"` + Channel string `json:"channel"` + ChatID string `json:"chat_id"` + Context InboundContext `json:"context"` + Content string `json:"content"` + ReplyToMessageID string `json:"reply_to_message_id,omitempty"` } // MediaPart describes a single media attachment to send. @@ -75,7 +76,8 @@ type MediaPart struct { // OutboundMediaMessage carries media attachments from Agent to channels via the bus. type OutboundMediaMessage struct { - Channel string `json:"channel"` - ChatID string `json:"chat_id"` - Parts []MediaPart `json:"parts"` + Channel string `json:"channel"` + ChatID string `json:"chat_id"` + Context InboundContext `json:"context"` + Parts []MediaPart `json:"parts"` } diff --git a/pkg/channels/manager.go b/pkg/channels/manager.go index 5fbf35ebf..76d1e67c5 100644 --- a/pkg/channels/manager.go +++ b/pkg/channels/manager.go @@ -1130,6 +1130,8 @@ func (m *Manager) UnregisterChannel(name string) { // delivered (or all retries are exhausted), which preserves ordering when // a subsequent operation depends on the message having been sent. func (m *Manager) SendMessage(ctx context.Context, msg bus.OutboundMessage) error { + msg = bus.NormalizeOutboundMessage(msg) + m.mu.RLock() _, exists := m.channels[msg.Channel] w, wExists := m.workers[msg.Channel] @@ -1163,6 +1165,8 @@ func (m *Manager) SendMessage(ctx context.Context, msg bus.OutboundMessage) erro // retries are exhausted), which preserves ordering when later agent behavior // depends on actual media delivery. func (m *Manager) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error { + msg = bus.NormalizeOutboundMediaMessage(msg) + m.mu.RLock() _, exists := m.channels[msg.Channel] w, wExists := m.workers[msg.Channel] diff --git a/pkg/routing/session_key.go b/pkg/routing/session_key.go index eab592bec..17b62f4b7 100644 --- a/pkg/routing/session_key.go +++ b/pkg/routing/session_key.go @@ -60,15 +60,7 @@ func BuildAgentPeerSessionKey(params SessionKeyParams) string { if dmScope == "" { dmScope = DMScopeMain } - peerID := strings.TrimSpace(peer.ID) - - // Resolve identity links (cross-platform collapse) - if dmScope != DMScopeMain && peerID != "" { - if linked := resolveLinkedPeerID(params.IdentityLinks, params.Channel, peerID); linked != "" { - peerID = linked - } - } - peerID = strings.ToLower(peerID) + peerID := CanonicalSessionPeerID(params.Channel, peer.ID, dmScope, params.IdentityLinks) switch dmScope { case DMScopePerAccountChannelPeer: @@ -99,6 +91,27 @@ func BuildAgentPeerSessionKey(params SessionKeyParams) string { return fmt.Sprintf("agent:%s:%s:%s:%s", agentID, channel, peerKind, peerID) } +// CanonicalSessionPeerID applies the current DM session canonicalization rules, +// including identity-link collapse when enabled. +func CanonicalSessionPeerID( + channel, peerID string, + dmScope DMScope, + identityLinks map[string][]string, +) string { + normalizedPeerID := strings.TrimSpace(peerID) + if normalizedPeerID == "" { + return "" + } + + if dmScope != DMScopeMain { + if linked := resolveLinkedPeerID(identityLinks, channel, normalizedPeerID); linked != "" { + normalizedPeerID = linked + } + } + + return strings.ToLower(normalizedPeerID) +} + // ParseAgentSessionKey extracts agentId and rest from "agent::". func ParseAgentSessionKey(sessionKey string) *ParsedSessionKey { raw := strings.TrimSpace(sessionKey) diff --git a/pkg/session/allocator.go b/pkg/session/allocator.go index 675e577f8..a3b8e075d 100644 --- a/pkg/session/allocator.go +++ b/pkg/session/allocator.go @@ -1,6 +1,7 @@ package session import ( + "fmt" "strings" "github.com/sipeed/picoclaw/pkg/routing" @@ -10,6 +11,7 @@ import ( // The current implementation intentionally preserves the legacy session-key // layout while moving key construction out of the router. type Allocation struct { + Scope SessionScope SessionKey string MainSessionKey string } @@ -27,6 +29,7 @@ type AllocationInput struct { // AllocateRouteSession maps a route decision onto the current legacy // agent-scoped session-key format. func AllocateRouteSession(input AllocationInput) Allocation { + scope := buildSessionScope(input) sessionKey := strings.ToLower(routing.BuildAgentPeerSessionKey(routing.SessionKeyParams{ AgentID: input.AgentID, Channel: input.Channel, @@ -37,7 +40,58 @@ func AllocateRouteSession(input AllocationInput) Allocation { })) mainSessionKey := strings.ToLower(routing.BuildAgentMainSessionKey(input.AgentID)) return Allocation{ + Scope: scope, SessionKey: sessionKey, MainSessionKey: mainSessionKey, } } + +func buildSessionScope(input AllocationInput) SessionScope { + scope := SessionScope{ + Version: ScopeVersionV1, + AgentID: routing.NormalizeAgentID(input.AgentID), + Channel: strings.ToLower(strings.TrimSpace(input.Channel)), + Account: routing.NormalizeAccountID(input.AccountID), + } + + peer := input.Peer + if peer == nil { + peer = &routing.RoutePeer{Kind: "direct"} + } + + peerKind := strings.ToLower(strings.TrimSpace(peer.Kind)) + if peerKind == "" { + peerKind = "direct" + } + + switch peerKind { + case "direct": + if input.SessionPolicy.DMScope == routing.DMScopeMain { + return scope + } + peerID := routing.CanonicalSessionPeerID( + input.Channel, + peer.ID, + input.SessionPolicy.DMScope, + input.SessionPolicy.IdentityLinks, + ) + if peerID == "" { + return scope + } + scope.Dimensions = []string{"sender"} + scope.Values = map[string]string{ + "sender": peerID, + } + default: + peerID := strings.ToLower(strings.TrimSpace(peer.ID)) + if peerID == "" { + peerID = "unknown" + } + scope.Dimensions = []string{"chat"} + scope.Values = map[string]string{ + "chat": fmt.Sprintf("%s:%s", peerKind, peerID), + } + } + + return scope +} diff --git a/pkg/session/allocator_test.go b/pkg/session/allocator_test.go index a6e84e09d..5eb442e98 100644 --- a/pkg/session/allocator_test.go +++ b/pkg/session/allocator_test.go @@ -26,6 +26,15 @@ func TestAllocateRouteSession_PerPeerDM(t *testing.T) { if allocation.MainSessionKey != "agent:main:main" { t.Fatalf("MainSessionKey = %q, want %q", allocation.MainSessionKey, "agent:main:main") } + if allocation.Scope.Version != ScopeVersionV1 { + t.Fatalf("Scope.Version = %d, want %d", allocation.Scope.Version, ScopeVersionV1) + } + if len(allocation.Scope.Dimensions) != 1 || allocation.Scope.Dimensions[0] != "sender" { + t.Fatalf("Scope.Dimensions = %v, want [sender]", allocation.Scope.Dimensions) + } + if allocation.Scope.Values["sender"] != "user123" { + t.Fatalf("Scope.Values[sender] = %q, want user123", allocation.Scope.Values["sender"]) + } } func TestAllocateRouteSession_GroupPeer(t *testing.T) { @@ -48,4 +57,10 @@ func TestAllocateRouteSession_GroupPeer(t *testing.T) { if allocation.MainSessionKey != "agent:main:main" { t.Fatalf("MainSessionKey = %q, want %q", allocation.MainSessionKey, "agent:main:main") } + if len(allocation.Scope.Dimensions) != 1 || allocation.Scope.Dimensions[0] != "chat" { + t.Fatalf("Scope.Dimensions = %v, want [chat]", allocation.Scope.Dimensions) + } + if allocation.Scope.Values["chat"] != "channel:c001" { + t.Fatalf("Scope.Values[chat] = %q, want channel:c001", allocation.Scope.Values["chat"]) + } } diff --git a/pkg/session/scope.go b/pkg/session/scope.go new file mode 100644 index 000000000..efb026ea3 --- /dev/null +++ b/pkg/session/scope.go @@ -0,0 +1,32 @@ +package session + +// ScopeVersionV1 is the first structured session-scope schema version. +const ScopeVersionV1 = 1 + +// SessionScope describes the semantic session partition selected for a turn. +type SessionScope struct { + Version int `json:"version"` + AgentID string `json:"agent_id"` + Channel string `json:"channel"` + Account string `json:"account"` + Dimensions []string `json:"dimensions"` + Values map[string]string `json:"values"` +} + +// CloneScope returns a deep copy of scope. +func CloneScope(scope *SessionScope) *SessionScope { + if scope == nil { + return nil + } + cloned := *scope + if len(scope.Dimensions) > 0 { + cloned.Dimensions = append([]string(nil), scope.Dimensions...) + } + if len(scope.Values) > 0 { + cloned.Values = make(map[string]string, len(scope.Values)) + for key, value := range scope.Values { + cloned.Values[key] = value + } + } + return &cloned +}