mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
refactor(agent): carry inbound context through events and hooks
This commit is contained in:
@@ -136,6 +136,12 @@ func TestAgentLoop_EmitsMinimalTurnEvents(t *testing.T) {
|
||||
DefaultResponse: defaultResponse,
|
||||
EnableSummary: false,
|
||||
SendResponse: false,
|
||||
InboundContext: &bus.InboundContext{
|
||||
Channel: "cli",
|
||||
ChatID: "direct",
|
||||
ChatType: "direct",
|
||||
SenderID: "tester",
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("runAgentLoop failed: %v", err)
|
||||
@@ -176,6 +182,12 @@ func TestAgentLoop_EmitsMinimalTurnEvents(t *testing.T) {
|
||||
if evt.Meta.SessionKey != "session-1" {
|
||||
t.Fatalf("event %d has session key %q, want session-1", i, evt.Meta.SessionKey)
|
||||
}
|
||||
if evt.Meta.Context == nil || evt.Meta.Context.Inbound == nil {
|
||||
t.Fatalf("event %d missing inbound turn context", i)
|
||||
}
|
||||
if evt.Meta.Context.Inbound.Channel != "cli" || evt.Meta.Context.Inbound.SenderID != "tester" {
|
||||
t.Fatalf("event %d inbound context = %+v", i, evt.Meta.Context.Inbound)
|
||||
}
|
||||
}
|
||||
|
||||
startPayload, ok := events[0].Payload.(TurnStartPayload)
|
||||
@@ -472,7 +484,7 @@ func TestAgentLoop_EmitsSessionSummarizeEvent(t *testing.T) {
|
||||
sub := al.SubscribeEvents(16)
|
||||
defer al.UnsubscribeEvents(sub.ID)
|
||||
|
||||
turnScope := al.newTurnEventScope(defaultAgent.ID, "session-1")
|
||||
turnScope := al.newTurnEventScope(defaultAgent.ID, "session-1", nil)
|
||||
al.summarizeSession(defaultAgent, "session-1", turnScope)
|
||||
|
||||
events := collectEventStream(sub.C)
|
||||
|
||||
@@ -98,6 +98,7 @@ type EventMeta struct {
|
||||
Iteration int
|
||||
TracePath string
|
||||
Source string
|
||||
Context *TurnContext `json:"context,omitempty"`
|
||||
}
|
||||
|
||||
// TurnEndStatus describes the terminal state of a turn.
|
||||
|
||||
@@ -103,6 +103,7 @@ func (r *LLMHookRequest) Clone() *LLMHookRequest {
|
||||
return nil
|
||||
}
|
||||
cloned := *r
|
||||
cloned.Meta = cloneEventMeta(r.Meta)
|
||||
cloned.Messages = cloneProviderMessages(r.Messages)
|
||||
cloned.Tools = cloneToolDefinitions(r.Tools)
|
||||
cloned.Options = cloneStringAnyMap(r.Options)
|
||||
@@ -122,6 +123,7 @@ func (r *LLMHookResponse) Clone() *LLMHookResponse {
|
||||
return nil
|
||||
}
|
||||
cloned := *r
|
||||
cloned.Meta = cloneEventMeta(r.Meta)
|
||||
cloned.Response = cloneLLMResponse(r.Response)
|
||||
return &cloned
|
||||
}
|
||||
@@ -139,6 +141,7 @@ func (r *ToolCallHookRequest) Clone() *ToolCallHookRequest {
|
||||
return nil
|
||||
}
|
||||
cloned := *r
|
||||
cloned.Meta = cloneEventMeta(r.Meta)
|
||||
cloned.Arguments = cloneStringAnyMap(r.Arguments)
|
||||
return &cloned
|
||||
}
|
||||
@@ -156,6 +159,7 @@ func (r *ToolApprovalRequest) Clone() *ToolApprovalRequest {
|
||||
return nil
|
||||
}
|
||||
cloned := *r
|
||||
cloned.Meta = cloneEventMeta(r.Meta)
|
||||
cloned.Arguments = cloneStringAnyMap(r.Arguments)
|
||||
return &cloned
|
||||
}
|
||||
@@ -175,6 +179,7 @@ func (r *ToolResultHookResponse) Clone() *ToolResultHookResponse {
|
||||
return nil
|
||||
}
|
||||
cloned := *r
|
||||
cloned.Meta = cloneEventMeta(r.Meta)
|
||||
cloned.Arguments = cloneStringAnyMap(r.Arguments)
|
||||
cloned.Result = cloneToolResult(r.Result)
|
||||
return &cloned
|
||||
|
||||
+20
-1
@@ -106,7 +106,8 @@ func (p *llmHookTestProvider) GetDefaultModel() string {
|
||||
}
|
||||
|
||||
type llmObserverHook struct {
|
||||
eventCh chan Event
|
||||
eventCh chan Event
|
||||
lastInbound *bus.InboundContext
|
||||
}
|
||||
|
||||
func (h *llmObserverHook) OnEvent(ctx context.Context, evt Event) error {
|
||||
@@ -123,6 +124,9 @@ func (h *llmObserverHook) BeforeLLM(
|
||||
ctx context.Context,
|
||||
req *LLMHookRequest,
|
||||
) (*LLMHookRequest, HookDecision, error) {
|
||||
if req.Meta.Context != nil {
|
||||
h.lastInbound = cloneInboundContext(req.Meta.Context.Inbound)
|
||||
}
|
||||
next := req.Clone()
|
||||
next.Model = "hook-model"
|
||||
return next, HookDecision{Action: HookActionModify}, nil
|
||||
@@ -155,6 +159,12 @@ func TestAgentLoop_Hooks_ObserverAndLLMInterceptor(t *testing.T) {
|
||||
DefaultResponse: defaultResponse,
|
||||
EnableSummary: false,
|
||||
SendResponse: false,
|
||||
InboundContext: &bus.InboundContext{
|
||||
Channel: "cli",
|
||||
ChatID: "direct",
|
||||
ChatType: "direct",
|
||||
SenderID: "hook-user",
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("runAgentLoop failed: %v", err)
|
||||
@@ -169,12 +179,21 @@ func TestAgentLoop_Hooks_ObserverAndLLMInterceptor(t *testing.T) {
|
||||
if lastModel != "hook-model" {
|
||||
t.Fatalf("expected model hook-model, got %q", lastModel)
|
||||
}
|
||||
if hook.lastInbound == nil {
|
||||
t.Fatal("expected hook to receive inbound context")
|
||||
}
|
||||
if hook.lastInbound.Channel != "cli" || hook.lastInbound.SenderID != "hook-user" {
|
||||
t.Fatalf("hook inbound context = %+v", hook.lastInbound)
|
||||
}
|
||||
|
||||
select {
|
||||
case evt := <-hook.eventCh:
|
||||
if evt.Kind != EventKindTurnEnd {
|
||||
t.Fatalf("expected turn end event, got %v", evt.Kind)
|
||||
}
|
||||
if evt.Meta.Context == nil || evt.Meta.Context.Inbound == nil {
|
||||
t.Fatal("expected observer event to carry inbound context")
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("timed out waiting for hook observer event")
|
||||
}
|
||||
|
||||
+9
-3
@@ -91,6 +91,7 @@ type processOptions struct {
|
||||
SuppressToolFeedback bool // Whether to suppress inline tool feedback messages
|
||||
NoHistory bool // If true, don't load session history (for heartbeat)
|
||||
SkipInitialSteeringPoll bool // If true, skip the steering poll at loop start (used by Continue)
|
||||
InboundContext *bus.InboundContext // Normalized inbound facts for events/hooks
|
||||
}
|
||||
|
||||
type continuationTarget struct {
|
||||
@@ -750,14 +751,16 @@ type turnEventScope struct {
|
||||
agentID string
|
||||
sessionKey string
|
||||
turnID string
|
||||
context *TurnContext
|
||||
}
|
||||
|
||||
func (al *AgentLoop) newTurnEventScope(agentID, sessionKey string) turnEventScope {
|
||||
func (al *AgentLoop) newTurnEventScope(agentID, sessionKey string, turnCtx *TurnContext) turnEventScope {
|
||||
seq := al.turnSeq.Add(1)
|
||||
return turnEventScope{
|
||||
agentID: agentID,
|
||||
sessionKey: sessionKey,
|
||||
turnID: fmt.Sprintf("%s-turn-%d", agentID, seq),
|
||||
context: cloneTurnContext(turnCtx),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -769,13 +772,14 @@ func (ts turnEventScope) meta(iteration int, source, tracePath string) EventMeta
|
||||
Iteration: iteration,
|
||||
Source: source,
|
||||
TracePath: tracePath,
|
||||
Context: cloneTurnContext(ts.context),
|
||||
}
|
||||
}
|
||||
|
||||
func (al *AgentLoop) emitEvent(kind EventKind, meta EventMeta, payload any) {
|
||||
evt := Event{
|
||||
Kind: kind,
|
||||
Meta: meta,
|
||||
Meta: cloneEventMeta(meta),
|
||||
Payload: payload,
|
||||
}
|
||||
|
||||
@@ -1356,6 +1360,7 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
|
||||
DefaultResponse: defaultResponse,
|
||||
EnableSummary: true,
|
||||
SendResponse: false,
|
||||
InboundContext: cloneInboundContext(&msg.Context),
|
||||
}
|
||||
|
||||
// context-dependent commands check their own Runtime fields and report
|
||||
@@ -1535,7 +1540,8 @@ func (al *AgentLoop) runAgentLoop(
|
||||
}
|
||||
}
|
||||
|
||||
ts := newTurnState(agent, opts, al.newTurnEventScope(agent.ID, opts.SessionKey))
|
||||
turnScope := al.newTurnEventScope(agent.ID, opts.SessionKey, newTurnContext(opts.InboundContext))
|
||||
ts := newTurnState(agent, opts, turnScope)
|
||||
result, err := al.runTurn(ctx, ts)
|
||||
if err != nil {
|
||||
return "", err
|
||||
|
||||
@@ -366,10 +366,11 @@ func spawnSubTurn(
|
||||
SendResponse: false,
|
||||
NoHistory: true, // SubTurns don't use session history
|
||||
SkipInitialSteeringPoll: true,
|
||||
InboundContext: cloneInboundContext(parentTS.opts.InboundContext),
|
||||
}
|
||||
|
||||
// Create event scope for the child turn
|
||||
scope := al.newTurnEventScope(agent.ID, childID)
|
||||
scope := al.newTurnEventScope(agent.ID, childID, newTurnContext(opts.InboundContext))
|
||||
|
||||
// Create child turnState using the new API
|
||||
childTS := newTurnState(&agent, opts, scope)
|
||||
|
||||
@@ -55,6 +55,7 @@ type turnState struct {
|
||||
turnID string
|
||||
agentID string
|
||||
sessionKey string
|
||||
turnCtx *TurnContext
|
||||
|
||||
channel string
|
||||
chatID string
|
||||
@@ -115,6 +116,7 @@ func newTurnState(agent *AgentInstance, opts processOptions, scope turnEventScop
|
||||
turnID: scope.turnID,
|
||||
agentID: agent.ID,
|
||||
sessionKey: opts.SessionKey,
|
||||
turnCtx: cloneTurnContext(scope.context),
|
||||
channel: opts.Channel,
|
||||
chatID: opts.ChatID,
|
||||
userMessage: opts.UserMessage,
|
||||
@@ -307,6 +309,7 @@ func (ts *turnState) eventMeta(source, tracePath string) EventMeta {
|
||||
Iteration: snap.Iteration,
|
||||
Source: source,
|
||||
TracePath: tracePath,
|
||||
Context: cloneTurnContext(ts.turnCtx),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,53 @@
|
||||
package agent
|
||||
|
||||
import "github.com/sipeed/picoclaw/pkg/bus"
|
||||
|
||||
// TurnContext carries normalized turn-scoped facts that can be shared across
|
||||
// events, hooks, and other runtime observers without re-parsing legacy fields.
|
||||
type TurnContext struct {
|
||||
Inbound *bus.InboundContext `json:"inbound,omitempty"`
|
||||
}
|
||||
|
||||
func newTurnContext(inbound *bus.InboundContext) *TurnContext {
|
||||
if inbound == nil {
|
||||
return nil
|
||||
}
|
||||
return &TurnContext{
|
||||
Inbound: cloneInboundContext(inbound),
|
||||
}
|
||||
}
|
||||
|
||||
func cloneTurnContext(ctx *TurnContext) *TurnContext {
|
||||
if ctx == nil {
|
||||
return nil
|
||||
}
|
||||
cloned := *ctx
|
||||
cloned.Inbound = cloneInboundContext(ctx.Inbound)
|
||||
return &cloned
|
||||
}
|
||||
|
||||
func cloneInboundContext(ctx *bus.InboundContext) *bus.InboundContext {
|
||||
if ctx == nil {
|
||||
return nil
|
||||
}
|
||||
cloned := *ctx
|
||||
cloned.ReplyHandles = cloneStringMap(ctx.ReplyHandles)
|
||||
cloned.Raw = cloneStringMap(ctx.Raw)
|
||||
return &cloned
|
||||
}
|
||||
|
||||
func cloneStringMap(src map[string]string) map[string]string {
|
||||
if len(src) == 0 {
|
||||
return nil
|
||||
}
|
||||
cloned := make(map[string]string, len(src))
|
||||
for k, v := range src {
|
||||
cloned[k] = v
|
||||
}
|
||||
return cloned
|
||||
}
|
||||
|
||||
func cloneEventMeta(meta EventMeta) EventMeta {
|
||||
meta.Context = cloneTurnContext(meta.Context)
|
||||
return meta
|
||||
}
|
||||
Reference in New Issue
Block a user