mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
0e075f7300
Refactor agent loop execution around runTurn, add explicit turn state and interrupt semantics, and automatically continue queued steering that misses the current turn boundary.
262 lines
6.6 KiB
Go
262 lines
6.6 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/sipeed/picoclaw/pkg/logger"
|
|
"github.com/sipeed/picoclaw/pkg/providers"
|
|
)
|
|
|
|
// SteeringMode controls how queued steering messages are dequeued.
|
|
type SteeringMode string
|
|
|
|
const (
|
|
// SteeringOneAtATime dequeues only the first queued message per poll.
|
|
SteeringOneAtATime SteeringMode = "one-at-a-time"
|
|
// SteeringAll drains the entire queue in a single poll.
|
|
SteeringAll SteeringMode = "all"
|
|
// MaxQueueSize number of possible messages in the Steering Queue
|
|
MaxQueueSize = 10
|
|
)
|
|
|
|
// parseSteeringMode normalizes a config string into a SteeringMode.
|
|
func parseSteeringMode(s string) SteeringMode {
|
|
switch s {
|
|
case "all":
|
|
return SteeringAll
|
|
default:
|
|
return SteeringOneAtATime
|
|
}
|
|
}
|
|
|
|
// steeringQueue is a thread-safe queue of user messages that can be injected
|
|
// into a running agent loop to interrupt it between tool calls.
|
|
type steeringQueue struct {
|
|
mu sync.Mutex
|
|
queue []providers.Message
|
|
mode SteeringMode
|
|
}
|
|
|
|
func newSteeringQueue(mode SteeringMode) *steeringQueue {
|
|
return &steeringQueue{
|
|
mode: mode,
|
|
}
|
|
}
|
|
|
|
// push enqueues a steering message.
|
|
func (sq *steeringQueue) push(msg providers.Message) error {
|
|
sq.mu.Lock()
|
|
defer sq.mu.Unlock()
|
|
if len(sq.queue) >= MaxQueueSize {
|
|
return fmt.Errorf("steering queue is full")
|
|
}
|
|
sq.queue = append(sq.queue, msg)
|
|
return nil
|
|
}
|
|
|
|
// dequeue removes and returns pending steering messages according to the
|
|
// configured mode. Returns nil when the queue is empty.
|
|
func (sq *steeringQueue) dequeue() []providers.Message {
|
|
sq.mu.Lock()
|
|
defer sq.mu.Unlock()
|
|
|
|
if len(sq.queue) == 0 {
|
|
return nil
|
|
}
|
|
|
|
switch sq.mode {
|
|
case SteeringAll:
|
|
msgs := sq.queue
|
|
sq.queue = nil
|
|
return msgs
|
|
default: // one-at-a-time
|
|
msg := sq.queue[0]
|
|
sq.queue[0] = providers.Message{} // Clear reference for GC
|
|
sq.queue = sq.queue[1:]
|
|
return []providers.Message{msg}
|
|
}
|
|
}
|
|
|
|
// len returns the number of queued messages.
|
|
func (sq *steeringQueue) len() int {
|
|
sq.mu.Lock()
|
|
defer sq.mu.Unlock()
|
|
return len(sq.queue)
|
|
}
|
|
|
|
// setMode updates the steering mode.
|
|
func (sq *steeringQueue) setMode(mode SteeringMode) {
|
|
sq.mu.Lock()
|
|
defer sq.mu.Unlock()
|
|
sq.mode = mode
|
|
}
|
|
|
|
// getMode returns the current steering mode.
|
|
func (sq *steeringQueue) getMode() SteeringMode {
|
|
sq.mu.Lock()
|
|
defer sq.mu.Unlock()
|
|
return sq.mode
|
|
}
|
|
|
|
// --- AgentLoop steering API ---
|
|
|
|
// Steer enqueues a user message to be injected into the currently running
|
|
// agent loop. The message will be picked up after the current tool finishes
|
|
// executing, causing any remaining tool calls in the batch to be skipped.
|
|
func (al *AgentLoop) Steer(msg providers.Message) error {
|
|
if al.steering == nil {
|
|
return fmt.Errorf("steering queue is not initialized")
|
|
}
|
|
if err := al.steering.push(msg); err != nil {
|
|
logger.WarnCF("agent", "Failed to enqueue steering message", map[string]any{
|
|
"error": err.Error(),
|
|
"role": msg.Role,
|
|
})
|
|
return err
|
|
}
|
|
logger.DebugCF("agent", "Steering message enqueued", map[string]any{
|
|
"role": msg.Role,
|
|
"content_len": len(msg.Content),
|
|
"queue_len": al.steering.len(),
|
|
})
|
|
|
|
meta := EventMeta{
|
|
Source: "Steer",
|
|
TracePath: "turn.interrupt.received",
|
|
}
|
|
if ts := al.getActiveTurnState(); ts != nil {
|
|
meta = ts.eventMeta("Steer", "turn.interrupt.received")
|
|
} else if registry := al.GetRegistry(); registry != nil {
|
|
if agent := registry.GetDefaultAgent(); agent != nil {
|
|
meta.AgentID = agent.ID
|
|
}
|
|
}
|
|
al.emitEvent(
|
|
EventKindInterruptReceived,
|
|
meta,
|
|
InterruptReceivedPayload{
|
|
Kind: InterruptKindSteering,
|
|
Role: msg.Role,
|
|
ContentLen: len(msg.Content),
|
|
QueueDepth: al.steering.len(),
|
|
},
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
// SteeringMode returns the current steering mode.
|
|
func (al *AgentLoop) SteeringMode() SteeringMode {
|
|
if al.steering == nil {
|
|
return SteeringOneAtATime
|
|
}
|
|
return al.steering.getMode()
|
|
}
|
|
|
|
// SetSteeringMode updates the steering mode.
|
|
func (al *AgentLoop) SetSteeringMode(mode SteeringMode) {
|
|
if al.steering == nil {
|
|
return
|
|
}
|
|
al.steering.setMode(mode)
|
|
}
|
|
|
|
// dequeueSteeringMessages is the internal method called by the agent loop
|
|
// to poll for steering messages. Returns nil when no messages are pending.
|
|
func (al *AgentLoop) dequeueSteeringMessages() []providers.Message {
|
|
if al.steering == nil {
|
|
return nil
|
|
}
|
|
return al.steering.dequeue()
|
|
}
|
|
|
|
// Continue resumes an idle agent by dequeuing any pending steering messages
|
|
// and running them through the agent loop. This is used when the agent's last
|
|
// message was from the assistant (i.e., it has stopped processing) and the
|
|
// user has since enqueued steering messages.
|
|
//
|
|
// If no steering messages are pending, it returns an empty string.
|
|
func (al *AgentLoop) Continue(ctx context.Context, sessionKey, channel, chatID string) (string, error) {
|
|
if active := al.GetActiveTurn(); active != nil {
|
|
return "", fmt.Errorf("turn %s is still active", active.TurnID)
|
|
}
|
|
|
|
steeringMsgs := al.dequeueSteeringMessages()
|
|
if len(steeringMsgs) == 0 {
|
|
return "", nil
|
|
}
|
|
|
|
agent := al.GetRegistry().GetDefaultAgent()
|
|
if agent == nil {
|
|
return "", fmt.Errorf("no default agent available")
|
|
}
|
|
|
|
if tool, ok := agent.Tools.Get("message"); ok {
|
|
if resetter, ok := tool.(interface{ ResetSentInRound() }); ok {
|
|
resetter.ResetSentInRound()
|
|
}
|
|
}
|
|
|
|
// Build a combined user message from the steering messages.
|
|
var contents []string
|
|
for _, msg := range steeringMsgs {
|
|
contents = append(contents, msg.Content)
|
|
}
|
|
combinedContent := strings.Join(contents, "\n")
|
|
|
|
return al.runAgentLoop(ctx, agent, processOptions{
|
|
SessionKey: sessionKey,
|
|
Channel: channel,
|
|
ChatID: chatID,
|
|
UserMessage: combinedContent,
|
|
DefaultResponse: defaultResponse,
|
|
EnableSummary: true,
|
|
SendResponse: false,
|
|
SkipInitialSteeringPoll: true,
|
|
})
|
|
}
|
|
|
|
func (al *AgentLoop) InterruptGraceful(hint string) error {
|
|
ts := al.getActiveTurnState()
|
|
if ts == nil {
|
|
return fmt.Errorf("no active turn")
|
|
}
|
|
if !ts.requestGracefulInterrupt(hint) {
|
|
return fmt.Errorf("turn %s cannot accept graceful interrupt", ts.turnID)
|
|
}
|
|
|
|
al.emitEvent(
|
|
EventKindInterruptReceived,
|
|
ts.eventMeta("InterruptGraceful", "turn.interrupt.received"),
|
|
InterruptReceivedPayload{
|
|
Kind: InterruptKindGraceful,
|
|
HintLen: len(hint),
|
|
},
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (al *AgentLoop) InterruptHard() error {
|
|
ts := al.getActiveTurnState()
|
|
if ts == nil {
|
|
return fmt.Errorf("no active turn")
|
|
}
|
|
if !ts.requestHardAbort() {
|
|
return fmt.Errorf("turn %s is already aborting", ts.turnID)
|
|
}
|
|
|
|
al.emitEvent(
|
|
EventKindInterruptReceived,
|
|
ts.eventMeta("InterruptHard", "turn.interrupt.received"),
|
|
InterruptReceivedPayload{
|
|
Kind: InterruptKindHard,
|
|
},
|
|
)
|
|
|
|
return nil
|
|
}
|