Files
picoclaw/pkg/bus/bus.go
T
Hoshina 72e897f95a fix(channels): fix memory hazards in channel abstraction layer
Address 7 memory/architecture issues affecting long-running gateway
processes on embedded devices (<10MB RAM):

- Fix dispatcher busy-wait: remove select+default pattern that caused
  CPU spin after context cancellation; SubscribeOutbound handles ctx
  internally
- Add TTL janitor for typingStops/placeholders sync.Map entries to
  prevent unbounded accumulation when outbound paths fail
- Reduce queue buffers from 100 to 16 slots (~84% memory reduction)
- Optimize SplitMessage with index-based rune operations to reduce
  intermediate string/rune allocations
- Replace uuid.New() with atomic counter + random prefix for media
  scope IDs (eliminates per-call crypto/rand syscall)
- Lazy channel worker creation: defer goroutine+buffer allocation
  until channel.Start() succeeds
2026-02-24 22:30:22 +08:00

149 lines
3.0 KiB
Go

package bus
import (
"context"
"errors"
"sync/atomic"
"github.com/sipeed/picoclaw/pkg/logger"
)
// ErrBusClosed is returned when publishing to a closed MessageBus.
var ErrBusClosed = errors.New("message bus closed")
const defaultBusBufferSize = 16
type MessageBus struct {
inbound chan InboundMessage
outbound chan OutboundMessage
outboundMedia chan OutboundMediaMessage
done chan struct{}
closed atomic.Bool
}
func NewMessageBus() *MessageBus {
return &MessageBus{
inbound: make(chan InboundMessage, defaultBusBufferSize),
outbound: make(chan OutboundMessage, defaultBusBufferSize),
outboundMedia: make(chan OutboundMediaMessage, defaultBusBufferSize),
done: make(chan struct{}),
}
}
func (mb *MessageBus) PublishInbound(ctx context.Context, msg InboundMessage) error {
if mb.closed.Load() {
return ErrBusClosed
}
select {
case mb.inbound <- msg:
return nil
case <-mb.done:
return ErrBusClosed
case <-ctx.Done():
return ctx.Err()
}
}
func (mb *MessageBus) ConsumeInbound(ctx context.Context) (InboundMessage, bool) {
select {
case msg, ok := <-mb.inbound:
return msg, ok
case <-mb.done:
return InboundMessage{}, false
case <-ctx.Done():
return InboundMessage{}, false
}
}
func (mb *MessageBus) PublishOutbound(ctx context.Context, msg OutboundMessage) error {
if mb.closed.Load() {
return ErrBusClosed
}
select {
case mb.outbound <- msg:
return nil
case <-mb.done:
return ErrBusClosed
case <-ctx.Done():
return ctx.Err()
}
}
func (mb *MessageBus) SubscribeOutbound(ctx context.Context) (OutboundMessage, bool) {
select {
case msg, ok := <-mb.outbound:
return msg, ok
case <-mb.done:
return OutboundMessage{}, false
case <-ctx.Done():
return OutboundMessage{}, false
}
}
func (mb *MessageBus) PublishOutboundMedia(ctx context.Context, msg OutboundMediaMessage) error {
if mb.closed.Load() {
return ErrBusClosed
}
select {
case mb.outboundMedia <- msg:
return nil
case <-mb.done:
return ErrBusClosed
case <-ctx.Done():
return ctx.Err()
}
}
func (mb *MessageBus) SubscribeOutboundMedia(ctx context.Context) (OutboundMediaMessage, bool) {
select {
case msg, ok := <-mb.outboundMedia:
return msg, ok
case <-mb.done:
return OutboundMediaMessage{}, false
case <-ctx.Done():
return OutboundMediaMessage{}, false
}
}
func (mb *MessageBus) Close() {
if mb.closed.CompareAndSwap(false, true) {
close(mb.done)
// Drain buffered channels so messages aren't silently lost.
// Channels are NOT closed to avoid send-on-closed panics from concurrent publishers.
drained := 0
for {
select {
case <-mb.inbound:
drained++
default:
goto doneInbound
}
}
doneInbound:
for {
select {
case <-mb.outbound:
drained++
default:
goto doneOutbound
}
}
doneOutbound:
for {
select {
case <-mb.outboundMedia:
drained++
default:
goto doneMedia
}
}
doneMedia:
if drained > 0 {
logger.DebugCF("bus", "Drained buffered messages during close", map[string]any{
"count": drained,
})
}
}
}