From 6852f240251a3d410d89d1b352acf2666dae3d04 Mon Sep 17 00:00:00 2001 From: Hoshina Date: Mon, 23 Feb 2026 21:34:37 +0800 Subject: [PATCH] fix: address PR #662 review comments (bus drain, context timeouts, onebot leak) - Drain buffered messages in MessageBus.Close() so they aren't silently lost - Replace all context.TODO() with context.WithTimeout(5s) across 7 call sites - Fix OneBot pending channel leak: send nil sentinel in Stop() and handle nil response in sendAPIRequest() to unblock waiting goroutines --- pkg/agent/loop.go | 5 +++-- pkg/bus/bus.go | 38 +++++++++++++++++++++++++++++++++++ pkg/channels/onebot/onebot.go | 9 ++++++++- pkg/devices/service.go | 5 ++++- pkg/heartbeat/service.go | 4 +++- pkg/tools/cron.go | 8 ++++++-- pkg/tools/subagent.go | 4 +++- 7 files changed, 65 insertions(+), 8 deletions(-) diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index e88136343..7a4e9077f 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -122,12 +122,13 @@ func registerSharedTools( // Message tool messageTool := tools.NewMessageTool() messageTool.SetSendCallback(func(channel, chatID, content string) error { - msgBus.PublishOutbound(context.TODO(), bus.OutboundMessage{ + pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer pubCancel() + return msgBus.PublishOutbound(pubCtx, bus.OutboundMessage{ Channel: channel, ChatID: chatID, Content: content, }) - return nil }) agent.Tools.Register(messageTool) diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 6a1c987b7..d2b6838c5 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -4,6 +4,8 @@ import ( "context" "errors" "sync/atomic" + + "github.com/sipeed/picoclaw/pkg/logger" ) // ErrBusClosed is returned when publishing to a closed MessageBus. @@ -104,5 +106,41 @@ func (mb *MessageBus) SubscribeOutboundMedia(ctx context.Context) (OutboundMedia func (mb *MessageBus) Close() { if mb.closed.CompareAndSwap(false, true) { close(mb.done) + + // Drain buffered channels so messages aren't silently lost. + // Channels are NOT closed to avoid send-on-closed panics from concurrent publishers. + drained := 0 + for { + select { + case <-mb.inbound: + drained++ + default: + goto doneInbound + } + } + doneInbound: + for { + select { + case <-mb.outbound: + drained++ + default: + goto doneOutbound + } + } + doneOutbound: + for { + select { + case <-mb.outboundMedia: + drained++ + default: + goto doneMedia + } + } + doneMedia: + if drained > 0 { + logger.DebugCF("bus", "Drained buffered messages during close", map[string]any{ + "count": drained, + }) + } } } diff --git a/pkg/channels/onebot/onebot.go b/pkg/channels/onebot/onebot.go index a748acaa0..feb198d7d 100644 --- a/pkg/channels/onebot/onebot.go +++ b/pkg/channels/onebot/onebot.go @@ -306,6 +306,9 @@ func (c *OneBotChannel) sendAPIRequest(action string, params any, timeout time.D select { case resp := <-ch: + if resp == nil { + return nil, fmt.Errorf("API request %s: channel stopped", action) + } return resp, nil case <-time.After(timeout): return nil, fmt.Errorf("API request %s timed out after %v", action, timeout) @@ -353,7 +356,11 @@ func (c *OneBotChannel) Stop(ctx context.Context) error { } c.pendingMu.Lock() - for echo := range c.pending { + for echo, ch := range c.pending { + select { + case ch <- nil: // non-blocking wake for blocked sendAPIRequest goroutines + default: + } delete(c.pending, echo) } c.pendingMu.Unlock() diff --git a/pkg/devices/service.go b/pkg/devices/service.go index 408e1c8aa..1bafe6085 100644 --- a/pkg/devices/service.go +++ b/pkg/devices/service.go @@ -4,6 +4,7 @@ import ( "context" "strings" "sync" + "time" "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/constants" @@ -127,7 +128,9 @@ func (s *Service) sendNotification(ev *events.DeviceEvent) { } msg := ev.FormatMessage() - msgBus.PublishOutbound(context.TODO(), bus.OutboundMessage{ + pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer pubCancel() + msgBus.PublishOutbound(pubCtx, bus.OutboundMessage{ Channel: platform, ChatID: userID, Content: msg, diff --git a/pkg/heartbeat/service.go b/pkg/heartbeat/service.go index 3e58dbc7a..ce14ed77c 100644 --- a/pkg/heartbeat/service.go +++ b/pkg/heartbeat/service.go @@ -308,7 +308,9 @@ func (hs *HeartbeatService) sendResponse(response string) { return } - msgBus.PublishOutbound(context.TODO(), bus.OutboundMessage{ + pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer pubCancel() + msgBus.PublishOutbound(pubCtx, bus.OutboundMessage{ Channel: platform, ChatID: userID, Content: response, diff --git a/pkg/tools/cron.go b/pkg/tools/cron.go index 3c13f5968..52f914622 100644 --- a/pkg/tools/cron.go +++ b/pkg/tools/cron.go @@ -294,7 +294,9 @@ func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string { output = fmt.Sprintf("Scheduled command '%s' executed:\n%s", job.Payload.Command, result.ForLLM) } - t.msgBus.PublishOutbound(context.TODO(), bus.OutboundMessage{ + pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer pubCancel() + t.msgBus.PublishOutbound(pubCtx, bus.OutboundMessage{ Channel: channel, ChatID: chatID, Content: output, @@ -304,7 +306,9 @@ func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string { // If deliver=true, send message directly without agent processing if job.Payload.Deliver { - t.msgBus.PublishOutbound(context.TODO(), bus.OutboundMessage{ + pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer pubCancel() + t.msgBus.PublishOutbound(pubCtx, bus.OutboundMessage{ Channel: channel, ChatID: chatID, Content: job.Payload.Message, diff --git a/pkg/tools/subagent.go b/pkg/tools/subagent.go index 081a02872..69f1a49a2 100644 --- a/pkg/tools/subagent.go +++ b/pkg/tools/subagent.go @@ -218,7 +218,9 @@ After completing the task, provide a clear summary of what was done.` // Send announce message back to main agent if sm.bus != nil { announceContent := fmt.Sprintf("Task '%s' completed.\n\nResult:\n%s", task.Label, task.Result) - sm.bus.PublishInbound(context.TODO(), bus.InboundMessage{ + pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer pubCancel() + sm.bus.PublishInbound(pubCtx, bus.InboundMessage{ Channel: "system", SenderID: fmt.Sprintf("subagent:%s", task.ID), // Format: "original_channel:original_chat_id" for routing back