From 968fff07b9254e3f8b402a92cdeb8f23b65f2786 Mon Sep 17 00:00:00 2001 From: Boris Bliznioukov Date: Thu, 5 Mar 2026 13:07:17 +0100 Subject: [PATCH] fix: background task results silently dropped Signed-off-by: Boris Bliznioukov --- pkg/agent/loop.go | 53 ++++++++++++++++++++++++++------- pkg/tools/spawn_test.go | 4 +-- pkg/tools/subagent.go | 18 ----------- pkg/tools/subagent_tool_test.go | 23 ++++++-------- 4 files changed, 53 insertions(+), 45 deletions(-) diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 1ab79f3ca..82dca9d30 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -192,7 +192,7 @@ func registerSharedTools( // Spawn tool with allowlist checker if cfg.Tools.IsToolEnabled("spawn") { if cfg.Tools.IsToolEnabled("subagent") { - subagentManager := tools.NewSubagentManager(provider, agent.Model, agent.Workspace, msgBus) + subagentManager := tools.NewSubagentManager(provider, agent.Model, agent.Workspace) subagentManager.SetLLMOptions(agent.MaxTokens, agent.Temperature) spawnTool := tools.NewSpawnTool(subagentManager) currentAgentID := agentID @@ -671,10 +671,9 @@ func (al *AgentLoop) runAgentLoop( agent *AgentInstance, opts processOptions, ) (string, error) { - // 0. Record last channel for heartbeat notifications (skip internal channels) + // 0. Record last channel for heartbeat notifications (skip internal channels and cli) if opts.Channel != "" && opts.ChatID != "" { - // Don't record internal channels (cli, system, subagent) - if !constants.IsInternalChannel(opts.Channel) { + if !constants.IsInternalChannel(opts.Channel) && opts.Channel != "cli" { channelKey := fmt.Sprintf("%s:%s", opts.Channel, opts.ChatID) if err := al.RecordLastChannel(channelKey); err != nil { logger.WarnCF( @@ -1083,15 +1082,47 @@ func (al *AgentLoop) runLLMIteration( "iteration": iteration, }) - // Create async callback for tools that implement AsyncExecutor - asyncCallback := func(callbackCtx context.Context, result *tools.ToolResult) { + // Create async callback for tools that implement AsyncExecutor. + // When the background work completes, this publishes the result + // as an inbound system message so processSystemMessage routes it + // back to the user via the normal agent loop. + asyncCallback := func(_ context.Context, result *tools.ToolResult) { + // Send ForUser content directly to the user (immediate feedback), + // mirroring the synchronous tool execution path. if !result.Silent && result.ForUser != "" { - logger.InfoCF("agent", "Async tool completed, agent will handle notification", - map[string]any{ - "tool": tc.Name, - "content_len": len(result.ForUser), - }) + outCtx, outCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer outCancel() + _ = al.bus.PublishOutbound(outCtx, bus.OutboundMessage{ + Channel: opts.Channel, + ChatID: opts.ChatID, + Content: result.ForUser, + }) } + + // Determine content for the agent loop (ForLLM or error). + content := result.ForLLM + if content == "" && result.Err != nil { + content = result.Err.Error() + } + if content == "" { + return + } + + logger.InfoCF("agent", "Async tool completed, publishing result", + map[string]any{ + "tool": tc.Name, + "content_len": len(content), + "channel": opts.Channel, + }) + + pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer pubCancel() + _ = al.bus.PublishInbound(pubCtx, bus.InboundMessage{ + Channel: "system", + SenderID: fmt.Sprintf("async:%s", tc.Name), + ChatID: fmt.Sprintf("%s:%s", opts.Channel, opts.ChatID), + Content: content, + }) } toolResult := agent.Tools.ExecuteWithContext( diff --git a/pkg/tools/spawn_test.go b/pkg/tools/spawn_test.go index 0646c82a9..43223b8db 100644 --- a/pkg/tools/spawn_test.go +++ b/pkg/tools/spawn_test.go @@ -8,7 +8,7 @@ import ( func TestSpawnTool_Execute_EmptyTask(t *testing.T) { provider := &MockLLMProvider{} - manager := NewSubagentManager(provider, "test-model", "/tmp/test", nil) + manager := NewSubagentManager(provider, "test-model", "/tmp/test") tool := NewSpawnTool(manager) ctx := context.Background() @@ -42,7 +42,7 @@ func TestSpawnTool_Execute_EmptyTask(t *testing.T) { func TestSpawnTool_Execute_ValidTask(t *testing.T) { provider := &MockLLMProvider{} - manager := NewSubagentManager(provider, "test-model", "/tmp/test", nil) + manager := NewSubagentManager(provider, "test-model", "/tmp/test") tool := NewSpawnTool(manager) ctx := context.Background() diff --git a/pkg/tools/subagent.go b/pkg/tools/subagent.go index 429340047..e51cbaafa 100644 --- a/pkg/tools/subagent.go +++ b/pkg/tools/subagent.go @@ -6,7 +6,6 @@ import ( "sync" "time" - "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/providers" ) @@ -27,7 +26,6 @@ type SubagentManager struct { mu sync.RWMutex provider providers.LLMProvider defaultModel string - bus *bus.MessageBus workspace string tools *ToolRegistry maxIterations int @@ -41,13 +39,11 @@ type SubagentManager struct { func NewSubagentManager( provider providers.LLMProvider, defaultModel, workspace string, - bus *bus.MessageBus, ) *SubagentManager { return &SubagentManager{ tasks: make(map[string]*SubagentTask), provider: provider, defaultModel: defaultModel, - bus: bus, workspace: workspace, tools: NewToolRegistry(), maxIterations: 10, @@ -214,20 +210,6 @@ After completing the task, provide a clear summary of what was done.` Async: false, } } - - // Send announce message back to main agent - if sm.bus != nil { - announceContent := fmt.Sprintf("Task '%s' completed.\n\nResult:\n%s", task.Label, task.Result) - pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second) - defer pubCancel() - sm.bus.PublishInbound(pubCtx, bus.InboundMessage{ - Channel: "system", - SenderID: fmt.Sprintf("subagent:%s", task.ID), - // Format: "original_channel:original_chat_id" for routing back - ChatID: fmt.Sprintf("%s:%s", task.OriginChannel, task.OriginChatID), - Content: announceContent, - }) - } } func (sm *SubagentManager) GetTask(taskID string) (*SubagentTask, bool) { diff --git a/pkg/tools/subagent_tool_test.go b/pkg/tools/subagent_tool_test.go index a1450410a..4b6f130a5 100644 --- a/pkg/tools/subagent_tool_test.go +++ b/pkg/tools/subagent_tool_test.go @@ -5,7 +5,6 @@ import ( "strings" "testing" - "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/providers" ) @@ -47,7 +46,7 @@ func (m *MockLLMProvider) GetContextWindow() int { func TestSubagentManager_SetLLMOptions_AppliesToRunToolLoop(t *testing.T) { provider := &MockLLMProvider{} - manager := NewSubagentManager(provider, "test-model", "/tmp/test", nil) + manager := NewSubagentManager(provider, "test-model", "/tmp/test") manager.SetLLMOptions(2048, 0.6) tool := NewSubagentTool(manager) @@ -73,7 +72,7 @@ func TestSubagentManager_SetLLMOptions_AppliesToRunToolLoop(t *testing.T) { // TestSubagentTool_Name verifies tool name func TestSubagentTool_Name(t *testing.T) { provider := &MockLLMProvider{} - manager := NewSubagentManager(provider, "test-model", "/tmp/test", nil) + manager := NewSubagentManager(provider, "test-model", "/tmp/test") tool := NewSubagentTool(manager) if tool.Name() != "subagent" { @@ -84,7 +83,7 @@ func TestSubagentTool_Name(t *testing.T) { // TestSubagentTool_Description verifies tool description func TestSubagentTool_Description(t *testing.T) { provider := &MockLLMProvider{} - manager := NewSubagentManager(provider, "test-model", "/tmp/test", nil) + manager := NewSubagentManager(provider, "test-model", "/tmp/test") tool := NewSubagentTool(manager) desc := tool.Description() @@ -99,7 +98,7 @@ func TestSubagentTool_Description(t *testing.T) { // TestSubagentTool_Parameters verifies tool parameters schema func TestSubagentTool_Parameters(t *testing.T) { provider := &MockLLMProvider{} - manager := NewSubagentManager(provider, "test-model", "/tmp/test", nil) + manager := NewSubagentManager(provider, "test-model", "/tmp/test") tool := NewSubagentTool(manager) params := tool.Parameters() @@ -149,8 +148,7 @@ func TestSubagentTool_Parameters(t *testing.T) { // TestSubagentTool_Execute_Success tests successful execution func TestSubagentTool_Execute_Success(t *testing.T) { provider := &MockLLMProvider{} - msgBus := bus.NewMessageBus() - manager := NewSubagentManager(provider, "test-model", "/tmp/test", msgBus) + manager := NewSubagentManager(provider, "test-model", "/tmp/test") tool := NewSubagentTool(manager) ctx := WithToolContext(context.Background(), "telegram", "chat-123") @@ -204,8 +202,7 @@ func TestSubagentTool_Execute_Success(t *testing.T) { // TestSubagentTool_Execute_NoLabel tests execution without label func TestSubagentTool_Execute_NoLabel(t *testing.T) { provider := &MockLLMProvider{} - msgBus := bus.NewMessageBus() - manager := NewSubagentManager(provider, "test-model", "/tmp/test", msgBus) + manager := NewSubagentManager(provider, "test-model", "/tmp/test") tool := NewSubagentTool(manager) ctx := context.Background() @@ -228,7 +225,7 @@ func TestSubagentTool_Execute_NoLabel(t *testing.T) { // TestSubagentTool_Execute_MissingTask tests error handling for missing task func TestSubagentTool_Execute_MissingTask(t *testing.T) { provider := &MockLLMProvider{} - manager := NewSubagentManager(provider, "test-model", "/tmp/test", nil) + manager := NewSubagentManager(provider, "test-model", "/tmp/test") tool := NewSubagentTool(manager) ctx := context.Background() @@ -278,8 +275,7 @@ func TestSubagentTool_Execute_NilManager(t *testing.T) { // TestSubagentTool_Execute_ContextPassing verifies context is properly used func TestSubagentTool_Execute_ContextPassing(t *testing.T) { provider := &MockLLMProvider{} - msgBus := bus.NewMessageBus() - manager := NewSubagentManager(provider, "test-model", "/tmp/test", msgBus) + manager := NewSubagentManager(provider, "test-model", "/tmp/test") tool := NewSubagentTool(manager) channel := "test-channel" @@ -304,8 +300,7 @@ func TestSubagentTool_Execute_ContextPassing(t *testing.T) { func TestSubagentTool_ForUserTruncation(t *testing.T) { // Create a mock provider that returns very long content provider := &MockLLMProvider{} - msgBus := bus.NewMessageBus() - manager := NewSubagentManager(provider, "test-model", "/tmp/test", msgBus) + manager := NewSubagentManager(provider, "test-model", "/tmp/test") tool := NewSubagentTool(manager) ctx := context.Background()