diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index f34a6e01a..8fd7328d1 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -575,11 +575,36 @@ func (al *AgentLoop) handleReasoning(ctx context.Context, reasoningContent, chan return } - al.bus.PublishOutbound(ctx, bus.OutboundMessage{ + // Use a short timeout so the goroutine does not block indefinitely when + // the outbound bus is full. Reasoning output is best-effort; dropping it + // is acceptable to avoid goroutine accumulation. + pubCtx, pubCancel := context.WithTimeout(ctx, 5*time.Second) + defer pubCancel() + + if err := al.bus.PublishOutbound(pubCtx, bus.OutboundMessage{ Channel: channelName, ChatID: channelID, Content: reasoningContent, - }) + }); err != nil { + // Treat context.DeadlineExceeded / context.Canceled as expected + // (bus full under load, or parent canceled). Check the error + // itself rather than ctx.Err(), because pubCtx may time out + // (5 s) while the parent ctx is still active. + // Also treat ErrBusClosed as expected — it occurs during normal + // shutdown when the bus is closed before all goroutines finish. + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) || + errors.Is(err, bus.ErrBusClosed) { + logger.DebugCF("agent", "Reasoning publish skipped (timeout/cancel)", map[string]any{ + "channel": channelName, + "error": err.Error(), + }) + } else { + logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{ + "channel": channelName, + "error": err.Error(), + }) + } + } } // runLLMIteration executes the LLM call loop with tool handling. diff --git a/pkg/agent/loop_test.go b/pkg/agent/loop_test.go index 6dfc7ef3e..801b6a46e 100644 --- a/pkg/agent/loop_test.go +++ b/pkg/agent/loop_test.go @@ -797,4 +797,57 @@ func TestHandleReasoning(t *testing.T) { t.Fatalf("expected no outbound message, got %+v", msg) } }) + + t.Run("returns promptly when bus is full", func(t *testing.T) { + al, msgBus := newLoop(t) + + // Fill the outbound bus buffer until a publish would block. + // Use a short timeout to detect when the buffer is full, + // rather than hardcoding the buffer size. + for i := 0; ; i++ { + fillCtx, fillCancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + err := msgBus.PublishOutbound(fillCtx, bus.OutboundMessage{ + Channel: "filler", + ChatID: "filler", + Content: fmt.Sprintf("filler-%d", i), + }) + fillCancel() + if err != nil { + // Buffer is full (timed out trying to send). + break + } + } + + // Use a short-deadline parent context to bound the test. + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + start := time.Now() + al.handleReasoning(ctx, "should timeout", "slack", "channel-full") + elapsed := time.Since(start) + + // handleReasoning uses a 5s internal timeout, but the parent ctx + // expires in 500ms. It should return within ~500ms, not 5s. + if elapsed > 2*time.Second { + t.Fatalf("handleReasoning blocked too long (%v); expected prompt return", elapsed) + } + + // Drain the bus and verify the reasoning message was NOT published + // (it should have been dropped due to timeout). + drainCtx, drainCancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer drainCancel() + foundReasoning := false + for { + msg, ok := msgBus.SubscribeOutbound(drainCtx) + if !ok { + break + } + if msg.Content == "should timeout" { + foundReasoning = true + } + } + if foundReasoning { + t.Fatal("expected reasoning message to be dropped when bus is full, but it was published") + } + }) } diff --git a/pkg/channels/whatsapp_native/whatsapp_native.go b/pkg/channels/whatsapp_native/whatsapp_native.go index 23115bda7..188a7c8fa 100644 --- a/pkg/channels/whatsapp_native/whatsapp_native.go +++ b/pkg/channels/whatsapp_native/whatsapp_native.go @@ -15,6 +15,7 @@ import ( "path/filepath" "strings" "sync" + "sync/atomic" "time" "github.com/mdp/qrterminal/v3" @@ -56,6 +57,8 @@ type WhatsAppNativeChannel struct { runCancel context.CancelFunc reconnectMu sync.Mutex reconnecting bool + stopping atomic.Bool // set once Stop begins; prevents new wg.Add calls + wg sync.WaitGroup // tracks background goroutines (QR handler, reconnect) } // NewWhatsAppNativeChannel creates a WhatsApp channel that uses whatsmeow for connection. @@ -80,6 +83,14 @@ func NewWhatsAppNativeChannel( func (c *WhatsAppNativeChannel) Start(ctx context.Context) error { logger.InfoCF("whatsapp", "Starting WhatsApp native channel (whatsmeow)", map[string]any{"store": c.storePath}) + // Reset lifecycle state from any previous Stop() so a restarted channel + // behaves correctly. Use reconnectMu to be consistent with eventHandler + // and Stop() which coordinate under the same lock. + c.reconnectMu.Lock() + c.stopping.Store(false) + c.reconnecting = false + c.reconnectMu.Unlock() + if err := os.MkdirAll(c.storePath, 0o700); err != nil { return fmt.Errorf("create session store dir: %w", err) } @@ -112,6 +123,12 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error { } client := whatsmeow.NewClient(deviceStore, waLogger) + + // Create runCtx/runCancel BEFORE registering event handler and starting + // goroutines so that Stop() can cancel them at any time, including during + // the QR-login flow. + c.runCtx, c.runCancel = context.WithCancel(ctx) + client.AddEventHandler(c.eventHandler) c.mu.Lock() @@ -119,36 +136,75 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error { c.client = client c.mu.Unlock() + // cleanupOnError clears struct references and releases resources when + // Start() fails after fields are already assigned. This prevents + // Stop() from operating on stale references (double-close, disconnect + // of a partially-initialized client, or stray event handler callbacks). + startOK := false + defer func() { + if startOK { + return + } + c.runCancel() + client.Disconnect() + c.mu.Lock() + c.client = nil + c.container = nil + c.mu.Unlock() + _ = container.Close() + }() + if client.Store.ID == nil { - qrChan, err := client.GetQRChannel(ctx) + qrChan, err := client.GetQRChannel(c.runCtx) if err != nil { - _ = container.Close() return fmt.Errorf("get QR channel: %w", err) } if err := client.Connect(); err != nil { - _ = container.Close() return fmt.Errorf("connect: %w", err) } - for evt := range qrChan { - if evt.Event == "code" { - logger.InfoCF("whatsapp", "Scan this QR code with WhatsApp (Linked Devices):", nil) - qrterminal.GenerateWithConfig(evt.Code, qrterminal.Config{ - Level: qrterminal.L, - Writer: os.Stdout, - HalfBlocks: true, - }) - } else { - logger.InfoCF("whatsapp", "WhatsApp login event", map[string]any{"event": evt.Event}) - } + // Handle QR events in a background goroutine so Start() returns + // promptly. The goroutine is tracked via c.wg and respects + // c.runCtx for cancellation. + // Guard wg.Add with reconnectMu + stopping check (same protocol + // as eventHandler) so a concurrent Stop() cannot enter wg.Wait() + // while we call wg.Add(1). + c.reconnectMu.Lock() + if c.stopping.Load() { + c.reconnectMu.Unlock() + return fmt.Errorf("channel stopped during QR setup") } + c.wg.Add(1) + c.reconnectMu.Unlock() + go func() { + defer c.wg.Done() + for { + select { + case <-c.runCtx.Done(): + return + case evt, ok := <-qrChan: + if !ok { + return + } + if evt.Event == "code" { + logger.InfoCF("whatsapp", "Scan this QR code with WhatsApp (Linked Devices):", nil) + qrterminal.GenerateWithConfig(evt.Code, qrterminal.Config{ + Level: qrterminal.L, + Writer: os.Stdout, + HalfBlocks: true, + }) + } else { + logger.InfoCF("whatsapp", "WhatsApp login event", map[string]any{"event": evt.Event}) + } + } + } + }() } else { if err := client.Connect(); err != nil { - _ = container.Close() return fmt.Errorf("connect: %w", err) } } - c.runCtx, c.runCancel = context.WithCancel(ctx) + startOK = true c.SetRunning(true) logger.InfoC("whatsapp", "WhatsApp native channel connected") return nil @@ -156,19 +212,53 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error { func (c *WhatsAppNativeChannel) Stop(ctx context.Context) error { logger.InfoC("whatsapp", "Stopping WhatsApp native channel") + + // Mark as stopping under reconnectMu so the flag is visible to + // eventHandler atomically with respect to its wg.Add(1) call. + // This closes the TOCTOU window where eventHandler could check + // stopping (false), then Stop sets it true + enters wg.Wait, + // then eventHandler calls wg.Add(1) — causing a panic. + c.reconnectMu.Lock() + c.stopping.Store(true) + c.reconnectMu.Unlock() + if c.runCancel != nil { c.runCancel() } + + // Disconnect the client first so any blocking Connect()/reconnect loops + // can be interrupted before we wait on the goroutines. c.mu.Lock() client := c.client container := c.container - c.client = nil - c.container = nil c.mu.Unlock() if client != nil { client.Disconnect() } + + // Wait for background goroutines (QR handler, reconnect) to finish in a + // context-aware way so Stop can be bounded by ctx. + done := make(chan struct{}) + go func() { + c.wg.Wait() + close(done) + }() + + select { + case <-done: + // All goroutines have finished. + case <-ctx.Done(): + // Context canceled or timed out; log and proceed with best-effort cleanup. + logger.WarnC("whatsapp", fmt.Sprintf("Stop context canceled before all goroutines finished: %v", ctx.Err())) + } + + // Now it is safe to clear and close resources. + c.mu.Lock() + c.client = nil + c.container = nil + c.mu.Unlock() + if container != nil { _ = container.Close() } @@ -187,9 +277,20 @@ func (c *WhatsAppNativeChannel) eventHandler(evt any) { c.reconnectMu.Unlock() return } + // Check stopping while holding the lock so the check and wg.Add + // are atomic with respect to Stop() setting the flag + calling + // wg.Wait(). This prevents the TOCTOU race. + if c.stopping.Load() { + c.reconnectMu.Unlock() + return + } c.reconnecting = true + c.wg.Add(1) c.reconnectMu.Unlock() - go c.reconnectWithBackoff() + go func() { + defer c.wg.Done() + c.reconnectWithBackoff() + }() } } @@ -313,6 +414,12 @@ func (c *WhatsAppNativeChannel) Send(ctx context.Context, msg bus.OutboundMessag return fmt.Errorf("whatsapp connection not established: %w", channels.ErrTemporary) } + // Detect unpaired state: the client is connected (to WhatsApp servers) + // but has not completed QR-login yet, so sending would fail. + if client.Store.ID == nil { + return fmt.Errorf("whatsapp not yet paired (QR login pending): %w", channels.ErrTemporary) + } + to, err := parseJID(msg.ChatID) if err != nil { return fmt.Errorf("invalid chat id %q: %w", msg.ChatID, err)