diff --git a/pkg/agent/eventbus_test.go b/pkg/agent/eventbus_test.go index 19a1ea9eb..8706a2c4e 100644 --- a/pkg/agent/eventbus_test.go +++ b/pkg/agent/eventbus_test.go @@ -136,6 +136,12 @@ func TestAgentLoop_EmitsMinimalTurnEvents(t *testing.T) { DefaultResponse: defaultResponse, EnableSummary: false, SendResponse: false, + InboundContext: &bus.InboundContext{ + Channel: "cli", + ChatID: "direct", + ChatType: "direct", + SenderID: "tester", + }, }) if err != nil { t.Fatalf("runAgentLoop failed: %v", err) @@ -176,6 +182,12 @@ func TestAgentLoop_EmitsMinimalTurnEvents(t *testing.T) { if evt.Meta.SessionKey != "session-1" { t.Fatalf("event %d has session key %q, want session-1", i, evt.Meta.SessionKey) } + if evt.Meta.Context == nil || evt.Meta.Context.Inbound == nil { + t.Fatalf("event %d missing inbound turn context", i) + } + if evt.Meta.Context.Inbound.Channel != "cli" || evt.Meta.Context.Inbound.SenderID != "tester" { + t.Fatalf("event %d inbound context = %+v", i, evt.Meta.Context.Inbound) + } } startPayload, ok := events[0].Payload.(TurnStartPayload) @@ -472,7 +484,7 @@ func TestAgentLoop_EmitsSessionSummarizeEvent(t *testing.T) { sub := al.SubscribeEvents(16) defer al.UnsubscribeEvents(sub.ID) - turnScope := al.newTurnEventScope(defaultAgent.ID, "session-1") + turnScope := al.newTurnEventScope(defaultAgent.ID, "session-1", nil) al.summarizeSession(defaultAgent, "session-1", turnScope) events := collectEventStream(sub.C) diff --git a/pkg/agent/events.go b/pkg/agent/events.go index f4562b360..fa006b9a5 100644 --- a/pkg/agent/events.go +++ b/pkg/agent/events.go @@ -98,6 +98,7 @@ type EventMeta struct { Iteration int TracePath string Source string + Context *TurnContext `json:"context,omitempty"` } // TurnEndStatus describes the terminal state of a turn. diff --git a/pkg/agent/hooks.go b/pkg/agent/hooks.go index c1ef58ffd..7a5f8c59b 100644 --- a/pkg/agent/hooks.go +++ b/pkg/agent/hooks.go @@ -103,6 +103,7 @@ func (r *LLMHookRequest) Clone() *LLMHookRequest { return nil } cloned := *r + cloned.Meta = cloneEventMeta(r.Meta) cloned.Messages = cloneProviderMessages(r.Messages) cloned.Tools = cloneToolDefinitions(r.Tools) cloned.Options = cloneStringAnyMap(r.Options) @@ -122,6 +123,7 @@ func (r *LLMHookResponse) Clone() *LLMHookResponse { return nil } cloned := *r + cloned.Meta = cloneEventMeta(r.Meta) cloned.Response = cloneLLMResponse(r.Response) return &cloned } @@ -139,6 +141,7 @@ func (r *ToolCallHookRequest) Clone() *ToolCallHookRequest { return nil } cloned := *r + cloned.Meta = cloneEventMeta(r.Meta) cloned.Arguments = cloneStringAnyMap(r.Arguments) return &cloned } @@ -156,6 +159,7 @@ func (r *ToolApprovalRequest) Clone() *ToolApprovalRequest { return nil } cloned := *r + cloned.Meta = cloneEventMeta(r.Meta) cloned.Arguments = cloneStringAnyMap(r.Arguments) return &cloned } @@ -175,6 +179,7 @@ func (r *ToolResultHookResponse) Clone() *ToolResultHookResponse { return nil } cloned := *r + cloned.Meta = cloneEventMeta(r.Meta) cloned.Arguments = cloneStringAnyMap(r.Arguments) cloned.Result = cloneToolResult(r.Result) return &cloned diff --git a/pkg/agent/hooks_test.go b/pkg/agent/hooks_test.go index 49e1b1784..1851090b8 100644 --- a/pkg/agent/hooks_test.go +++ b/pkg/agent/hooks_test.go @@ -106,7 +106,8 @@ func (p *llmHookTestProvider) GetDefaultModel() string { } type llmObserverHook struct { - eventCh chan Event + eventCh chan Event + lastInbound *bus.InboundContext } func (h *llmObserverHook) OnEvent(ctx context.Context, evt Event) error { @@ -123,6 +124,9 @@ func (h *llmObserverHook) BeforeLLM( ctx context.Context, req *LLMHookRequest, ) (*LLMHookRequest, HookDecision, error) { + if req.Meta.Context != nil { + h.lastInbound = cloneInboundContext(req.Meta.Context.Inbound) + } next := req.Clone() next.Model = "hook-model" return next, HookDecision{Action: HookActionModify}, nil @@ -155,6 +159,12 @@ func TestAgentLoop_Hooks_ObserverAndLLMInterceptor(t *testing.T) { DefaultResponse: defaultResponse, EnableSummary: false, SendResponse: false, + InboundContext: &bus.InboundContext{ + Channel: "cli", + ChatID: "direct", + ChatType: "direct", + SenderID: "hook-user", + }, }) if err != nil { t.Fatalf("runAgentLoop failed: %v", err) @@ -169,12 +179,21 @@ func TestAgentLoop_Hooks_ObserverAndLLMInterceptor(t *testing.T) { if lastModel != "hook-model" { t.Fatalf("expected model hook-model, got %q", lastModel) } + if hook.lastInbound == nil { + t.Fatal("expected hook to receive inbound context") + } + if hook.lastInbound.Channel != "cli" || hook.lastInbound.SenderID != "hook-user" { + t.Fatalf("hook inbound context = %+v", hook.lastInbound) + } select { case evt := <-hook.eventCh: if evt.Kind != EventKindTurnEnd { t.Fatalf("expected turn end event, got %v", evt.Kind) } + if evt.Meta.Context == nil || evt.Meta.Context.Inbound == nil { + t.Fatal("expected observer event to carry inbound context") + } case <-time.After(2 * time.Second): t.Fatal("timed out waiting for hook observer event") } diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 39a2e1539..8b388755a 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -91,6 +91,7 @@ type processOptions struct { SuppressToolFeedback bool // Whether to suppress inline tool feedback messages NoHistory bool // If true, don't load session history (for heartbeat) SkipInitialSteeringPoll bool // If true, skip the steering poll at loop start (used by Continue) + InboundContext *bus.InboundContext // Normalized inbound facts for events/hooks } type continuationTarget struct { @@ -750,14 +751,16 @@ type turnEventScope struct { agentID string sessionKey string turnID string + context *TurnContext } -func (al *AgentLoop) newTurnEventScope(agentID, sessionKey string) turnEventScope { +func (al *AgentLoop) newTurnEventScope(agentID, sessionKey string, turnCtx *TurnContext) turnEventScope { seq := al.turnSeq.Add(1) return turnEventScope{ agentID: agentID, sessionKey: sessionKey, turnID: fmt.Sprintf("%s-turn-%d", agentID, seq), + context: cloneTurnContext(turnCtx), } } @@ -769,13 +772,14 @@ func (ts turnEventScope) meta(iteration int, source, tracePath string) EventMeta Iteration: iteration, Source: source, TracePath: tracePath, + Context: cloneTurnContext(ts.context), } } func (al *AgentLoop) emitEvent(kind EventKind, meta EventMeta, payload any) { evt := Event{ Kind: kind, - Meta: meta, + Meta: cloneEventMeta(meta), Payload: payload, } @@ -1356,6 +1360,7 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) DefaultResponse: defaultResponse, EnableSummary: true, SendResponse: false, + InboundContext: cloneInboundContext(&msg.Context), } // context-dependent commands check their own Runtime fields and report @@ -1535,7 +1540,8 @@ func (al *AgentLoop) runAgentLoop( } } - ts := newTurnState(agent, opts, al.newTurnEventScope(agent.ID, opts.SessionKey)) + turnScope := al.newTurnEventScope(agent.ID, opts.SessionKey, newTurnContext(opts.InboundContext)) + ts := newTurnState(agent, opts, turnScope) result, err := al.runTurn(ctx, ts) if err != nil { return "", err diff --git a/pkg/agent/subturn.go b/pkg/agent/subturn.go index f5ba412ab..e243d8ac0 100644 --- a/pkg/agent/subturn.go +++ b/pkg/agent/subturn.go @@ -366,10 +366,11 @@ func spawnSubTurn( SendResponse: false, NoHistory: true, // SubTurns don't use session history SkipInitialSteeringPoll: true, + InboundContext: cloneInboundContext(parentTS.opts.InboundContext), } // Create event scope for the child turn - scope := al.newTurnEventScope(agent.ID, childID) + scope := al.newTurnEventScope(agent.ID, childID, newTurnContext(opts.InboundContext)) // Create child turnState using the new API childTS := newTurnState(&agent, opts, scope) diff --git a/pkg/agent/turn.go b/pkg/agent/turn.go index e4970c519..3339b3418 100644 --- a/pkg/agent/turn.go +++ b/pkg/agent/turn.go @@ -55,6 +55,7 @@ type turnState struct { turnID string agentID string sessionKey string + turnCtx *TurnContext channel string chatID string @@ -115,6 +116,7 @@ func newTurnState(agent *AgentInstance, opts processOptions, scope turnEventScop turnID: scope.turnID, agentID: agent.ID, sessionKey: opts.SessionKey, + turnCtx: cloneTurnContext(scope.context), channel: opts.Channel, chatID: opts.ChatID, userMessage: opts.UserMessage, @@ -307,6 +309,7 @@ func (ts *turnState) eventMeta(source, tracePath string) EventMeta { Iteration: snap.Iteration, Source: source, TracePath: tracePath, + Context: cloneTurnContext(ts.turnCtx), } } diff --git a/pkg/agent/turn_context.go b/pkg/agent/turn_context.go new file mode 100644 index 000000000..a448e24cd --- /dev/null +++ b/pkg/agent/turn_context.go @@ -0,0 +1,53 @@ +package agent + +import "github.com/sipeed/picoclaw/pkg/bus" + +// TurnContext carries normalized turn-scoped facts that can be shared across +// events, hooks, and other runtime observers without re-parsing legacy fields. +type TurnContext struct { + Inbound *bus.InboundContext `json:"inbound,omitempty"` +} + +func newTurnContext(inbound *bus.InboundContext) *TurnContext { + if inbound == nil { + return nil + } + return &TurnContext{ + Inbound: cloneInboundContext(inbound), + } +} + +func cloneTurnContext(ctx *TurnContext) *TurnContext { + if ctx == nil { + return nil + } + cloned := *ctx + cloned.Inbound = cloneInboundContext(ctx.Inbound) + return &cloned +} + +func cloneInboundContext(ctx *bus.InboundContext) *bus.InboundContext { + if ctx == nil { + return nil + } + cloned := *ctx + cloned.ReplyHandles = cloneStringMap(ctx.ReplyHandles) + cloned.Raw = cloneStringMap(ctx.Raw) + return &cloned +} + +func cloneStringMap(src map[string]string) map[string]string { + if len(src) == 0 { + return nil + } + cloned := make(map[string]string, len(src)) + for k, v := range src { + cloned[k] = v + } + return cloned +} + +func cloneEventMeta(meta EventMeta) EventMeta { + meta.Context = cloneTurnContext(meta.Context) + return meta +}