diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 53f411452..ba0254076 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -9,6 +9,7 @@ package agent import ( "context" "encoding/json" + "errors" "fmt" "path/filepath" "strings" @@ -585,15 +586,17 @@ func (al *AgentLoop) handleReasoning(ctx context.Context, reasoningContent, chan ChatID: channelID, Content: reasoningContent, }); err != nil { - // Only log unexpected errors; context deadline/cancel are expected when - // the bus is full under load and would be too noisy to warn about. - if ctx.Err() == nil { - logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{ + // Treat context.DeadlineExceeded / context.Canceled as expected + // (bus full under load, or parent cancelled). Check the error + // itself rather than ctx.Err(), because pubCtx may time out + // (5 s) while the parent ctx is still active. + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + logger.DebugCF("agent", "Reasoning publish skipped (timeout/cancel)", map[string]any{ "channel": channelName, "error": err.Error(), }) } else { - logger.DebugCF("agent", "Reasoning publish skipped (context done)", map[string]any{ + logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{ "channel": channelName, "error": err.Error(), }) diff --git a/pkg/agent/loop_test.go b/pkg/agent/loop_test.go index 29293477e..801b6a46e 100644 --- a/pkg/agent/loop_test.go +++ b/pkg/agent/loop_test.go @@ -801,16 +801,20 @@ func TestHandleReasoning(t *testing.T) { t.Run("returns promptly when bus is full", func(t *testing.T) { al, msgBus := newLoop(t) - // Fill the outbound bus buffer (default size is 64) so that the - // next PublishOutbound will block until the context deadline. - for i := 0; i < 64; i++ { - err := msgBus.PublishOutbound(context.Background(), bus.OutboundMessage{ + // Fill the outbound bus buffer until a publish would block. + // Use a short timeout to detect when the buffer is full, + // rather than hardcoding the buffer size. + for i := 0; ; i++ { + fillCtx, fillCancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + err := msgBus.PublishOutbound(fillCtx, bus.OutboundMessage{ Channel: "filler", ChatID: "filler", Content: fmt.Sprintf("filler-%d", i), }) + fillCancel() if err != nil { - t.Fatalf("failed to fill bus: %v", err) + // Buffer is full (timed out trying to send). + break } } diff --git a/pkg/channels/whatsapp_native/whatsapp_native.go b/pkg/channels/whatsapp_native/whatsapp_native.go index 748ee29d7..6b8ddbffd 100644 --- a/pkg/channels/whatsapp_native/whatsapp_native.go +++ b/pkg/channels/whatsapp_native/whatsapp_native.go @@ -384,6 +384,12 @@ func (c *WhatsAppNativeChannel) Send(ctx context.Context, msg bus.OutboundMessag return fmt.Errorf("whatsapp connection not established: %w", channels.ErrTemporary) } + // Detect unpaired state: the client is connected (to WhatsApp servers) + // but has not completed QR-login yet, so sending would fail. + if client.Store.ID == nil { + return fmt.Errorf("whatsapp not yet paired (QR login pending): %w", channels.ErrTemporary) + } + to, err := parseJID(msg.ChatID) if err != nil { return fmt.Errorf("invalid chat id %q: %w", msg.ChatID, err)