fix(whatsapp_native,agent): address PR #884 review feedback

- Use c.runCtx for GetQRChannel so the QR producer is canceled on Stop()
- Add atomic stopping guard to prevent wg.Add/wg.Wait race in eventHandler
- Make Stop() context-aware: disconnect client before waiting, respect ctx deadline
- Reduce reasoning publish log noise: use debug level for expected ctx errors
- Add test for handleReasoning when outbound bus is full (timeout path)
This commit is contained in:
Hoshina
2026-02-28 03:05:19 +08:00
parent 1d0220f9fd
commit 9b80fdf885
3 changed files with 98 additions and 11 deletions
+13 -4
View File
@@ -585,10 +585,19 @@ func (al *AgentLoop) handleReasoning(ctx context.Context, reasoningContent, chan
ChatID: channelID,
Content: reasoningContent,
}); err != nil {
logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{
"channel": channelName,
"error": err.Error(),
})
// Only log unexpected errors; context deadline/cancel are expected when
// the bus is full under load and would be too noisy to warn about.
if ctx.Err() == nil {
logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{
"channel": channelName,
"error": err.Error(),
})
} else {
logger.DebugCF("agent", "Reasoning publish skipped (context done)", map[string]any{
"channel": channelName,
"error": err.Error(),
})
}
}
}
+49
View File
@@ -797,4 +797,53 @@ func TestHandleReasoning(t *testing.T) {
t.Fatalf("expected no outbound message, got %+v", msg)
}
})
t.Run("returns promptly when bus is full", func(t *testing.T) {
al, msgBus := newLoop(t)
// Fill the outbound bus buffer (default size is 64) so that the
// next PublishOutbound will block until the context deadline.
for i := 0; i < 64; i++ {
err := msgBus.PublishOutbound(context.Background(), bus.OutboundMessage{
Channel: "filler",
ChatID: "filler",
Content: fmt.Sprintf("filler-%d", i),
})
if err != nil {
t.Fatalf("failed to fill bus: %v", err)
}
}
// Use a short-deadline parent context to bound the test.
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
start := time.Now()
al.handleReasoning(ctx, "should timeout", "slack", "channel-full")
elapsed := time.Since(start)
// handleReasoning uses a 5s internal timeout, but the parent ctx
// expires in 500ms. It should return within ~500ms, not 5s.
if elapsed > 2*time.Second {
t.Fatalf("handleReasoning blocked too long (%v); expected prompt return", elapsed)
}
// Drain the bus and verify the reasoning message was NOT published
// (it should have been dropped due to timeout).
drainCtx, drainCancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer drainCancel()
foundReasoning := false
for {
msg, ok := msgBus.SubscribeOutbound(drainCtx)
if !ok {
break
}
if msg.Content == "should timeout" {
foundReasoning = true
}
}
if foundReasoning {
t.Fatal("expected reasoning message to be dropped when bus is full, but it was published")
}
})
}
@@ -15,6 +15,7 @@ import (
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/mdp/qrterminal/v3"
@@ -56,6 +57,7 @@ type WhatsAppNativeChannel struct {
runCancel context.CancelFunc
reconnectMu sync.Mutex
reconnecting bool
stopping atomic.Bool // set once Stop begins; prevents new wg.Add calls
wg sync.WaitGroup // tracks background goroutines (QR handler, reconnect)
}
@@ -127,7 +129,7 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error {
c.mu.Unlock()
if client.Store.ID == nil {
qrChan, err := client.GetQRChannel(ctx)
qrChan, err := client.GetQRChannel(c.runCtx)
if err != nil {
c.runCancel()
_ = container.Close()
@@ -180,24 +182,47 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error {
func (c *WhatsAppNativeChannel) Stop(ctx context.Context) error {
logger.InfoC("whatsapp", "Stopping WhatsApp native channel")
// Mark as stopping so no new goroutines are spawned via eventHandler.
c.stopping.Store(true)
if c.runCancel != nil {
c.runCancel()
}
// Wait for background goroutines (QR handler, reconnect) to finish so
// they don't reference the client/container after cleanup.
c.wg.Wait()
// Disconnect the client first so any blocking Connect()/reconnect loops
// can be interrupted before we wait on the goroutines.
c.mu.Lock()
client := c.client
container := c.container
c.client = nil
c.container = nil
c.mu.Unlock()
if client != nil {
client.Disconnect()
}
// Wait for background goroutines (QR handler, reconnect) to finish in a
// context-aware way so Stop can be bounded by ctx.
done := make(chan struct{})
go func() {
c.wg.Wait()
close(done)
}()
select {
case <-done:
// All goroutines have finished.
case <-ctx.Done():
// Context canceled or timed out; log and proceed with best-effort cleanup.
logger.WarnC("whatsapp", fmt.Sprintf("Stop context canceled before all goroutines finished: %v", ctx.Err()))
}
// Now it is safe to clear and close resources.
c.mu.Lock()
c.client = nil
c.container = nil
c.mu.Unlock()
if container != nil {
_ = container.Close()
}
@@ -211,6 +236,10 @@ func (c *WhatsAppNativeChannel) eventHandler(evt any) {
c.handleIncoming(evt.(*events.Message))
case *events.Disconnected:
logger.InfoCF("whatsapp", "WhatsApp disconnected, will attempt reconnection", nil)
// Prevent new goroutines once Stop() has begun.
if c.stopping.Load() {
return
}
c.reconnectMu.Lock()
if c.reconnecting {
c.reconnectMu.Unlock()