mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
refactor(context): carry route and scope through runtime
This commit is contained in:
@@ -10,6 +10,8 @@ import (
|
||||
"github.com/sipeed/picoclaw/pkg/bus"
|
||||
"github.com/sipeed/picoclaw/pkg/config"
|
||||
"github.com/sipeed/picoclaw/pkg/providers"
|
||||
"github.com/sipeed/picoclaw/pkg/routing"
|
||||
"github.com/sipeed/picoclaw/pkg/session"
|
||||
"github.com/sipeed/picoclaw/pkg/tools"
|
||||
)
|
||||
|
||||
@@ -142,6 +144,25 @@ func TestAgentLoop_EmitsMinimalTurnEvents(t *testing.T) {
|
||||
ChatType: "direct",
|
||||
SenderID: "tester",
|
||||
},
|
||||
RouteResult: &routing.ResolvedRoute{
|
||||
AgentID: "main",
|
||||
Channel: "cli",
|
||||
AccountID: routing.DefaultAccountID,
|
||||
SessionPolicy: routing.SessionPolicy{
|
||||
DMScope: routing.DMScopePerPeer,
|
||||
},
|
||||
MatchedBy: "default",
|
||||
},
|
||||
SessionScope: &session.SessionScope{
|
||||
Version: session.ScopeVersionV1,
|
||||
AgentID: "main",
|
||||
Channel: "cli",
|
||||
Account: routing.DefaultAccountID,
|
||||
Dimensions: []string{"sender"},
|
||||
Values: map[string]string{
|
||||
"sender": "tester",
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("runAgentLoop failed: %v", err)
|
||||
@@ -182,11 +203,17 @@ func TestAgentLoop_EmitsMinimalTurnEvents(t *testing.T) {
|
||||
if evt.Meta.SessionKey != "session-1" {
|
||||
t.Fatalf("event %d has session key %q, want session-1", i, evt.Meta.SessionKey)
|
||||
}
|
||||
if evt.Meta.Context == nil || evt.Meta.Context.Inbound == nil {
|
||||
if evt.Context == nil || evt.Context.Inbound == nil {
|
||||
t.Fatalf("event %d missing inbound turn context", i)
|
||||
}
|
||||
if evt.Meta.Context.Inbound.Channel != "cli" || evt.Meta.Context.Inbound.SenderID != "tester" {
|
||||
t.Fatalf("event %d inbound context = %+v", i, evt.Meta.Context.Inbound)
|
||||
if evt.Context.Inbound.Channel != "cli" || evt.Context.Inbound.SenderID != "tester" {
|
||||
t.Fatalf("event %d inbound context = %+v", i, evt.Context.Inbound)
|
||||
}
|
||||
if evt.Context.Route == nil || evt.Context.Route.AgentID != "main" {
|
||||
t.Fatalf("event %d missing route context: %+v", i, evt.Context.Route)
|
||||
}
|
||||
if evt.Context.Scope == nil || evt.Context.Scope.Values["sender"] != "tester" {
|
||||
t.Fatalf("event %d missing session scope: %+v", i, evt.Context.Scope)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+2
-1
@@ -86,6 +86,7 @@ type Event struct {
|
||||
Kind EventKind
|
||||
Time time.Time
|
||||
Meta EventMeta
|
||||
Context *TurnContext
|
||||
Payload any
|
||||
}
|
||||
|
||||
@@ -98,7 +99,7 @@ type EventMeta struct {
|
||||
Iteration int
|
||||
TracePath string
|
||||
Source string
|
||||
Context *TurnContext `json:"context,omitempty"`
|
||||
turnContext *TurnContext
|
||||
}
|
||||
|
||||
// TurnEndStatus describes the terminal state of a turn.
|
||||
|
||||
@@ -89,6 +89,7 @@ type ToolApprover interface {
|
||||
|
||||
type LLMHookRequest struct {
|
||||
Meta EventMeta `json:"meta"`
|
||||
Context *TurnContext `json:"context,omitempty"`
|
||||
Model string `json:"model"`
|
||||
Messages []providers.Message `json:"messages,omitempty"`
|
||||
Tools []providers.ToolDefinition `json:"tools,omitempty"`
|
||||
@@ -104,6 +105,7 @@ func (r *LLMHookRequest) Clone() *LLMHookRequest {
|
||||
}
|
||||
cloned := *r
|
||||
cloned.Meta = cloneEventMeta(r.Meta)
|
||||
cloned.Context = cloneTurnContext(r.Context)
|
||||
cloned.Messages = cloneProviderMessages(r.Messages)
|
||||
cloned.Tools = cloneToolDefinitions(r.Tools)
|
||||
cloned.Options = cloneStringAnyMap(r.Options)
|
||||
@@ -112,6 +114,7 @@ func (r *LLMHookRequest) Clone() *LLMHookRequest {
|
||||
|
||||
type LLMHookResponse struct {
|
||||
Meta EventMeta `json:"meta"`
|
||||
Context *TurnContext `json:"context,omitempty"`
|
||||
Model string `json:"model"`
|
||||
Response *providers.LLMResponse `json:"response,omitempty"`
|
||||
Channel string `json:"channel,omitempty"`
|
||||
@@ -124,12 +127,14 @@ func (r *LLMHookResponse) Clone() *LLMHookResponse {
|
||||
}
|
||||
cloned := *r
|
||||
cloned.Meta = cloneEventMeta(r.Meta)
|
||||
cloned.Context = cloneTurnContext(r.Context)
|
||||
cloned.Response = cloneLLMResponse(r.Response)
|
||||
return &cloned
|
||||
}
|
||||
|
||||
type ToolCallHookRequest struct {
|
||||
Meta EventMeta `json:"meta"`
|
||||
Context *TurnContext `json:"context,omitempty"`
|
||||
Tool string `json:"tool"`
|
||||
Arguments map[string]any `json:"arguments,omitempty"`
|
||||
Channel string `json:"channel,omitempty"`
|
||||
@@ -142,12 +147,14 @@ func (r *ToolCallHookRequest) Clone() *ToolCallHookRequest {
|
||||
}
|
||||
cloned := *r
|
||||
cloned.Meta = cloneEventMeta(r.Meta)
|
||||
cloned.Context = cloneTurnContext(r.Context)
|
||||
cloned.Arguments = cloneStringAnyMap(r.Arguments)
|
||||
return &cloned
|
||||
}
|
||||
|
||||
type ToolApprovalRequest struct {
|
||||
Meta EventMeta `json:"meta"`
|
||||
Context *TurnContext `json:"context,omitempty"`
|
||||
Tool string `json:"tool"`
|
||||
Arguments map[string]any `json:"arguments,omitempty"`
|
||||
Channel string `json:"channel,omitempty"`
|
||||
@@ -160,12 +167,14 @@ func (r *ToolApprovalRequest) Clone() *ToolApprovalRequest {
|
||||
}
|
||||
cloned := *r
|
||||
cloned.Meta = cloneEventMeta(r.Meta)
|
||||
cloned.Context = cloneTurnContext(r.Context)
|
||||
cloned.Arguments = cloneStringAnyMap(r.Arguments)
|
||||
return &cloned
|
||||
}
|
||||
|
||||
type ToolResultHookResponse struct {
|
||||
Meta EventMeta `json:"meta"`
|
||||
Context *TurnContext `json:"context,omitempty"`
|
||||
Tool string `json:"tool"`
|
||||
Arguments map[string]any `json:"arguments,omitempty"`
|
||||
Result *tools.ToolResult `json:"result,omitempty"`
|
||||
@@ -180,6 +189,7 @@ func (r *ToolResultHookResponse) Clone() *ToolResultHookResponse {
|
||||
}
|
||||
cloned := *r
|
||||
cloned.Meta = cloneEventMeta(r.Meta)
|
||||
cloned.Context = cloneTurnContext(r.Context)
|
||||
cloned.Arguments = cloneStringAnyMap(r.Arguments)
|
||||
cloned.Result = cloneToolResult(r.Result)
|
||||
return &cloned
|
||||
|
||||
+33
-3
@@ -10,6 +10,8 @@ import (
|
||||
"github.com/sipeed/picoclaw/pkg/bus"
|
||||
"github.com/sipeed/picoclaw/pkg/config"
|
||||
"github.com/sipeed/picoclaw/pkg/providers"
|
||||
"github.com/sipeed/picoclaw/pkg/routing"
|
||||
"github.com/sipeed/picoclaw/pkg/session"
|
||||
"github.com/sipeed/picoclaw/pkg/tools"
|
||||
)
|
||||
|
||||
@@ -124,8 +126,8 @@ func (h *llmObserverHook) BeforeLLM(
|
||||
ctx context.Context,
|
||||
req *LLMHookRequest,
|
||||
) (*LLMHookRequest, HookDecision, error) {
|
||||
if req.Meta.Context != nil {
|
||||
h.lastInbound = cloneInboundContext(req.Meta.Context.Inbound)
|
||||
if req.Context != nil {
|
||||
h.lastInbound = cloneInboundContext(req.Context.Inbound)
|
||||
}
|
||||
next := req.Clone()
|
||||
next.Model = "hook-model"
|
||||
@@ -165,6 +167,25 @@ func TestAgentLoop_Hooks_ObserverAndLLMInterceptor(t *testing.T) {
|
||||
ChatType: "direct",
|
||||
SenderID: "hook-user",
|
||||
},
|
||||
RouteResult: &routing.ResolvedRoute{
|
||||
AgentID: "main",
|
||||
Channel: "cli",
|
||||
AccountID: routing.DefaultAccountID,
|
||||
SessionPolicy: routing.SessionPolicy{
|
||||
DMScope: routing.DMScopePerPeer,
|
||||
},
|
||||
MatchedBy: "default",
|
||||
},
|
||||
SessionScope: &session.SessionScope{
|
||||
Version: session.ScopeVersionV1,
|
||||
AgentID: "main",
|
||||
Channel: "cli",
|
||||
Account: routing.DefaultAccountID,
|
||||
Dimensions: []string{"sender"},
|
||||
Values: map[string]string{
|
||||
"sender": "hook-user",
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("runAgentLoop failed: %v", err)
|
||||
@@ -185,15 +206,24 @@ func TestAgentLoop_Hooks_ObserverAndLLMInterceptor(t *testing.T) {
|
||||
if hook.lastInbound.Channel != "cli" || hook.lastInbound.SenderID != "hook-user" {
|
||||
t.Fatalf("hook inbound context = %+v", hook.lastInbound)
|
||||
}
|
||||
if hook.lastInbound != nil && hook.lastInbound.ChatID != "direct" {
|
||||
t.Fatalf("hook inbound chat ID = %q, want direct", hook.lastInbound.ChatID)
|
||||
}
|
||||
|
||||
select {
|
||||
case evt := <-hook.eventCh:
|
||||
if evt.Kind != EventKindTurnEnd {
|
||||
t.Fatalf("expected turn end event, got %v", evt.Kind)
|
||||
}
|
||||
if evt.Meta.Context == nil || evt.Meta.Context.Inbound == nil {
|
||||
if evt.Context == nil || evt.Context.Inbound == nil {
|
||||
t.Fatal("expected observer event to carry inbound context")
|
||||
}
|
||||
if evt.Context.Route == nil || evt.Context.Route.AgentID != "main" {
|
||||
t.Fatalf("expected observer event to carry route context, got %+v", evt.Context.Route)
|
||||
}
|
||||
if evt.Context.Scope == nil || evt.Context.Scope.Values["sender"] != "hook-user" {
|
||||
t.Fatalf("expected observer event to carry session scope, got %+v", evt.Context.Scope)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("timed out waiting for hook observer event")
|
||||
}
|
||||
|
||||
+95
-48
@@ -73,25 +73,27 @@ type AgentLoop struct {
|
||||
|
||||
// processOptions configures how a message is processed
|
||||
type processOptions struct {
|
||||
SessionKey string // Session identifier for history/context
|
||||
Channel string // Target channel for tool execution
|
||||
ChatID string // Target chat ID for tool execution
|
||||
MessageID string // Current inbound platform message ID
|
||||
ReplyToMessageID string // Current inbound reply target message ID
|
||||
SenderID string // Current sender ID for dynamic context
|
||||
SenderDisplayName string // Current sender display name for dynamic context
|
||||
UserMessage string // User message content (may include prefix)
|
||||
ForcedSkills []string // Skills explicitly requested for this message
|
||||
SystemPromptOverride string // Override the default system prompt (Used by SubTurns)
|
||||
Media []string // media:// refs from inbound message
|
||||
InitialSteeringMessages []providers.Message // Steering messages from refactor/agent
|
||||
DefaultResponse string // Response when LLM returns empty
|
||||
EnableSummary bool // Whether to trigger summarization
|
||||
SendResponse bool // Whether to send response via bus
|
||||
SuppressToolFeedback bool // Whether to suppress inline tool feedback messages
|
||||
NoHistory bool // If true, don't load session history (for heartbeat)
|
||||
SkipInitialSteeringPoll bool // If true, skip the steering poll at loop start (used by Continue)
|
||||
InboundContext *bus.InboundContext // Normalized inbound facts for events/hooks
|
||||
SessionKey string // Session identifier for history/context
|
||||
Channel string // Target channel for tool execution
|
||||
ChatID string // Target chat ID for tool execution
|
||||
MessageID string // Current inbound platform message ID
|
||||
ReplyToMessageID string // Current inbound reply target message ID
|
||||
SenderID string // Current sender ID for dynamic context
|
||||
SenderDisplayName string // Current sender display name for dynamic context
|
||||
UserMessage string // User message content (may include prefix)
|
||||
ForcedSkills []string // Skills explicitly requested for this message
|
||||
SystemPromptOverride string // Override the default system prompt (Used by SubTurns)
|
||||
Media []string // media:// refs from inbound message
|
||||
InitialSteeringMessages []providers.Message // Steering messages from refactor/agent
|
||||
DefaultResponse string // Response when LLM returns empty
|
||||
EnableSummary bool // Whether to trigger summarization
|
||||
SendResponse bool // Whether to send response via bus
|
||||
SuppressToolFeedback bool // Whether to suppress inline tool feedback messages
|
||||
NoHistory bool // If true, don't load session history (for heartbeat)
|
||||
SkipInitialSteeringPoll bool // If true, skip the steering poll at loop start (used by Continue)
|
||||
InboundContext *bus.InboundContext // Normalized inbound facts for events/hooks
|
||||
RouteResult *routing.ResolvedRoute // Route decision snapshot for events/hooks
|
||||
SessionScope *session.SessionScope // Session scope snapshot for events/hooks
|
||||
}
|
||||
|
||||
type continuationTarget struct {
|
||||
@@ -705,6 +707,45 @@ func (al *AgentLoop) Close() {
|
||||
}
|
||||
}
|
||||
|
||||
func outboundContextFromInbound(
|
||||
inbound *bus.InboundContext,
|
||||
channel, chatID, replyToMessageID string,
|
||||
) bus.InboundContext {
|
||||
if inbound == nil {
|
||||
return bus.ContextFromLegacyOutbound(bus.OutboundMessage{
|
||||
Channel: channel,
|
||||
ChatID: chatID,
|
||||
ReplyToMessageID: replyToMessageID,
|
||||
})
|
||||
}
|
||||
|
||||
outboundCtx := *cloneInboundContext(inbound)
|
||||
if outboundCtx.Channel == "" {
|
||||
outboundCtx.Channel = channel
|
||||
}
|
||||
if outboundCtx.ChatID == "" {
|
||||
outboundCtx.ChatID = chatID
|
||||
}
|
||||
if outboundCtx.ReplyToMessageID == "" {
|
||||
outboundCtx.ReplyToMessageID = replyToMessageID
|
||||
}
|
||||
return outboundCtx
|
||||
}
|
||||
|
||||
func outboundMessageForTurn(ts *turnState, content string) bus.OutboundMessage {
|
||||
return bus.OutboundMessage{
|
||||
Channel: ts.channel,
|
||||
ChatID: ts.chatID,
|
||||
Context: outboundContextFromInbound(
|
||||
ts.opts.InboundContext,
|
||||
ts.channel,
|
||||
ts.chatID,
|
||||
ts.opts.ReplyToMessageID,
|
||||
),
|
||||
Content: content,
|
||||
}
|
||||
}
|
||||
|
||||
// MountHook registers an in-process hook on the agent loop.
|
||||
func (al *AgentLoop) MountHook(reg HookRegistration) error {
|
||||
if al == nil || al.hooks == nil {
|
||||
@@ -766,20 +807,22 @@ func (al *AgentLoop) newTurnEventScope(agentID, sessionKey string, turnCtx *Turn
|
||||
|
||||
func (ts turnEventScope) meta(iteration int, source, tracePath string) EventMeta {
|
||||
return EventMeta{
|
||||
AgentID: ts.agentID,
|
||||
TurnID: ts.turnID,
|
||||
SessionKey: ts.sessionKey,
|
||||
Iteration: iteration,
|
||||
Source: source,
|
||||
TracePath: tracePath,
|
||||
Context: cloneTurnContext(ts.context),
|
||||
AgentID: ts.agentID,
|
||||
TurnID: ts.turnID,
|
||||
SessionKey: ts.sessionKey,
|
||||
Iteration: iteration,
|
||||
Source: source,
|
||||
TracePath: tracePath,
|
||||
turnContext: cloneTurnContext(ts.context),
|
||||
}
|
||||
}
|
||||
|
||||
func (al *AgentLoop) emitEvent(kind EventKind, meta EventMeta, payload any) {
|
||||
clonedMeta := cloneEventMeta(meta)
|
||||
evt := Event{
|
||||
Kind: kind,
|
||||
Meta: cloneEventMeta(meta),
|
||||
Meta: clonedMeta,
|
||||
Context: cloneTurnContext(clonedMeta.turnContext),
|
||||
Payload: payload,
|
||||
}
|
||||
|
||||
@@ -1361,6 +1404,8 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
|
||||
EnableSummary: true,
|
||||
SendResponse: false,
|
||||
InboundContext: cloneInboundContext(&msg.Context),
|
||||
RouteResult: cloneResolvedRoute(&route),
|
||||
SessionScope: session.CloneScope(&allocation.Scope),
|
||||
}
|
||||
|
||||
// context-dependent commands check their own Runtime fields and report
|
||||
@@ -1540,7 +1585,11 @@ func (al *AgentLoop) runAgentLoop(
|
||||
}
|
||||
}
|
||||
|
||||
turnScope := al.newTurnEventScope(agent.ID, opts.SessionKey, newTurnContext(opts.InboundContext))
|
||||
turnScope := al.newTurnEventScope(
|
||||
agent.ID,
|
||||
opts.SessionKey,
|
||||
newTurnContext(opts.InboundContext, opts.RouteResult, opts.SessionScope),
|
||||
)
|
||||
ts := newTurnState(agent, opts, turnScope)
|
||||
result, err := al.runTurn(ctx, ts)
|
||||
if err != nil {
|
||||
@@ -1564,6 +1613,12 @@ func (al *AgentLoop) runAgentLoop(
|
||||
al.bus.PublishOutbound(ctx, bus.OutboundMessage{
|
||||
Channel: opts.Channel,
|
||||
ChatID: opts.ChatID,
|
||||
Context: outboundContextFromInbound(
|
||||
opts.InboundContext,
|
||||
opts.Channel,
|
||||
opts.ChatID,
|
||||
opts.ReplyToMessageID,
|
||||
),
|
||||
Content: result.finalContent,
|
||||
})
|
||||
}
|
||||
@@ -1897,6 +1952,7 @@ turnLoop:
|
||||
if al.hooks != nil {
|
||||
llmReq, decision := al.hooks.BeforeLLM(turnCtx, &LLMHookRequest{
|
||||
Meta: ts.eventMeta("runTurn", "turn.llm.request"),
|
||||
Context: cloneTurnContext(ts.turnCtx),
|
||||
Model: llmModel,
|
||||
Messages: callMessages,
|
||||
Tools: providerToolDefs,
|
||||
@@ -2069,11 +2125,10 @@ turnLoop:
|
||||
)
|
||||
|
||||
if retry == 0 && !constants.IsInternalChannel(ts.channel) {
|
||||
al.bus.PublishOutbound(ctx, bus.OutboundMessage{
|
||||
Channel: ts.channel,
|
||||
ChatID: ts.chatID,
|
||||
Content: "Context window exceeded. Compressing history and retrying...",
|
||||
})
|
||||
al.bus.PublishOutbound(ctx, outboundMessageForTurn(
|
||||
ts,
|
||||
"Context window exceeded. Compressing history and retrying...",
|
||||
))
|
||||
}
|
||||
|
||||
if compression, ok := al.forceCompression(ts.agent, ts.sessionKey); ok {
|
||||
@@ -2128,6 +2183,7 @@ turnLoop:
|
||||
if al.hooks != nil {
|
||||
llmResp, decision := al.hooks.AfterLLM(turnCtx, &LLMHookResponse{
|
||||
Meta: ts.eventMeta("runTurn", "turn.llm.response"),
|
||||
Context: cloneTurnContext(ts.turnCtx),
|
||||
Model: llmModel,
|
||||
Response: response,
|
||||
Channel: ts.channel,
|
||||
@@ -2280,6 +2336,7 @@ turnLoop:
|
||||
if al.hooks != nil {
|
||||
toolReq, decision := al.hooks.BeforeTool(turnCtx, &ToolCallHookRequest{
|
||||
Meta: ts.eventMeta("runTurn", "turn.tool.before"),
|
||||
Context: cloneTurnContext(ts.turnCtx),
|
||||
Tool: toolName,
|
||||
Arguments: toolArgs,
|
||||
Channel: ts.channel,
|
||||
@@ -2326,6 +2383,7 @@ turnLoop:
|
||||
if al.hooks != nil {
|
||||
approval := al.hooks.ApproveTool(turnCtx, &ToolApprovalRequest{
|
||||
Meta: ts.eventMeta("runTurn", "turn.tool.approve"),
|
||||
Context: cloneTurnContext(ts.turnCtx),
|
||||
Tool: toolName,
|
||||
Arguments: toolArgs,
|
||||
Channel: ts.channel,
|
||||
@@ -2383,11 +2441,7 @@ turnLoop:
|
||||
)
|
||||
feedbackMsg := fmt.Sprintf("\U0001f527 `%s`\n```\n%s\n```", tc.Name, feedbackPreview)
|
||||
fbCtx, fbCancel := context.WithTimeout(turnCtx, 3*time.Second)
|
||||
_ = al.bus.PublishOutbound(fbCtx, bus.OutboundMessage{
|
||||
Channel: ts.channel,
|
||||
ChatID: ts.chatID,
|
||||
Content: feedbackMsg,
|
||||
})
|
||||
_ = al.bus.PublishOutbound(fbCtx, outboundMessageForTurn(ts, feedbackMsg))
|
||||
fbCancel()
|
||||
}
|
||||
|
||||
@@ -2400,11 +2454,7 @@ turnLoop:
|
||||
if !result.Silent && result.ForUser != "" {
|
||||
outCtx, outCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer outCancel()
|
||||
_ = al.bus.PublishOutbound(outCtx, bus.OutboundMessage{
|
||||
Channel: ts.channel,
|
||||
ChatID: ts.chatID,
|
||||
Content: result.ForUser,
|
||||
})
|
||||
_ = al.bus.PublishOutbound(outCtx, outboundMessageForTurn(ts, result.ForUser))
|
||||
}
|
||||
|
||||
// Determine content for the agent loop (ForLLM or error).
|
||||
@@ -2469,6 +2519,7 @@ turnLoop:
|
||||
if al.hooks != nil {
|
||||
toolResp, decision := al.hooks.AfterTool(turnCtx, &ToolResultHookResponse{
|
||||
Meta: ts.eventMeta("runTurn", "turn.tool.after"),
|
||||
Context: cloneTurnContext(ts.turnCtx),
|
||||
Tool: toolName,
|
||||
Arguments: toolArgs,
|
||||
Result: toolResult,
|
||||
@@ -2545,11 +2596,7 @@ turnLoop:
|
||||
}
|
||||
|
||||
if !toolResult.Silent && toolResult.ForUser != "" && ts.opts.SendResponse {
|
||||
al.bus.PublishOutbound(ctx, bus.OutboundMessage{
|
||||
Channel: ts.channel,
|
||||
ChatID: ts.chatID,
|
||||
Content: toolResult.ForUser,
|
||||
})
|
||||
al.bus.PublishOutbound(ctx, outboundMessageForTurn(ts, toolResult.ForUser))
|
||||
logger.DebugCF("agent", "Sent tool result to user",
|
||||
map[string]any{
|
||||
"tool": toolName,
|
||||
|
||||
@@ -370,7 +370,11 @@ func spawnSubTurn(
|
||||
}
|
||||
|
||||
// Create event scope for the child turn
|
||||
scope := al.newTurnEventScope(agent.ID, childID, newTurnContext(opts.InboundContext))
|
||||
scope := al.newTurnEventScope(
|
||||
agent.ID,
|
||||
childID,
|
||||
newTurnContext(opts.InboundContext, opts.RouteResult, opts.SessionScope),
|
||||
)
|
||||
|
||||
// Create child turnState using the new API
|
||||
childTS := newTurnState(&agent, opts, scope)
|
||||
|
||||
+7
-7
@@ -303,13 +303,13 @@ func (ts *turnState) hardAbortRequested() bool {
|
||||
func (ts *turnState) eventMeta(source, tracePath string) EventMeta {
|
||||
snap := ts.snapshot()
|
||||
return EventMeta{
|
||||
AgentID: snap.AgentID,
|
||||
TurnID: snap.TurnID,
|
||||
SessionKey: snap.SessionKey,
|
||||
Iteration: snap.Iteration,
|
||||
Source: source,
|
||||
TracePath: tracePath,
|
||||
Context: cloneTurnContext(ts.turnCtx),
|
||||
AgentID: snap.AgentID,
|
||||
TurnID: snap.TurnID,
|
||||
SessionKey: snap.SessionKey,
|
||||
Iteration: snap.Iteration,
|
||||
Source: source,
|
||||
TracePath: tracePath,
|
||||
turnContext: cloneTurnContext(ts.turnCtx),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,19 +1,31 @@
|
||||
package agent
|
||||
|
||||
import "github.com/sipeed/picoclaw/pkg/bus"
|
||||
import (
|
||||
"github.com/sipeed/picoclaw/pkg/bus"
|
||||
"github.com/sipeed/picoclaw/pkg/routing"
|
||||
"github.com/sipeed/picoclaw/pkg/session"
|
||||
)
|
||||
|
||||
// TurnContext carries normalized turn-scoped facts that can be shared across
|
||||
// events, hooks, and other runtime observers without re-parsing legacy fields.
|
||||
type TurnContext struct {
|
||||
Inbound *bus.InboundContext `json:"inbound,omitempty"`
|
||||
Inbound *bus.InboundContext `json:"inbound,omitempty"`
|
||||
Route *routing.ResolvedRoute `json:"route,omitempty"`
|
||||
Scope *session.SessionScope `json:"scope,omitempty"`
|
||||
}
|
||||
|
||||
func newTurnContext(inbound *bus.InboundContext) *TurnContext {
|
||||
if inbound == nil {
|
||||
func newTurnContext(
|
||||
inbound *bus.InboundContext,
|
||||
route *routing.ResolvedRoute,
|
||||
scope *session.SessionScope,
|
||||
) *TurnContext {
|
||||
if inbound == nil && route == nil && scope == nil {
|
||||
return nil
|
||||
}
|
||||
return &TurnContext{
|
||||
Inbound: cloneInboundContext(inbound),
|
||||
Route: cloneResolvedRoute(route),
|
||||
Scope: session.CloneScope(scope),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,6 +35,8 @@ func cloneTurnContext(ctx *TurnContext) *TurnContext {
|
||||
}
|
||||
cloned := *ctx
|
||||
cloned.Inbound = cloneInboundContext(ctx.Inbound)
|
||||
cloned.Route = cloneResolvedRoute(ctx.Route)
|
||||
cloned.Scope = session.CloneScope(ctx.Scope)
|
||||
return &cloned
|
||||
}
|
||||
|
||||
@@ -48,6 +62,31 @@ func cloneStringMap(src map[string]string) map[string]string {
|
||||
}
|
||||
|
||||
func cloneEventMeta(meta EventMeta) EventMeta {
|
||||
meta.Context = cloneTurnContext(meta.Context)
|
||||
meta.turnContext = cloneTurnContext(meta.turnContext)
|
||||
return meta
|
||||
}
|
||||
|
||||
func cloneResolvedRoute(route *routing.ResolvedRoute) *routing.ResolvedRoute {
|
||||
if route == nil {
|
||||
return nil
|
||||
}
|
||||
cloned := *route
|
||||
cloned.SessionPolicy = routing.SessionPolicy{
|
||||
DMScope: route.SessionPolicy.DMScope,
|
||||
IdentityLinks: cloneIdentityLinks(route.SessionPolicy.IdentityLinks),
|
||||
}
|
||||
return &cloned
|
||||
}
|
||||
|
||||
func cloneIdentityLinks(src map[string][]string) map[string][]string {
|
||||
if len(src) == 0 {
|
||||
return nil
|
||||
}
|
||||
cloned := make(map[string][]string, len(src))
|
||||
for canonical, ids := range src {
|
||||
dup := make([]string, len(ids))
|
||||
copy(dup, ids)
|
||||
cloned[canonical] = dup
|
||||
}
|
||||
return cloned
|
||||
}
|
||||
|
||||
@@ -89,6 +89,7 @@ func (mb *MessageBus) InboundChan() <-chan InboundMessage {
|
||||
}
|
||||
|
||||
func (mb *MessageBus) PublishOutbound(ctx context.Context, msg OutboundMessage) error {
|
||||
msg = NormalizeOutboundMessage(msg)
|
||||
return publish(ctx, mb, mb.outbound, msg)
|
||||
}
|
||||
|
||||
@@ -97,6 +98,7 @@ func (mb *MessageBus) OutboundChan() <-chan OutboundMessage {
|
||||
}
|
||||
|
||||
func (mb *MessageBus) PublishOutboundMedia(ctx context.Context, msg OutboundMediaMessage) error {
|
||||
msg = NormalizeOutboundMediaMessage(msg)
|
||||
return publish(ctx, mb, mb.outboundMedia, msg)
|
||||
}
|
||||
|
||||
|
||||
@@ -181,6 +181,66 @@ func TestPublishOutboundSubscribe(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestPublishOutbound_MirrorsContextToLegacyFields(t *testing.T) {
|
||||
mb := NewMessageBus()
|
||||
defer mb.Close()
|
||||
|
||||
msg := OutboundMessage{
|
||||
Context: InboundContext{
|
||||
Channel: "telegram",
|
||||
ChatID: "chat-42",
|
||||
ReplyToMessageID: "msg-9",
|
||||
},
|
||||
Content: "reply",
|
||||
}
|
||||
|
||||
if err := mb.PublishOutbound(context.Background(), msg); err != nil {
|
||||
t.Fatalf("PublishOutbound failed: %v", err)
|
||||
}
|
||||
|
||||
got := <-mb.OutboundChan()
|
||||
if got.Channel != "telegram" {
|
||||
t.Fatalf("expected legacy channel telegram, got %q", got.Channel)
|
||||
}
|
||||
if got.ChatID != "chat-42" {
|
||||
t.Fatalf("expected legacy chat ID chat-42, got %q", got.ChatID)
|
||||
}
|
||||
if got.ReplyToMessageID != "msg-9" {
|
||||
t.Fatalf("expected mirrored reply_to_message_id msg-9, got %q", got.ReplyToMessageID)
|
||||
}
|
||||
if got.Context.Channel != "telegram" || got.Context.ChatID != "chat-42" {
|
||||
t.Fatalf("unexpected outbound context: %+v", got.Context)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPublishOutboundMedia_MirrorsContextToLegacyFields(t *testing.T) {
|
||||
mb := NewMessageBus()
|
||||
defer mb.Close()
|
||||
|
||||
msg := OutboundMediaMessage{
|
||||
Context: InboundContext{
|
||||
Channel: "slack",
|
||||
ChatID: "C001",
|
||||
},
|
||||
Parts: []MediaPart{{Type: "image", Ref: "media://1"}},
|
||||
}
|
||||
|
||||
if err := mb.PublishOutboundMedia(context.Background(), msg); err != nil {
|
||||
t.Fatalf("PublishOutboundMedia failed: %v", err)
|
||||
}
|
||||
|
||||
got := <-mb.OutboundMediaChan()
|
||||
if got.Channel != "slack" {
|
||||
t.Fatalf("expected legacy channel slack, got %q", got.Channel)
|
||||
}
|
||||
if got.ChatID != "C001" {
|
||||
t.Fatalf("expected legacy chat ID C001, got %q", got.ChatID)
|
||||
}
|
||||
if got.Context.Channel != "slack" || got.Context.ChatID != "C001" {
|
||||
t.Fatalf("unexpected outbound media context: %+v", got.Context)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPublishInbound_ContextCancel(t *testing.T) {
|
||||
mb := NewMessageBus()
|
||||
defer mb.Close()
|
||||
|
||||
@@ -0,0 +1,63 @@
|
||||
package bus
|
||||
|
||||
import "strings"
|
||||
|
||||
// ContextFromLegacyOutbound builds a minimal outbound context from the legacy
|
||||
// top-level outbound fields. This keeps older outbound publishers working
|
||||
// while new publishers gradually start carrying the original InboundContext.
|
||||
func ContextFromLegacyOutbound(msg OutboundMessage) InboundContext {
|
||||
return normalizeInboundContext(InboundContext{
|
||||
Channel: strings.TrimSpace(msg.Channel),
|
||||
ChatID: strings.TrimSpace(msg.ChatID),
|
||||
ReplyToMessageID: strings.TrimSpace(msg.ReplyToMessageID),
|
||||
})
|
||||
}
|
||||
|
||||
// ContextFromLegacyOutboundMedia builds a minimal outbound context for media.
|
||||
func ContextFromLegacyOutboundMedia(msg OutboundMediaMessage) InboundContext {
|
||||
return normalizeInboundContext(InboundContext{
|
||||
Channel: strings.TrimSpace(msg.Channel),
|
||||
ChatID: strings.TrimSpace(msg.ChatID),
|
||||
})
|
||||
}
|
||||
|
||||
// NormalizeOutboundMessage ensures Context is present and mirrors legacy
|
||||
// top-level addressing fields from it so older senders keep working.
|
||||
func NormalizeOutboundMessage(msg OutboundMessage) OutboundMessage {
|
||||
if msg.Context.isZero() {
|
||||
msg.Context = ContextFromLegacyOutbound(msg)
|
||||
} else {
|
||||
msg.Context = normalizeInboundContext(msg.Context)
|
||||
}
|
||||
|
||||
if msg.Channel == "" {
|
||||
msg.Channel = msg.Context.Channel
|
||||
}
|
||||
if msg.ChatID == "" {
|
||||
msg.ChatID = msg.Context.ChatID
|
||||
}
|
||||
if msg.ReplyToMessageID == "" {
|
||||
msg.ReplyToMessageID = msg.Context.ReplyToMessageID
|
||||
}
|
||||
|
||||
return msg
|
||||
}
|
||||
|
||||
// NormalizeOutboundMediaMessage ensures media outbound messages also carry a
|
||||
// normalized context while preserving the legacy top-level routing fields.
|
||||
func NormalizeOutboundMediaMessage(msg OutboundMediaMessage) OutboundMediaMessage {
|
||||
if msg.Context.isZero() {
|
||||
msg.Context = ContextFromLegacyOutboundMedia(msg)
|
||||
} else {
|
||||
msg.Context = normalizeInboundContext(msg.Context)
|
||||
}
|
||||
|
||||
if msg.Channel == "" {
|
||||
msg.Channel = msg.Context.Channel
|
||||
}
|
||||
if msg.ChatID == "" {
|
||||
msg.ChatID = msg.Context.ChatID
|
||||
}
|
||||
|
||||
return msg
|
||||
}
|
||||
+9
-7
@@ -58,10 +58,11 @@ type InboundMessage struct {
|
||||
}
|
||||
|
||||
type OutboundMessage struct {
|
||||
Channel string `json:"channel"`
|
||||
ChatID string `json:"chat_id"`
|
||||
Content string `json:"content"`
|
||||
ReplyToMessageID string `json:"reply_to_message_id,omitempty"`
|
||||
Channel string `json:"channel"`
|
||||
ChatID string `json:"chat_id"`
|
||||
Context InboundContext `json:"context"`
|
||||
Content string `json:"content"`
|
||||
ReplyToMessageID string `json:"reply_to_message_id,omitempty"`
|
||||
}
|
||||
|
||||
// MediaPart describes a single media attachment to send.
|
||||
@@ -75,7 +76,8 @@ type MediaPart struct {
|
||||
|
||||
// OutboundMediaMessage carries media attachments from Agent to channels via the bus.
|
||||
type OutboundMediaMessage struct {
|
||||
Channel string `json:"channel"`
|
||||
ChatID string `json:"chat_id"`
|
||||
Parts []MediaPart `json:"parts"`
|
||||
Channel string `json:"channel"`
|
||||
ChatID string `json:"chat_id"`
|
||||
Context InboundContext `json:"context"`
|
||||
Parts []MediaPart `json:"parts"`
|
||||
}
|
||||
|
||||
@@ -1130,6 +1130,8 @@ func (m *Manager) UnregisterChannel(name string) {
|
||||
// delivered (or all retries are exhausted), which preserves ordering when
|
||||
// a subsequent operation depends on the message having been sent.
|
||||
func (m *Manager) SendMessage(ctx context.Context, msg bus.OutboundMessage) error {
|
||||
msg = bus.NormalizeOutboundMessage(msg)
|
||||
|
||||
m.mu.RLock()
|
||||
_, exists := m.channels[msg.Channel]
|
||||
w, wExists := m.workers[msg.Channel]
|
||||
@@ -1163,6 +1165,8 @@ func (m *Manager) SendMessage(ctx context.Context, msg bus.OutboundMessage) erro
|
||||
// retries are exhausted), which preserves ordering when later agent behavior
|
||||
// depends on actual media delivery.
|
||||
func (m *Manager) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
|
||||
msg = bus.NormalizeOutboundMediaMessage(msg)
|
||||
|
||||
m.mu.RLock()
|
||||
_, exists := m.channels[msg.Channel]
|
||||
w, wExists := m.workers[msg.Channel]
|
||||
|
||||
@@ -60,15 +60,7 @@ func BuildAgentPeerSessionKey(params SessionKeyParams) string {
|
||||
if dmScope == "" {
|
||||
dmScope = DMScopeMain
|
||||
}
|
||||
peerID := strings.TrimSpace(peer.ID)
|
||||
|
||||
// Resolve identity links (cross-platform collapse)
|
||||
if dmScope != DMScopeMain && peerID != "" {
|
||||
if linked := resolveLinkedPeerID(params.IdentityLinks, params.Channel, peerID); linked != "" {
|
||||
peerID = linked
|
||||
}
|
||||
}
|
||||
peerID = strings.ToLower(peerID)
|
||||
peerID := CanonicalSessionPeerID(params.Channel, peer.ID, dmScope, params.IdentityLinks)
|
||||
|
||||
switch dmScope {
|
||||
case DMScopePerAccountChannelPeer:
|
||||
@@ -99,6 +91,27 @@ func BuildAgentPeerSessionKey(params SessionKeyParams) string {
|
||||
return fmt.Sprintf("agent:%s:%s:%s:%s", agentID, channel, peerKind, peerID)
|
||||
}
|
||||
|
||||
// CanonicalSessionPeerID applies the current DM session canonicalization rules,
|
||||
// including identity-link collapse when enabled.
|
||||
func CanonicalSessionPeerID(
|
||||
channel, peerID string,
|
||||
dmScope DMScope,
|
||||
identityLinks map[string][]string,
|
||||
) string {
|
||||
normalizedPeerID := strings.TrimSpace(peerID)
|
||||
if normalizedPeerID == "" {
|
||||
return ""
|
||||
}
|
||||
|
||||
if dmScope != DMScopeMain {
|
||||
if linked := resolveLinkedPeerID(identityLinks, channel, normalizedPeerID); linked != "" {
|
||||
normalizedPeerID = linked
|
||||
}
|
||||
}
|
||||
|
||||
return strings.ToLower(normalizedPeerID)
|
||||
}
|
||||
|
||||
// ParseAgentSessionKey extracts agentId and rest from "agent:<agentId>:<rest>".
|
||||
func ParseAgentSessionKey(sessionKey string) *ParsedSessionKey {
|
||||
raw := strings.TrimSpace(sessionKey)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package session
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/sipeed/picoclaw/pkg/routing"
|
||||
@@ -10,6 +11,7 @@ import (
|
||||
// The current implementation intentionally preserves the legacy session-key
|
||||
// layout while moving key construction out of the router.
|
||||
type Allocation struct {
|
||||
Scope SessionScope
|
||||
SessionKey string
|
||||
MainSessionKey string
|
||||
}
|
||||
@@ -27,6 +29,7 @@ type AllocationInput struct {
|
||||
// AllocateRouteSession maps a route decision onto the current legacy
|
||||
// agent-scoped session-key format.
|
||||
func AllocateRouteSession(input AllocationInput) Allocation {
|
||||
scope := buildSessionScope(input)
|
||||
sessionKey := strings.ToLower(routing.BuildAgentPeerSessionKey(routing.SessionKeyParams{
|
||||
AgentID: input.AgentID,
|
||||
Channel: input.Channel,
|
||||
@@ -37,7 +40,58 @@ func AllocateRouteSession(input AllocationInput) Allocation {
|
||||
}))
|
||||
mainSessionKey := strings.ToLower(routing.BuildAgentMainSessionKey(input.AgentID))
|
||||
return Allocation{
|
||||
Scope: scope,
|
||||
SessionKey: sessionKey,
|
||||
MainSessionKey: mainSessionKey,
|
||||
}
|
||||
}
|
||||
|
||||
func buildSessionScope(input AllocationInput) SessionScope {
|
||||
scope := SessionScope{
|
||||
Version: ScopeVersionV1,
|
||||
AgentID: routing.NormalizeAgentID(input.AgentID),
|
||||
Channel: strings.ToLower(strings.TrimSpace(input.Channel)),
|
||||
Account: routing.NormalizeAccountID(input.AccountID),
|
||||
}
|
||||
|
||||
peer := input.Peer
|
||||
if peer == nil {
|
||||
peer = &routing.RoutePeer{Kind: "direct"}
|
||||
}
|
||||
|
||||
peerKind := strings.ToLower(strings.TrimSpace(peer.Kind))
|
||||
if peerKind == "" {
|
||||
peerKind = "direct"
|
||||
}
|
||||
|
||||
switch peerKind {
|
||||
case "direct":
|
||||
if input.SessionPolicy.DMScope == routing.DMScopeMain {
|
||||
return scope
|
||||
}
|
||||
peerID := routing.CanonicalSessionPeerID(
|
||||
input.Channel,
|
||||
peer.ID,
|
||||
input.SessionPolicy.DMScope,
|
||||
input.SessionPolicy.IdentityLinks,
|
||||
)
|
||||
if peerID == "" {
|
||||
return scope
|
||||
}
|
||||
scope.Dimensions = []string{"sender"}
|
||||
scope.Values = map[string]string{
|
||||
"sender": peerID,
|
||||
}
|
||||
default:
|
||||
peerID := strings.ToLower(strings.TrimSpace(peer.ID))
|
||||
if peerID == "" {
|
||||
peerID = "unknown"
|
||||
}
|
||||
scope.Dimensions = []string{"chat"}
|
||||
scope.Values = map[string]string{
|
||||
"chat": fmt.Sprintf("%s:%s", peerKind, peerID),
|
||||
}
|
||||
}
|
||||
|
||||
return scope
|
||||
}
|
||||
|
||||
@@ -26,6 +26,15 @@ func TestAllocateRouteSession_PerPeerDM(t *testing.T) {
|
||||
if allocation.MainSessionKey != "agent:main:main" {
|
||||
t.Fatalf("MainSessionKey = %q, want %q", allocation.MainSessionKey, "agent:main:main")
|
||||
}
|
||||
if allocation.Scope.Version != ScopeVersionV1 {
|
||||
t.Fatalf("Scope.Version = %d, want %d", allocation.Scope.Version, ScopeVersionV1)
|
||||
}
|
||||
if len(allocation.Scope.Dimensions) != 1 || allocation.Scope.Dimensions[0] != "sender" {
|
||||
t.Fatalf("Scope.Dimensions = %v, want [sender]", allocation.Scope.Dimensions)
|
||||
}
|
||||
if allocation.Scope.Values["sender"] != "user123" {
|
||||
t.Fatalf("Scope.Values[sender] = %q, want user123", allocation.Scope.Values["sender"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestAllocateRouteSession_GroupPeer(t *testing.T) {
|
||||
@@ -48,4 +57,10 @@ func TestAllocateRouteSession_GroupPeer(t *testing.T) {
|
||||
if allocation.MainSessionKey != "agent:main:main" {
|
||||
t.Fatalf("MainSessionKey = %q, want %q", allocation.MainSessionKey, "agent:main:main")
|
||||
}
|
||||
if len(allocation.Scope.Dimensions) != 1 || allocation.Scope.Dimensions[0] != "chat" {
|
||||
t.Fatalf("Scope.Dimensions = %v, want [chat]", allocation.Scope.Dimensions)
|
||||
}
|
||||
if allocation.Scope.Values["chat"] != "channel:c001" {
|
||||
t.Fatalf("Scope.Values[chat] = %q, want channel:c001", allocation.Scope.Values["chat"])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,32 @@
|
||||
package session
|
||||
|
||||
// ScopeVersionV1 is the first structured session-scope schema version.
|
||||
const ScopeVersionV1 = 1
|
||||
|
||||
// SessionScope describes the semantic session partition selected for a turn.
|
||||
type SessionScope struct {
|
||||
Version int `json:"version"`
|
||||
AgentID string `json:"agent_id"`
|
||||
Channel string `json:"channel"`
|
||||
Account string `json:"account"`
|
||||
Dimensions []string `json:"dimensions"`
|
||||
Values map[string]string `json:"values"`
|
||||
}
|
||||
|
||||
// CloneScope returns a deep copy of scope.
|
||||
func CloneScope(scope *SessionScope) *SessionScope {
|
||||
if scope == nil {
|
||||
return nil
|
||||
}
|
||||
cloned := *scope
|
||||
if len(scope.Dimensions) > 0 {
|
||||
cloned.Dimensions = append([]string(nil), scope.Dimensions...)
|
||||
}
|
||||
if len(scope.Values) > 0 {
|
||||
cloned.Values = make(map[string]string, len(scope.Values))
|
||||
for key, value := range scope.Values {
|
||||
cloned.Values[key] = value
|
||||
}
|
||||
}
|
||||
return &cloned
|
||||
}
|
||||
Reference in New Issue
Block a user