mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
24e2ed79c0
PublishInbound/PublishOutbound held RLock during blocking channel sends, deadlocking against Close() which needs a write lock when the buffer is full. ConsumeInbound/SubscribeOutbound used bare receives instead of comma-ok, causing zero-value processing or busy loops after close. Replace sync.RWMutex+bool with atomic.Bool+done channel so Publish methods use a lock-free 3-way select (send / done / ctx.Done). Add context.Context parameter to both Publish methods so callers can cancel or timeout blocked sends. Close() now only sets the atomic flag and closes the done channel—never closes the data channels—eliminating send-on-closed-channel panics. - Remove dead code: RegisterHandler, GetHandler, handlers map, MessageHandler type (zero callers across the whole repo) - Add ErrBusClosed sentinel error - Update all 10 caller sites to pass context - Add msgBus.Close() to gateway and agent shutdown flows - Add pkg/bus/bus_test.go with 11 test cases covering basic round-trip, context cancellation, closed-bus behavior, concurrent publish+close, full-buffer timeout, and idempotent Close
376 lines
9.2 KiB
Go
376 lines
9.2 KiB
Go
package tools
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/sipeed/picoclaw/pkg/bus"
|
|
"github.com/sipeed/picoclaw/pkg/providers"
|
|
)
|
|
|
|
type SubagentTask struct {
|
|
ID string
|
|
Task string
|
|
Label string
|
|
AgentID string
|
|
OriginChannel string
|
|
OriginChatID string
|
|
Status string
|
|
Result string
|
|
Created int64
|
|
}
|
|
|
|
type SubagentManager struct {
|
|
tasks map[string]*SubagentTask
|
|
mu sync.RWMutex
|
|
provider providers.LLMProvider
|
|
defaultModel string
|
|
bus *bus.MessageBus
|
|
workspace string
|
|
tools *ToolRegistry
|
|
maxIterations int
|
|
maxTokens int
|
|
temperature float64
|
|
hasMaxTokens bool
|
|
hasTemperature bool
|
|
nextID int
|
|
}
|
|
|
|
func NewSubagentManager(
|
|
provider providers.LLMProvider,
|
|
defaultModel, workspace string,
|
|
bus *bus.MessageBus,
|
|
) *SubagentManager {
|
|
return &SubagentManager{
|
|
tasks: make(map[string]*SubagentTask),
|
|
provider: provider,
|
|
defaultModel: defaultModel,
|
|
bus: bus,
|
|
workspace: workspace,
|
|
tools: NewToolRegistry(),
|
|
maxIterations: 10,
|
|
nextID: 1,
|
|
}
|
|
}
|
|
|
|
// SetLLMOptions sets max tokens and temperature for subagent LLM calls.
|
|
func (sm *SubagentManager) SetLLMOptions(maxTokens int, temperature float64) {
|
|
sm.mu.Lock()
|
|
defer sm.mu.Unlock()
|
|
sm.maxTokens = maxTokens
|
|
sm.hasMaxTokens = true
|
|
sm.temperature = temperature
|
|
sm.hasTemperature = true
|
|
}
|
|
|
|
// SetTools sets the tool registry for subagent execution.
|
|
// If not set, subagent will have access to the provided tools.
|
|
func (sm *SubagentManager) SetTools(tools *ToolRegistry) {
|
|
sm.mu.Lock()
|
|
defer sm.mu.Unlock()
|
|
sm.tools = tools
|
|
}
|
|
|
|
// RegisterTool registers a tool for subagent execution.
|
|
func (sm *SubagentManager) RegisterTool(tool Tool) {
|
|
sm.mu.Lock()
|
|
defer sm.mu.Unlock()
|
|
sm.tools.Register(tool)
|
|
}
|
|
|
|
func (sm *SubagentManager) Spawn(
|
|
ctx context.Context,
|
|
task, label, agentID, originChannel, originChatID string,
|
|
callback AsyncCallback,
|
|
) (string, error) {
|
|
sm.mu.Lock()
|
|
defer sm.mu.Unlock()
|
|
|
|
taskID := fmt.Sprintf("subagent-%d", sm.nextID)
|
|
sm.nextID++
|
|
|
|
subagentTask := &SubagentTask{
|
|
ID: taskID,
|
|
Task: task,
|
|
Label: label,
|
|
AgentID: agentID,
|
|
OriginChannel: originChannel,
|
|
OriginChatID: originChatID,
|
|
Status: "running",
|
|
Created: time.Now().UnixMilli(),
|
|
}
|
|
sm.tasks[taskID] = subagentTask
|
|
|
|
// Start task in background with context cancellation support
|
|
go sm.runTask(ctx, subagentTask, callback)
|
|
|
|
if label != "" {
|
|
return fmt.Sprintf("Spawned subagent '%s' for task: %s", label, task), nil
|
|
}
|
|
return fmt.Sprintf("Spawned subagent for task: %s", task), nil
|
|
}
|
|
|
|
func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask, callback AsyncCallback) {
|
|
task.Status = "running"
|
|
task.Created = time.Now().UnixMilli()
|
|
|
|
// Build system prompt for subagent
|
|
systemPrompt := `You are a subagent. Complete the given task independently and report the result.
|
|
You have access to tools - use them as needed to complete your task.
|
|
After completing the task, provide a clear summary of what was done.`
|
|
|
|
messages := []providers.Message{
|
|
{
|
|
Role: "system",
|
|
Content: systemPrompt,
|
|
},
|
|
{
|
|
Role: "user",
|
|
Content: task.Task,
|
|
},
|
|
}
|
|
|
|
// Check if context is already cancelled before starting
|
|
select {
|
|
case <-ctx.Done():
|
|
sm.mu.Lock()
|
|
task.Status = "cancelled"
|
|
task.Result = "Task cancelled before execution"
|
|
sm.mu.Unlock()
|
|
return
|
|
default:
|
|
}
|
|
|
|
// Run tool loop with access to tools
|
|
sm.mu.RLock()
|
|
tools := sm.tools
|
|
maxIter := sm.maxIterations
|
|
maxTokens := sm.maxTokens
|
|
temperature := sm.temperature
|
|
hasMaxTokens := sm.hasMaxTokens
|
|
hasTemperature := sm.hasTemperature
|
|
sm.mu.RUnlock()
|
|
|
|
var llmOptions map[string]any
|
|
if hasMaxTokens || hasTemperature {
|
|
llmOptions = map[string]any{}
|
|
if hasMaxTokens {
|
|
llmOptions["max_tokens"] = maxTokens
|
|
}
|
|
if hasTemperature {
|
|
llmOptions["temperature"] = temperature
|
|
}
|
|
}
|
|
|
|
loopResult, err := RunToolLoop(ctx, ToolLoopConfig{
|
|
Provider: sm.provider,
|
|
Model: sm.defaultModel,
|
|
Tools: tools,
|
|
MaxIterations: maxIter,
|
|
LLMOptions: llmOptions,
|
|
}, messages, task.OriginChannel, task.OriginChatID)
|
|
|
|
sm.mu.Lock()
|
|
var result *ToolResult
|
|
defer func() {
|
|
sm.mu.Unlock()
|
|
// Call callback if provided and result is set
|
|
if callback != nil && result != nil {
|
|
callback(ctx, result)
|
|
}
|
|
}()
|
|
|
|
if err != nil {
|
|
task.Status = "failed"
|
|
task.Result = fmt.Sprintf("Error: %v", err)
|
|
// Check if it was cancelled
|
|
if ctx.Err() != nil {
|
|
task.Status = "cancelled"
|
|
task.Result = "Task cancelled during execution"
|
|
}
|
|
result = &ToolResult{
|
|
ForLLM: task.Result,
|
|
ForUser: "",
|
|
Silent: false,
|
|
IsError: true,
|
|
Async: false,
|
|
Err: err,
|
|
}
|
|
} else {
|
|
task.Status = "completed"
|
|
task.Result = loopResult.Content
|
|
result = &ToolResult{
|
|
ForLLM: fmt.Sprintf(
|
|
"Subagent '%s' completed (iterations: %d): %s",
|
|
task.Label,
|
|
loopResult.Iterations,
|
|
loopResult.Content,
|
|
),
|
|
ForUser: loopResult.Content,
|
|
Silent: false,
|
|
IsError: false,
|
|
Async: false,
|
|
}
|
|
}
|
|
|
|
// Send announce message back to main agent
|
|
if sm.bus != nil {
|
|
announceContent := fmt.Sprintf("Task '%s' completed.\n\nResult:\n%s", task.Label, task.Result)
|
|
sm.bus.PublishInbound(context.TODO(), bus.InboundMessage{
|
|
Channel: "system",
|
|
SenderID: fmt.Sprintf("subagent:%s", task.ID),
|
|
// Format: "original_channel:original_chat_id" for routing back
|
|
ChatID: fmt.Sprintf("%s:%s", task.OriginChannel, task.OriginChatID),
|
|
Content: announceContent,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (sm *SubagentManager) GetTask(taskID string) (*SubagentTask, bool) {
|
|
sm.mu.RLock()
|
|
defer sm.mu.RUnlock()
|
|
task, ok := sm.tasks[taskID]
|
|
return task, ok
|
|
}
|
|
|
|
func (sm *SubagentManager) ListTasks() []*SubagentTask {
|
|
sm.mu.RLock()
|
|
defer sm.mu.RUnlock()
|
|
|
|
tasks := make([]*SubagentTask, 0, len(sm.tasks))
|
|
for _, task := range sm.tasks {
|
|
tasks = append(tasks, task)
|
|
}
|
|
return tasks
|
|
}
|
|
|
|
// SubagentTool executes a subagent task synchronously and returns the result.
|
|
// Unlike SpawnTool which runs tasks asynchronously, SubagentTool waits for completion
|
|
// and returns the result directly in the ToolResult.
|
|
type SubagentTool struct {
|
|
manager *SubagentManager
|
|
originChannel string
|
|
originChatID string
|
|
}
|
|
|
|
func NewSubagentTool(manager *SubagentManager) *SubagentTool {
|
|
return &SubagentTool{
|
|
manager: manager,
|
|
originChannel: "cli",
|
|
originChatID: "direct",
|
|
}
|
|
}
|
|
|
|
func (t *SubagentTool) Name() string {
|
|
return "subagent"
|
|
}
|
|
|
|
func (t *SubagentTool) Description() string {
|
|
return "Execute a subagent task synchronously and return the result. Use this for delegating specific tasks to an independent agent instance. Returns execution summary to user and full details to LLM."
|
|
}
|
|
|
|
func (t *SubagentTool) Parameters() map[string]any {
|
|
return map[string]any{
|
|
"type": "object",
|
|
"properties": map[string]any{
|
|
"task": map[string]any{
|
|
"type": "string",
|
|
"description": "The task for subagent to complete",
|
|
},
|
|
"label": map[string]any{
|
|
"type": "string",
|
|
"description": "Optional short label for the task (for display)",
|
|
},
|
|
},
|
|
"required": []string{"task"},
|
|
}
|
|
}
|
|
|
|
func (t *SubagentTool) SetContext(channel, chatID string) {
|
|
t.originChannel = channel
|
|
t.originChatID = chatID
|
|
}
|
|
|
|
func (t *SubagentTool) Execute(ctx context.Context, args map[string]any) *ToolResult {
|
|
task, ok := args["task"].(string)
|
|
if !ok {
|
|
return ErrorResult("task is required").WithError(fmt.Errorf("task parameter is required"))
|
|
}
|
|
|
|
label, _ := args["label"].(string)
|
|
|
|
if t.manager == nil {
|
|
return ErrorResult("Subagent manager not configured").WithError(fmt.Errorf("manager is nil"))
|
|
}
|
|
|
|
// Build messages for subagent
|
|
messages := []providers.Message{
|
|
{
|
|
Role: "system",
|
|
Content: "You are a subagent. Complete the given task independently and provide a clear, concise result.",
|
|
},
|
|
{
|
|
Role: "user",
|
|
Content: task,
|
|
},
|
|
}
|
|
|
|
// Use RunToolLoop to execute with tools (same as async SpawnTool)
|
|
sm := t.manager
|
|
sm.mu.RLock()
|
|
tools := sm.tools
|
|
maxIter := sm.maxIterations
|
|
maxTokens := sm.maxTokens
|
|
temperature := sm.temperature
|
|
hasMaxTokens := sm.hasMaxTokens
|
|
hasTemperature := sm.hasTemperature
|
|
sm.mu.RUnlock()
|
|
|
|
var llmOptions map[string]any
|
|
if hasMaxTokens || hasTemperature {
|
|
llmOptions = map[string]any{}
|
|
if hasMaxTokens {
|
|
llmOptions["max_tokens"] = maxTokens
|
|
}
|
|
if hasTemperature {
|
|
llmOptions["temperature"] = temperature
|
|
}
|
|
}
|
|
|
|
loopResult, err := RunToolLoop(ctx, ToolLoopConfig{
|
|
Provider: sm.provider,
|
|
Model: sm.defaultModel,
|
|
Tools: tools,
|
|
MaxIterations: maxIter,
|
|
LLMOptions: llmOptions,
|
|
}, messages, t.originChannel, t.originChatID)
|
|
if err != nil {
|
|
return ErrorResult(fmt.Sprintf("Subagent execution failed: %v", err)).WithError(err)
|
|
}
|
|
|
|
// ForUser: Brief summary for user (truncated if too long)
|
|
userContent := loopResult.Content
|
|
maxUserLen := 500
|
|
if len(userContent) > maxUserLen {
|
|
userContent = userContent[:maxUserLen] + "..."
|
|
}
|
|
|
|
// ForLLM: Full execution details
|
|
labelStr := label
|
|
if labelStr == "" {
|
|
labelStr = "(unnamed)"
|
|
}
|
|
llmContent := fmt.Sprintf("Subagent task completed:\nLabel: %s\nIterations: %d\nResult: %s",
|
|
labelStr, loopResult.Iterations, loopResult.Content)
|
|
|
|
return &ToolResult{
|
|
ForLLM: llmContent,
|
|
ForUser: userContent,
|
|
Silent: false,
|
|
IsError: false,
|
|
Async: false,
|
|
}
|
|
}
|