Files
picoclaw/pkg/channels/manager.go
T
Hoshina 72e897f95a fix(channels): fix memory hazards in channel abstraction layer
Address 7 memory/architecture issues affecting long-running gateway
processes on embedded devices (<10MB RAM):

- Fix dispatcher busy-wait: remove select+default pattern that caused
  CPU spin after context cancellation; SubscribeOutbound handles ctx
  internally
- Add TTL janitor for typingStops/placeholders sync.Map entries to
  prevent unbounded accumulation when outbound paths fail
- Reduce queue buffers from 100 to 16 slots (~84% memory reduction)
- Optimize SplitMessage with index-based rune operations to reduce
  intermediate string/rune allocations
- Replace uuid.New() with atomic counter + random prefix for media
  scope IDs (eliminates per-call crypto/rand syscall)
- Lazy channel worker creation: defer goroutine+buffer allocation
  until channel.Start() succeeds
2026-02-24 22:30:22 +08:00

785 lines
20 KiB
Go

// PicoClaw - Ultra-lightweight personal AI agent
// Inspired by and based on nanobot: https://github.com/HKUDS/nanobot
// License: MIT
//
// Copyright (c) 2026 PicoClaw contributors
package channels
import (
"context"
"errors"
"fmt"
"math"
"net/http"
"sync"
"time"
"golang.org/x/time/rate"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/constants"
"github.com/sipeed/picoclaw/pkg/health"
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/media"
)
const (
defaultChannelQueueSize = 16
defaultRateLimit = 10 // default 10 msg/s
maxRetries = 3
rateLimitDelay = 1 * time.Second
baseBackoff = 500 * time.Millisecond
maxBackoff = 8 * time.Second
janitorInterval = 10 * time.Second
typingStopTTL = 5 * time.Minute
placeholderTTL = 10 * time.Minute
)
// typingEntry wraps a typing stop function with a creation timestamp for TTL eviction.
type typingEntry struct {
stop func()
createdAt time.Time
}
// placeholderEntry wraps a placeholder ID with a creation timestamp for TTL eviction.
type placeholderEntry struct {
id string
createdAt time.Time
}
// channelRateConfig maps channel name to per-second rate limit.
var channelRateConfig = map[string]float64{
"telegram": 20,
"discord": 1,
"slack": 1,
"line": 10,
}
type channelWorker struct {
ch Channel
queue chan bus.OutboundMessage
mediaQueue chan bus.OutboundMediaMessage
done chan struct{}
mediaDone chan struct{}
limiter *rate.Limiter
}
type Manager struct {
channels map[string]Channel
workers map[string]*channelWorker
bus *bus.MessageBus
config *config.Config
mediaStore media.MediaStore
dispatchTask *asyncTask
mux *http.ServeMux
httpServer *http.Server
mu sync.RWMutex
placeholders sync.Map // "channel:chatID" → placeholderID (string)
typingStops sync.Map // "channel:chatID" → func()
}
type asyncTask struct {
cancel context.CancelFunc
}
// RecordPlaceholder registers a placeholder message for later editing.
// Implements PlaceholderRecorder.
func (m *Manager) RecordPlaceholder(channel, chatID, placeholderID string) {
key := channel + ":" + chatID
m.placeholders.Store(key, placeholderEntry{id: placeholderID, createdAt: time.Now()})
}
// RecordTypingStop registers a typing stop function for later invocation.
// Implements PlaceholderRecorder.
func (m *Manager) RecordTypingStop(channel, chatID string, stop func()) {
key := channel + ":" + chatID
m.typingStops.Store(key, typingEntry{stop: stop, createdAt: time.Now()})
}
// preSend handles typing stop and placeholder editing before sending a message.
// Returns true if the message was edited into a placeholder (skip Send).
func (m *Manager) preSend(ctx context.Context, name string, msg bus.OutboundMessage, ch Channel) bool {
key := name + ":" + msg.ChatID
// 1. Stop typing
if v, loaded := m.typingStops.LoadAndDelete(key); loaded {
if entry, ok := v.(typingEntry); ok {
entry.stop() // idempotent, safe
}
}
// 2. Try editing placeholder
if v, loaded := m.placeholders.LoadAndDelete(key); loaded {
if entry, ok := v.(placeholderEntry); ok && entry.id != "" {
if editor, ok := ch.(MessageEditor); ok {
if err := editor.EditMessage(ctx, msg.ChatID, entry.id, msg.Content); err == nil {
return true // edited successfully, skip Send
}
// edit failed → fall through to normal Send
}
}
}
return false
}
func NewManager(cfg *config.Config, messageBus *bus.MessageBus, store media.MediaStore) (*Manager, error) {
m := &Manager{
channels: make(map[string]Channel),
workers: make(map[string]*channelWorker),
bus: messageBus,
config: cfg,
mediaStore: store,
}
if err := m.initChannels(); err != nil {
return nil, err
}
return m, nil
}
// initChannel is a helper that looks up a factory by name and creates the channel.
func (m *Manager) initChannel(name, displayName string) {
f, ok := getFactory(name)
if !ok {
logger.WarnCF("channels", "Factory not registered", map[string]any{
"channel": displayName,
})
return
}
logger.DebugCF("channels", "Attempting to initialize channel", map[string]any{
"channel": displayName,
})
ch, err := f(m.config, m.bus)
if err != nil {
logger.ErrorCF("channels", "Failed to initialize channel", map[string]any{
"channel": displayName,
"error": err.Error(),
})
} else {
// Inject MediaStore if channel supports it
if m.mediaStore != nil {
if setter, ok := ch.(interface{ SetMediaStore(s media.MediaStore) }); ok {
setter.SetMediaStore(m.mediaStore)
}
}
// Inject PlaceholderRecorder if channel supports it
if setter, ok := ch.(interface{ SetPlaceholderRecorder(r PlaceholderRecorder) }); ok {
setter.SetPlaceholderRecorder(m)
}
m.channels[name] = ch
logger.InfoCF("channels", "Channel enabled successfully", map[string]any{
"channel": displayName,
})
}
}
func (m *Manager) initChannels() error {
logger.InfoC("channels", "Initializing channel manager")
if m.config.Channels.Telegram.Enabled && m.config.Channels.Telegram.Token != "" {
m.initChannel("telegram", "Telegram")
}
if m.config.Channels.WhatsApp.Enabled && m.config.Channels.WhatsApp.BridgeURL != "" {
m.initChannel("whatsapp", "WhatsApp")
}
if m.config.Channels.Feishu.Enabled {
m.initChannel("feishu", "Feishu")
}
if m.config.Channels.Discord.Enabled && m.config.Channels.Discord.Token != "" {
m.initChannel("discord", "Discord")
}
if m.config.Channels.MaixCam.Enabled {
m.initChannel("maixcam", "MaixCam")
}
if m.config.Channels.QQ.Enabled {
m.initChannel("qq", "QQ")
}
if m.config.Channels.DingTalk.Enabled && m.config.Channels.DingTalk.ClientID != "" {
m.initChannel("dingtalk", "DingTalk")
}
if m.config.Channels.Slack.Enabled && m.config.Channels.Slack.BotToken != "" {
m.initChannel("slack", "Slack")
}
if m.config.Channels.LINE.Enabled && m.config.Channels.LINE.ChannelAccessToken != "" {
m.initChannel("line", "LINE")
}
if m.config.Channels.OneBot.Enabled && m.config.Channels.OneBot.WSUrl != "" {
m.initChannel("onebot", "OneBot")
}
if m.config.Channels.WeCom.Enabled && m.config.Channels.WeCom.Token != "" {
m.initChannel("wecom", "WeCom")
}
if m.config.Channels.WeComApp.Enabled && m.config.Channels.WeComApp.CorpID != "" {
m.initChannel("wecom_app", "WeCom App")
}
if m.config.Channels.Pico.Enabled && m.config.Channels.Pico.Token != "" {
m.initChannel("pico", "Pico")
}
logger.InfoCF("channels", "Channel initialization completed", map[string]any{
"enabled_channels": len(m.channels),
})
return nil
}
// SetupHTTPServer creates a shared HTTP server with the given listen address.
// It registers health endpoints from the health server and discovers channels
// that implement WebhookHandler and/or HealthChecker to register their handlers.
func (m *Manager) SetupHTTPServer(addr string, healthServer *health.Server) {
m.mux = http.NewServeMux()
// Register health endpoints
if healthServer != nil {
healthServer.RegisterOnMux(m.mux)
}
// Discover and register webhook handlers and health checkers
for name, ch := range m.channels {
if wh, ok := ch.(WebhookHandler); ok {
m.mux.Handle(wh.WebhookPath(), wh)
logger.InfoCF("channels", "Webhook handler registered", map[string]any{
"channel": name,
"path": wh.WebhookPath(),
})
}
if hc, ok := ch.(HealthChecker); ok {
m.mux.HandleFunc(hc.HealthPath(), hc.HealthHandler)
logger.InfoCF("channels", "Health endpoint registered", map[string]any{
"channel": name,
"path": hc.HealthPath(),
})
}
}
m.httpServer = &http.Server{
Addr: addr,
Handler: m.mux,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
}
}
func (m *Manager) StartAll(ctx context.Context) error {
m.mu.Lock()
defer m.mu.Unlock()
if len(m.channels) == 0 {
logger.WarnC("channels", "No channels enabled")
return nil
}
logger.InfoC("channels", "Starting all channels")
dispatchCtx, cancel := context.WithCancel(ctx)
m.dispatchTask = &asyncTask{cancel: cancel}
for name, channel := range m.channels {
logger.InfoCF("channels", "Starting channel", map[string]any{
"channel": name,
})
if err := channel.Start(ctx); err != nil {
logger.ErrorCF("channels", "Failed to start channel", map[string]any{
"channel": name,
"error": err.Error(),
})
continue
}
// Lazily create worker only after channel starts successfully
w := newChannelWorker(name, channel)
m.workers[name] = w
go m.runWorker(dispatchCtx, name, w)
go m.runMediaWorker(dispatchCtx, name, w)
}
// Start the dispatcher that reads from the bus and routes to workers
go m.dispatchOutbound(dispatchCtx)
go m.dispatchOutboundMedia(dispatchCtx)
// Start the TTL janitor that cleans up stale typing/placeholder entries
go m.runTTLJanitor(dispatchCtx)
// Start shared HTTP server if configured
if m.httpServer != nil {
go func() {
logger.InfoCF("channels", "Shared HTTP server listening", map[string]any{
"addr": m.httpServer.Addr,
})
if err := m.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.ErrorCF("channels", "Shared HTTP server error", map[string]any{
"error": err.Error(),
})
}
}()
}
logger.InfoC("channels", "All channels started")
return nil
}
func (m *Manager) StopAll(ctx context.Context) error {
m.mu.Lock()
defer m.mu.Unlock()
logger.InfoC("channels", "Stopping all channels")
// Shutdown shared HTTP server first
if m.httpServer != nil {
shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := m.httpServer.Shutdown(shutdownCtx); err != nil {
logger.ErrorCF("channels", "Shared HTTP server shutdown error", map[string]any{
"error": err.Error(),
})
}
m.httpServer = nil
}
// Cancel dispatcher
if m.dispatchTask != nil {
m.dispatchTask.cancel()
m.dispatchTask = nil
}
// Close all worker queues and wait for them to drain
for _, w := range m.workers {
if w != nil {
close(w.queue)
}
}
for _, w := range m.workers {
if w != nil {
<-w.done
}
}
// Close all media worker queues and wait for them to drain
for _, w := range m.workers {
if w != nil {
close(w.mediaQueue)
}
}
for _, w := range m.workers {
if w != nil {
<-w.mediaDone
}
}
// Stop all channels
for name, channel := range m.channels {
logger.InfoCF("channels", "Stopping channel", map[string]any{
"channel": name,
})
if err := channel.Stop(ctx); err != nil {
logger.ErrorCF("channels", "Error stopping channel", map[string]any{
"channel": name,
"error": err.Error(),
})
}
}
logger.InfoC("channels", "All channels stopped")
return nil
}
// newChannelWorker creates a channelWorker with a rate limiter configured
// for the given channel name.
func newChannelWorker(name string, ch Channel) *channelWorker {
rateVal := float64(defaultRateLimit)
if r, ok := channelRateConfig[name]; ok {
rateVal = r
}
burst := int(math.Max(1, math.Ceil(rateVal/2)))
return &channelWorker{
ch: ch,
queue: make(chan bus.OutboundMessage, defaultChannelQueueSize),
mediaQueue: make(chan bus.OutboundMediaMessage, defaultChannelQueueSize),
done: make(chan struct{}),
mediaDone: make(chan struct{}),
limiter: rate.NewLimiter(rate.Limit(rateVal), burst),
}
}
// runWorker processes outbound messages for a single channel, splitting
// messages that exceed the channel's maximum message length.
func (m *Manager) runWorker(ctx context.Context, name string, w *channelWorker) {
defer close(w.done)
for {
select {
case msg, ok := <-w.queue:
if !ok {
return
}
maxLen := 0
if mlp, ok := w.ch.(MessageLengthProvider); ok {
maxLen = mlp.MaxMessageLength()
}
if maxLen > 0 && len([]rune(msg.Content)) > maxLen {
chunks := SplitMessage(msg.Content, maxLen)
for _, chunk := range chunks {
chunkMsg := msg
chunkMsg.Content = chunk
m.sendWithRetry(ctx, name, w, chunkMsg)
}
} else {
m.sendWithRetry(ctx, name, w, msg)
}
case <-ctx.Done():
return
}
}
}
// sendWithRetry sends a message through the channel with rate limiting and
// retry logic. It classifies errors to determine the retry strategy:
// - ErrNotRunning / ErrSendFailed: permanent, no retry
// - ErrRateLimit: fixed delay retry
// - ErrTemporary / unknown: exponential backoff retry
func (m *Manager) sendWithRetry(ctx context.Context, name string, w *channelWorker, msg bus.OutboundMessage) {
// Rate limit: wait for token
if err := w.limiter.Wait(ctx); err != nil {
// ctx cancelled, shutting down
return
}
// Pre-send: stop typing and try to edit placeholder
if m.preSend(ctx, name, msg, w.ch) {
return // placeholder was edited successfully, skip Send
}
var lastErr error
for attempt := 0; attempt <= maxRetries; attempt++ {
lastErr = w.ch.Send(ctx, msg)
if lastErr == nil {
return
}
// Permanent failures — don't retry
if errors.Is(lastErr, ErrNotRunning) || errors.Is(lastErr, ErrSendFailed) {
break
}
// Last attempt exhausted — don't sleep
if attempt == maxRetries {
break
}
// Rate limit error — fixed delay
if errors.Is(lastErr, ErrRateLimit) {
select {
case <-time.After(rateLimitDelay):
continue
case <-ctx.Done():
return
}
}
// ErrTemporary or unknown error — exponential backoff
backoff := min(time.Duration(float64(baseBackoff)*math.Pow(2, float64(attempt))), maxBackoff)
select {
case <-time.After(backoff):
case <-ctx.Done():
return
}
}
// All retries exhausted or permanent failure
logger.ErrorCF("channels", "Send failed", map[string]any{
"channel": name,
"chat_id": msg.ChatID,
"error": lastErr.Error(),
"retries": maxRetries,
})
}
func (m *Manager) dispatchOutbound(ctx context.Context) {
logger.InfoC("channels", "Outbound dispatcher started")
for {
msg, ok := m.bus.SubscribeOutbound(ctx)
if !ok {
logger.InfoC("channels", "Outbound dispatcher stopped")
return
}
// Silently skip internal channels
if constants.IsInternalChannel(msg.Channel) {
continue
}
m.mu.RLock()
_, exists := m.channels[msg.Channel]
w, wExists := m.workers[msg.Channel]
m.mu.RUnlock()
if !exists {
logger.WarnCF("channels", "Unknown channel for outbound message", map[string]any{
"channel": msg.Channel,
})
continue
}
if wExists && w != nil {
select {
case w.queue <- msg:
case <-ctx.Done():
return
}
} else if exists {
logger.WarnCF("channels", "Channel has no active worker, skipping message", map[string]any{
"channel": msg.Channel,
})
}
}
}
func (m *Manager) dispatchOutboundMedia(ctx context.Context) {
logger.InfoC("channels", "Outbound media dispatcher started")
for {
msg, ok := m.bus.SubscribeOutboundMedia(ctx)
if !ok {
logger.InfoC("channels", "Outbound media dispatcher stopped")
return
}
// Silently skip internal channels
if constants.IsInternalChannel(msg.Channel) {
continue
}
m.mu.RLock()
_, exists := m.channels[msg.Channel]
w, wExists := m.workers[msg.Channel]
m.mu.RUnlock()
if !exists {
logger.WarnCF("channels", "Unknown channel for outbound media message", map[string]any{
"channel": msg.Channel,
})
continue
}
if wExists && w != nil {
select {
case w.mediaQueue <- msg:
case <-ctx.Done():
return
}
} else if exists {
logger.WarnCF("channels", "Channel has no active worker, skipping media message", map[string]any{
"channel": msg.Channel,
})
}
}
}
// runMediaWorker processes outbound media messages for a single channel.
func (m *Manager) runMediaWorker(ctx context.Context, name string, w *channelWorker) {
defer close(w.mediaDone)
for {
select {
case msg, ok := <-w.mediaQueue:
if !ok {
return
}
m.sendMediaWithRetry(ctx, name, w, msg)
case <-ctx.Done():
return
}
}
}
// sendMediaWithRetry sends a media message through the channel with rate limiting and
// retry logic. If the channel does not implement MediaSender, it silently skips.
func (m *Manager) sendMediaWithRetry(ctx context.Context, name string, w *channelWorker, msg bus.OutboundMediaMessage) {
ms, ok := w.ch.(MediaSender)
if !ok {
logger.DebugCF("channels", "Channel does not support MediaSender, skipping media", map[string]any{
"channel": name,
})
return
}
// Rate limit: wait for token
if err := w.limiter.Wait(ctx); err != nil {
return
}
var lastErr error
for attempt := 0; attempt <= maxRetries; attempt++ {
lastErr = ms.SendMedia(ctx, msg)
if lastErr == nil {
return
}
// Permanent failures — don't retry
if errors.Is(lastErr, ErrNotRunning) || errors.Is(lastErr, ErrSendFailed) {
break
}
// Last attempt exhausted — don't sleep
if attempt == maxRetries {
break
}
// Rate limit error — fixed delay
if errors.Is(lastErr, ErrRateLimit) {
select {
case <-time.After(rateLimitDelay):
continue
case <-ctx.Done():
return
}
}
// ErrTemporary or unknown error — exponential backoff
backoff := min(time.Duration(float64(baseBackoff)*math.Pow(2, float64(attempt))), maxBackoff)
select {
case <-time.After(backoff):
case <-ctx.Done():
return
}
}
// All retries exhausted or permanent failure
logger.ErrorCF("channels", "SendMedia failed", map[string]any{
"channel": name,
"chat_id": msg.ChatID,
"error": lastErr.Error(),
"retries": maxRetries,
})
}
// runTTLJanitor periodically scans the typingStops and placeholders maps
// and evicts entries that have exceeded their TTL. This prevents memory
// accumulation when outbound paths fail to trigger preSend (e.g. LLM errors).
func (m *Manager) runTTLJanitor(ctx context.Context) {
ticker := time.NewTicker(janitorInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case now := <-ticker.C:
m.typingStops.Range(func(key, value any) bool {
if entry, ok := value.(typingEntry); ok {
if now.Sub(entry.createdAt) > typingStopTTL {
if _, loaded := m.typingStops.LoadAndDelete(key); loaded {
entry.stop() // idempotent, safe
}
}
}
return true
})
m.placeholders.Range(func(key, value any) bool {
if entry, ok := value.(placeholderEntry); ok {
if now.Sub(entry.createdAt) > placeholderTTL {
m.placeholders.Delete(key)
}
}
return true
})
}
}
}
func (m *Manager) GetChannel(name string) (Channel, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
channel, ok := m.channels[name]
return channel, ok
}
func (m *Manager) GetStatus() map[string]any {
m.mu.RLock()
defer m.mu.RUnlock()
status := make(map[string]any)
for name, channel := range m.channels {
status[name] = map[string]any{
"enabled": true,
"running": channel.IsRunning(),
}
}
return status
}
func (m *Manager) GetEnabledChannels() []string {
m.mu.RLock()
defer m.mu.RUnlock()
names := make([]string, 0, len(m.channels))
for name := range m.channels {
names = append(names, name)
}
return names
}
func (m *Manager) RegisterChannel(name string, channel Channel) {
m.mu.Lock()
defer m.mu.Unlock()
m.channels[name] = channel
}
func (m *Manager) UnregisterChannel(name string) {
m.mu.Lock()
defer m.mu.Unlock()
if w, ok := m.workers[name]; ok && w != nil {
close(w.queue)
<-w.done
close(w.mediaQueue)
<-w.mediaDone
}
delete(m.workers, name)
delete(m.channels, name)
}
func (m *Manager) SendToChannel(ctx context.Context, channelName, chatID, content string) error {
m.mu.RLock()
_, exists := m.channels[channelName]
w, wExists := m.workers[channelName]
m.mu.RUnlock()
if !exists {
return fmt.Errorf("channel %s not found", channelName)
}
msg := bus.OutboundMessage{
Channel: channelName,
ChatID: chatID,
Content: content,
}
if wExists && w != nil {
select {
case w.queue <- msg:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// Fallback: direct send (should not happen)
channel, _ := m.channels[channelName]
return channel.Send(ctx, msg)
}