diff --git a/config/config.example.json b/config/config.example.json index 4205b8e8a..4267b7b20 100644 --- a/config/config.example.json +++ b/config/config.example.json @@ -11,6 +11,8 @@ "summarize_message_threshold": 20, "summarize_token_percent": 75, "split_on_marker": false, + "max_llm_retries": 2, + "llm_retry_backoff_secs": 2, "tool_feedback": { "enabled": false, "max_args_length": 300, diff --git a/pkg/agent/pipeline_llm.go b/pkg/agent/pipeline_llm.go index 6bf55fa39..d0b7ad8b4 100644 --- a/pkg/agent/pipeline_llm.go +++ b/pkg/agent/pipeline_llm.go @@ -184,7 +184,14 @@ func (p *Pipeline) CallLLM( // Retry loop var err error - maxRetries := 2 + maxRetries := p.Cfg.Agents.Defaults.MaxLLMRetries + if maxRetries <= 0 { + maxRetries = 2 + } + backoffSecs := p.Cfg.Agents.Defaults.LLMRetryBackoffSecs + if backoffSecs <= 0 { + backoffSecs = 2 + } for retry := 0; retry <= maxRetries; retry++ { exec.response, err = callLLM(exec.callMessages, exec.providerToolDefs) if err == nil { @@ -232,6 +239,15 @@ func (p *Pipeline) CallLLM( strings.Contains(errMsg, "timed out") || strings.Contains(errMsg, "timeout exceeded") + isNetworkError := !isTimeoutError && (strings.Contains(errMsg, "connection reset") || + strings.Contains(errMsg, "connection refused") || + strings.Contains(errMsg, "broken pipe") || + strings.Contains(errMsg, "no such host") || + strings.Contains(errMsg, "network is unreachable") || + strings.Contains(errMsg, "read tcp") || + strings.Contains(errMsg, "write tcp") || + strings.Contains(errMsg, "eof")) + isContextError := !isTimeoutError && (strings.Contains(errMsg, "context_length_exceeded") || strings.Contains(errMsg, "context window") || strings.Contains(errMsg, "context_window") || @@ -244,7 +260,7 @@ func (p *Pipeline) CallLLM( strings.Contains(errMsg, "request too large")) if isTimeoutError && retry < maxRetries { - backoff := time.Duration(retry+1) * 5 * time.Second + backoff := time.Duration(retry+1) * time.Duration(backoffSecs) * time.Second al.emitEvent( EventKindLLMRetry, ts.eventMeta("runTurn", "turn.llm.retry"), @@ -272,6 +288,35 @@ func (p *Pipeline) CallLLM( continue } + if isNetworkError && retry < maxRetries { + backoff := time.Duration(retry+1) * time.Duration(backoffSecs) * time.Second + al.emitEvent( + EventKindLLMRetry, + ts.eventMeta("runTurn", "turn.llm.retry"), + LLMRetryPayload{ + Attempt: retry + 1, + MaxRetries: maxRetries, + Reason: "network", + Error: err.Error(), + Backoff: backoff, + }, + ) + logger.WarnCF("agent", "Network error, retrying after backoff", map[string]any{ + "error": err.Error(), + "retry": retry, + "backoff": backoff.String(), + }) + if sleepErr := sleepWithContext(turnCtx, backoff); sleepErr != nil { + if ts.hardAbortRequested() { + _ = ts.requestHardAbort() + return ControlBreak, nil + } + err = sleepErr + break + } + continue + } + if isContextError && retry < maxRetries && !ts.opts.NoHistory { al.emitEvent( EventKindLLMRetry, diff --git a/pkg/agent/turn_coord_test.go b/pkg/agent/turn_coord_test.go index c059d0a39..898ae3931 100644 --- a/pkg/agent/turn_coord_test.go +++ b/pkg/agent/turn_coord_test.go @@ -135,6 +135,16 @@ func (p *errorProvider) Chat( return nil, errors.New("context_length_exceeded") case "vision": return nil, errors.New("vision_unsupported") + case "connection_reset": + return nil, errors.New("connection reset by peer") + case "broken_pipe": + return nil, errors.New("broken pipe") + case "read_tcp": + return nil, errors.New("read tcp 127.0.0.1:8080: connection reset") + case "eof": + return nil, errors.New("EOF") + case "connection_refused": + return nil, errors.New("connection refused") default: return nil, errors.New("unknown error") } @@ -366,6 +376,163 @@ func TestPipeline_CallLLM_ContextLengthError(t *testing.T) { t.Logf("CallLLM result after context error: err=%v", err) } +func TestPipeline_CallLLM_NetworkErrorRetry(t *testing.T) { + testCases := []struct { + name string + errType string + }{ + {"connection_reset", "connection_reset"}, + {"broken_pipe", "broken_pipe"}, + {"read_tcp", "read_tcp"}, + {"eof", "eof"}, + {"connection_refused", "connection_refused"}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + errorPrv := &errorProvider{errType: tc.errType} + al, agent, cleanup := newTurnCoordTestLoop(t, errorPrv) + defer cleanup() + + pipeline := NewPipeline(al) + ts := newTurnState(agent, makeTestProcessOpts("test-session"), turnEventScope{ + turnID: "turn-1", + context: newTurnContext(nil, nil, nil), + }) + + exec, err := pipeline.SetupTurn(context.Background(), ts) + if err != nil { + t.Fatalf("SetupTurn failed: %v", err) + } + + _, err = pipeline.CallLLM(context.Background(), context.Background(), ts, exec, 1) + if err == nil { + t.Error("expected error after network error retries") + } + }) + } +} + +func TestPipeline_CallLLM_RetryConfigRespected(t *testing.T) { + tmpDir := t.TempDir() + + cfg := &config.Config{ + Agents: config.AgentsConfig{ + Defaults: config.AgentDefaults{ + Workspace: tmpDir, + ModelName: "test-model", + MaxTokens: 4096, + MaxToolIterations: 10, + MaxLLMRetries: 3, + LLMRetryBackoffSecs: 1, + }, + }, + } + + msgBus := bus.NewMessageBus() + provider := &errorProvider{errType: "connection_reset"} + al := NewAgentLoop(cfg, msgBus, provider) + defer al.Close() + agent := al.registry.GetDefaultAgent() + if agent == nil { + t.Fatal("expected default agent") + } + + pipeline := NewPipeline(al) + ts := newTurnState(agent, makeTestProcessOpts("test-session"), turnEventScope{ + turnID: "turn-1", + context: newTurnContext(nil, nil, nil), + }) + + exec, err := pipeline.SetupTurn(context.Background(), ts) + if err != nil { + t.Fatalf("SetupTurn failed: %v", err) + } + + start := time.Now() + _, err = pipeline.CallLLM(context.Background(), context.Background(), ts, exec, 1) + elapsed := time.Since(start) + + if err == nil { + t.Error("expected error after retries") + } + + expectedMinTime := 3 * time.Second + if elapsed < expectedMinTime { + t.Errorf("expected at least %v of backoff, got %v", expectedMinTime, elapsed) + } +} + +func TestPipeline_CallLLM_RetryCountLimit(t *testing.T) { + tmpDir := t.TempDir() + + counterPrv := &countingErrorProvider{errType: "connection_reset", targetCalls: 5} + cfg := &config.Config{ + Agents: config.AgentsConfig{ + Defaults: config.AgentDefaults{ + Workspace: tmpDir, + ModelName: "test-model", + MaxTokens: 4096, + MaxToolIterations: 10, + MaxLLMRetries: 2, + LLMRetryBackoffSecs: 0, + }, + }, + } + + msgBus := bus.NewMessageBus() + al := NewAgentLoop(cfg, msgBus, counterPrv) + defer al.Close() + agent := al.registry.GetDefaultAgent() + if agent == nil { + t.Fatal("expected default agent") + } + + pipeline := NewPipeline(al) + ts := newTurnState(agent, makeTestProcessOpts("test-session"), turnEventScope{ + turnID: "turn-1", + context: newTurnContext(nil, nil, nil), + }) + + exec, err := pipeline.SetupTurn(context.Background(), ts) + if err != nil { + t.Fatalf("SetupTurn failed: %v", err) + } + + _, err = pipeline.CallLLM(context.Background(), context.Background(), ts, exec, 1) + if err == nil { + t.Error("expected error after retries") + } + + if counterPrv.callCount != 3 { + t.Errorf("expected exactly 3 calls (1 initial + 2 retries), got %d", counterPrv.callCount) + } +} + +type countingErrorProvider struct { + errType string + targetCalls int + callCount int + mu sync.Mutex +} + +func (p *countingErrorProvider) Chat( + ctx context.Context, + messages []providers.Message, + tools []providers.ToolDefinition, + model string, + opts map[string]any, +) (*providers.LLMResponse, error) { + p.mu.Lock() + p.callCount++ + p.mu.Unlock() + return nil, errors.New("connection reset by peer") +} + +func (p *countingErrorProvider) GetDefaultModel() string { + return "counting-error-model" +} + // ============================================================================= // Pipeline Method Tests: ExecuteTools // ============================================================================= diff --git a/pkg/config/config.go b/pkg/config/config.go index dc9e88949..bd336bf81 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -276,6 +276,8 @@ type AgentDefaults struct { SplitOnMarker bool `json:"split_on_marker" env:"PICOCLAW_AGENTS_DEFAULTS_SPLIT_ON_MARKER"` // split messages on <|[SPLIT]|> marker ContextManager string `json:"context_manager,omitempty" env:"PICOCLAW_AGENTS_DEFAULTS_CONTEXT_MANAGER"` ContextManagerConfig json.RawMessage `json:"context_manager_config,omitempty" env:"PICOCLAW_AGENTS_DEFAULTS_CONTEXT_MANAGER_CONFIG"` + MaxLLMRetries int `json:"max_llm_retries,omitempty" env:"PICOCLAW_AGENTS_DEFAULTS_MAX_LLM_RETRIES"` + LLMRetryBackoffSecs int `json:"llm_retry_backoff_secs,omitempty" env:"PICOCLAW_AGENTS_DEFAULTS_LLM_RETRY_BACKOFF_SECS"` } const DefaultMaxMediaSize = 20 * 1024 * 1024 // 20 MB diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index be8c32495..da97e8634 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -39,7 +39,9 @@ func DefaultConfig() *Config { MaxArgsLength: 300, SeparateMessages: false, }, - SplitOnMarker: false, + SplitOnMarker: false, + MaxLLMRetries: 2, + LLMRetryBackoffSecs: 2, }, }, Session: SessionConfig{