diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index d7daf3775..53f411452 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -585,10 +585,19 @@ func (al *AgentLoop) handleReasoning(ctx context.Context, reasoningContent, chan ChatID: channelID, Content: reasoningContent, }); err != nil { - logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{ - "channel": channelName, - "error": err.Error(), - }) + // Only log unexpected errors; context deadline/cancel are expected when + // the bus is full under load and would be too noisy to warn about. + if ctx.Err() == nil { + logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{ + "channel": channelName, + "error": err.Error(), + }) + } else { + logger.DebugCF("agent", "Reasoning publish skipped (context done)", map[string]any{ + "channel": channelName, + "error": err.Error(), + }) + } } } diff --git a/pkg/agent/loop_test.go b/pkg/agent/loop_test.go index 6dfc7ef3e..29293477e 100644 --- a/pkg/agent/loop_test.go +++ b/pkg/agent/loop_test.go @@ -797,4 +797,53 @@ func TestHandleReasoning(t *testing.T) { t.Fatalf("expected no outbound message, got %+v", msg) } }) + + t.Run("returns promptly when bus is full", func(t *testing.T) { + al, msgBus := newLoop(t) + + // Fill the outbound bus buffer (default size is 64) so that the + // next PublishOutbound will block until the context deadline. + for i := 0; i < 64; i++ { + err := msgBus.PublishOutbound(context.Background(), bus.OutboundMessage{ + Channel: "filler", + ChatID: "filler", + Content: fmt.Sprintf("filler-%d", i), + }) + if err != nil { + t.Fatalf("failed to fill bus: %v", err) + } + } + + // Use a short-deadline parent context to bound the test. + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + start := time.Now() + al.handleReasoning(ctx, "should timeout", "slack", "channel-full") + elapsed := time.Since(start) + + // handleReasoning uses a 5s internal timeout, but the parent ctx + // expires in 500ms. It should return within ~500ms, not 5s. + if elapsed > 2*time.Second { + t.Fatalf("handleReasoning blocked too long (%v); expected prompt return", elapsed) + } + + // Drain the bus and verify the reasoning message was NOT published + // (it should have been dropped due to timeout). + drainCtx, drainCancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer drainCancel() + foundReasoning := false + for { + msg, ok := msgBus.SubscribeOutbound(drainCtx) + if !ok { + break + } + if msg.Content == "should timeout" { + foundReasoning = true + } + } + if foundReasoning { + t.Fatal("expected reasoning message to be dropped when bus is full, but it was published") + } + }) } diff --git a/pkg/channels/whatsapp_native/whatsapp_native.go b/pkg/channels/whatsapp_native/whatsapp_native.go index 1ac3c4c9c..c475276ba 100644 --- a/pkg/channels/whatsapp_native/whatsapp_native.go +++ b/pkg/channels/whatsapp_native/whatsapp_native.go @@ -15,6 +15,7 @@ import ( "path/filepath" "strings" "sync" + "sync/atomic" "time" "github.com/mdp/qrterminal/v3" @@ -56,6 +57,7 @@ type WhatsAppNativeChannel struct { runCancel context.CancelFunc reconnectMu sync.Mutex reconnecting bool + stopping atomic.Bool // set once Stop begins; prevents new wg.Add calls wg sync.WaitGroup // tracks background goroutines (QR handler, reconnect) } @@ -127,7 +129,7 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error { c.mu.Unlock() if client.Store.ID == nil { - qrChan, err := client.GetQRChannel(ctx) + qrChan, err := client.GetQRChannel(c.runCtx) if err != nil { c.runCancel() _ = container.Close() @@ -180,24 +182,47 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error { func (c *WhatsAppNativeChannel) Stop(ctx context.Context) error { logger.InfoC("whatsapp", "Stopping WhatsApp native channel") + + // Mark as stopping so no new goroutines are spawned via eventHandler. + c.stopping.Store(true) + if c.runCancel != nil { c.runCancel() } - // Wait for background goroutines (QR handler, reconnect) to finish so - // they don't reference the client/container after cleanup. - c.wg.Wait() - + // Disconnect the client first so any blocking Connect()/reconnect loops + // can be interrupted before we wait on the goroutines. c.mu.Lock() client := c.client container := c.container - c.client = nil - c.container = nil c.mu.Unlock() if client != nil { client.Disconnect() } + + // Wait for background goroutines (QR handler, reconnect) to finish in a + // context-aware way so Stop can be bounded by ctx. + done := make(chan struct{}) + go func() { + c.wg.Wait() + close(done) + }() + + select { + case <-done: + // All goroutines have finished. + case <-ctx.Done(): + // Context canceled or timed out; log and proceed with best-effort cleanup. + logger.WarnC("whatsapp", fmt.Sprintf("Stop context canceled before all goroutines finished: %v", ctx.Err())) + } + + // Now it is safe to clear and close resources. + c.mu.Lock() + c.client = nil + c.container = nil + c.mu.Unlock() + if container != nil { _ = container.Close() } @@ -211,6 +236,10 @@ func (c *WhatsAppNativeChannel) eventHandler(evt any) { c.handleIncoming(evt.(*events.Message)) case *events.Disconnected: logger.InfoCF("whatsapp", "WhatsApp disconnected, will attempt reconnection", nil) + // Prevent new goroutines once Stop() has begun. + if c.stopping.Load() { + return + } c.reconnectMu.Lock() if c.reconnecting { c.reconnectMu.Unlock()