mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
feat: execute LLM tool calls in parallel for faster response (#1070)
When the LLM returns multiple tool calls, they are now executed concurrently using goroutines + sync.WaitGroup instead of sequentially. Results are collected in an indexed slice and processed in original order to preserve message ordering. MessageTool.sentInRound is changed to atomic.Bool for thread safety. Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
+58
-44
@@ -969,62 +969,76 @@ func (al *AgentLoop) runLLMIteration(
|
||||
// Save assistant message with tool calls to session
|
||||
agent.Sessions.AddFullMessage(opts.SessionKey, assistantMsg)
|
||||
|
||||
// Execute tool calls
|
||||
for _, tc := range normalizedToolCalls {
|
||||
argsJSON, _ := json.Marshal(tc.Arguments)
|
||||
argsPreview := utils.Truncate(string(argsJSON), 200)
|
||||
logger.InfoCF("agent", fmt.Sprintf("Tool call: %s(%s)", tc.Name, argsPreview),
|
||||
map[string]any{
|
||||
"agent_id": agent.ID,
|
||||
"tool": tc.Name,
|
||||
"iteration": iteration,
|
||||
})
|
||||
// Execute tool calls in parallel
|
||||
type indexedAgentResult struct {
|
||||
result *tools.ToolResult
|
||||
tc providers.ToolCall
|
||||
}
|
||||
|
||||
// Create async callback for tools that implement AsyncTool
|
||||
// NOTE: Following openclaw's design, async tools do NOT send results directly to users.
|
||||
// Instead, they notify the agent via PublishInbound, and the agent decides
|
||||
// whether to forward the result to the user (in processSystemMessage).
|
||||
asyncCallback := func(callbackCtx context.Context, result *tools.ToolResult) {
|
||||
// Log the async completion but don't send directly to user
|
||||
// The agent will handle user notification via processSystemMessage
|
||||
if !result.Silent && result.ForUser != "" {
|
||||
logger.InfoCF("agent", "Async tool completed, agent will handle notification",
|
||||
map[string]any{
|
||||
"tool": tc.Name,
|
||||
"content_len": len(result.ForUser),
|
||||
})
|
||||
agentResults := make([]indexedAgentResult, len(normalizedToolCalls))
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i, tc := range normalizedToolCalls {
|
||||
agentResults[i].tc = tc
|
||||
|
||||
wg.Add(1)
|
||||
go func(idx int, tc providers.ToolCall) {
|
||||
defer wg.Done()
|
||||
|
||||
argsJSON, _ := json.Marshal(tc.Arguments)
|
||||
argsPreview := utils.Truncate(string(argsJSON), 200)
|
||||
logger.InfoCF("agent", fmt.Sprintf("Tool call: %s(%s)", tc.Name, argsPreview),
|
||||
map[string]any{
|
||||
"agent_id": agent.ID,
|
||||
"tool": tc.Name,
|
||||
"iteration": iteration,
|
||||
})
|
||||
|
||||
// Create async callback for tools that implement AsyncTool
|
||||
asyncCallback := func(callbackCtx context.Context, result *tools.ToolResult) {
|
||||
if !result.Silent && result.ForUser != "" {
|
||||
logger.InfoCF("agent", "Async tool completed, agent will handle notification",
|
||||
map[string]any{
|
||||
"tool": tc.Name,
|
||||
"content_len": len(result.ForUser),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
toolResult := agent.Tools.ExecuteWithContext(
|
||||
ctx,
|
||||
tc.Name,
|
||||
tc.Arguments,
|
||||
opts.Channel,
|
||||
opts.ChatID,
|
||||
asyncCallback,
|
||||
)
|
||||
toolResult := agent.Tools.ExecuteWithContext(
|
||||
ctx,
|
||||
tc.Name,
|
||||
tc.Arguments,
|
||||
opts.Channel,
|
||||
opts.ChatID,
|
||||
asyncCallback,
|
||||
)
|
||||
agentResults[idx].result = toolResult
|
||||
}(i, tc)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Process results in original order (send to user, save to session)
|
||||
for _, r := range agentResults {
|
||||
// Send ForUser content to user immediately if not Silent
|
||||
if !toolResult.Silent && toolResult.ForUser != "" && opts.SendResponse {
|
||||
if !r.result.Silent && r.result.ForUser != "" && opts.SendResponse {
|
||||
al.bus.PublishOutbound(ctx, bus.OutboundMessage{
|
||||
Channel: opts.Channel,
|
||||
ChatID: opts.ChatID,
|
||||
Content: toolResult.ForUser,
|
||||
Content: r.result.ForUser,
|
||||
})
|
||||
logger.DebugCF("agent", "Sent tool result to user",
|
||||
map[string]any{
|
||||
"tool": tc.Name,
|
||||
"content_len": len(toolResult.ForUser),
|
||||
"tool": r.tc.Name,
|
||||
"content_len": len(r.result.ForUser),
|
||||
})
|
||||
}
|
||||
|
||||
// If tool returned media refs, publish them as outbound media
|
||||
if len(toolResult.Media) > 0 && opts.SendResponse {
|
||||
parts := make([]bus.MediaPart, 0, len(toolResult.Media))
|
||||
for _, ref := range toolResult.Media {
|
||||
if len(r.result.Media) > 0 && opts.SendResponse {
|
||||
parts := make([]bus.MediaPart, 0, len(r.result.Media))
|
||||
for _, ref := range r.result.Media {
|
||||
part := bus.MediaPart{Ref: ref}
|
||||
// Populate metadata from MediaStore when available
|
||||
if al.mediaStore != nil {
|
||||
if _, meta, err := al.mediaStore.ResolveWithMeta(ref); err == nil {
|
||||
part.Filename = meta.Filename
|
||||
@@ -1042,15 +1056,15 @@ func (al *AgentLoop) runLLMIteration(
|
||||
}
|
||||
|
||||
// Determine content for LLM based on tool result
|
||||
contentForLLM := toolResult.ForLLM
|
||||
if contentForLLM == "" && toolResult.Err != nil {
|
||||
contentForLLM = toolResult.Err.Error()
|
||||
contentForLLM := r.result.ForLLM
|
||||
if contentForLLM == "" && r.result.Err != nil {
|
||||
contentForLLM = r.result.Err.Error()
|
||||
}
|
||||
|
||||
toolResultMsg := providers.Message{
|
||||
Role: "tool",
|
||||
Content: contentForLLM,
|
||||
ToolCallID: tc.ID,
|
||||
ToolCallID: r.tc.ID,
|
||||
}
|
||||
messages = append(messages, toolResultMsg)
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package tools
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
type SendCallback func(channel, chatID, content string) error
|
||||
@@ -11,7 +12,7 @@ type MessageTool struct {
|
||||
sendCallback SendCallback
|
||||
defaultChannel string
|
||||
defaultChatID string
|
||||
sentInRound bool // Tracks whether a message was sent in the current processing round
|
||||
sentInRound atomic.Bool // Tracks whether a message was sent in the current processing round
|
||||
}
|
||||
|
||||
func NewMessageTool() *MessageTool {
|
||||
@@ -50,12 +51,12 @@ func (t *MessageTool) Parameters() map[string]any {
|
||||
func (t *MessageTool) SetContext(channel, chatID string) {
|
||||
t.defaultChannel = channel
|
||||
t.defaultChatID = chatID
|
||||
t.sentInRound = false // Reset send tracking for new processing round
|
||||
t.sentInRound.Store(false) // Reset send tracking for new processing round
|
||||
}
|
||||
|
||||
// HasSentInRound returns true if the message tool sent a message during the current round.
|
||||
func (t *MessageTool) HasSentInRound() bool {
|
||||
return t.sentInRound
|
||||
return t.sentInRound.Load()
|
||||
}
|
||||
|
||||
func (t *MessageTool) SetSendCallback(callback SendCallback) {
|
||||
@@ -94,7 +95,7 @@ func (t *MessageTool) Execute(ctx context.Context, args map[string]any) *ToolRes
|
||||
}
|
||||
}
|
||||
|
||||
t.sentInRound = true
|
||||
t.sentInRound.Store(true)
|
||||
// Silent: user already received the message directly
|
||||
return &ToolResult{
|
||||
ForLLM: fmt.Sprintf("Message sent to %s:%s", channel, chatID),
|
||||
|
||||
+43
-26
@@ -10,6 +10,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/sipeed/picoclaw/pkg/logger"
|
||||
"github.com/sipeed/picoclaw/pkg/providers"
|
||||
@@ -121,37 +122,53 @@ func RunToolLoop(
|
||||
}
|
||||
messages = append(messages, assistantMsg)
|
||||
|
||||
// 7. Execute tool calls
|
||||
for _, tc := range normalizedToolCalls {
|
||||
argsJSON, _ := json.Marshal(tc.Arguments)
|
||||
argsPreview := utils.Truncate(string(argsJSON), 200)
|
||||
logger.InfoCF("toolloop", fmt.Sprintf("Tool call: %s(%s)", tc.Name, argsPreview),
|
||||
map[string]any{
|
||||
"tool": tc.Name,
|
||||
"iteration": iteration,
|
||||
})
|
||||
// 7. Execute tool calls in parallel
|
||||
type indexedResult struct {
|
||||
result *ToolResult
|
||||
tc providers.ToolCall
|
||||
}
|
||||
|
||||
// Execute tool (no async callback for subagents - they run independently)
|
||||
var toolResult *ToolResult
|
||||
if config.Tools != nil {
|
||||
toolResult = config.Tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, channel, chatID, nil)
|
||||
} else {
|
||||
toolResult = ErrorResult("No tools available")
|
||||
results := make([]indexedResult, len(normalizedToolCalls))
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i, tc := range normalizedToolCalls {
|
||||
results[i].tc = tc
|
||||
|
||||
wg.Add(1)
|
||||
go func(idx int, tc providers.ToolCall) {
|
||||
defer wg.Done()
|
||||
|
||||
argsJSON, _ := json.Marshal(tc.Arguments)
|
||||
argsPreview := utils.Truncate(string(argsJSON), 200)
|
||||
logger.InfoCF("toolloop", fmt.Sprintf("Tool call: %s(%s)", tc.Name, argsPreview),
|
||||
map[string]any{
|
||||
"tool": tc.Name,
|
||||
"iteration": iteration,
|
||||
})
|
||||
|
||||
var toolResult *ToolResult
|
||||
if config.Tools != nil {
|
||||
toolResult = config.Tools.ExecuteWithContext(ctx, tc.Name, tc.Arguments, channel, chatID, nil)
|
||||
} else {
|
||||
toolResult = ErrorResult("No tools available")
|
||||
}
|
||||
results[idx].result = toolResult
|
||||
}(i, tc)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Append results in original order
|
||||
for _, r := range results {
|
||||
contentForLLM := r.result.ForLLM
|
||||
if contentForLLM == "" && r.result.Err != nil {
|
||||
contentForLLM = r.result.Err.Error()
|
||||
}
|
||||
|
||||
// Determine content for LLM
|
||||
contentForLLM := toolResult.ForLLM
|
||||
if contentForLLM == "" && toolResult.Err != nil {
|
||||
contentForLLM = toolResult.Err.Error()
|
||||
}
|
||||
|
||||
// Add tool result message
|
||||
toolResultMsg := providers.Message{
|
||||
messages = append(messages, providers.Message{
|
||||
Role: "tool",
|
||||
Content: contentForLLM,
|
||||
ToolCallID: tc.ID,
|
||||
}
|
||||
messages = append(messages, toolResultMsg)
|
||||
ToolCallID: r.tc.ID,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user