From 7276a2d651cd62806336d84d0c20e7680c2aa2b0 Mon Sep 17 00:00:00 2001 From: Hoshina Date: Fri, 27 Feb 2026 20:15:14 +0800 Subject: [PATCH 1/8] Fix lint errors --- pkg/config/config.go | 10 +++++----- pkg/utils/string.go | 2 +- pkg/utils/string_test.go | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 46ffa2ecf..2e0215278 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -224,11 +224,11 @@ type PlaceholderConfig struct { } type WhatsAppConfig struct { - Enabled bool `json:"enabled" env:"PICOCLAW_CHANNELS_WHATSAPP_ENABLED"` - BridgeURL string `json:"bridge_url" env:"PICOCLAW_CHANNELS_WHATSAPP_BRIDGE_URL"` - UseNative bool `json:"use_native" env:"PICOCLAW_CHANNELS_WHATSAPP_USE_NATIVE"` - SessionStorePath string `json:"session_store_path" env:"PICOCLAW_CHANNELS_WHATSAPP_SESSION_STORE_PATH"` - AllowFrom FlexibleStringSlice `json:"allow_from" env:"PICOCLAW_CHANNELS_WHATSAPP_ALLOW_FROM"` + Enabled bool `json:"enabled" env:"PICOCLAW_CHANNELS_WHATSAPP_ENABLED"` + BridgeURL string `json:"bridge_url" env:"PICOCLAW_CHANNELS_WHATSAPP_BRIDGE_URL"` + UseNative bool `json:"use_native" env:"PICOCLAW_CHANNELS_WHATSAPP_USE_NATIVE"` + SessionStorePath string `json:"session_store_path" env:"PICOCLAW_CHANNELS_WHATSAPP_SESSION_STORE_PATH"` + AllowFrom FlexibleStringSlice `json:"allow_from" env:"PICOCLAW_CHANNELS_WHATSAPP_ALLOW_FROM"` ReasoningChannelID string `json:"reasoning_channel_id" env:"PICOCLAW_CHANNELS_WHATSAPP_REASONING_CHANNEL_ID"` } diff --git a/pkg/utils/string.go b/pkg/utils/string.go index 6a78f59fe..02f346db4 100644 --- a/pkg/utils/string.go +++ b/pkg/utils/string.go @@ -5,7 +5,7 @@ import ( "unicode" ) -// SanitizeMessage removes Unicode control characters, format characters (RTL overrides, +// SanitizeMessageContent removes Unicode control characters, format characters (RTL overrides, // zero-width characters), and other non-graphic characters that could confuse an LLM // or cause display issues in the agent UI. func SanitizeMessageContent(input string) string { diff --git a/pkg/utils/string_test.go b/pkg/utils/string_test.go index fffa0cff3..e3b5af052 100644 --- a/pkg/utils/string_test.go +++ b/pkg/utils/string_test.go @@ -117,7 +117,7 @@ func TestSanitizeMessageContent(t *testing.T) { {"strip RTL override", "Hi\u202eevil", "Hievil"}, {"strip BOM", "\uFEFFcontent", "content"}, {"strip multiple", "a\u200c\u202ab\u202cc", "abc"}, - {"unicode letters preserved", "café 日本語", "café 日本語"}, + {"unicode letters preserved", "café \u65e5\u672c\u8a9e", "café \u65e5\u672c\u8a9e"}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { From c7d75a18f89f443f5bf1e1a3de214680c0957773 Mon Sep 17 00:00:00 2001 From: Hoshina Date: Sat, 28 Feb 2026 01:39:06 +0800 Subject: [PATCH 2/8] fix(whatsapp_native): fix goroutine and resource leak in Start/Stop lifecycle - Move runCtx/runCancel creation before event handler registration and QR loop so Stop() can cancel at any point during startup - Replace blocking QR event loop in Start() with a background goroutine that selects on runCtx.Done(), preventing Start() from hanging indefinitely when waiting for QR scan - Track all background goroutines (QR handler, reconnect) with sync.WaitGroup; Stop() waits for them to finish before releasing client/container resources - Cancel runCtx on error paths in Start() to avoid leaked contexts Fixes resource leak introduced in #655. --- .../whatsapp_native/whatsapp_native.go | 59 +++++++++++++++---- 1 file changed, 46 insertions(+), 13 deletions(-) diff --git a/pkg/channels/whatsapp_native/whatsapp_native.go b/pkg/channels/whatsapp_native/whatsapp_native.go index 23115bda7..1ac3c4c9c 100644 --- a/pkg/channels/whatsapp_native/whatsapp_native.go +++ b/pkg/channels/whatsapp_native/whatsapp_native.go @@ -56,6 +56,7 @@ type WhatsAppNativeChannel struct { runCancel context.CancelFunc reconnectMu sync.Mutex reconnecting bool + wg sync.WaitGroup // tracks background goroutines (QR handler, reconnect) } // NewWhatsAppNativeChannel creates a WhatsApp channel that uses whatsmeow for connection. @@ -112,6 +113,12 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error { } client := whatsmeow.NewClient(deviceStore, waLogger) + + // Create runCtx/runCancel BEFORE registering event handler and starting + // goroutines so that Stop() can cancel them at any time, including during + // the QR-login flow. + c.runCtx, c.runCancel = context.WithCancel(ctx) + client.AddEventHandler(c.eventHandler) c.mu.Lock() @@ -122,33 +129,50 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error { if client.Store.ID == nil { qrChan, err := client.GetQRChannel(ctx) if err != nil { + c.runCancel() _ = container.Close() return fmt.Errorf("get QR channel: %w", err) } if err := client.Connect(); err != nil { + c.runCancel() _ = container.Close() return fmt.Errorf("connect: %w", err) } - for evt := range qrChan { - if evt.Event == "code" { - logger.InfoCF("whatsapp", "Scan this QR code with WhatsApp (Linked Devices):", nil) - qrterminal.GenerateWithConfig(evt.Code, qrterminal.Config{ - Level: qrterminal.L, - Writer: os.Stdout, - HalfBlocks: true, - }) - } else { - logger.InfoCF("whatsapp", "WhatsApp login event", map[string]any{"event": evt.Event}) + // Handle QR events in a background goroutine so Start() returns + // promptly. The goroutine is tracked via c.wg and respects + // c.runCtx for cancellation. + c.wg.Add(1) + go func() { + defer c.wg.Done() + for { + select { + case <-c.runCtx.Done(): + return + case evt, ok := <-qrChan: + if !ok { + return + } + if evt.Event == "code" { + logger.InfoCF("whatsapp", "Scan this QR code with WhatsApp (Linked Devices):", nil) + qrterminal.GenerateWithConfig(evt.Code, qrterminal.Config{ + Level: qrterminal.L, + Writer: os.Stdout, + HalfBlocks: true, + }) + } else { + logger.InfoCF("whatsapp", "WhatsApp login event", map[string]any{"event": evt.Event}) + } + } } - } + }() } else { if err := client.Connect(); err != nil { + c.runCancel() _ = container.Close() return fmt.Errorf("connect: %w", err) } } - c.runCtx, c.runCancel = context.WithCancel(ctx) c.SetRunning(true) logger.InfoC("whatsapp", "WhatsApp native channel connected") return nil @@ -159,6 +183,11 @@ func (c *WhatsAppNativeChannel) Stop(ctx context.Context) error { if c.runCancel != nil { c.runCancel() } + + // Wait for background goroutines (QR handler, reconnect) to finish so + // they don't reference the client/container after cleanup. + c.wg.Wait() + c.mu.Lock() client := c.client container := c.container @@ -189,7 +218,11 @@ func (c *WhatsAppNativeChannel) eventHandler(evt any) { } c.reconnecting = true c.reconnectMu.Unlock() - go c.reconnectWithBackoff() + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.reconnectWithBackoff() + }() } } From 1d0220f9fdc0af1b5c18bef6f5b79be7f4aedb48 Mon Sep 17 00:00:00 2001 From: Hoshina Date: Sat, 28 Feb 2026 01:39:17 +0800 Subject: [PATCH 3/8] fix(agent): prevent reasoning goroutine accumulation on full bus Add a 5-second timeout to handleReasoning's PublishOutbound call so fire-and-forget goroutines do not block indefinitely when the outbound bus channel is full. Reasoning output is best-effort; on timeout the publish is abandoned with a warning log instead of holding the goroutine alive. Fixes goroutine leak introduced in #802. --- pkg/agent/loop.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 29827d0b2..d7daf3775 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -574,11 +574,22 @@ func (al *AgentLoop) handleReasoning(ctx context.Context, reasoningContent, chan return } - al.bus.PublishOutbound(ctx, bus.OutboundMessage{ + // Use a short timeout so the goroutine does not block indefinitely when + // the outbound bus is full. Reasoning output is best-effort; dropping it + // is acceptable to avoid goroutine accumulation. + pubCtx, pubCancel := context.WithTimeout(ctx, 5*time.Second) + defer pubCancel() + + if err := al.bus.PublishOutbound(pubCtx, bus.OutboundMessage{ Channel: channelName, ChatID: channelID, Content: reasoningContent, - }) + }); err != nil { + logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{ + "channel": channelName, + "error": err.Error(), + }) + } } // runLLMIteration executes the LLM call loop with tool handling. From 9b80fdf885facf0d1b6067446a8fa67a18393762 Mon Sep 17 00:00:00 2001 From: Hoshina Date: Sat, 28 Feb 2026 03:05:19 +0800 Subject: [PATCH 4/8] fix(whatsapp_native,agent): address PR #884 review feedback - Use c.runCtx for GetQRChannel so the QR producer is canceled on Stop() - Add atomic stopping guard to prevent wg.Add/wg.Wait race in eventHandler - Make Stop() context-aware: disconnect client before waiting, respect ctx deadline - Reduce reasoning publish log noise: use debug level for expected ctx errors - Add test for handleReasoning when outbound bus is full (timeout path) --- pkg/agent/loop.go | 17 +++++-- pkg/agent/loop_test.go | 49 +++++++++++++++++++ .../whatsapp_native/whatsapp_native.go | 43 +++++++++++++--- 3 files changed, 98 insertions(+), 11 deletions(-) diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index d7daf3775..53f411452 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -585,10 +585,19 @@ func (al *AgentLoop) handleReasoning(ctx context.Context, reasoningContent, chan ChatID: channelID, Content: reasoningContent, }); err != nil { - logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{ - "channel": channelName, - "error": err.Error(), - }) + // Only log unexpected errors; context deadline/cancel are expected when + // the bus is full under load and would be too noisy to warn about. + if ctx.Err() == nil { + logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{ + "channel": channelName, + "error": err.Error(), + }) + } else { + logger.DebugCF("agent", "Reasoning publish skipped (context done)", map[string]any{ + "channel": channelName, + "error": err.Error(), + }) + } } } diff --git a/pkg/agent/loop_test.go b/pkg/agent/loop_test.go index 6dfc7ef3e..29293477e 100644 --- a/pkg/agent/loop_test.go +++ b/pkg/agent/loop_test.go @@ -797,4 +797,53 @@ func TestHandleReasoning(t *testing.T) { t.Fatalf("expected no outbound message, got %+v", msg) } }) + + t.Run("returns promptly when bus is full", func(t *testing.T) { + al, msgBus := newLoop(t) + + // Fill the outbound bus buffer (default size is 64) so that the + // next PublishOutbound will block until the context deadline. + for i := 0; i < 64; i++ { + err := msgBus.PublishOutbound(context.Background(), bus.OutboundMessage{ + Channel: "filler", + ChatID: "filler", + Content: fmt.Sprintf("filler-%d", i), + }) + if err != nil { + t.Fatalf("failed to fill bus: %v", err) + } + } + + // Use a short-deadline parent context to bound the test. + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + start := time.Now() + al.handleReasoning(ctx, "should timeout", "slack", "channel-full") + elapsed := time.Since(start) + + // handleReasoning uses a 5s internal timeout, but the parent ctx + // expires in 500ms. It should return within ~500ms, not 5s. + if elapsed > 2*time.Second { + t.Fatalf("handleReasoning blocked too long (%v); expected prompt return", elapsed) + } + + // Drain the bus and verify the reasoning message was NOT published + // (it should have been dropped due to timeout). + drainCtx, drainCancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer drainCancel() + foundReasoning := false + for { + msg, ok := msgBus.SubscribeOutbound(drainCtx) + if !ok { + break + } + if msg.Content == "should timeout" { + foundReasoning = true + } + } + if foundReasoning { + t.Fatal("expected reasoning message to be dropped when bus is full, but it was published") + } + }) } diff --git a/pkg/channels/whatsapp_native/whatsapp_native.go b/pkg/channels/whatsapp_native/whatsapp_native.go index 1ac3c4c9c..c475276ba 100644 --- a/pkg/channels/whatsapp_native/whatsapp_native.go +++ b/pkg/channels/whatsapp_native/whatsapp_native.go @@ -15,6 +15,7 @@ import ( "path/filepath" "strings" "sync" + "sync/atomic" "time" "github.com/mdp/qrterminal/v3" @@ -56,6 +57,7 @@ type WhatsAppNativeChannel struct { runCancel context.CancelFunc reconnectMu sync.Mutex reconnecting bool + stopping atomic.Bool // set once Stop begins; prevents new wg.Add calls wg sync.WaitGroup // tracks background goroutines (QR handler, reconnect) } @@ -127,7 +129,7 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error { c.mu.Unlock() if client.Store.ID == nil { - qrChan, err := client.GetQRChannel(ctx) + qrChan, err := client.GetQRChannel(c.runCtx) if err != nil { c.runCancel() _ = container.Close() @@ -180,24 +182,47 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error { func (c *WhatsAppNativeChannel) Stop(ctx context.Context) error { logger.InfoC("whatsapp", "Stopping WhatsApp native channel") + + // Mark as stopping so no new goroutines are spawned via eventHandler. + c.stopping.Store(true) + if c.runCancel != nil { c.runCancel() } - // Wait for background goroutines (QR handler, reconnect) to finish so - // they don't reference the client/container after cleanup. - c.wg.Wait() - + // Disconnect the client first so any blocking Connect()/reconnect loops + // can be interrupted before we wait on the goroutines. c.mu.Lock() client := c.client container := c.container - c.client = nil - c.container = nil c.mu.Unlock() if client != nil { client.Disconnect() } + + // Wait for background goroutines (QR handler, reconnect) to finish in a + // context-aware way so Stop can be bounded by ctx. + done := make(chan struct{}) + go func() { + c.wg.Wait() + close(done) + }() + + select { + case <-done: + // All goroutines have finished. + case <-ctx.Done(): + // Context canceled or timed out; log and proceed with best-effort cleanup. + logger.WarnC("whatsapp", fmt.Sprintf("Stop context canceled before all goroutines finished: %v", ctx.Err())) + } + + // Now it is safe to clear and close resources. + c.mu.Lock() + c.client = nil + c.container = nil + c.mu.Unlock() + if container != nil { _ = container.Close() } @@ -211,6 +236,10 @@ func (c *WhatsAppNativeChannel) eventHandler(evt any) { c.handleIncoming(evt.(*events.Message)) case *events.Disconnected: logger.InfoCF("whatsapp", "WhatsApp disconnected, will attempt reconnection", nil) + // Prevent new goroutines once Stop() has begun. + if c.stopping.Load() { + return + } c.reconnectMu.Lock() if c.reconnecting { c.reconnectMu.Unlock() From fc28c2660a5f709b7f6c763a9d9b1328dd4acaf0 Mon Sep 17 00:00:00 2001 From: Hoshina Date: Sat, 28 Feb 2026 12:37:49 +0800 Subject: [PATCH 5/8] fix(whatsapp_native): close TOCTOU race between eventHandler and Stop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move the stopping check and wg.Add(1) inside reconnectMu in eventHandler, and set the stopping flag under the same lock in Stop(). This makes the two operations atomic with respect to each other, preventing the race where: 1. eventHandler checks stopping (false) 2. Stop() sets stopping=true and enters wg.Wait() (wg is 0) 3. eventHandler calls wg.Add(1) → panic or goroutine leak --- .../whatsapp_native/whatsapp_native.go | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/pkg/channels/whatsapp_native/whatsapp_native.go b/pkg/channels/whatsapp_native/whatsapp_native.go index c475276ba..748ee29d7 100644 --- a/pkg/channels/whatsapp_native/whatsapp_native.go +++ b/pkg/channels/whatsapp_native/whatsapp_native.go @@ -183,8 +183,14 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error { func (c *WhatsAppNativeChannel) Stop(ctx context.Context) error { logger.InfoC("whatsapp", "Stopping WhatsApp native channel") - // Mark as stopping so no new goroutines are spawned via eventHandler. + // Mark as stopping under reconnectMu so the flag is visible to + // eventHandler atomically with respect to its wg.Add(1) call. + // This closes the TOCTOU window where eventHandler could check + // stopping (false), then Stop sets it true + enters wg.Wait, + // then eventHandler calls wg.Add(1) — causing a panic. + c.reconnectMu.Lock() c.stopping.Store(true) + c.reconnectMu.Unlock() if c.runCancel != nil { c.runCancel() @@ -236,18 +242,21 @@ func (c *WhatsAppNativeChannel) eventHandler(evt any) { c.handleIncoming(evt.(*events.Message)) case *events.Disconnected: logger.InfoCF("whatsapp", "WhatsApp disconnected, will attempt reconnection", nil) - // Prevent new goroutines once Stop() has begun. - if c.stopping.Load() { - return - } c.reconnectMu.Lock() if c.reconnecting { c.reconnectMu.Unlock() return } + // Check stopping while holding the lock so the check and wg.Add + // are atomic with respect to Stop() setting the flag + calling + // wg.Wait(). This prevents the TOCTOU race. + if c.stopping.Load() { + c.reconnectMu.Unlock() + return + } c.reconnecting = true - c.reconnectMu.Unlock() c.wg.Add(1) + c.reconnectMu.Unlock() go func() { defer c.wg.Done() c.reconnectWithBackoff() From d1b10a0004e87d6f3857cbb22287b16786c49279 Mon Sep 17 00:00:00 2001 From: Hoshina Date: Sat, 28 Feb 2026 12:54:05 +0800 Subject: [PATCH 6/8] fix(whatsapp_native,agent): address second round of review feedback - WhatsApp Send(): detect unpaired state (Store.ID == nil) and return ErrTemporary instead of attempting to send while QR login is pending - handleReasoning: check the returned error type (DeadlineExceeded / Canceled) instead of ctx.Err() to decide log level, so pubCtx timeouts on a full bus are correctly classified as expected - Test: fill bus with a short-timeout loop instead of hardcoding the buffer size (64), making the test resilient to buffer size changes --- pkg/agent/loop.go | 13 ++++++++----- pkg/agent/loop_test.go | 14 +++++++++----- pkg/channels/whatsapp_native/whatsapp_native.go | 6 ++++++ 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 53f411452..ba0254076 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -9,6 +9,7 @@ package agent import ( "context" "encoding/json" + "errors" "fmt" "path/filepath" "strings" @@ -585,15 +586,17 @@ func (al *AgentLoop) handleReasoning(ctx context.Context, reasoningContent, chan ChatID: channelID, Content: reasoningContent, }); err != nil { - // Only log unexpected errors; context deadline/cancel are expected when - // the bus is full under load and would be too noisy to warn about. - if ctx.Err() == nil { - logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{ + // Treat context.DeadlineExceeded / context.Canceled as expected + // (bus full under load, or parent cancelled). Check the error + // itself rather than ctx.Err(), because pubCtx may time out + // (5 s) while the parent ctx is still active. + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + logger.DebugCF("agent", "Reasoning publish skipped (timeout/cancel)", map[string]any{ "channel": channelName, "error": err.Error(), }) } else { - logger.DebugCF("agent", "Reasoning publish skipped (context done)", map[string]any{ + logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{ "channel": channelName, "error": err.Error(), }) diff --git a/pkg/agent/loop_test.go b/pkg/agent/loop_test.go index 29293477e..801b6a46e 100644 --- a/pkg/agent/loop_test.go +++ b/pkg/agent/loop_test.go @@ -801,16 +801,20 @@ func TestHandleReasoning(t *testing.T) { t.Run("returns promptly when bus is full", func(t *testing.T) { al, msgBus := newLoop(t) - // Fill the outbound bus buffer (default size is 64) so that the - // next PublishOutbound will block until the context deadline. - for i := 0; i < 64; i++ { - err := msgBus.PublishOutbound(context.Background(), bus.OutboundMessage{ + // Fill the outbound bus buffer until a publish would block. + // Use a short timeout to detect when the buffer is full, + // rather than hardcoding the buffer size. + for i := 0; ; i++ { + fillCtx, fillCancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + err := msgBus.PublishOutbound(fillCtx, bus.OutboundMessage{ Channel: "filler", ChatID: "filler", Content: fmt.Sprintf("filler-%d", i), }) + fillCancel() if err != nil { - t.Fatalf("failed to fill bus: %v", err) + // Buffer is full (timed out trying to send). + break } } diff --git a/pkg/channels/whatsapp_native/whatsapp_native.go b/pkg/channels/whatsapp_native/whatsapp_native.go index 748ee29d7..6b8ddbffd 100644 --- a/pkg/channels/whatsapp_native/whatsapp_native.go +++ b/pkg/channels/whatsapp_native/whatsapp_native.go @@ -384,6 +384,12 @@ func (c *WhatsAppNativeChannel) Send(ctx context.Context, msg bus.OutboundMessag return fmt.Errorf("whatsapp connection not established: %w", channels.ErrTemporary) } + // Detect unpaired state: the client is connected (to WhatsApp servers) + // but has not completed QR-login yet, so sending would fail. + if client.Store.ID == nil { + return fmt.Errorf("whatsapp not yet paired (QR login pending): %w", channels.ErrTemporary) + } + to, err := parseJID(msg.ChatID) if err != nil { return fmt.Errorf("invalid chat id %q: %w", msg.ChatID, err) From 7f425f1d11fef11607c9abbf2469f9bdbe8bf546 Mon Sep 17 00:00:00 2001 From: Hoshina Date: Sat, 28 Feb 2026 13:00:21 +0800 Subject: [PATCH 7/8] fix(agent): correct misspelling of 'canceled' --- pkg/agent/loop.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index ba0254076..6d581465e 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -587,7 +587,7 @@ func (al *AgentLoop) handleReasoning(ctx context.Context, reasoningContent, chan Content: reasoningContent, }); err != nil { // Treat context.DeadlineExceeded / context.Canceled as expected - // (bus full under load, or parent cancelled). Check the error + // (bus full under load, or parent canceled). Check the error // itself rather than ctx.Err(), because pubCtx may time out // (5 s) while the parent ctx is still active. if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { From 871b2d7342bfcfec07c0723ad6614c23ff24c24c Mon Sep 17 00:00:00 2001 From: Hoshina Date: Sat, 28 Feb 2026 14:13:24 +0800 Subject: [PATCH 8/8] fix(whatsapp_native,agent): fixes for resource leak and log noise - WhatsApp Start(): use deferred cleanup to nil out c.client/c.container and disconnect/close resources on any error after struct fields are assigned, preventing stale references and double-close in Stop() - handleReasoning: treat bus.ErrBusClosed as an expected condition (DEBUG level) alongside context timeout/cancel, avoiding WARN noise during normal shutdown --- pkg/agent/loop.go | 5 ++- .../whatsapp_native/whatsapp_native.go | 42 ++++++++++++++++--- 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 6d581465e..eae5bdc15 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -590,7 +590,10 @@ func (al *AgentLoop) handleReasoning(ctx context.Context, reasoningContent, chan // (bus full under load, or parent canceled). Check the error // itself rather than ctx.Err(), because pubCtx may time out // (5 s) while the parent ctx is still active. - if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + // Also treat ErrBusClosed as expected — it occurs during normal + // shutdown when the bus is closed before all goroutines finish. + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) || + errors.Is(err, bus.ErrBusClosed) { logger.DebugCF("agent", "Reasoning publish skipped (timeout/cancel)", map[string]any{ "channel": channelName, "error": err.Error(), diff --git a/pkg/channels/whatsapp_native/whatsapp_native.go b/pkg/channels/whatsapp_native/whatsapp_native.go index 6b8ddbffd..188a7c8fa 100644 --- a/pkg/channels/whatsapp_native/whatsapp_native.go +++ b/pkg/channels/whatsapp_native/whatsapp_native.go @@ -83,6 +83,14 @@ func NewWhatsAppNativeChannel( func (c *WhatsAppNativeChannel) Start(ctx context.Context) error { logger.InfoCF("whatsapp", "Starting WhatsApp native channel (whatsmeow)", map[string]any{"store": c.storePath}) + // Reset lifecycle state from any previous Stop() so a restarted channel + // behaves correctly. Use reconnectMu to be consistent with eventHandler + // and Stop() which coordinate under the same lock. + c.reconnectMu.Lock() + c.stopping.Store(false) + c.reconnecting = false + c.reconnectMu.Unlock() + if err := os.MkdirAll(c.storePath, 0o700); err != nil { return fmt.Errorf("create session store dir: %w", err) } @@ -128,22 +136,45 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error { c.client = client c.mu.Unlock() + // cleanupOnError clears struct references and releases resources when + // Start() fails after fields are already assigned. This prevents + // Stop() from operating on stale references (double-close, disconnect + // of a partially-initialized client, or stray event handler callbacks). + startOK := false + defer func() { + if startOK { + return + } + c.runCancel() + client.Disconnect() + c.mu.Lock() + c.client = nil + c.container = nil + c.mu.Unlock() + _ = container.Close() + }() + if client.Store.ID == nil { qrChan, err := client.GetQRChannel(c.runCtx) if err != nil { - c.runCancel() - _ = container.Close() return fmt.Errorf("get QR channel: %w", err) } if err := client.Connect(); err != nil { - c.runCancel() - _ = container.Close() return fmt.Errorf("connect: %w", err) } // Handle QR events in a background goroutine so Start() returns // promptly. The goroutine is tracked via c.wg and respects // c.runCtx for cancellation. + // Guard wg.Add with reconnectMu + stopping check (same protocol + // as eventHandler) so a concurrent Stop() cannot enter wg.Wait() + // while we call wg.Add(1). + c.reconnectMu.Lock() + if c.stopping.Load() { + c.reconnectMu.Unlock() + return fmt.Errorf("channel stopped during QR setup") + } c.wg.Add(1) + c.reconnectMu.Unlock() go func() { defer c.wg.Done() for { @@ -169,12 +200,11 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error { }() } else { if err := client.Connect(); err != nil { - c.runCancel() - _ = container.Close() return fmt.Errorf("connect: %w", err) } } + startOK = true c.SetRunning(true) logger.InfoC("whatsapp", "WhatsApp native channel connected") return nil