fix: address PR #662 review comments (bus drain, context timeouts, onebot leak)

- Drain buffered messages in MessageBus.Close() so they aren't silently lost
- Replace all context.TODO() with context.WithTimeout(5s) across 7 call sites
- Fix OneBot pending channel leak: send nil sentinel in Stop() and handle
  nil response in sendAPIRequest() to unblock waiting goroutines
This commit is contained in:
Hoshina
2026-02-23 21:34:37 +08:00
parent 26bee0b791
commit 6852f24025
7 changed files with 65 additions and 8 deletions
+3 -2
View File
@@ -122,12 +122,13 @@ func registerSharedTools(
// Message tool
messageTool := tools.NewMessageTool()
messageTool.SetSendCallback(func(channel, chatID, content string) error {
msgBus.PublishOutbound(context.TODO(), bus.OutboundMessage{
pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer pubCancel()
return msgBus.PublishOutbound(pubCtx, bus.OutboundMessage{
Channel: channel,
ChatID: chatID,
Content: content,
})
return nil
})
agent.Tools.Register(messageTool)
+38
View File
@@ -4,6 +4,8 @@ import (
"context"
"errors"
"sync/atomic"
"github.com/sipeed/picoclaw/pkg/logger"
)
// ErrBusClosed is returned when publishing to a closed MessageBus.
@@ -104,5 +106,41 @@ func (mb *MessageBus) SubscribeOutboundMedia(ctx context.Context) (OutboundMedia
func (mb *MessageBus) Close() {
if mb.closed.CompareAndSwap(false, true) {
close(mb.done)
// Drain buffered channels so messages aren't silently lost.
// Channels are NOT closed to avoid send-on-closed panics from concurrent publishers.
drained := 0
for {
select {
case <-mb.inbound:
drained++
default:
goto doneInbound
}
}
doneInbound:
for {
select {
case <-mb.outbound:
drained++
default:
goto doneOutbound
}
}
doneOutbound:
for {
select {
case <-mb.outboundMedia:
drained++
default:
goto doneMedia
}
}
doneMedia:
if drained > 0 {
logger.DebugCF("bus", "Drained buffered messages during close", map[string]any{
"count": drained,
})
}
}
}
+8 -1
View File
@@ -306,6 +306,9 @@ func (c *OneBotChannel) sendAPIRequest(action string, params any, timeout time.D
select {
case resp := <-ch:
if resp == nil {
return nil, fmt.Errorf("API request %s: channel stopped", action)
}
return resp, nil
case <-time.After(timeout):
return nil, fmt.Errorf("API request %s timed out after %v", action, timeout)
@@ -353,7 +356,11 @@ func (c *OneBotChannel) Stop(ctx context.Context) error {
}
c.pendingMu.Lock()
for echo := range c.pending {
for echo, ch := range c.pending {
select {
case ch <- nil: // non-blocking wake for blocked sendAPIRequest goroutines
default:
}
delete(c.pending, echo)
}
c.pendingMu.Unlock()
+4 -1
View File
@@ -4,6 +4,7 @@ import (
"context"
"strings"
"sync"
"time"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/constants"
@@ -127,7 +128,9 @@ func (s *Service) sendNotification(ev *events.DeviceEvent) {
}
msg := ev.FormatMessage()
msgBus.PublishOutbound(context.TODO(), bus.OutboundMessage{
pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer pubCancel()
msgBus.PublishOutbound(pubCtx, bus.OutboundMessage{
Channel: platform,
ChatID: userID,
Content: msg,
+3 -1
View File
@@ -308,7 +308,9 @@ func (hs *HeartbeatService) sendResponse(response string) {
return
}
msgBus.PublishOutbound(context.TODO(), bus.OutboundMessage{
pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer pubCancel()
msgBus.PublishOutbound(pubCtx, bus.OutboundMessage{
Channel: platform,
ChatID: userID,
Content: response,
+6 -2
View File
@@ -294,7 +294,9 @@ func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string {
output = fmt.Sprintf("Scheduled command '%s' executed:\n%s", job.Payload.Command, result.ForLLM)
}
t.msgBus.PublishOutbound(context.TODO(), bus.OutboundMessage{
pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer pubCancel()
t.msgBus.PublishOutbound(pubCtx, bus.OutboundMessage{
Channel: channel,
ChatID: chatID,
Content: output,
@@ -304,7 +306,9 @@ func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string {
// If deliver=true, send message directly without agent processing
if job.Payload.Deliver {
t.msgBus.PublishOutbound(context.TODO(), bus.OutboundMessage{
pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer pubCancel()
t.msgBus.PublishOutbound(pubCtx, bus.OutboundMessage{
Channel: channel,
ChatID: chatID,
Content: job.Payload.Message,
+3 -1
View File
@@ -218,7 +218,9 @@ After completing the task, provide a clear summary of what was done.`
// Send announce message back to main agent
if sm.bus != nil {
announceContent := fmt.Sprintf("Task '%s' completed.\n\nResult:\n%s", task.Label, task.Result)
sm.bus.PublishInbound(context.TODO(), bus.InboundMessage{
pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer pubCancel()
sm.bus.PublishInbound(pubCtx, bus.InboundMessage{
Channel: "system",
SenderID: fmt.Sprintf("subagent:%s", task.ID),
// Format: "original_channel:original_chat_id" for routing back