From 028605cfd09d78c3db1c9f9fae65538f442a7c34 Mon Sep 17 00:00:00 2001 From: Guoguo <16666742+imguoguo@users.noreply.github.com> Date: Wed, 4 Mar 2026 17:17:28 +0800 Subject: [PATCH] feat: execute LLM tool calls in parallel for faster response (#1070) When the LLM returns multiple tool calls, they are now executed concurrently using goroutines + sync.WaitGroup instead of sequentially. Results are collected in an indexed slice and processed in original order to preserve message ordering. MessageTool.sentInRound is changed to atomic.Bool for thread safety. Co-authored-by: Claude Opus 4.6 --- pkg/agent/loop.go | 102 ++++++++++++++++++++++++------------------ pkg/tools/message.go | 9 ++-- pkg/tools/toolloop.go | 69 +++++++++++++++++----------- 3 files changed, 106 insertions(+), 74 deletions(-) diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index ef7ded721..db9efa2cf 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -969,62 +969,76 @@ func (al *AgentLoop) runLLMIteration( // Save assistant message with tool calls to session agent.Sessions.AddFullMessage(opts.SessionKey, assistantMsg) - // Execute tool calls - for _, tc := range normalizedToolCalls { - argsJSON, _ := json.Marshal(tc.Arguments) - argsPreview := utils.Truncate(string(argsJSON), 200) - logger.InfoCF("agent", fmt.Sprintf("Tool call: %s(%s)", tc.Name, argsPreview), - map[string]any{ - "agent_id": agent.ID, - "tool": tc.Name, - "iteration": iteration, - }) + // Execute tool calls in parallel + type indexedAgentResult struct { + result *tools.ToolResult + tc providers.ToolCall + } - // Create async callback for tools that implement AsyncTool - // NOTE: Following openclaw's design, async tools do NOT send results directly to users. - // Instead, they notify the agent via PublishInbound, and the agent decides - // whether to forward the result to the user (in processSystemMessage). - asyncCallback := func(callbackCtx context.Context, result *tools.ToolResult) { - // Log the async completion but don't send directly to user - // The agent will handle user notification via processSystemMessage - if !result.Silent && result.ForUser != "" { - logger.InfoCF("agent", "Async tool completed, agent will handle notification", - map[string]any{ - "tool": tc.Name, - "content_len": len(result.ForUser), - }) + agentResults := make([]indexedAgentResult, len(normalizedToolCalls)) + var wg sync.WaitGroup + + for i, tc := range normalizedToolCalls { + agentResults[i].tc = tc + + wg.Add(1) + go func(idx int, tc providers.ToolCall) { + defer wg.Done() + + argsJSON, _ := json.Marshal(tc.Arguments) + argsPreview := utils.Truncate(string(argsJSON), 200) + logger.InfoCF("agent", fmt.Sprintf("Tool call: %s(%s)", tc.Name, argsPreview), + map[string]any{ + "agent_id": agent.ID, + "tool": tc.Name, + "iteration": iteration, + }) + + // Create async callback for tools that implement AsyncTool + asyncCallback := func(callbackCtx context.Context, result *tools.ToolResult) { + if !result.Silent && result.ForUser != "" { + logger.InfoCF("agent", "Async tool completed, agent will handle notification", + map[string]any{ + "tool": tc.Name, + "content_len": len(result.ForUser), + }) + } } - } - toolResult := agent.Tools.ExecuteWithContext( - ctx, - tc.Name, - tc.Arguments, - opts.Channel, - opts.ChatID, - asyncCallback, - ) + toolResult := agent.Tools.ExecuteWithContext( + ctx, + tc.Name, + tc.Arguments, + opts.Channel, + opts.ChatID, + asyncCallback, + ) + agentResults[idx].result = toolResult + }(i, tc) + } + wg.Wait() + // Process results in original order (send to user, save to session) + for _, r := range agentResults { // Send ForUser content to user immediately if not Silent - if !toolResult.Silent && toolResult.ForUser != "" && opts.SendResponse { + if !r.result.Silent && r.result.ForUser != "" && opts.SendResponse { al.bus.PublishOutbound(ctx, bus.OutboundMessage{ Channel: opts.Channel, ChatID: opts.ChatID, - Content: toolResult.ForUser, + Content: r.result.ForUser, }) logger.DebugCF("agent", "Sent tool result to user", map[string]any{ - "tool": tc.Name, - "content_len": len(toolResult.ForUser), + "tool": r.tc.Name, + "content_len": len(r.result.ForUser), }) } // If tool returned media refs, publish them as outbound media - if len(toolResult.Media) > 0 && opts.SendResponse { - parts := make([]bus.MediaPart, 0, len(toolResult.Media)) - for _, ref := range toolResult.Media { + if len(r.result.Media) > 0 && opts.SendResponse { + parts := make([]bus.MediaPart, 0, len(r.result.Media)) + for _, ref := range r.result.Media { part := bus.MediaPart{Ref: ref} - // Populate metadata from MediaStore when available if al.mediaStore != nil { if _, meta, err := al.mediaStore.ResolveWithMeta(ref); err == nil { part.Filename = meta.Filename @@ -1042,15 +1056,15 @@ func (al *AgentLoop) runLLMIteration( } // Determine content for LLM based on tool result - contentForLLM := toolResult.ForLLM - if contentForLLM == "" && toolResult.Err != nil { - contentForLLM = toolResult.Err.Error() + contentForLLM := r.result.ForLLM + if contentForLLM == "" && r.result.Err != nil { + contentForLLM = r.result.Err.Error() } toolResultMsg := providers.Message{ Role: "tool", Content: contentForLLM, - ToolCallID: tc.ID, + ToolCallID: r.tc.ID, } messages = append(messages, toolResultMsg) diff --git a/pkg/tools/message.go b/pkg/tools/message.go index 15ef4ff73..d1e4a373e 100644 --- a/pkg/tools/message.go +++ b/pkg/tools/message.go @@ -3,6 +3,7 @@ package tools import ( "context" "fmt" + "sync/atomic" ) type SendCallback func(channel, chatID, content string) error @@ -11,7 +12,7 @@ type MessageTool struct { sendCallback SendCallback defaultChannel string defaultChatID string - sentInRound bool // Tracks whether a message was sent in the current processing round + sentInRound atomic.Bool // Tracks whether a message was sent in the current processing round } func NewMessageTool() *MessageTool { @@ -50,12 +51,12 @@ func (t *MessageTool) Parameters() map[string]any { func (t *MessageTool) SetContext(channel, chatID string) { t.defaultChannel = channel t.defaultChatID = chatID - t.sentInRound = false // Reset send tracking for new processing round + t.sentInRound.Store(false) // Reset send tracking for new processing round } // HasSentInRound returns true if the message tool sent a message during the current round. func (t *MessageTool) HasSentInRound() bool { - return t.sentInRound + return t.sentInRound.Load() } func (t *MessageTool) SetSendCallback(callback SendCallback) { @@ -94,7 +95,7 @@ func (t *MessageTool) Execute(ctx context.Context, args map[string]any) *ToolRes } } - t.sentInRound = true + t.sentInRound.Store(true) // Silent: user already received the message directly return &ToolResult{ ForLLM: fmt.Sprintf("Message sent to %s:%s", channel, chatID), diff --git a/pkg/tools/toolloop.go b/pkg/tools/toolloop.go index cdfe0d6ce..244f0d4a2 100644 --- a/pkg/tools/toolloop.go +++ b/pkg/tools/toolloop.go @@ -10,6 +10,7 @@ import ( "context" "encoding/json" "fmt" + "sync" "github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/providers" @@ -121,37 +122,53 @@ func RunToolLoop( } messages = append(messages, assistantMsg) - // 7. Execute tool calls - for _, tc := range normalizedToolCalls { - argsJSON, _ := json.Marshal(tc.Arguments) - argsPreview := utils.Truncate(string(argsJSON), 200) - logger.InfoCF("toolloop", fmt.Sprintf("Tool call: %s(%s)", tc.Name, argsPreview), - map[string]any{ - "tool": tc.Name, - "iteration": iteration, - }) + // 7. Execute tool calls in parallel + type indexedResult struct { + result *ToolResult + tc providers.ToolCall + } - // Execute tool (no async callback for subagents - they run independently) - var toolResult *ToolResult - if config.Tools != nil { - toolResult = config.Tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, channel, chatID, nil) - } else { - toolResult = ErrorResult("No tools available") + results := make([]indexedResult, len(normalizedToolCalls)) + var wg sync.WaitGroup + + for i, tc := range normalizedToolCalls { + results[i].tc = tc + + wg.Add(1) + go func(idx int, tc providers.ToolCall) { + defer wg.Done() + + argsJSON, _ := json.Marshal(tc.Arguments) + argsPreview := utils.Truncate(string(argsJSON), 200) + logger.InfoCF("toolloop", fmt.Sprintf("Tool call: %s(%s)", tc.Name, argsPreview), + map[string]any{ + "tool": tc.Name, + "iteration": iteration, + }) + + var toolResult *ToolResult + if config.Tools != nil { + toolResult = config.Tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, channel, chatID, nil) + } else { + toolResult = ErrorResult("No tools available") + } + results[idx].result = toolResult + }(i, tc) + } + wg.Wait() + + // Append results in original order + for _, r := range results { + contentForLLM := r.result.ForLLM + if contentForLLM == "" && r.result.Err != nil { + contentForLLM = r.result.Err.Error() } - // Determine content for LLM - contentForLLM := toolResult.ForLLM - if contentForLLM == "" && toolResult.Err != nil { - contentForLLM = toolResult.Err.Error() - } - - // Add tool result message - toolResultMsg := providers.Message{ + messages = append(messages, providers.Message{ Role: "tool", Content: contentForLLM, - ToolCallID: tc.ID, - } - messages = append(messages, toolResultMsg) + ToolCallID: r.tc.ID, + }) } }