mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
Merge pull request #884 from alexhoshina/fix/memory-leak-whatsapp-reasoning
Fix/memory leak whatsapp reasoning
This commit is contained in:
+27
-2
@@ -575,11 +575,36 @@ func (al *AgentLoop) handleReasoning(ctx context.Context, reasoningContent, chan
|
||||
return
|
||||
}
|
||||
|
||||
al.bus.PublishOutbound(ctx, bus.OutboundMessage{
|
||||
// Use a short timeout so the goroutine does not block indefinitely when
|
||||
// the outbound bus is full. Reasoning output is best-effort; dropping it
|
||||
// is acceptable to avoid goroutine accumulation.
|
||||
pubCtx, pubCancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer pubCancel()
|
||||
|
||||
if err := al.bus.PublishOutbound(pubCtx, bus.OutboundMessage{
|
||||
Channel: channelName,
|
||||
ChatID: channelID,
|
||||
Content: reasoningContent,
|
||||
})
|
||||
}); err != nil {
|
||||
// Treat context.DeadlineExceeded / context.Canceled as expected
|
||||
// (bus full under load, or parent canceled). Check the error
|
||||
// itself rather than ctx.Err(), because pubCtx may time out
|
||||
// (5 s) while the parent ctx is still active.
|
||||
// Also treat ErrBusClosed as expected — it occurs during normal
|
||||
// shutdown when the bus is closed before all goroutines finish.
|
||||
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) ||
|
||||
errors.Is(err, bus.ErrBusClosed) {
|
||||
logger.DebugCF("agent", "Reasoning publish skipped (timeout/cancel)", map[string]any{
|
||||
"channel": channelName,
|
||||
"error": err.Error(),
|
||||
})
|
||||
} else {
|
||||
logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{
|
||||
"channel": channelName,
|
||||
"error": err.Error(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// runLLMIteration executes the LLM call loop with tool handling.
|
||||
|
||||
@@ -797,4 +797,57 @@ func TestHandleReasoning(t *testing.T) {
|
||||
t.Fatalf("expected no outbound message, got %+v", msg)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("returns promptly when bus is full", func(t *testing.T) {
|
||||
al, msgBus := newLoop(t)
|
||||
|
||||
// Fill the outbound bus buffer until a publish would block.
|
||||
// Use a short timeout to detect when the buffer is full,
|
||||
// rather than hardcoding the buffer size.
|
||||
for i := 0; ; i++ {
|
||||
fillCtx, fillCancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
|
||||
err := msgBus.PublishOutbound(fillCtx, bus.OutboundMessage{
|
||||
Channel: "filler",
|
||||
ChatID: "filler",
|
||||
Content: fmt.Sprintf("filler-%d", i),
|
||||
})
|
||||
fillCancel()
|
||||
if err != nil {
|
||||
// Buffer is full (timed out trying to send).
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Use a short-deadline parent context to bound the test.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
start := time.Now()
|
||||
al.handleReasoning(ctx, "should timeout", "slack", "channel-full")
|
||||
elapsed := time.Since(start)
|
||||
|
||||
// handleReasoning uses a 5s internal timeout, but the parent ctx
|
||||
// expires in 500ms. It should return within ~500ms, not 5s.
|
||||
if elapsed > 2*time.Second {
|
||||
t.Fatalf("handleReasoning blocked too long (%v); expected prompt return", elapsed)
|
||||
}
|
||||
|
||||
// Drain the bus and verify the reasoning message was NOT published
|
||||
// (it should have been dropped due to timeout).
|
||||
drainCtx, drainCancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||
defer drainCancel()
|
||||
foundReasoning := false
|
||||
for {
|
||||
msg, ok := msgBus.SubscribeOutbound(drainCtx)
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
if msg.Content == "should timeout" {
|
||||
foundReasoning = true
|
||||
}
|
||||
}
|
||||
if foundReasoning {
|
||||
t.Fatal("expected reasoning message to be dropped when bus is full, but it was published")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/mdp/qrterminal/v3"
|
||||
@@ -56,6 +57,8 @@ type WhatsAppNativeChannel struct {
|
||||
runCancel context.CancelFunc
|
||||
reconnectMu sync.Mutex
|
||||
reconnecting bool
|
||||
stopping atomic.Bool // set once Stop begins; prevents new wg.Add calls
|
||||
wg sync.WaitGroup // tracks background goroutines (QR handler, reconnect)
|
||||
}
|
||||
|
||||
// NewWhatsAppNativeChannel creates a WhatsApp channel that uses whatsmeow for connection.
|
||||
@@ -80,6 +83,14 @@ func NewWhatsAppNativeChannel(
|
||||
func (c *WhatsAppNativeChannel) Start(ctx context.Context) error {
|
||||
logger.InfoCF("whatsapp", "Starting WhatsApp native channel (whatsmeow)", map[string]any{"store": c.storePath})
|
||||
|
||||
// Reset lifecycle state from any previous Stop() so a restarted channel
|
||||
// behaves correctly. Use reconnectMu to be consistent with eventHandler
|
||||
// and Stop() which coordinate under the same lock.
|
||||
c.reconnectMu.Lock()
|
||||
c.stopping.Store(false)
|
||||
c.reconnecting = false
|
||||
c.reconnectMu.Unlock()
|
||||
|
||||
if err := os.MkdirAll(c.storePath, 0o700); err != nil {
|
||||
return fmt.Errorf("create session store dir: %w", err)
|
||||
}
|
||||
@@ -112,6 +123,12 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error {
|
||||
}
|
||||
|
||||
client := whatsmeow.NewClient(deviceStore, waLogger)
|
||||
|
||||
// Create runCtx/runCancel BEFORE registering event handler and starting
|
||||
// goroutines so that Stop() can cancel them at any time, including during
|
||||
// the QR-login flow.
|
||||
c.runCtx, c.runCancel = context.WithCancel(ctx)
|
||||
|
||||
client.AddEventHandler(c.eventHandler)
|
||||
|
||||
c.mu.Lock()
|
||||
@@ -119,36 +136,75 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error {
|
||||
c.client = client
|
||||
c.mu.Unlock()
|
||||
|
||||
// cleanupOnError clears struct references and releases resources when
|
||||
// Start() fails after fields are already assigned. This prevents
|
||||
// Stop() from operating on stale references (double-close, disconnect
|
||||
// of a partially-initialized client, or stray event handler callbacks).
|
||||
startOK := false
|
||||
defer func() {
|
||||
if startOK {
|
||||
return
|
||||
}
|
||||
c.runCancel()
|
||||
client.Disconnect()
|
||||
c.mu.Lock()
|
||||
c.client = nil
|
||||
c.container = nil
|
||||
c.mu.Unlock()
|
||||
_ = container.Close()
|
||||
}()
|
||||
|
||||
if client.Store.ID == nil {
|
||||
qrChan, err := client.GetQRChannel(ctx)
|
||||
qrChan, err := client.GetQRChannel(c.runCtx)
|
||||
if err != nil {
|
||||
_ = container.Close()
|
||||
return fmt.Errorf("get QR channel: %w", err)
|
||||
}
|
||||
if err := client.Connect(); err != nil {
|
||||
_ = container.Close()
|
||||
return fmt.Errorf("connect: %w", err)
|
||||
}
|
||||
for evt := range qrChan {
|
||||
if evt.Event == "code" {
|
||||
logger.InfoCF("whatsapp", "Scan this QR code with WhatsApp (Linked Devices):", nil)
|
||||
qrterminal.GenerateWithConfig(evt.Code, qrterminal.Config{
|
||||
Level: qrterminal.L,
|
||||
Writer: os.Stdout,
|
||||
HalfBlocks: true,
|
||||
})
|
||||
} else {
|
||||
logger.InfoCF("whatsapp", "WhatsApp login event", map[string]any{"event": evt.Event})
|
||||
}
|
||||
// Handle QR events in a background goroutine so Start() returns
|
||||
// promptly. The goroutine is tracked via c.wg and respects
|
||||
// c.runCtx for cancellation.
|
||||
// Guard wg.Add with reconnectMu + stopping check (same protocol
|
||||
// as eventHandler) so a concurrent Stop() cannot enter wg.Wait()
|
||||
// while we call wg.Add(1).
|
||||
c.reconnectMu.Lock()
|
||||
if c.stopping.Load() {
|
||||
c.reconnectMu.Unlock()
|
||||
return fmt.Errorf("channel stopped during QR setup")
|
||||
}
|
||||
c.wg.Add(1)
|
||||
c.reconnectMu.Unlock()
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-c.runCtx.Done():
|
||||
return
|
||||
case evt, ok := <-qrChan:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if evt.Event == "code" {
|
||||
logger.InfoCF("whatsapp", "Scan this QR code with WhatsApp (Linked Devices):", nil)
|
||||
qrterminal.GenerateWithConfig(evt.Code, qrterminal.Config{
|
||||
Level: qrterminal.L,
|
||||
Writer: os.Stdout,
|
||||
HalfBlocks: true,
|
||||
})
|
||||
} else {
|
||||
logger.InfoCF("whatsapp", "WhatsApp login event", map[string]any{"event": evt.Event})
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
if err := client.Connect(); err != nil {
|
||||
_ = container.Close()
|
||||
return fmt.Errorf("connect: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
c.runCtx, c.runCancel = context.WithCancel(ctx)
|
||||
startOK = true
|
||||
c.SetRunning(true)
|
||||
logger.InfoC("whatsapp", "WhatsApp native channel connected")
|
||||
return nil
|
||||
@@ -156,19 +212,53 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error {
|
||||
|
||||
func (c *WhatsAppNativeChannel) Stop(ctx context.Context) error {
|
||||
logger.InfoC("whatsapp", "Stopping WhatsApp native channel")
|
||||
|
||||
// Mark as stopping under reconnectMu so the flag is visible to
|
||||
// eventHandler atomically with respect to its wg.Add(1) call.
|
||||
// This closes the TOCTOU window where eventHandler could check
|
||||
// stopping (false), then Stop sets it true + enters wg.Wait,
|
||||
// then eventHandler calls wg.Add(1) — causing a panic.
|
||||
c.reconnectMu.Lock()
|
||||
c.stopping.Store(true)
|
||||
c.reconnectMu.Unlock()
|
||||
|
||||
if c.runCancel != nil {
|
||||
c.runCancel()
|
||||
}
|
||||
|
||||
// Disconnect the client first so any blocking Connect()/reconnect loops
|
||||
// can be interrupted before we wait on the goroutines.
|
||||
c.mu.Lock()
|
||||
client := c.client
|
||||
container := c.container
|
||||
c.client = nil
|
||||
c.container = nil
|
||||
c.mu.Unlock()
|
||||
|
||||
if client != nil {
|
||||
client.Disconnect()
|
||||
}
|
||||
|
||||
// Wait for background goroutines (QR handler, reconnect) to finish in a
|
||||
// context-aware way so Stop can be bounded by ctx.
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
c.wg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
// All goroutines have finished.
|
||||
case <-ctx.Done():
|
||||
// Context canceled or timed out; log and proceed with best-effort cleanup.
|
||||
logger.WarnC("whatsapp", fmt.Sprintf("Stop context canceled before all goroutines finished: %v", ctx.Err()))
|
||||
}
|
||||
|
||||
// Now it is safe to clear and close resources.
|
||||
c.mu.Lock()
|
||||
c.client = nil
|
||||
c.container = nil
|
||||
c.mu.Unlock()
|
||||
|
||||
if container != nil {
|
||||
_ = container.Close()
|
||||
}
|
||||
@@ -187,9 +277,20 @@ func (c *WhatsAppNativeChannel) eventHandler(evt any) {
|
||||
c.reconnectMu.Unlock()
|
||||
return
|
||||
}
|
||||
// Check stopping while holding the lock so the check and wg.Add
|
||||
// are atomic with respect to Stop() setting the flag + calling
|
||||
// wg.Wait(). This prevents the TOCTOU race.
|
||||
if c.stopping.Load() {
|
||||
c.reconnectMu.Unlock()
|
||||
return
|
||||
}
|
||||
c.reconnecting = true
|
||||
c.wg.Add(1)
|
||||
c.reconnectMu.Unlock()
|
||||
go c.reconnectWithBackoff()
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
c.reconnectWithBackoff()
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -313,6 +414,12 @@ func (c *WhatsAppNativeChannel) Send(ctx context.Context, msg bus.OutboundMessag
|
||||
return fmt.Errorf("whatsapp connection not established: %w", channels.ErrTemporary)
|
||||
}
|
||||
|
||||
// Detect unpaired state: the client is connected (to WhatsApp servers)
|
||||
// but has not completed QR-login yet, so sending would fail.
|
||||
if client.Store.ID == nil {
|
||||
return fmt.Errorf("whatsapp not yet paired (QR login pending): %w", channels.ErrTemporary)
|
||||
}
|
||||
|
||||
to, err := parseJID(msg.ChatID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid chat id %q: %w", msg.ChatID, err)
|
||||
|
||||
Reference in New Issue
Block a user