fix: background task results silently dropped

Signed-off-by: Boris Bliznioukov <blib@mail.com>
This commit is contained in:
Boris Bliznioukov
2026-03-05 13:07:17 +01:00
parent 6f5930624b
commit 968fff07b9
4 changed files with 53 additions and 45 deletions
+42 -11
View File
@@ -192,7 +192,7 @@ func registerSharedTools(
// Spawn tool with allowlist checker // Spawn tool with allowlist checker
if cfg.Tools.IsToolEnabled("spawn") { if cfg.Tools.IsToolEnabled("spawn") {
if cfg.Tools.IsToolEnabled("subagent") { if cfg.Tools.IsToolEnabled("subagent") {
subagentManager := tools.NewSubagentManager(provider, agent.Model, agent.Workspace, msgBus) subagentManager := tools.NewSubagentManager(provider, agent.Model, agent.Workspace)
subagentManager.SetLLMOptions(agent.MaxTokens, agent.Temperature) subagentManager.SetLLMOptions(agent.MaxTokens, agent.Temperature)
spawnTool := tools.NewSpawnTool(subagentManager) spawnTool := tools.NewSpawnTool(subagentManager)
currentAgentID := agentID currentAgentID := agentID
@@ -671,10 +671,9 @@ func (al *AgentLoop) runAgentLoop(
agent *AgentInstance, agent *AgentInstance,
opts processOptions, opts processOptions,
) (string, error) { ) (string, error) {
// 0. Record last channel for heartbeat notifications (skip internal channels) // 0. Record last channel for heartbeat notifications (skip internal channels and cli)
if opts.Channel != "" && opts.ChatID != "" { if opts.Channel != "" && opts.ChatID != "" {
// Don't record internal channels (cli, system, subagent) if !constants.IsInternalChannel(opts.Channel) && opts.Channel != "cli" {
if !constants.IsInternalChannel(opts.Channel) {
channelKey := fmt.Sprintf("%s:%s", opts.Channel, opts.ChatID) channelKey := fmt.Sprintf("%s:%s", opts.Channel, opts.ChatID)
if err := al.RecordLastChannel(channelKey); err != nil { if err := al.RecordLastChannel(channelKey); err != nil {
logger.WarnCF( logger.WarnCF(
@@ -1083,15 +1082,47 @@ func (al *AgentLoop) runLLMIteration(
"iteration": iteration, "iteration": iteration,
}) })
// Create async callback for tools that implement AsyncExecutor // Create async callback for tools that implement AsyncExecutor.
asyncCallback := func(callbackCtx context.Context, result *tools.ToolResult) { // When the background work completes, this publishes the result
// as an inbound system message so processSystemMessage routes it
// back to the user via the normal agent loop.
asyncCallback := func(_ context.Context, result *tools.ToolResult) {
// Send ForUser content directly to the user (immediate feedback),
// mirroring the synchronous tool execution path.
if !result.Silent && result.ForUser != "" { if !result.Silent && result.ForUser != "" {
logger.InfoCF("agent", "Async tool completed, agent will handle notification", outCtx, outCancel := context.WithTimeout(context.Background(), 5*time.Second)
map[string]any{ defer outCancel()
"tool": tc.Name, _ = al.bus.PublishOutbound(outCtx, bus.OutboundMessage{
"content_len": len(result.ForUser), Channel: opts.Channel,
}) ChatID: opts.ChatID,
Content: result.ForUser,
})
} }
// Determine content for the agent loop (ForLLM or error).
content := result.ForLLM
if content == "" && result.Err != nil {
content = result.Err.Error()
}
if content == "" {
return
}
logger.InfoCF("agent", "Async tool completed, publishing result",
map[string]any{
"tool": tc.Name,
"content_len": len(content),
"channel": opts.Channel,
})
pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer pubCancel()
_ = al.bus.PublishInbound(pubCtx, bus.InboundMessage{
Channel: "system",
SenderID: fmt.Sprintf("async:%s", tc.Name),
ChatID: fmt.Sprintf("%s:%s", opts.Channel, opts.ChatID),
Content: content,
})
} }
toolResult := agent.Tools.ExecuteWithContext( toolResult := agent.Tools.ExecuteWithContext(
+2 -2
View File
@@ -8,7 +8,7 @@ import (
func TestSpawnTool_Execute_EmptyTask(t *testing.T) { func TestSpawnTool_Execute_EmptyTask(t *testing.T) {
provider := &MockLLMProvider{} provider := &MockLLMProvider{}
manager := NewSubagentManager(provider, "test-model", "/tmp/test", nil) manager := NewSubagentManager(provider, "test-model", "/tmp/test")
tool := NewSpawnTool(manager) tool := NewSpawnTool(manager)
ctx := context.Background() ctx := context.Background()
@@ -42,7 +42,7 @@ func TestSpawnTool_Execute_EmptyTask(t *testing.T) {
func TestSpawnTool_Execute_ValidTask(t *testing.T) { func TestSpawnTool_Execute_ValidTask(t *testing.T) {
provider := &MockLLMProvider{} provider := &MockLLMProvider{}
manager := NewSubagentManager(provider, "test-model", "/tmp/test", nil) manager := NewSubagentManager(provider, "test-model", "/tmp/test")
tool := NewSpawnTool(manager) tool := NewSpawnTool(manager)
ctx := context.Background() ctx := context.Background()
-18
View File
@@ -6,7 +6,6 @@ import (
"sync" "sync"
"time" "time"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/providers"
) )
@@ -27,7 +26,6 @@ type SubagentManager struct {
mu sync.RWMutex mu sync.RWMutex
provider providers.LLMProvider provider providers.LLMProvider
defaultModel string defaultModel string
bus *bus.MessageBus
workspace string workspace string
tools *ToolRegistry tools *ToolRegistry
maxIterations int maxIterations int
@@ -41,13 +39,11 @@ type SubagentManager struct {
func NewSubagentManager( func NewSubagentManager(
provider providers.LLMProvider, provider providers.LLMProvider,
defaultModel, workspace string, defaultModel, workspace string,
bus *bus.MessageBus,
) *SubagentManager { ) *SubagentManager {
return &SubagentManager{ return &SubagentManager{
tasks: make(map[string]*SubagentTask), tasks: make(map[string]*SubagentTask),
provider: provider, provider: provider,
defaultModel: defaultModel, defaultModel: defaultModel,
bus: bus,
workspace: workspace, workspace: workspace,
tools: NewToolRegistry(), tools: NewToolRegistry(),
maxIterations: 10, maxIterations: 10,
@@ -214,20 +210,6 @@ After completing the task, provide a clear summary of what was done.`
Async: false, Async: false,
} }
} }
// Send announce message back to main agent
if sm.bus != nil {
announceContent := fmt.Sprintf("Task '%s' completed.\n\nResult:\n%s", task.Label, task.Result)
pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer pubCancel()
sm.bus.PublishInbound(pubCtx, bus.InboundMessage{
Channel: "system",
SenderID: fmt.Sprintf("subagent:%s", task.ID),
// Format: "original_channel:original_chat_id" for routing back
ChatID: fmt.Sprintf("%s:%s", task.OriginChannel, task.OriginChatID),
Content: announceContent,
})
}
} }
func (sm *SubagentManager) GetTask(taskID string) (*SubagentTask, bool) { func (sm *SubagentManager) GetTask(taskID string) (*SubagentTask, bool) {
+9 -14
View File
@@ -5,7 +5,6 @@ import (
"strings" "strings"
"testing" "testing"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/providers"
) )
@@ -47,7 +46,7 @@ func (m *MockLLMProvider) GetContextWindow() int {
func TestSubagentManager_SetLLMOptions_AppliesToRunToolLoop(t *testing.T) { func TestSubagentManager_SetLLMOptions_AppliesToRunToolLoop(t *testing.T) {
provider := &MockLLMProvider{} provider := &MockLLMProvider{}
manager := NewSubagentManager(provider, "test-model", "/tmp/test", nil) manager := NewSubagentManager(provider, "test-model", "/tmp/test")
manager.SetLLMOptions(2048, 0.6) manager.SetLLMOptions(2048, 0.6)
tool := NewSubagentTool(manager) tool := NewSubagentTool(manager)
@@ -73,7 +72,7 @@ func TestSubagentManager_SetLLMOptions_AppliesToRunToolLoop(t *testing.T) {
// TestSubagentTool_Name verifies tool name // TestSubagentTool_Name verifies tool name
func TestSubagentTool_Name(t *testing.T) { func TestSubagentTool_Name(t *testing.T) {
provider := &MockLLMProvider{} provider := &MockLLMProvider{}
manager := NewSubagentManager(provider, "test-model", "/tmp/test", nil) manager := NewSubagentManager(provider, "test-model", "/tmp/test")
tool := NewSubagentTool(manager) tool := NewSubagentTool(manager)
if tool.Name() != "subagent" { if tool.Name() != "subagent" {
@@ -84,7 +83,7 @@ func TestSubagentTool_Name(t *testing.T) {
// TestSubagentTool_Description verifies tool description // TestSubagentTool_Description verifies tool description
func TestSubagentTool_Description(t *testing.T) { func TestSubagentTool_Description(t *testing.T) {
provider := &MockLLMProvider{} provider := &MockLLMProvider{}
manager := NewSubagentManager(provider, "test-model", "/tmp/test", nil) manager := NewSubagentManager(provider, "test-model", "/tmp/test")
tool := NewSubagentTool(manager) tool := NewSubagentTool(manager)
desc := tool.Description() desc := tool.Description()
@@ -99,7 +98,7 @@ func TestSubagentTool_Description(t *testing.T) {
// TestSubagentTool_Parameters verifies tool parameters schema // TestSubagentTool_Parameters verifies tool parameters schema
func TestSubagentTool_Parameters(t *testing.T) { func TestSubagentTool_Parameters(t *testing.T) {
provider := &MockLLMProvider{} provider := &MockLLMProvider{}
manager := NewSubagentManager(provider, "test-model", "/tmp/test", nil) manager := NewSubagentManager(provider, "test-model", "/tmp/test")
tool := NewSubagentTool(manager) tool := NewSubagentTool(manager)
params := tool.Parameters() params := tool.Parameters()
@@ -149,8 +148,7 @@ func TestSubagentTool_Parameters(t *testing.T) {
// TestSubagentTool_Execute_Success tests successful execution // TestSubagentTool_Execute_Success tests successful execution
func TestSubagentTool_Execute_Success(t *testing.T) { func TestSubagentTool_Execute_Success(t *testing.T) {
provider := &MockLLMProvider{} provider := &MockLLMProvider{}
msgBus := bus.NewMessageBus() manager := NewSubagentManager(provider, "test-model", "/tmp/test")
manager := NewSubagentManager(provider, "test-model", "/tmp/test", msgBus)
tool := NewSubagentTool(manager) tool := NewSubagentTool(manager)
ctx := WithToolContext(context.Background(), "telegram", "chat-123") ctx := WithToolContext(context.Background(), "telegram", "chat-123")
@@ -204,8 +202,7 @@ func TestSubagentTool_Execute_Success(t *testing.T) {
// TestSubagentTool_Execute_NoLabel tests execution without label // TestSubagentTool_Execute_NoLabel tests execution without label
func TestSubagentTool_Execute_NoLabel(t *testing.T) { func TestSubagentTool_Execute_NoLabel(t *testing.T) {
provider := &MockLLMProvider{} provider := &MockLLMProvider{}
msgBus := bus.NewMessageBus() manager := NewSubagentManager(provider, "test-model", "/tmp/test")
manager := NewSubagentManager(provider, "test-model", "/tmp/test", msgBus)
tool := NewSubagentTool(manager) tool := NewSubagentTool(manager)
ctx := context.Background() ctx := context.Background()
@@ -228,7 +225,7 @@ func TestSubagentTool_Execute_NoLabel(t *testing.T) {
// TestSubagentTool_Execute_MissingTask tests error handling for missing task // TestSubagentTool_Execute_MissingTask tests error handling for missing task
func TestSubagentTool_Execute_MissingTask(t *testing.T) { func TestSubagentTool_Execute_MissingTask(t *testing.T) {
provider := &MockLLMProvider{} provider := &MockLLMProvider{}
manager := NewSubagentManager(provider, "test-model", "/tmp/test", nil) manager := NewSubagentManager(provider, "test-model", "/tmp/test")
tool := NewSubagentTool(manager) tool := NewSubagentTool(manager)
ctx := context.Background() ctx := context.Background()
@@ -278,8 +275,7 @@ func TestSubagentTool_Execute_NilManager(t *testing.T) {
// TestSubagentTool_Execute_ContextPassing verifies context is properly used // TestSubagentTool_Execute_ContextPassing verifies context is properly used
func TestSubagentTool_Execute_ContextPassing(t *testing.T) { func TestSubagentTool_Execute_ContextPassing(t *testing.T) {
provider := &MockLLMProvider{} provider := &MockLLMProvider{}
msgBus := bus.NewMessageBus() manager := NewSubagentManager(provider, "test-model", "/tmp/test")
manager := NewSubagentManager(provider, "test-model", "/tmp/test", msgBus)
tool := NewSubagentTool(manager) tool := NewSubagentTool(manager)
channel := "test-channel" channel := "test-channel"
@@ -304,8 +300,7 @@ func TestSubagentTool_Execute_ContextPassing(t *testing.T) {
func TestSubagentTool_ForUserTruncation(t *testing.T) { func TestSubagentTool_ForUserTruncation(t *testing.T) {
// Create a mock provider that returns very long content // Create a mock provider that returns very long content
provider := &MockLLMProvider{} provider := &MockLLMProvider{}
msgBus := bus.NewMessageBus() manager := NewSubagentManager(provider, "test-model", "/tmp/test")
manager := NewSubagentManager(provider, "test-model", "/tmp/test", msgBus)
tool := NewSubagentTool(manager) tool := NewSubagentTool(manager)
ctx := context.Background() ctx := context.Background()