Files
picoclaw/pkg/agent/loop_event.go
T
sky5454 12d5421c26 refactor(agent): split loop.go into focused sub-packages
Break up the monolithic 4384-line loop.go into 12 focused files:
- loop.go: core AgentLoop struct and main Run loop
- loop_turn.go: turn execution logic (runTurn, askSideQuestion, etc.)
- loop_utils.go: pure utility functions (formatters, helpers)
- loop_init.go: constructor and tool registration
- loop_message.go: message handling (processMessage, routing)
- loop_command.go: command processing (/use, /btw, etc.)
- loop_mcp.go: MCP runtime management
- loop_event.go: event/hook system helpers
- loop_media.go: media resolution and artifact handling
- loop_outbound.go: response publishing
- loop_transcribe.go: audio transcription
- loop_steering.go: steering queue and continuation
- loop_inject.go: setter injection methods

No functional changes - pure code movement with updated imports.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-17 11:17:12 +08:00

207 lines
6.0 KiB
Go

// PicoClaw - Ultra-lightweight personal AI agent
package agent
import (
"fmt"
"github.com/sipeed/picoclaw/pkg/logger"
)
func (al *AgentLoop) newTurnEventScope(agentID, sessionKey string, turnCtx *TurnContext) turnEventScope {
seq := al.turnSeq.Add(1)
return turnEventScope{
agentID: agentID,
sessionKey: sessionKey,
turnID: fmt.Sprintf("%s-turn-%d", agentID, seq),
context: cloneTurnContext(turnCtx),
}
}
func (ts turnEventScope) meta(iteration int, source, tracePath string) EventMeta {
return EventMeta{
AgentID: ts.agentID,
TurnID: ts.turnID,
SessionKey: ts.sessionKey,
Iteration: iteration,
Source: source,
TracePath: tracePath,
turnContext: cloneTurnContext(ts.context),
}
}
func (al *AgentLoop) emitEvent(kind EventKind, meta EventMeta, payload any) {
clonedMeta := cloneEventMeta(meta)
evt := Event{
Kind: kind,
Meta: clonedMeta,
Context: cloneTurnContext(clonedMeta.turnContext),
Payload: payload,
}
if al == nil || al.eventBus == nil {
return
}
al.logEvent(evt)
al.eventBus.Emit(evt)
}
func (al *AgentLoop) hookAbortError(ts *turnState, stage string, decision HookDecision) error {
reason := decision.Reason
if reason == "" {
reason = "hook requested turn abort"
}
err := fmt.Errorf("hook aborted turn during %s: %s", stage, reason)
al.emitEvent(
EventKindError,
ts.eventMeta("hooks", "turn.error"),
ErrorPayload{
Stage: "hook." + stage,
Message: err.Error(),
},
)
return err
}
func (al *AgentLoop) logEvent(evt Event) {
fields := map[string]any{
"event_kind": evt.Kind.String(),
"agent_id": evt.Meta.AgentID,
"turn_id": evt.Meta.TurnID,
"session_key": evt.Meta.SessionKey,
"iteration": evt.Meta.Iteration,
}
if evt.Meta.TracePath != "" {
fields["trace"] = evt.Meta.TracePath
}
if evt.Meta.Source != "" {
fields["source"] = evt.Meta.Source
}
appendEventContextFields(fields, evt.Context)
switch payload := evt.Payload.(type) {
case TurnStartPayload:
fields["user_len"] = len(payload.UserMessage)
fields["media_count"] = payload.MediaCount
case TurnEndPayload:
fields["status"] = payload.Status
fields["iterations_total"] = payload.Iterations
fields["duration_ms"] = payload.Duration.Milliseconds()
fields["final_len"] = payload.FinalContentLen
case LLMRequestPayload:
fields["model"] = payload.Model
fields["messages"] = payload.MessagesCount
fields["tools"] = payload.ToolsCount
fields["max_tokens"] = payload.MaxTokens
case LLMDeltaPayload:
fields["content_delta_len"] = payload.ContentDeltaLen
fields["reasoning_delta_len"] = payload.ReasoningDeltaLen
case LLMResponsePayload:
fields["content_len"] = payload.ContentLen
fields["tool_calls"] = payload.ToolCalls
fields["has_reasoning"] = payload.HasReasoning
case LLMRetryPayload:
fields["attempt"] = payload.Attempt
fields["max_retries"] = payload.MaxRetries
fields["reason"] = payload.Reason
fields["error"] = payload.Error
fields["backoff_ms"] = payload.Backoff.Milliseconds()
case ContextCompressPayload:
fields["reason"] = payload.Reason
fields["dropped_messages"] = payload.DroppedMessages
fields["remaining_messages"] = payload.RemainingMessages
case SessionSummarizePayload:
fields["summarized_messages"] = payload.SummarizedMessages
fields["kept_messages"] = payload.KeptMessages
fields["summary_len"] = payload.SummaryLen
fields["omitted_oversized"] = payload.OmittedOversized
case ToolExecStartPayload:
fields["tool"] = payload.Tool
fields["args_count"] = len(payload.Arguments)
case ToolExecEndPayload:
fields["tool"] = payload.Tool
fields["duration_ms"] = payload.Duration.Milliseconds()
fields["for_llm_len"] = payload.ForLLMLen
fields["for_user_len"] = payload.ForUserLen
fields["is_error"] = payload.IsError
fields["async"] = payload.Async
case ToolExecSkippedPayload:
fields["tool"] = payload.Tool
fields["reason"] = payload.Reason
case SteeringInjectedPayload:
fields["count"] = payload.Count
fields["total_content_len"] = payload.TotalContentLen
case FollowUpQueuedPayload:
fields["source_tool"] = payload.SourceTool
fields["content_len"] = payload.ContentLen
case InterruptReceivedPayload:
fields["interrupt_kind"] = payload.Kind
fields["role"] = payload.Role
fields["content_len"] = payload.ContentLen
fields["queue_depth"] = payload.QueueDepth
fields["hint_len"] = payload.HintLen
case SubTurnSpawnPayload:
fields["child_agent_id"] = payload.AgentID
fields["label"] = payload.Label
case SubTurnEndPayload:
fields["child_agent_id"] = payload.AgentID
fields["status"] = payload.Status
case SubTurnResultDeliveredPayload:
fields["target_channel"] = payload.TargetChannel
fields["target_chat_id"] = payload.TargetChatID
fields["content_len"] = payload.ContentLen
case ErrorPayload:
fields["stage"] = payload.Stage
fields["error"] = payload.Message
}
logger.InfoCF("eventbus", fmt.Sprintf("Agent event: %s", evt.Kind.String()), fields)
}
// MountHook registers an in-process hook on the agent loop.
func (al *AgentLoop) MountHook(reg HookRegistration) error {
if al == nil || al.hooks == nil {
return fmt.Errorf("hook manager is not initialized")
}
return al.hooks.Mount(reg)
}
// UnmountHook removes a previously registered in-process hook.
func (al *AgentLoop) UnmountHook(name string) {
if al == nil || al.hooks == nil {
return
}
al.hooks.Unmount(name)
}
// SubscribeEvents registers a subscriber for agent-loop events.
func (al *AgentLoop) SubscribeEvents(buffer int) EventSubscription {
if al == nil || al.eventBus == nil {
ch := make(chan Event)
close(ch)
return EventSubscription{C: ch}
}
return al.eventBus.Subscribe(buffer)
}
// UnsubscribeEvents removes a previously registered event subscriber.
func (al *AgentLoop) UnsubscribeEvents(id uint64) {
if al == nil || al.eventBus == nil {
return
}
al.eventBus.Unsubscribe(id)
}
// EventDrops returns the number of dropped events for the given kind.
func (al *AgentLoop) EventDrops(kind EventKind) int64 {
if al == nil || al.eventBus == nil {
return 0
}
return al.eventBus.Dropped(kind)
}