mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
329e68e017
* refactor(agent): introduce interfaces for MessageBus and ChannelManager Phase 2 of loop.go refactor — dependency inversion using adapter pattern. - Add interfaces.MessageBus and interfaces.ChannelManager interfaces - Create adapters/messagebus.go wrapping *bus.MessageBus - Create adapters/channelmanager.go wrapping *channels.Manager - Update AgentLoop to use interfaces instead of concrete types - Update registerSharedTools to accept interfaces.MessageBus Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * refactor(agent): restructure pipeline and rename loop files Pipeline refactoring: - Split pipeline.go (1400 lines) into focused files: - pipeline_setup.go (~115 lines): SetupTurn method - pipeline_llm.go (~519 lines): CallLLM method - pipeline_execute.go (~693 lines): ExecuteTools method - pipeline_finalize.go (~78 lines): Finalize method - Pipeline struct and NewPipeline remain in pipeline.go (~39 lines) Agent file renaming: - Rename loop_*.go to agent_*.go for consistent naming: - loop.go -> agent.go, loop_message.go -> agent_message.go, etc. - Merge turn.go + turn_exec.go into turn_state.go - Rename loop_turn.go -> turn_coord.go Documentation: - Update docs/pipeline-restructuring-plan.md - Add docs/agent-rename-plan.md Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(agent): code format fixed * refactor(agent): code test file added/renamed * docs(agent): update agent refactor docs * fix(agent): fix agent hardAbortX --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
97 lines
2.6 KiB
Go
97 lines
2.6 KiB
Go
// PicoClaw - Ultra-lightweight personal AI agent
|
|
|
|
package agent
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/sipeed/picoclaw/pkg/bus"
|
|
"github.com/sipeed/picoclaw/pkg/logger"
|
|
)
|
|
|
|
func (al *AgentLoop) processMessageSync(ctx context.Context, msg bus.InboundMessage) {
|
|
if al.channelManager != nil {
|
|
defer al.channelManager.InvokeTypingStop(msg.Channel, msg.ChatID)
|
|
}
|
|
|
|
response, err := al.processMessage(ctx, msg)
|
|
al.publishResponseOrError(ctx, msg.Channel, msg.ChatID, msg.SessionKey, response, err)
|
|
}
|
|
|
|
func (al *AgentLoop) runTurnWithSteering(ctx context.Context, initialMsg bus.InboundMessage) {
|
|
// Process the initial message
|
|
response, err := al.processMessage(ctx, initialMsg)
|
|
if err != nil {
|
|
if !al.maybePublishError(ctx, initialMsg.Channel, initialMsg.ChatID, initialMsg.SessionKey, err) {
|
|
return // context canceled
|
|
}
|
|
response = ""
|
|
}
|
|
finalResponse := response
|
|
|
|
// Build continuation target
|
|
target, targetErr := al.buildContinuationTarget(initialMsg)
|
|
if targetErr != nil {
|
|
logger.WarnCF("agent", "Failed to build steering continuation target",
|
|
map[string]any{
|
|
"channel": initialMsg.Channel,
|
|
"error": targetErr.Error(),
|
|
})
|
|
return
|
|
}
|
|
if target == nil {
|
|
// System message or non-routable, response already published
|
|
return
|
|
}
|
|
|
|
// Drain steering queue using existing Continue mechanism
|
|
for al.pendingSteeringCountForScope(target.SessionKey) > 0 {
|
|
// Check for context cancellation between iterations
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
|
|
logger.InfoCF("agent", "Continuing queued steering after turn end",
|
|
map[string]any{
|
|
"channel": target.Channel,
|
|
"chat_id": target.ChatID,
|
|
"session_key": target.SessionKey,
|
|
"queue_depth": al.pendingSteeringCountForScope(target.SessionKey),
|
|
})
|
|
|
|
continued, continueErr := al.Continue(ctx, target.SessionKey, target.Channel, target.ChatID)
|
|
if continueErr != nil {
|
|
logger.WarnCF("agent", "Failed to continue queued steering",
|
|
map[string]any{
|
|
"channel": target.Channel,
|
|
"chat_id": target.ChatID,
|
|
"error": continueErr.Error(),
|
|
})
|
|
break
|
|
}
|
|
if continued == "" {
|
|
break
|
|
}
|
|
finalResponse = continued
|
|
}
|
|
|
|
// Publish final response
|
|
if finalResponse != "" {
|
|
al.PublishResponseIfNeeded(ctx, target.Channel, target.ChatID, target.SessionKey, finalResponse)
|
|
}
|
|
}
|
|
|
|
func (al *AgentLoop) resolveSteeringTarget(msg bus.InboundMessage) (string, string, bool) {
|
|
if msg.Channel == "system" {
|
|
return "", "", false
|
|
}
|
|
|
|
route, agent, err := al.resolveMessageRoute(msg)
|
|
if err != nil || agent == nil {
|
|
return "", "", false
|
|
}
|
|
allocation := al.allocateRouteSession(route, msg)
|
|
|
|
return resolveScopeKey(allocation.SessionKey, msg.SessionKey), agent.ID, true
|
|
}
|