mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
329e68e017
* refactor(agent): introduce interfaces for MessageBus and ChannelManager Phase 2 of loop.go refactor — dependency inversion using adapter pattern. - Add interfaces.MessageBus and interfaces.ChannelManager interfaces - Create adapters/messagebus.go wrapping *bus.MessageBus - Create adapters/channelmanager.go wrapping *channels.Manager - Update AgentLoop to use interfaces instead of concrete types - Update registerSharedTools to accept interfaces.MessageBus Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * refactor(agent): restructure pipeline and rename loop files Pipeline refactoring: - Split pipeline.go (1400 lines) into focused files: - pipeline_setup.go (~115 lines): SetupTurn method - pipeline_llm.go (~519 lines): CallLLM method - pipeline_execute.go (~693 lines): ExecuteTools method - pipeline_finalize.go (~78 lines): Finalize method - Pipeline struct and NewPipeline remain in pipeline.go (~39 lines) Agent file renaming: - Rename loop_*.go to agent_*.go for consistent naming: - loop.go -> agent.go, loop_message.go -> agent_message.go, etc. - Merge turn.go + turn_exec.go into turn_state.go - Rename loop_turn.go -> turn_coord.go Documentation: - Update docs/pipeline-restructuring-plan.md - Add docs/agent-rename-plan.md Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(agent): code format fixed * refactor(agent): code test file added/renamed * docs(agent): update agent refactor docs * fix(agent): fix agent hardAbortX --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
189 lines
5.6 KiB
Go
189 lines
5.6 KiB
Go
// PicoClaw - Ultra-lightweight personal AI agent
|
|
|
|
package agent
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/sipeed/picoclaw/pkg/logger"
|
|
)
|
|
|
|
func (al *AgentLoop) newTurnEventScope(agentID, sessionKey string, turnCtx *TurnContext) turnEventScope {
|
|
seq := al.turnSeq.Add(1)
|
|
return turnEventScope{
|
|
agentID: agentID,
|
|
sessionKey: sessionKey,
|
|
turnID: fmt.Sprintf("%s-turn-%d", agentID, seq),
|
|
context: cloneTurnContext(turnCtx),
|
|
}
|
|
}
|
|
|
|
func (ts turnEventScope) meta(iteration int, source, tracePath string) EventMeta {
|
|
return EventMeta{
|
|
AgentID: ts.agentID,
|
|
TurnID: ts.turnID,
|
|
SessionKey: ts.sessionKey,
|
|
Iteration: iteration,
|
|
Source: source,
|
|
TracePath: tracePath,
|
|
turnContext: cloneTurnContext(ts.context),
|
|
}
|
|
}
|
|
|
|
func (al *AgentLoop) emitEvent(kind EventKind, meta EventMeta, payload any) {
|
|
clonedMeta := cloneEventMeta(meta)
|
|
evt := Event{
|
|
Kind: kind,
|
|
Meta: clonedMeta,
|
|
Context: cloneTurnContext(clonedMeta.turnContext),
|
|
Payload: payload,
|
|
}
|
|
|
|
if al == nil || al.eventBus == nil {
|
|
return
|
|
}
|
|
|
|
al.logEvent(evt)
|
|
|
|
al.eventBus.Emit(evt)
|
|
}
|
|
|
|
func (al *AgentLoop) logEvent(evt Event) {
|
|
fields := map[string]any{
|
|
"event_kind": evt.Kind.String(),
|
|
"agent_id": evt.Meta.AgentID,
|
|
"turn_id": evt.Meta.TurnID,
|
|
"session_key": evt.Meta.SessionKey,
|
|
"iteration": evt.Meta.Iteration,
|
|
}
|
|
|
|
if evt.Meta.TracePath != "" {
|
|
fields["trace"] = evt.Meta.TracePath
|
|
}
|
|
if evt.Meta.Source != "" {
|
|
fields["source"] = evt.Meta.Source
|
|
}
|
|
|
|
appendEventContextFields(fields, evt.Context)
|
|
|
|
switch payload := evt.Payload.(type) {
|
|
case TurnStartPayload:
|
|
fields["user_len"] = len(payload.UserMessage)
|
|
fields["media_count"] = payload.MediaCount
|
|
case TurnEndPayload:
|
|
fields["status"] = payload.Status
|
|
fields["iterations_total"] = payload.Iterations
|
|
fields["duration_ms"] = payload.Duration.Milliseconds()
|
|
fields["final_len"] = payload.FinalContentLen
|
|
case LLMRequestPayload:
|
|
fields["model"] = payload.Model
|
|
fields["messages"] = payload.MessagesCount
|
|
fields["tools"] = payload.ToolsCount
|
|
fields["max_tokens"] = payload.MaxTokens
|
|
case LLMDeltaPayload:
|
|
fields["content_delta_len"] = payload.ContentDeltaLen
|
|
fields["reasoning_delta_len"] = payload.ReasoningDeltaLen
|
|
case LLMResponsePayload:
|
|
fields["content_len"] = payload.ContentLen
|
|
fields["tool_calls"] = payload.ToolCalls
|
|
fields["has_reasoning"] = payload.HasReasoning
|
|
case LLMRetryPayload:
|
|
fields["attempt"] = payload.Attempt
|
|
fields["max_retries"] = payload.MaxRetries
|
|
fields["reason"] = payload.Reason
|
|
fields["error"] = payload.Error
|
|
fields["backoff_ms"] = payload.Backoff.Milliseconds()
|
|
case ContextCompressPayload:
|
|
fields["reason"] = payload.Reason
|
|
fields["dropped_messages"] = payload.DroppedMessages
|
|
fields["remaining_messages"] = payload.RemainingMessages
|
|
case SessionSummarizePayload:
|
|
fields["summarized_messages"] = payload.SummarizedMessages
|
|
fields["kept_messages"] = payload.KeptMessages
|
|
fields["summary_len"] = payload.SummaryLen
|
|
fields["omitted_oversized"] = payload.OmittedOversized
|
|
case ToolExecStartPayload:
|
|
fields["tool"] = payload.Tool
|
|
fields["args_count"] = len(payload.Arguments)
|
|
case ToolExecEndPayload:
|
|
fields["tool"] = payload.Tool
|
|
fields["duration_ms"] = payload.Duration.Milliseconds()
|
|
fields["for_llm_len"] = payload.ForLLMLen
|
|
fields["for_user_len"] = payload.ForUserLen
|
|
fields["is_error"] = payload.IsError
|
|
fields["async"] = payload.Async
|
|
case ToolExecSkippedPayload:
|
|
fields["tool"] = payload.Tool
|
|
fields["reason"] = payload.Reason
|
|
case SteeringInjectedPayload:
|
|
fields["count"] = payload.Count
|
|
fields["total_content_len"] = payload.TotalContentLen
|
|
case FollowUpQueuedPayload:
|
|
fields["source_tool"] = payload.SourceTool
|
|
fields["content_len"] = payload.ContentLen
|
|
case InterruptReceivedPayload:
|
|
fields["interrupt_kind"] = payload.Kind
|
|
fields["role"] = payload.Role
|
|
fields["content_len"] = payload.ContentLen
|
|
fields["queue_depth"] = payload.QueueDepth
|
|
fields["hint_len"] = payload.HintLen
|
|
case SubTurnSpawnPayload:
|
|
fields["child_agent_id"] = payload.AgentID
|
|
fields["label"] = payload.Label
|
|
case SubTurnEndPayload:
|
|
fields["child_agent_id"] = payload.AgentID
|
|
fields["status"] = payload.Status
|
|
case SubTurnResultDeliveredPayload:
|
|
fields["target_channel"] = payload.TargetChannel
|
|
fields["target_chat_id"] = payload.TargetChatID
|
|
fields["content_len"] = payload.ContentLen
|
|
case ErrorPayload:
|
|
fields["stage"] = payload.Stage
|
|
fields["error"] = payload.Message
|
|
}
|
|
|
|
logger.InfoCF("eventbus", fmt.Sprintf("Agent event: %s", evt.Kind.String()), fields)
|
|
}
|
|
|
|
// MountHook registers an in-process hook on the agent loop.
|
|
func (al *AgentLoop) MountHook(reg HookRegistration) error {
|
|
if al == nil || al.hooks == nil {
|
|
return fmt.Errorf("hook manager is not initialized")
|
|
}
|
|
return al.hooks.Mount(reg)
|
|
}
|
|
|
|
// UnmountHook removes a previously registered in-process hook.
|
|
func (al *AgentLoop) UnmountHook(name string) {
|
|
if al == nil || al.hooks == nil {
|
|
return
|
|
}
|
|
al.hooks.Unmount(name)
|
|
}
|
|
|
|
// SubscribeEvents registers a subscriber for agent-loop events.
|
|
func (al *AgentLoop) SubscribeEvents(buffer int) EventSubscription {
|
|
if al == nil || al.eventBus == nil {
|
|
ch := make(chan Event)
|
|
close(ch)
|
|
return EventSubscription{C: ch}
|
|
}
|
|
return al.eventBus.Subscribe(buffer)
|
|
}
|
|
|
|
// UnsubscribeEvents removes a previously registered event subscriber.
|
|
func (al *AgentLoop) UnsubscribeEvents(id uint64) {
|
|
if al == nil || al.eventBus == nil {
|
|
return
|
|
}
|
|
al.eventBus.Unsubscribe(id)
|
|
}
|
|
|
|
// EventDrops returns the number of dropped events for the given kind.
|
|
func (al *AgentLoop) EventDrops(kind EventKind) int64 {
|
|
if al == nil || al.eventBus == nil {
|
|
return 0
|
|
}
|
|
return al.eventBus.Dropped(kind)
|
|
}
|