Files
picoclaw/pkg/agent/loop_outbound.go
T
sky5454 12d5421c26 refactor(agent): split loop.go into focused sub-packages
Break up the monolithic 4384-line loop.go into 12 focused files:
- loop.go: core AgentLoop struct and main Run loop
- loop_turn.go: turn execution logic (runTurn, askSideQuestion, etc.)
- loop_utils.go: pure utility functions (formatters, helpers)
- loop_init.go: constructor and tool registration
- loop_message.go: message handling (processMessage, routing)
- loop_command.go: command processing (/use, /btw, etc.)
- loop_mcp.go: MCP runtime management
- loop_event.go: event/hook system helpers
- loop_media.go: media resolution and artifact handling
- loop_outbound.go: response publishing
- loop_transcribe.go: audio transcription
- loop_steering.go: steering queue and continuation
- loop_inject.go: setter injection methods

No functional changes - pure code movement with updated imports.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-17 11:17:12 +08:00

166 lines
4.6 KiB
Go

// PicoClaw - Ultra-lightweight personal AI agent
package agent
import (
"context"
"errors"
"fmt"
"time"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/tools"
)
func (al *AgentLoop) maybePublishError(ctx context.Context, channel, chatID, sessionKey string, err error) bool {
if errors.Is(err, context.Canceled) {
return false
}
al.PublishResponseIfNeeded(ctx, channel, chatID, sessionKey, fmt.Sprintf("Error processing message: %v", err))
return true
}
func (al *AgentLoop) publishResponseOrError(
ctx context.Context,
channel, chatID, sessionKey string,
response string,
err error,
) {
if err != nil {
if !al.maybePublishError(ctx, channel, chatID, sessionKey, err) {
return
}
response = ""
}
al.PublishResponseIfNeeded(ctx, channel, chatID, sessionKey, response)
}
func (al *AgentLoop) PublishResponseIfNeeded(ctx context.Context, channel, chatID, sessionKey, response string) {
if response == "" {
return
}
alreadySentToSameChat := false
defaultAgent := al.GetRegistry().GetDefaultAgent()
if defaultAgent != nil {
if tool, ok := defaultAgent.Tools.Get("message"); ok {
if mt, ok := tool.(*tools.MessageTool); ok {
alreadySentToSameChat = mt.HasSentTo(sessionKey, channel, chatID)
}
}
}
if alreadySentToSameChat {
logger.DebugCF(
"agent",
"Skipped outbound (message tool already sent to same chat)",
map[string]any{"channel": channel, "chat_id": chatID},
)
return
}
al.bus.PublishOutbound(ctx, bus.OutboundMessage{
Context: bus.NewOutboundContext(channel, chatID, ""),
Content: response,
})
logger.InfoCF("agent", "Published outbound response",
map[string]any{
"channel": channel,
"chat_id": chatID,
"content_len": len(response),
})
}
func (al *AgentLoop) targetReasoningChannelID(channelName string) (chatID string) {
if al.channelManager == nil {
return ""
}
if ch, ok := al.channelManager.GetChannel(channelName); ok {
return ch.ReasoningChannelID()
}
return ""
}
func (al *AgentLoop) publishPicoReasoning(ctx context.Context, reasoningContent, chatID string) {
if reasoningContent == "" || chatID == "" {
return
}
if ctx.Err() != nil {
return
}
pubCtx, pubCancel := context.WithTimeout(ctx, 5*time.Second)
defer pubCancel()
if err := al.bus.PublishOutbound(pubCtx, bus.OutboundMessage{
Context: bus.InboundContext{
Channel: "pico",
ChatID: chatID,
Raw: map[string]string{
metadataKeyMessageKind: messageKindThought,
},
},
Content: reasoningContent,
}); err != nil {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) ||
errors.Is(err, bus.ErrBusClosed) {
logger.DebugCF("agent", "Pico reasoning publish skipped (timeout/cancel)", map[string]any{
"channel": "pico",
"error": err.Error(),
})
} else {
logger.WarnCF("agent", "Failed to publish pico reasoning (best-effort)", map[string]any{
"channel": "pico",
"error": err.Error(),
})
}
}
}
func (al *AgentLoop) handleReasoning(
ctx context.Context,
reasoningContent, channelName, channelID string,
) {
if reasoningContent == "" || channelName == "" || channelID == "" {
return
}
// Check context cancellation before attempting to publish,
// since PublishOutbound's select may race between send and ctx.Done().
if ctx.Err() != nil {
return
}
// Use a short timeout so the goroutine does not block indefinitely when
// the outbound bus is full. Reasoning output is best-effort; dropping it
// is acceptable to avoid goroutine accumulation.
pubCtx, pubCancel := context.WithTimeout(ctx, 5*time.Second)
defer pubCancel()
if err := al.bus.PublishOutbound(pubCtx, bus.OutboundMessage{
Context: bus.NewOutboundContext(channelName, channelID, ""),
Content: reasoningContent,
}); err != nil {
// Treat context.DeadlineExceeded / context.Canceled as expected
// (bus full under load, or parent canceled). Check the error
// itself rather than ctx.Err(), because pubCtx may time out
// (5 s) while the parent ctx is still active.
// Also treat ErrBusClosed as expected — it occurs during normal
// shutdown when the bus is closed before all goroutines finish.
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) ||
errors.Is(err, bus.ErrBusClosed) {
logger.DebugCF("agent", "Reasoning publish skipped (timeout/cancel)", map[string]any{
"channel": channelName,
"error": err.Error(),
})
} else {
logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{
"channel": channelName,
"error": err.Error(),
})
}
}
}