mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
refactor: make agent loop support parallel and update docs
This commit is contained in:
@@ -825,7 +825,8 @@ This keeps the runtime lightweight while making new OpenAI-compatible backends m
|
||||
"model": "glm-4.7",
|
||||
"max_tokens": 8192,
|
||||
"temperature": 0.7,
|
||||
"max_tool_iterations": 20
|
||||
"max_tool_iterations": 20,
|
||||
"max_parallel_turns": 1
|
||||
}
|
||||
},
|
||||
"providers": {
|
||||
@@ -838,6 +839,8 @@ This keeps the runtime lightweight while making new OpenAI-compatible backends m
|
||||
```
|
||||
|
||||
> **Note**: The `providers` format is deprecated. Use the new `model_list` format with `.security.yml` for better security.
|
||||
>
|
||||
> **`max_parallel_turns`**: Controls concurrent processing of messages from different sessions. `1` (default) = sequential; `>1` = parallel. Messages from the same session are always serialized. See [Steering docs](../steering.md) for details.
|
||||
|
||||
</details>
|
||||
|
||||
|
||||
@@ -26,7 +26,8 @@ graph TD
|
||||
|
||||
subgraph AgentLoop
|
||||
BUS[MessageBus]
|
||||
DRAIN[drainBusToSteering goroutine]
|
||||
ROUTE{Session Routing}
|
||||
WP[Worker Pool]
|
||||
SQ[steeringQueue]
|
||||
RLI[runLLMIteration]
|
||||
TE[Tool Execution Loop]
|
||||
@@ -37,8 +38,11 @@ graph TD
|
||||
DC -->|PublishInbound| BUS
|
||||
SL -->|PublishInbound| BUS
|
||||
|
||||
BUS -->|ConsumeInbound while busy| DRAIN
|
||||
DRAIN -->|Steer| SQ
|
||||
BUS -->|ConsumeInbound| ROUTE
|
||||
ROUTE -->|no active turn| WP
|
||||
ROUTE -->|active turn exists| SQ
|
||||
WP -->|Steer| SQ
|
||||
WP -->|process| RLI
|
||||
|
||||
RLI -->|1. initial poll| SQ
|
||||
TE -->|2. poll after each tool| SQ
|
||||
@@ -47,32 +51,34 @@ graph TD
|
||||
RLI -->|inject into context| LLM
|
||||
```
|
||||
|
||||
### Bus drain mechanism
|
||||
### Message routing and worker pool
|
||||
|
||||
Channels (Telegram, Discord, etc.) publish messages to the `MessageBus` via `PublishInbound`. Without additional wiring, these messages would sit in the bus buffer until the current `processMessage` finishes — meaning steering would never work for real users.
|
||||
Channels (Telegram, Discord, etc.) publish messages to the `MessageBus` via `PublishInbound`. The `Run()` loop consumes messages from the bus and routes each one based on its **session key**:
|
||||
|
||||
The solution: when `Run()` starts processing a message, it spawns a **drain goroutine** (`drainBusToSteering`) that keeps consuming from the bus and calling `Steer()`. When `processMessage` returns, the drain is canceled and normal consumption resumes.
|
||||
- **No active turn for the session**: The session key is atomically reserved via `LoadOrStore(sessionKey, struct{}{})`, and a **worker goroutine** is spawned to process the full turn lifecycle.
|
||||
- **Active turn exists for the session**: The message is enqueued directly into the steering queue via `enqueueSteeringMessage`. It will be picked up by the existing worker's steering drain loop.
|
||||
- **Non-routable (system)**: Processed synchronously in the main loop.
|
||||
|
||||
This enables **parallel processing of messages from different sessions** (up to `max_parallel_turns`) while keeping same-session messages strictly sequential.
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant Bus
|
||||
participant Run
|
||||
participant Drain
|
||||
participant AgentLoop
|
||||
participant Worker
|
||||
participant SQ
|
||||
|
||||
Run->>Bus: ConsumeInbound() → msg
|
||||
Run->>Drain: spawn drainBusToSteering(ctx)
|
||||
Run->>Run: processMessage(msg)
|
||||
Run->>Run: resolveSteeringTarget(msg) → sessionKey
|
||||
|
||||
Note over Drain: running concurrently
|
||||
|
||||
Bus-->>Drain: ConsumeInbound() → newMsg
|
||||
Drain->>AgentLoop: al.transcribeAudioInMessage(ctx, newMsg)
|
||||
Drain->>AgentLoop: Steer(providers.Message{Content: newMsg.Content})
|
||||
|
||||
Run->>Run: processMessage returns
|
||||
Run->>Drain: cancel context
|
||||
Note over Drain: exits
|
||||
alt no active turn
|
||||
Run->>Run: LoadOrStore(sessionKey, sentinel)
|
||||
Run->>Worker: spawn worker goroutine
|
||||
Worker->>Worker: processMessage(msg)
|
||||
Worker->>SQ: drain steering after turn
|
||||
else active turn exists
|
||||
Run->>SQ: enqueueSteeringMessage(msg)
|
||||
end
|
||||
```
|
||||
|
||||
## Data Structures
|
||||
@@ -121,7 +127,7 @@ A new field was added to `processOptions`:
|
||||
| `Steer` | `Steer(msg providers.Message) error` | Enqueues a steering message. Returns an error if the queue is full or not initialized. Thread-safe, can be called from any goroutine. |
|
||||
| `SteeringMode` | `SteeringMode() SteeringMode` | Returns the current dequeue mode. |
|
||||
| `SetSteeringMode` | `SetSteeringMode(mode SteeringMode)` | Changes the dequeue mode at runtime. |
|
||||
| `Continue` | `Continue(ctx, sessionKey, channel, chatID) (string, error)` | Resumes an idle agent using pending steering messages. Returns `""` if queue is empty. |
|
||||
| `Continue` | `Continue(ctx, sessionKey, channel, chatID) (string, error)` | Resumes an idle agent using pending steering messages for the given session. Returns `""` if queue is empty. Uses session-aware active turn checking (won't block on unrelated sessions). |
|
||||
|
||||
## Integration into the Agent Loop
|
||||
|
||||
@@ -280,15 +286,17 @@ flowchart TD
|
||||
{
|
||||
"agents": {
|
||||
"defaults": {
|
||||
"steering_mode": "one-at-a-time"
|
||||
"steering_mode": "one-at-a-time",
|
||||
"max_parallel_turns": 1
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
| Field | Type | Default | Env var |
|
||||
|-------|------|---------|---------|
|
||||
| `steering_mode` | `string` | `"one-at-a-time"` | `PICOCLAW_AGENTS_DEFAULTS_STEERING_MODE` |
|
||||
| Field | Type | Default | Env var | Description |
|
||||
|-------|------|---------|---------|-------------|
|
||||
| `steering_mode` | `string` | `"one-at-a-time"` | `PICOCLAW_AGENTS_DEFAULTS_STEERING_MODE` | How the steering queue is drained per poll |
|
||||
| `max_parallel_turns` | `int` | `1` | `PICOCLAW_AGENTS_DEFAULTS_MAX_PARALLEL_TURNS` | Max concurrent turns. `0` or `1` = sequential; `>1` = parallel across sessions |
|
||||
|
||||
|
||||
## Design decisions and trade-offs
|
||||
@@ -300,7 +308,8 @@ flowchart TD
|
||||
| `one-at-a-time` as default | Gives the model a chance to react to each steering message individually. More predictable behavior than dumping all messages at once. |
|
||||
| Skipped tools get explicit error results | The LLM protocol requires a tool result for every tool call in the assistant message. Omitting them would cause API errors. The skip message also informs the model about what was not done. |
|
||||
| `Continue()` uses `SkipInitialSteeringPoll` | Prevents race conditions and double-dequeuing when resuming an idle agent. |
|
||||
| Queue stored on `AgentLoop`, not `AgentInstance` | Steering is a loop-level concern (it affects the iteration flow), not a per-agent concern. All agents share the same steering queue since `processMessage` is sequential. |
|
||||
| Bus drain goroutine in `Run()` | Channels (Telegram, Discord, etc.) publish to the bus via `PublishInbound`. Without the drain, messages would queue in the bus channel buffer and only be consumed after `processMessage` returns — defeating the purpose of steering. The drain goroutine bridges the gap by consuming new bus messages and calling `Steer()` while the agent is busy. |
|
||||
| Audio transcription before steering | The drain goroutine calls `al.transcribeAudioInMessage(ctx, msg)` before steering, so voice messages are converted to text before the agent sees them. If transcription fails, the error is silently discarded and the original message is steered as-is. |
|
||||
| Queue stored on `AgentLoop`, not `AgentInstance` | Steering is a loop-level concern (it affects the iteration flow), not a per-agent concern. All agents share the steering queue since `processMessage` is sequential. |
|
||||
| Worker pool dispatch in `Run()` | Messages are dispatched to a worker pool instead of a single sequential loop. The session key is atomically reserved via `LoadOrStore` before the worker starts, preventing TOCTOU races. Messages from the same session are serialized; different sessions are processed in parallel (up to `max_parallel_turns`). |
|
||||
| No bus drain goroutine | The old `drainBusToSteering` goroutine has been removed. The main `Run()` loop now checks `activeTurnStates` for each inbound message: if a turn is active for the session, the message is enqueued directly to the steering queue; otherwise a new worker is spawned. This eliminates the complexity of drain cancellation and requeuing. |
|
||||
| Audio transcription in worker | Audio is transcribed within the worker that processes the turn, not in a separate drain goroutine. |
|
||||
| `MaxQueueSize = 10` | Prevents unbounded memory growth if a user sends many messages while the agent is busy. Excess messages are dropped with a warning. |
|
||||
|
||||
+12
-6
@@ -170,13 +170,19 @@ This is saved to the session via `AddFullMessage` and sent to the model, so it i
|
||||
|
||||
## Automatic bus drain
|
||||
|
||||
When the agent loop (`Run()`) starts processing a message, it spawns a background goroutine that keeps consuming new inbound messages from the bus. These messages are automatically redirected into the steering queue via `Steer()`. This means:
|
||||
When the agent loop (`Run()`) starts, it reads inbound messages from a shared message bus. The routing logic determines how each message is handled:
|
||||
|
||||
- Users on any channel (Telegram, Discord, etc.) don't need to do anything special — their messages are automatically captured as steering when the agent is busy
|
||||
- Audio messages are transcribed before being steered, so the agent receives text. If transcription fails, the original (non-transcribed) message is steered as-is
|
||||
- Only messages that resolve to the **same steering scope** as the active turn are redirected. Messages for other chats/sessions are requeued onto the inbound bus so they can be processed normally
|
||||
- `system` inbound messages are not treated as steering input
|
||||
- When `processMessage` finishes, the drain goroutine is canceled and normal message consumption resumes
|
||||
1. **No active turn for the message's session** — the message is dispatched to a **worker goroutine** that processes the full turn (LLM calls, tool execution, steering drain)
|
||||
2. **An active turn already exists for the same session** — the message is enqueued directly into that session's **steering queue** via `enqueueSteeringMessage`. No background drain goroutine is needed
|
||||
3. **Non-routable message** (e.g. `system`) — processed synchronously in the main loop
|
||||
|
||||
This design enables **parallel processing of messages from different sessions** while keeping same-session messages strictly sequential. Key implications:
|
||||
|
||||
- Messages from different users/channels are processed **concurrently** (up to `max_parallel_turns`)
|
||||
- Messages from the same session are **serialized** — subsequent messages go to the steering queue
|
||||
- Users don't need to do anything special — their messages are automatically captured as steering when the agent is busy for their session
|
||||
- Audio messages are transcribed within the worker that processes the turn, so the agent receives text
|
||||
- `system` inbound messages are processed immediately and do not trigger steering
|
||||
|
||||
## Steering with media
|
||||
|
||||
|
||||
+11
-7
@@ -112,13 +112,17 @@ When the parent task is forcefully aborted (e.g., user interrupts with `/stop`):
|
||||
|
||||
## Agent Loop Integration
|
||||
|
||||
### Bus Draining During Processing
|
||||
### Message Routing and Steering
|
||||
|
||||
When a message enters the `Run()` loop, the agent starts a `drainBusToSteering` goroutine before calling `processMessage`. This goroutine runs concurrently with the entire processing lifecycle and continuously consumes any new inbound messages from the bus, redirecting them into the **steering queue** instead of dropping them.
|
||||
When a message enters the `Run()` loop, the agent determines whether to start a new worker or enqueue to steering:
|
||||
|
||||
This ensures that if a user sends a follow-up message while the agent is processing (including during SubTurn execution), the message is not lost — it will be picked up between tool call iterations via `dequeueSteeringMessages`.
|
||||
- If **no active turn** exists for the message's session key, the session is atomically reserved and a **worker goroutine** is spawned. The worker processes the full turn lifecycle: `processMessage` → tool execution → steering drain → `Continue` for queued messages.
|
||||
- If an **active turn already exists** for the same session, the message is enqueued directly into that session's steering queue. It will be picked up by the existing worker's steering drain loop.
|
||||
|
||||
The drain goroutine stops automatically when `processMessage` returns (via a cancellable context).
|
||||
This ensures that:
|
||||
- Messages from **different sessions** are processed **in parallel** (up to `max_parallel_turns` concurrent workers)
|
||||
- Messages from the **same session** are strictly **serialized** — they go to the steering queue and are processed sequentially within the active turn
|
||||
- No background drain goroutine is needed; steering is handled by the worker itself after processing
|
||||
|
||||
### Pending Result Polling
|
||||
|
||||
@@ -129,7 +133,7 @@ The agent loop polls for async SubTurn results at two points per iteration:
|
||||
|
||||
### Turn State Tracking
|
||||
|
||||
All active root turns are registered in `AgentLoop.activeTurnStates` (`sync.Map`, keyed by session key). This allows `HardAbort` and `/subagents` observability commands to find and operate on active turns.
|
||||
All active turns are registered in `AgentLoop.activeTurnStates` (`sync.Map`, keyed by session key). A reservation sentinel is stored atomically via `LoadOrStore` before the worker starts, then replaced with the real `*turnState` when `runTurn` registers. This prevents a TOCTOU race where multiple messages for the same session could spawn concurrent workers. The sentinel is cleaned up by the worker's deferred cleanup. This allows `HardAbort` and `/subagents` observability commands to find and operate on active turns.
|
||||
|
||||
## Event Bus Integration
|
||||
|
||||
@@ -181,10 +185,10 @@ Creates a new spawner instance for the given AgentLoop. Pass the returned value
|
||||
### Continue
|
||||
|
||||
```go
|
||||
func (al *AgentLoop) Continue(ctx context.Context, sessionKey string) error
|
||||
func (al *AgentLoop) Continue(ctx context.Context, sessionKey, channel, chatID string) (string, error)
|
||||
```
|
||||
|
||||
Resumes an idle agent turn by injecting any queued steering messages as a new LLM iteration. Used when the agent is waiting and a deferred steering message needs to be processed without a new inbound message arriving.
|
||||
Resumes an idle agent turn by dequeuing steering messages for the given session and running them through the agent loop. Returns the response string if processing occurred, or empty string if no steering messages were pending. Uses session-aware active turn checking — it only blocks if a turn is active for the *same* session, not for unrelated sessions.
|
||||
|
||||
## Context Propagation
|
||||
|
||||
|
||||
Reference in New Issue
Block a user