Files
picoclaw/pkg/agent/pipeline_execute.go
T
Hoshina 795ee362ea refactor(events): emit agent runtime events directly
Remove the legacy EventKind/Event envelope mapping and let agent event emission build pkg/events.Event values directly.

Keep HookMeta as the shared hook metadata shape and preserve legacy observe string aliases by mapping them to runtime event kinds.

Validation: GOCACHE=/tmp/picoclaw-go-cache go test ./pkg/agent; make lint
2026-04-26 16:55:02 +08:00

728 lines
22 KiB
Go

// PicoClaw - Ultra-lightweight personal AI agent
package agent
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/constants"
runtimeevents "github.com/sipeed/picoclaw/pkg/events"
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/providers"
"github.com/sipeed/picoclaw/pkg/tools"
"github.com/sipeed/picoclaw/pkg/utils"
)
// ExecuteTools executes the tool loop, handling BeforeTool/ApproveTool/AfterTool hooks,
// tool execution with async callbacks, media delivery, and steering injection.
// Returns ToolControl indicating what the coordinator should do next:
// - ToolControlContinue: all tool results handled, pendingMessages or steering exists, continue turn
// - ToolControlBreak: tool loop exited, proceed to coordinator's hardAbort/finalContent/finalize
func (p *Pipeline) ExecuteTools(
ctx context.Context,
turnCtx context.Context,
ts *turnState,
exec *turnExecution,
iteration int,
) ToolControl {
al := p.al
normalizedToolCalls := exec.normalizedToolCalls
ts.setPhase(TurnPhaseTools)
messages := exec.messages
handledAttachments := make([]providers.Attachment, 0)
toolLoop:
for i, tc := range normalizedToolCalls {
if ts.hardAbortRequested() {
exec.abortedByHardAbort = true
return ToolControlBreak
}
toolName := tc.Name
toolArgs := cloneStringAnyMap(tc.Arguments)
if al.hooks != nil {
toolReq, decision := al.hooks.BeforeTool(turnCtx, &ToolCallHookRequest{
Meta: ts.eventMeta("runTurn", "turn.tool.before"),
Context: cloneTurnContext(ts.turnCtx),
Tool: toolName,
Arguments: toolArgs,
})
switch decision.normalizedAction() {
case HookActionContinue, HookActionModify:
if toolReq != nil {
toolName = toolReq.Tool
toolArgs = toolReq.Arguments
}
case HookActionRespond:
if toolReq != nil && toolReq.HookResult != nil {
hookResult := toolReq.HookResult
argsJSON, _ := json.Marshal(toolArgs)
argsPreview := utils.Truncate(string(argsJSON), 200)
logger.InfoCF("agent", fmt.Sprintf("Tool call (hook respond): %s(%s)", toolName, argsPreview),
map[string]any{
"agent_id": ts.agent.ID,
"tool": toolName,
"iteration": iteration,
})
al.emitEvent(
runtimeevents.KindAgentToolExecStart,
ts.eventMeta("runTurn", "turn.tool.start"),
ToolExecStartPayload{
Tool: toolName,
Arguments: cloneEventArguments(toolArgs),
},
)
if shouldPublishToolFeedback(al.cfg, ts) {
toolFeedbackMaxLen := al.cfg.Agents.Defaults.GetToolFeedbackMaxArgsLength()
toolFeedbackExplanation := toolFeedbackExplanationForToolCall(
exec.response,
tc,
messages,
toolFeedbackMaxLen,
)
feedbackMsg := utils.FormatToolFeedbackMessage(
toolName,
toolFeedbackExplanation,
toolFeedbackArgsPreview(toolArgs, toolFeedbackMaxLen),
)
fbCtx, fbCancel := context.WithTimeout(turnCtx, 3*time.Second)
_ = al.bus.PublishOutbound(fbCtx, outboundMessageForTurnWithKind(ts, feedbackMsg, messageKindToolFeedback))
fbCancel()
}
toolDuration := time.Duration(0)
shouldSendForUser := !hookResult.Silent && hookResult.ForUser != "" &&
(ts.opts.SendResponse || hookResult.ResponseHandled)
if shouldSendForUser {
al.bus.PublishOutbound(ctx, bus.OutboundMessage{
Context: bus.InboundContext{
Channel: ts.channel,
ChatID: ts.chatID,
Raw: map[string]string{
"is_tool_call": "true",
},
},
Content: hookResult.ForUser,
})
}
if len(hookResult.Media) > 0 && hookResult.ResponseHandled {
parts := make([]bus.MediaPart, 0, len(hookResult.Media))
for _, ref := range hookResult.Media {
part := bus.MediaPart{Ref: ref}
if al.mediaStore != nil {
if _, meta, err := al.mediaStore.ResolveWithMeta(ref); err == nil {
part.Filename = meta.Filename
part.ContentType = meta.ContentType
part.Type = inferMediaType(meta.Filename, meta.ContentType)
}
}
parts = append(parts, part)
}
outboundMedia := bus.OutboundMediaMessage{
Channel: ts.channel,
ChatID: ts.chatID,
Context: outboundContextFromInbound(
ts.opts.Dispatch.InboundContext,
ts.channel,
ts.chatID,
ts.opts.Dispatch.ReplyToMessageID(),
),
AgentID: ts.agent.ID,
SessionKey: ts.sessionKey,
Scope: outboundScopeFromSessionScope(ts.opts.Dispatch.SessionScope),
Parts: parts,
}
if al.channelManager != nil && ts.channel != "" && !constants.IsInternalChannel(ts.channel) {
if err := al.channelManager.SendMedia(ctx, outboundMedia); err != nil {
logger.WarnCF("agent", "Failed to deliver hook media",
map[string]any{
"agent_id": ts.agent.ID,
"tool": toolName,
"channel": ts.channel,
"chat_id": ts.chatID,
"error": err.Error(),
})
hookResult.IsError = true
hookResult.ForLLM = fmt.Sprintf("failed to deliver attachment: %v", err)
} else {
handledAttachments = append(
handledAttachments,
buildProviderAttachments(al.mediaStore, hookResult.Media)...,
)
}
} else if al.bus != nil {
al.bus.PublishOutboundMedia(ctx, outboundMedia)
hookResult.ResponseHandled = false
}
}
if !hookResult.ResponseHandled {
exec.allResponsesHandled = false
}
contentForLLM := hookResult.ContentForLLM()
if al.cfg.Tools.IsFilterSensitiveDataEnabled() {
contentForLLM = al.cfg.FilterSensitiveData(contentForLLM)
}
toolResultMsg := providers.Message{
Role: "tool",
Content: contentForLLM,
ToolCallID: tc.ID,
}
if len(hookResult.Media) > 0 && !hookResult.ResponseHandled {
hookResult.ArtifactTags = buildArtifactTags(al.mediaStore, hookResult.Media)
contentForLLM = hookResult.ContentForLLM()
if al.cfg.Tools.IsFilterSensitiveDataEnabled() {
contentForLLM = al.cfg.FilterSensitiveData(contentForLLM)
}
toolResultMsg.Content = contentForLLM
toolResultMsg.Media = append(toolResultMsg.Media, hookResult.Media...)
}
al.emitEvent(
runtimeevents.KindAgentToolExecEnd,
ts.eventMeta("runTurn", "turn.tool.end"),
ToolExecEndPayload{
Tool: toolName,
Duration: toolDuration,
ForLLMLen: len(contentForLLM),
ForUserLen: len(hookResult.ForUser),
IsError: hookResult.IsError,
Async: hookResult.Async,
},
)
messages = append(messages, toolResultMsg)
if !ts.opts.NoHistory {
ts.agent.Sessions.AddFullMessage(ts.sessionKey, toolResultMsg)
ts.recordPersistedMessage(toolResultMsg)
ts.ingestMessage(turnCtx, al, toolResultMsg)
}
if steerMsgs := al.dequeueSteeringMessagesForScope(ts.sessionKey); len(steerMsgs) > 0 {
exec.pendingMessages = append(exec.pendingMessages, steerMsgs...)
}
skipReason := ""
skipMessage := ""
if len(exec.pendingMessages) > 0 {
skipReason = "queued user steering message"
skipMessage = "Skipped due to queued user message."
} else if gracefulPending, _ := ts.gracefulInterruptRequested(); gracefulPending {
skipReason = "graceful interrupt requested"
skipMessage = "Skipped due to graceful interrupt."
}
if skipReason != "" {
remaining := len(normalizedToolCalls) - i - 1
if remaining > 0 {
logger.InfoCF("agent", "Turn checkpoint: skipping remaining tools after hook respond",
map[string]any{
"agent_id": ts.agent.ID,
"completed": i + 1,
"skipped": remaining,
"reason": skipReason,
})
for j := i + 1; j < len(normalizedToolCalls); j++ {
skippedTC := normalizedToolCalls[j]
al.emitEvent(
runtimeevents.KindAgentToolExecSkipped,
ts.eventMeta("runTurn", "turn.tool.skipped"),
ToolExecSkippedPayload{
Tool: skippedTC.Name,
Reason: skipReason,
},
)
skippedMsg := providers.Message{
Role: "tool",
Content: skipMessage,
ToolCallID: skippedTC.ID,
}
messages = append(messages, skippedMsg)
if !ts.opts.NoHistory {
ts.agent.Sessions.AddFullMessage(ts.sessionKey, skippedMsg)
ts.recordPersistedMessage(skippedMsg)
}
}
}
break toolLoop
}
if ts.pendingResults != nil {
select {
case result, ok := <-ts.pendingResults:
if ok && result != nil && result.ForLLM != "" {
content := al.cfg.FilterSensitiveData(result.ForLLM)
msg := subTurnResultPromptMessage(content)
messages = append(messages, msg)
ts.agent.Sessions.AddFullMessage(ts.sessionKey, msg)
}
default:
}
}
continue
}
logger.WarnCF("agent", "Hook returned respond action but no HookResult provided",
map[string]any{
"agent_id": ts.agent.ID,
"tool": toolName,
"action": "respond",
})
case HookActionDenyTool:
exec.allResponsesHandled = false
denyContent := hookDeniedToolContent("Tool execution denied by hook", decision.Reason)
al.emitEvent(
runtimeevents.KindAgentToolExecSkipped,
ts.eventMeta("runTurn", "turn.tool.skipped"),
ToolExecSkippedPayload{
Tool: toolName,
Reason: denyContent,
},
)
deniedMsg := providers.Message{
Role: "tool",
Content: denyContent,
ToolCallID: tc.ID,
}
messages = append(messages, deniedMsg)
if !ts.opts.NoHistory {
ts.agent.Sessions.AddFullMessage(ts.sessionKey, deniedMsg)
ts.recordPersistedMessage(deniedMsg)
}
continue
case HookActionAbortTurn:
exec.abortedByHook = true
return ToolControlBreak
case HookActionHardAbort:
_ = ts.requestHardAbort()
exec.abortedByHardAbort = true
return ToolControlBreak
}
}
if al.hooks != nil {
approval := al.hooks.ApproveTool(turnCtx, &ToolApprovalRequest{
Meta: ts.eventMeta("runTurn", "turn.tool.approve"),
Context: cloneTurnContext(ts.turnCtx),
Tool: toolName,
Arguments: toolArgs,
})
if !approval.Approved {
exec.allResponsesHandled = false
denyContent := hookDeniedToolContent("Tool execution denied by approval hook", approval.Reason)
al.emitEvent(
runtimeevents.KindAgentToolExecSkipped,
ts.eventMeta("runTurn", "turn.tool.skipped"),
ToolExecSkippedPayload{
Tool: toolName,
Reason: denyContent,
},
)
deniedMsg := providers.Message{
Role: "tool",
Content: denyContent,
ToolCallID: tc.ID,
}
messages = append(messages, deniedMsg)
if !ts.opts.NoHistory {
ts.agent.Sessions.AddFullMessage(ts.sessionKey, deniedMsg)
ts.recordPersistedMessage(deniedMsg)
}
continue
}
}
argsJSON, _ := json.Marshal(toolArgs)
argsPreview := utils.Truncate(string(argsJSON), 200)
logger.InfoCF("agent", fmt.Sprintf("Tool call: %s(%s)", toolName, argsPreview),
map[string]any{
"agent_id": ts.agent.ID,
"tool": toolName,
"iteration": iteration,
})
al.emitEvent(
runtimeevents.KindAgentToolExecStart,
ts.eventMeta("runTurn", "turn.tool.start"),
ToolExecStartPayload{
Tool: toolName,
Arguments: cloneEventArguments(toolArgs),
},
)
if shouldPublishToolFeedback(al.cfg, ts) {
toolFeedbackMaxLen := al.cfg.Agents.Defaults.GetToolFeedbackMaxArgsLength()
toolFeedbackExplanation := toolFeedbackExplanationForToolCall(
exec.response,
tc,
messages,
toolFeedbackMaxLen,
)
feedbackMsg := utils.FormatToolFeedbackMessage(
toolName,
toolFeedbackExplanation,
toolFeedbackArgsPreview(toolArgs, toolFeedbackMaxLen),
)
fbCtx, fbCancel := context.WithTimeout(turnCtx, 3*time.Second)
_ = al.bus.PublishOutbound(fbCtx, outboundMessageForTurnWithKind(ts, feedbackMsg, messageKindToolFeedback))
fbCancel()
}
toolCallID := tc.ID
asyncToolName := toolName
asyncCallback := func(_ context.Context, result *tools.ToolResult) {
if !result.Silent && result.ForUser != "" {
outCtx, outCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer outCancel()
_ = al.bus.PublishOutbound(outCtx, outboundMessageForTurn(ts, result.ForUser))
}
content := result.ContentForLLM()
if content == "" {
return
}
content = al.cfg.FilterSensitiveData(content)
logger.InfoCF("agent", "Async tool completed, publishing result",
map[string]any{
"tool": asyncToolName,
"content_len": len(content),
"channel": ts.channel,
})
al.emitEvent(
runtimeevents.KindAgentFollowUpQueued,
ts.scope.meta(iteration, "runTurn", "turn.follow_up.queued"),
FollowUpQueuedPayload{
SourceTool: asyncToolName,
ContentLen: len(content),
},
)
pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer pubCancel()
_ = al.bus.PublishInbound(pubCtx, bus.InboundMessage{
Context: bus.InboundContext{
Channel: "system",
ChatID: fmt.Sprintf("%s:%s", ts.channel, ts.chatID),
ChatType: "direct",
SenderID: fmt.Sprintf("async:%s", asyncToolName),
},
Content: content,
})
}
toolStart := time.Now()
execCtx := tools.WithToolInboundContext(
turnCtx,
ts.channel,
ts.chatID,
ts.opts.Dispatch.MessageID(),
ts.opts.Dispatch.ReplyToMessageID(),
)
execCtx = tools.WithToolSessionContext(
execCtx,
ts.agent.ID,
ts.sessionKey,
ts.opts.Dispatch.SessionScope,
)
toolResult := ts.agent.Tools.ExecuteWithContext(
execCtx,
toolName,
toolArgs,
ts.channel,
ts.chatID,
asyncCallback,
)
toolDuration := time.Since(toolStart)
if ts.hardAbortRequested() {
exec.abortedByHardAbort = true
return ToolControlBreak
}
if al.hooks != nil {
toolResp, decision := al.hooks.AfterTool(turnCtx, &ToolResultHookResponse{
Meta: ts.eventMeta("runTurn", "turn.tool.after"),
Context: cloneTurnContext(ts.turnCtx),
Tool: toolName,
Arguments: toolArgs,
Result: toolResult,
Duration: toolDuration,
})
switch decision.normalizedAction() {
case HookActionContinue, HookActionModify:
if toolResp != nil {
if toolResp.Tool != "" {
toolName = toolResp.Tool
}
if toolResp.Result != nil {
toolResult = toolResp.Result
}
}
case HookActionAbortTurn:
exec.abortedByHook = true
return ToolControlBreak
case HookActionHardAbort:
_ = ts.requestHardAbort()
exec.abortedByHardAbort = true
return ToolControlBreak
}
}
if toolResult == nil {
toolResult = tools.ErrorResult("hook returned nil tool result")
}
if len(toolResult.Media) > 0 && toolResult.ResponseHandled {
parts := make([]bus.MediaPart, 0, len(toolResult.Media))
for _, ref := range toolResult.Media {
part := bus.MediaPart{Ref: ref}
if al.mediaStore != nil {
if _, meta, err := al.mediaStore.ResolveWithMeta(ref); err == nil {
part.Filename = meta.Filename
part.ContentType = meta.ContentType
part.Type = inferMediaType(meta.Filename, meta.ContentType)
}
}
parts = append(parts, part)
}
outboundMedia := bus.OutboundMediaMessage{
Channel: ts.channel,
ChatID: ts.chatID,
Context: outboundContextFromInbound(
ts.opts.Dispatch.InboundContext,
ts.channel,
ts.chatID,
ts.opts.Dispatch.ReplyToMessageID(),
),
AgentID: ts.agent.ID,
SessionKey: ts.sessionKey,
Scope: outboundScopeFromSessionScope(ts.opts.Dispatch.SessionScope),
Parts: parts,
}
if al.channelManager != nil && ts.channel != "" && !constants.IsInternalChannel(ts.channel) {
if err := al.channelManager.SendMedia(ctx, outboundMedia); err != nil {
logger.WarnCF("agent", "Failed to deliver handled tool media",
map[string]any{
"agent_id": ts.agent.ID,
"tool": toolName,
"channel": ts.channel,
"chat_id": ts.chatID,
"error": err.Error(),
})
toolResult = tools.ErrorResult(fmt.Sprintf("failed to deliver attachment: %v", err)).WithError(err)
} else {
handledAttachments = append(
handledAttachments,
buildProviderAttachments(al.mediaStore, toolResult.Media)...,
)
}
} else if al.bus != nil {
al.bus.PublishOutboundMedia(ctx, outboundMedia)
toolResult.ResponseHandled = false
}
}
if len(toolResult.Media) > 0 && !toolResult.ResponseHandled {
toolResult.ArtifactTags = buildArtifactTags(al.mediaStore, toolResult.Media)
}
if !toolResult.ResponseHandled {
exec.allResponsesHandled = false
}
shouldSendForUser := !toolResult.Silent &&
toolResult.ForUser != "" &&
(ts.opts.SendResponse || toolResult.ResponseHandled)
if shouldSendForUser {
al.bus.PublishOutbound(ctx, outboundMessageForTurn(ts, toolResult.ForUser))
logger.DebugCF("agent", "Sent tool result to user",
map[string]any{
"tool": toolName,
"content_len": len(toolResult.ForUser),
})
}
contentForLLM := toolResult.ContentForLLM()
if al.cfg.Tools.IsFilterSensitiveDataEnabled() {
contentForLLM = al.cfg.FilterSensitiveData(contentForLLM)
}
toolResultMsg := providers.Message{
Role: "tool",
Content: contentForLLM,
ToolCallID: toolCallID,
}
if len(toolResult.Media) > 0 && !toolResult.ResponseHandled {
toolResultMsg.Media = append(toolResultMsg.Media, toolResult.Media...)
}
al.emitEvent(
runtimeevents.KindAgentToolExecEnd,
ts.eventMeta("runTurn", "turn.tool.end"),
ToolExecEndPayload{
Tool: toolName,
Duration: toolDuration,
ForLLMLen: len(contentForLLM),
ForUserLen: len(toolResult.ForUser),
IsError: toolResult.IsError,
Async: toolResult.Async,
},
)
messages = append(messages, toolResultMsg)
if !ts.opts.NoHistory {
ts.agent.Sessions.AddFullMessage(ts.sessionKey, toolResultMsg)
ts.recordPersistedMessage(toolResultMsg)
ts.ingestMessage(turnCtx, al, toolResultMsg)
}
if steerMsgs := al.dequeueSteeringMessagesForScope(ts.sessionKey); len(steerMsgs) > 0 {
exec.pendingMessages = append(exec.pendingMessages, steerMsgs...)
}
skipReason := ""
skipMessage := ""
if len(exec.pendingMessages) > 0 {
skipReason = "queued user steering message"
skipMessage = "Skipped due to queued user message."
} else if gracefulPending, _ := ts.gracefulInterruptRequested(); gracefulPending {
skipReason = "graceful interrupt requested"
skipMessage = "Skipped due to graceful interrupt."
}
if skipReason != "" {
remaining := len(normalizedToolCalls) - i - 1
if remaining > 0 {
logger.InfoCF("agent", "Turn checkpoint: skipping remaining tools",
map[string]any{
"agent_id": ts.agent.ID,
"completed": i + 1,
"skipped": remaining,
"reason": skipReason,
})
for j := i + 1; j < len(normalizedToolCalls); j++ {
skippedTC := normalizedToolCalls[j]
al.emitEvent(
runtimeevents.KindAgentToolExecSkipped,
ts.eventMeta("runTurn", "turn.tool.skipped"),
ToolExecSkippedPayload{
Tool: skippedTC.Name,
Reason: skipReason,
},
)
skippedMsg := providers.Message{
Role: "tool",
Content: skipMessage,
ToolCallID: skippedTC.ID,
}
messages = append(messages, skippedMsg)
if !ts.opts.NoHistory {
ts.agent.Sessions.AddFullMessage(ts.sessionKey, skippedMsg)
ts.recordPersistedMessage(skippedMsg)
}
}
}
break toolLoop
}
if ts.pendingResults != nil {
select {
case result, ok := <-ts.pendingResults:
if ok && result != nil && result.ForLLM != "" {
content := al.cfg.FilterSensitiveData(result.ForLLM)
msg := subTurnResultPromptMessage(content)
messages = append(messages, msg)
ts.agent.Sessions.AddFullMessage(ts.sessionKey, msg)
}
default:
}
}
}
exec.messages = messages
// Continue if pending steering exists (regardless of allResponsesHandled).
// This covers the case where tools were partially executed and skipped due to steering,
// but one tool had ResponseHandled=false (so allResponsesHandled=false).
if len(exec.pendingMessages) > 0 {
logger.InfoCF("agent", "Pending steering after partial tool execution; continuing turn",
map[string]any{
"agent_id": ts.agent.ID,
"pending_count": len(exec.pendingMessages),
"allResponsesHandled": exec.allResponsesHandled,
})
exec.allResponsesHandled = false
return ToolControlContinue
}
// Poll for newly arrived steering
if steerMsgs := al.dequeueSteeringMessagesForScope(ts.sessionKey); len(steerMsgs) > 0 {
logger.InfoCF("agent", "Steering arrived after tool delivery; continuing turn",
map[string]any{
"agent_id": ts.agent.ID,
"steering_count": len(steerMsgs),
})
exec.pendingMessages = append(exec.pendingMessages, steerMsgs...)
exec.allResponsesHandled = false
return ToolControlContinue
}
// No pending steering: finalize or break depending on allResponsesHandled
if exec.allResponsesHandled {
summaryMsg := providers.Message{
Role: "assistant",
Content: handledToolResponseSummary,
Attachments: append([]providers.Attachment(nil), handledAttachments...),
}
if !ts.opts.NoHistory {
ts.agent.Sessions.AddFullMessage(ts.sessionKey, summaryMsg)
ts.recordPersistedMessage(summaryMsg)
ts.ingestMessage(turnCtx, al, summaryMsg)
if err := ts.agent.Sessions.Save(ts.sessionKey); err != nil {
logger.WarnCF("agent", "Failed to save session after tool delivery",
map[string]any{
"agent_id": ts.agent.ID,
"error": err.Error(),
})
}
}
if ts.opts.EnableSummary {
al.contextManager.Compact(turnCtx, &CompactRequest{
SessionKey: ts.sessionKey,
Reason: ContextCompressReasonSummarize,
Budget: ts.agent.ContextWindow,
})
}
ts.setPhase(TurnPhaseCompleted)
ts.setFinalContent("")
logger.InfoCF("agent", "Tool output satisfied delivery; ending turn without follow-up LLM",
map[string]any{
"agent_id": ts.agent.ID,
"iteration": iteration,
"tool_count": len(normalizedToolCalls),
})
return ToolControlBreak
}
// allResponsesHandled=false and no pending steering: continue so coordinator
// makes another LLM call. The tool result is in messages and the LLM will
// return it as finalContent in the next iteration.
ts.agent.Tools.TickTTL()
logger.DebugCF("agent", "TTL tick after tool execution", map[string]any{
"agent_id": ts.agent.ID, "iteration": iteration,
})
return ToolControlContinue
}