fix(whatsapp_native,agent): fixes for resource leak and log noise

- WhatsApp Start(): use deferred cleanup to nil out c.client/c.container
  and disconnect/close resources on any error after struct fields are
  assigned, preventing stale references and double-close in Stop()
- handleReasoning: treat bus.ErrBusClosed as an expected condition
  (DEBUG level) alongside context timeout/cancel, avoiding WARN noise
  during normal shutdown
This commit is contained in:
Hoshina
2026-02-28 14:13:24 +08:00
parent 7f425f1d11
commit 871b2d7342
2 changed files with 40 additions and 7 deletions
+4 -1
View File
@@ -590,7 +590,10 @@ func (al *AgentLoop) handleReasoning(ctx context.Context, reasoningContent, chan
// (bus full under load, or parent canceled). Check the error
// itself rather than ctx.Err(), because pubCtx may time out
// (5 s) while the parent ctx is still active.
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
// Also treat ErrBusClosed as expected — it occurs during normal
// shutdown when the bus is closed before all goroutines finish.
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) ||
errors.Is(err, bus.ErrBusClosed) {
logger.DebugCF("agent", "Reasoning publish skipped (timeout/cancel)", map[string]any{
"channel": channelName,
"error": err.Error(),
@@ -83,6 +83,14 @@ func NewWhatsAppNativeChannel(
func (c *WhatsAppNativeChannel) Start(ctx context.Context) error {
logger.InfoCF("whatsapp", "Starting WhatsApp native channel (whatsmeow)", map[string]any{"store": c.storePath})
// Reset lifecycle state from any previous Stop() so a restarted channel
// behaves correctly. Use reconnectMu to be consistent with eventHandler
// and Stop() which coordinate under the same lock.
c.reconnectMu.Lock()
c.stopping.Store(false)
c.reconnecting = false
c.reconnectMu.Unlock()
if err := os.MkdirAll(c.storePath, 0o700); err != nil {
return fmt.Errorf("create session store dir: %w", err)
}
@@ -128,22 +136,45 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error {
c.client = client
c.mu.Unlock()
// cleanupOnError clears struct references and releases resources when
// Start() fails after fields are already assigned. This prevents
// Stop() from operating on stale references (double-close, disconnect
// of a partially-initialized client, or stray event handler callbacks).
startOK := false
defer func() {
if startOK {
return
}
c.runCancel()
client.Disconnect()
c.mu.Lock()
c.client = nil
c.container = nil
c.mu.Unlock()
_ = container.Close()
}()
if client.Store.ID == nil {
qrChan, err := client.GetQRChannel(c.runCtx)
if err != nil {
c.runCancel()
_ = container.Close()
return fmt.Errorf("get QR channel: %w", err)
}
if err := client.Connect(); err != nil {
c.runCancel()
_ = container.Close()
return fmt.Errorf("connect: %w", err)
}
// Handle QR events in a background goroutine so Start() returns
// promptly. The goroutine is tracked via c.wg and respects
// c.runCtx for cancellation.
// Guard wg.Add with reconnectMu + stopping check (same protocol
// as eventHandler) so a concurrent Stop() cannot enter wg.Wait()
// while we call wg.Add(1).
c.reconnectMu.Lock()
if c.stopping.Load() {
c.reconnectMu.Unlock()
return fmt.Errorf("channel stopped during QR setup")
}
c.wg.Add(1)
c.reconnectMu.Unlock()
go func() {
defer c.wg.Done()
for {
@@ -169,12 +200,11 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error {
}()
} else {
if err := client.Connect(); err != nil {
c.runCancel()
_ = container.Close()
return fmt.Errorf("connect: %w", err)
}
}
startOK = true
c.SetRunning(true)
logger.InfoC("whatsapp", "WhatsApp native channel connected")
return nil