fix(whatsapp_native,agent): address second round of review feedback

- WhatsApp Send(): detect unpaired state (Store.ID == nil) and return
  ErrTemporary instead of attempting to send while QR login is pending
- handleReasoning: check the returned error type (DeadlineExceeded /
  Canceled) instead of ctx.Err() to decide log level, so pubCtx
  timeouts on a full bus are correctly classified as expected
- Test: fill bus with a short-timeout loop instead of hardcoding the
  buffer size (64), making the test resilient to buffer size changes
This commit is contained in:
Hoshina
2026-02-28 12:54:05 +08:00
parent fc28c2660a
commit d1b10a0004
3 changed files with 23 additions and 10 deletions
+8 -5
View File
@@ -9,6 +9,7 @@ package agent
import (
"context"
"encoding/json"
"errors"
"fmt"
"path/filepath"
"strings"
@@ -585,15 +586,17 @@ func (al *AgentLoop) handleReasoning(ctx context.Context, reasoningContent, chan
ChatID: channelID,
Content: reasoningContent,
}); err != nil {
// Only log unexpected errors; context deadline/cancel are expected when
// the bus is full under load and would be too noisy to warn about.
if ctx.Err() == nil {
logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{
// Treat context.DeadlineExceeded / context.Canceled as expected
// (bus full under load, or parent cancelled). Check the error
// itself rather than ctx.Err(), because pubCtx may time out
// (5 s) while the parent ctx is still active.
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
logger.DebugCF("agent", "Reasoning publish skipped (timeout/cancel)", map[string]any{
"channel": channelName,
"error": err.Error(),
})
} else {
logger.DebugCF("agent", "Reasoning publish skipped (context done)", map[string]any{
logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{
"channel": channelName,
"error": err.Error(),
})
+9 -5
View File
@@ -801,16 +801,20 @@ func TestHandleReasoning(t *testing.T) {
t.Run("returns promptly when bus is full", func(t *testing.T) {
al, msgBus := newLoop(t)
// Fill the outbound bus buffer (default size is 64) so that the
// next PublishOutbound will block until the context deadline.
for i := 0; i < 64; i++ {
err := msgBus.PublishOutbound(context.Background(), bus.OutboundMessage{
// Fill the outbound bus buffer until a publish would block.
// Use a short timeout to detect when the buffer is full,
// rather than hardcoding the buffer size.
for i := 0; ; i++ {
fillCtx, fillCancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
err := msgBus.PublishOutbound(fillCtx, bus.OutboundMessage{
Channel: "filler",
ChatID: "filler",
Content: fmt.Sprintf("filler-%d", i),
})
fillCancel()
if err != nil {
t.Fatalf("failed to fill bus: %v", err)
// Buffer is full (timed out trying to send).
break
}
}
@@ -384,6 +384,12 @@ func (c *WhatsAppNativeChannel) Send(ctx context.Context, msg bus.OutboundMessag
return fmt.Errorf("whatsapp connection not established: %w", channels.ErrTemporary)
}
// Detect unpaired state: the client is connected (to WhatsApp servers)
// but has not completed QR-login yet, so sending would fail.
if client.Store.ID == nil {
return fmt.Errorf("whatsapp not yet paired (QR login pending): %w", channels.ErrTemporary)
}
to, err := parseJID(msg.ChatID)
if err != nil {
return fmt.Errorf("invalid chat id %q: %w", msg.ChatID, err)