mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
1087 lines
29 KiB
Go
1087 lines
29 KiB
Go
// PicoClaw - Ultra-lightweight personal AI agent
|
|
// Inspired by and based on nanobot: https://github.com/HKUDS/nanobot
|
|
// License: MIT
|
|
//
|
|
// Copyright (c) 2026 PicoClaw contributors
|
|
|
|
package channels
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"golang.org/x/time/rate"
|
|
|
|
"github.com/sipeed/picoclaw/pkg/bus"
|
|
"github.com/sipeed/picoclaw/pkg/config"
|
|
"github.com/sipeed/picoclaw/pkg/constants"
|
|
"github.com/sipeed/picoclaw/pkg/health"
|
|
"github.com/sipeed/picoclaw/pkg/logger"
|
|
"github.com/sipeed/picoclaw/pkg/media"
|
|
)
|
|
|
|
const (
|
|
defaultChannelQueueSize = 16
|
|
defaultRateLimit = 10 // default 10 msg/s
|
|
maxRetries = 3
|
|
rateLimitDelay = 1 * time.Second
|
|
baseBackoff = 500 * time.Millisecond
|
|
maxBackoff = 8 * time.Second
|
|
|
|
janitorInterval = 10 * time.Second
|
|
typingStopTTL = 5 * time.Minute
|
|
placeholderTTL = 10 * time.Minute
|
|
)
|
|
|
|
// typingEntry wraps a typing stop function with a creation timestamp for TTL eviction.
|
|
type typingEntry struct {
|
|
stop func()
|
|
createdAt time.Time
|
|
}
|
|
|
|
// reactionEntry wraps a reaction undo function with a creation timestamp for TTL eviction.
|
|
type reactionEntry struct {
|
|
undo func()
|
|
createdAt time.Time
|
|
}
|
|
|
|
// placeholderEntry wraps a placeholder ID with a creation timestamp for TTL eviction.
|
|
type placeholderEntry struct {
|
|
id string
|
|
createdAt time.Time
|
|
}
|
|
|
|
// channelRateConfig maps channel name to per-second rate limit.
|
|
var channelRateConfig = map[string]float64{
|
|
"telegram": 20,
|
|
"discord": 1,
|
|
"slack": 1,
|
|
"matrix": 2,
|
|
"line": 10,
|
|
"qq": 5,
|
|
"irc": 2,
|
|
}
|
|
|
|
type channelWorker struct {
|
|
ch Channel
|
|
queue chan bus.OutboundMessage
|
|
mediaQueue chan bus.OutboundMediaMessage
|
|
done chan struct{}
|
|
mediaDone chan struct{}
|
|
limiter *rate.Limiter
|
|
}
|
|
|
|
type Manager struct {
|
|
channels map[string]Channel
|
|
workers map[string]*channelWorker
|
|
bus *bus.MessageBus
|
|
config *config.Config
|
|
mediaStore media.MediaStore
|
|
dispatchTask *asyncTask
|
|
mux *http.ServeMux
|
|
httpServer *http.Server
|
|
mu sync.RWMutex
|
|
placeholders sync.Map // "channel:chatID" → placeholderID (string)
|
|
typingStops sync.Map // "channel:chatID" → func()
|
|
reactionUndos sync.Map // "channel:chatID" → reactionEntry
|
|
streamActive sync.Map // "channel:chatID" → true (set when streamer.Finalize sent the message)
|
|
channelHashes map[string]string // channel name → config hash
|
|
}
|
|
|
|
type asyncTask struct {
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
// RecordPlaceholder registers a placeholder message for later editing.
|
|
// Implements PlaceholderRecorder.
|
|
func (m *Manager) RecordPlaceholder(channel, chatID, placeholderID string) {
|
|
key := channel + ":" + chatID
|
|
m.placeholders.Store(key, placeholderEntry{id: placeholderID, createdAt: time.Now()})
|
|
}
|
|
|
|
// SendPlaceholder sends a "Thinking…" placeholder for the given channel/chatID
|
|
// and records it for later editing. Returns true if a placeholder was sent.
|
|
func (m *Manager) SendPlaceholder(ctx context.Context, channel, chatID string) bool {
|
|
m.mu.RLock()
|
|
ch, ok := m.channels[channel]
|
|
m.mu.RUnlock()
|
|
if !ok {
|
|
return false
|
|
}
|
|
pc, ok := ch.(PlaceholderCapable)
|
|
if !ok {
|
|
return false
|
|
}
|
|
phID, err := pc.SendPlaceholder(ctx, chatID)
|
|
if err != nil || phID == "" {
|
|
return false
|
|
}
|
|
m.RecordPlaceholder(channel, chatID, phID)
|
|
return true
|
|
}
|
|
|
|
// RecordTypingStop registers a typing stop function for later invocation.
|
|
// Implements PlaceholderRecorder.
|
|
func (m *Manager) RecordTypingStop(channel, chatID string, stop func()) {
|
|
key := channel + ":" + chatID
|
|
entry := typingEntry{stop: stop, createdAt: time.Now()}
|
|
if previous, loaded := m.typingStops.Swap(key, entry); loaded {
|
|
if oldEntry, ok := previous.(typingEntry); ok && oldEntry.stop != nil {
|
|
oldEntry.stop()
|
|
}
|
|
}
|
|
}
|
|
|
|
// InvokeTypingStop invokes the registered typing stop function for the given channel and chatID.
|
|
// It is safe to call even when no typing indicator is active (no-op).
|
|
// Used by the agent loop to stop typing when processing completes (success, error, or panic),
|
|
// regardless of whether an outbound message is published.
|
|
func (m *Manager) InvokeTypingStop(channel, chatID string) {
|
|
key := channel + ":" + chatID
|
|
if v, loaded := m.typingStops.LoadAndDelete(key); loaded {
|
|
if entry, ok := v.(typingEntry); ok {
|
|
entry.stop()
|
|
}
|
|
}
|
|
}
|
|
|
|
// RecordReactionUndo registers a reaction undo function for later invocation.
|
|
// Implements PlaceholderRecorder.
|
|
func (m *Manager) RecordReactionUndo(channel, chatID string, undo func()) {
|
|
key := channel + ":" + chatID
|
|
m.reactionUndos.Store(key, reactionEntry{undo: undo, createdAt: time.Now()})
|
|
}
|
|
|
|
// preSend handles typing stop, reaction undo, and placeholder editing before sending a message.
|
|
// Returns true if the message was already delivered (skip Send).
|
|
func (m *Manager) preSend(ctx context.Context, name string, msg bus.OutboundMessage, ch Channel) bool {
|
|
key := name + ":" + msg.ChatID
|
|
|
|
// 1. Stop typing
|
|
if v, loaded := m.typingStops.LoadAndDelete(key); loaded {
|
|
if entry, ok := v.(typingEntry); ok {
|
|
entry.stop() // idempotent, safe
|
|
}
|
|
}
|
|
|
|
// 2. Undo reaction
|
|
if v, loaded := m.reactionUndos.LoadAndDelete(key); loaded {
|
|
if entry, ok := v.(reactionEntry); ok {
|
|
entry.undo() // idempotent, safe
|
|
}
|
|
}
|
|
|
|
// 3. If a stream already finalized this message, delete the placeholder and skip send
|
|
if _, loaded := m.streamActive.LoadAndDelete(key); loaded {
|
|
if v, loaded := m.placeholders.LoadAndDelete(key); loaded {
|
|
if entry, ok := v.(placeholderEntry); ok && entry.id != "" {
|
|
// Prefer deleting the placeholder (cleaner UX than editing to same content)
|
|
if deleter, ok := ch.(MessageDeleter); ok {
|
|
deleter.DeleteMessage(ctx, msg.ChatID, entry.id) // best effort
|
|
} else if editor, ok := ch.(MessageEditor); ok {
|
|
editor.EditMessage(ctx, msg.ChatID, entry.id, msg.Content) // fallback
|
|
}
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// 4. Try editing placeholder
|
|
if v, loaded := m.placeholders.LoadAndDelete(key); loaded {
|
|
if entry, ok := v.(placeholderEntry); ok && entry.id != "" {
|
|
if editor, ok := ch.(MessageEditor); ok {
|
|
if err := editor.EditMessage(ctx, msg.ChatID, entry.id, msg.Content); err == nil {
|
|
return true // edited successfully, skip Send
|
|
}
|
|
// edit failed → fall through to normal Send
|
|
}
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func NewManager(cfg *config.Config, messageBus *bus.MessageBus, store media.MediaStore) (*Manager, error) {
|
|
m := &Manager{
|
|
channels: make(map[string]Channel),
|
|
workers: make(map[string]*channelWorker),
|
|
bus: messageBus,
|
|
config: cfg,
|
|
mediaStore: store,
|
|
channelHashes: make(map[string]string),
|
|
}
|
|
|
|
// Register as streaming delegate so the agent loop can obtain streamers
|
|
messageBus.SetStreamDelegate(m)
|
|
|
|
if err := m.initChannels(&cfg.Channels); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Store initial config hashes for all channels
|
|
m.channelHashes = toChannelHashes(cfg)
|
|
|
|
return m, nil
|
|
}
|
|
|
|
// GetStreamer implements bus.StreamDelegate.
|
|
// It checks if the named channel supports streaming and returns a Streamer.
|
|
func (m *Manager) GetStreamer(ctx context.Context, channelName, chatID string) (bus.Streamer, bool) {
|
|
m.mu.RLock()
|
|
ch, exists := m.channels[channelName]
|
|
m.mu.RUnlock()
|
|
|
|
if !exists {
|
|
return nil, false
|
|
}
|
|
|
|
sc, ok := ch.(StreamingCapable)
|
|
if !ok {
|
|
return nil, false
|
|
}
|
|
|
|
streamer, err := sc.BeginStream(ctx, chatID)
|
|
if err != nil {
|
|
logger.DebugCF("channels", "Streaming unavailable, falling back to placeholder", map[string]any{
|
|
"channel": channelName,
|
|
"error": err.Error(),
|
|
})
|
|
return nil, false
|
|
}
|
|
|
|
// Mark streamActive on Finalize so preSend knows to clean up the placeholder
|
|
key := channelName + ":" + chatID
|
|
return &finalizeHookStreamer{
|
|
Streamer: streamer,
|
|
onFinalize: func() { m.streamActive.Store(key, true) },
|
|
}, true
|
|
}
|
|
|
|
// finalizeHookStreamer wraps a Streamer to run a hook on Finalize.
|
|
type finalizeHookStreamer struct {
|
|
Streamer
|
|
onFinalize func()
|
|
}
|
|
|
|
func (s *finalizeHookStreamer) Finalize(ctx context.Context, content string) error {
|
|
if err := s.Streamer.Finalize(ctx, content); err != nil {
|
|
return err
|
|
}
|
|
s.onFinalize()
|
|
return nil
|
|
}
|
|
|
|
// initChannel is a helper that looks up a factory by name and creates the channel.
|
|
func (m *Manager) initChannel(name, displayName string) {
|
|
f, ok := getFactory(name)
|
|
if !ok {
|
|
logger.WarnCF("channels", "Factory not registered", map[string]any{
|
|
"channel": displayName,
|
|
})
|
|
return
|
|
}
|
|
logger.DebugCF("channels", "Attempting to initialize channel", map[string]any{
|
|
"channel": displayName,
|
|
})
|
|
ch, err := f(m.config, m.bus)
|
|
if err != nil {
|
|
logger.ErrorCF("channels", "Failed to initialize channel", map[string]any{
|
|
"channel": displayName,
|
|
"error": err.Error(),
|
|
})
|
|
} else {
|
|
// Inject MediaStore if channel supports it
|
|
if m.mediaStore != nil {
|
|
if setter, ok := ch.(interface{ SetMediaStore(s media.MediaStore) }); ok {
|
|
setter.SetMediaStore(m.mediaStore)
|
|
}
|
|
}
|
|
// Inject PlaceholderRecorder if channel supports it
|
|
if setter, ok := ch.(interface{ SetPlaceholderRecorder(r PlaceholderRecorder) }); ok {
|
|
setter.SetPlaceholderRecorder(m)
|
|
}
|
|
// Inject owner reference so BaseChannel.HandleMessage can auto-trigger typing/reaction
|
|
if setter, ok := ch.(interface{ SetOwner(ch Channel) }); ok {
|
|
setter.SetOwner(ch)
|
|
}
|
|
m.channels[name] = ch
|
|
logger.InfoCF("channels", "Channel enabled successfully", map[string]any{
|
|
"channel": displayName,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (m *Manager) initChannels(channels *config.ChannelsConfig) error {
|
|
logger.InfoC("channels", "Initializing channel manager")
|
|
|
|
if channels.Telegram.Enabled && channels.Telegram.Token != "" {
|
|
m.initChannel("telegram", "Telegram")
|
|
}
|
|
|
|
if channels.WhatsApp.Enabled {
|
|
waCfg := channels.WhatsApp
|
|
if waCfg.UseNative {
|
|
m.initChannel("whatsapp_native", "WhatsApp Native")
|
|
} else if waCfg.BridgeURL != "" {
|
|
m.initChannel("whatsapp", "WhatsApp")
|
|
}
|
|
}
|
|
|
|
if channels.Feishu.Enabled {
|
|
m.initChannel("feishu", "Feishu")
|
|
}
|
|
|
|
if channels.Discord.Enabled && channels.Discord.Token != "" {
|
|
m.initChannel("discord", "Discord")
|
|
}
|
|
|
|
if channels.MaixCam.Enabled {
|
|
m.initChannel("maixcam", "MaixCam")
|
|
}
|
|
|
|
if channels.QQ.Enabled {
|
|
m.initChannel("qq", "QQ")
|
|
}
|
|
|
|
if channels.DingTalk.Enabled && channels.DingTalk.ClientID != "" {
|
|
m.initChannel("dingtalk", "DingTalk")
|
|
}
|
|
|
|
if channels.Slack.Enabled && channels.Slack.BotToken != "" {
|
|
m.initChannel("slack", "Slack")
|
|
}
|
|
|
|
if channels.Matrix.Enabled &&
|
|
m.config.Channels.Matrix.Homeserver != "" &&
|
|
m.config.Channels.Matrix.UserID != "" &&
|
|
m.config.Channels.Matrix.AccessToken != "" {
|
|
m.initChannel("matrix", "Matrix")
|
|
}
|
|
|
|
if channels.LINE.Enabled && channels.LINE.ChannelAccessToken != "" {
|
|
m.initChannel("line", "LINE")
|
|
}
|
|
|
|
if channels.OneBot.Enabled && channels.OneBot.WSUrl != "" {
|
|
m.initChannel("onebot", "OneBot")
|
|
}
|
|
|
|
if channels.WeCom.Enabled && channels.WeCom.Token != "" {
|
|
m.initChannel("wecom", "WeCom")
|
|
}
|
|
|
|
if m.config.Channels.WeComAIBot.Enabled &&
|
|
((m.config.Channels.WeComAIBot.BotID != "" && m.config.Channels.WeComAIBot.Secret != "") ||
|
|
m.config.Channels.WeComAIBot.Token != "") {
|
|
m.initChannel("wecom_aibot", "WeCom AI Bot")
|
|
}
|
|
|
|
if channels.WeComApp.Enabled && channels.WeComApp.CorpID != "" {
|
|
m.initChannel("wecom_app", "WeCom App")
|
|
}
|
|
|
|
if channels.Pico.Enabled && channels.Pico.Token != "" {
|
|
m.initChannel("pico", "Pico")
|
|
}
|
|
|
|
if channels.PicoClient.Enabled && channels.PicoClient.URL != "" {
|
|
m.initChannel("pico_client", "Pico Client")
|
|
}
|
|
|
|
if channels.IRC.Enabled && channels.IRC.Server != "" {
|
|
m.initChannel("irc", "IRC")
|
|
}
|
|
|
|
logger.InfoCF("channels", "Channel initialization completed", map[string]any{
|
|
"enabled_channels": len(m.channels),
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
// SetupHTTPServer creates a shared HTTP server with the given listen address.
|
|
// It registers health endpoints from the health server and discovers channels
|
|
// that implement WebhookHandler and/or HealthChecker to register their handlers.
|
|
func (m *Manager) SetupHTTPServer(addr string, healthServer *health.Server) {
|
|
m.mux = http.NewServeMux()
|
|
|
|
// Register health endpoints
|
|
if healthServer != nil {
|
|
healthServer.RegisterOnMux(m.mux)
|
|
}
|
|
|
|
// Discover and register webhook handlers and health checkers
|
|
for name, ch := range m.channels {
|
|
if wh, ok := ch.(WebhookHandler); ok {
|
|
m.mux.Handle(wh.WebhookPath(), wh)
|
|
logger.InfoCF("channels", "Webhook handler registered", map[string]any{
|
|
"channel": name,
|
|
"path": wh.WebhookPath(),
|
|
})
|
|
}
|
|
if hc, ok := ch.(HealthChecker); ok {
|
|
m.mux.HandleFunc(hc.HealthPath(), hc.HealthHandler)
|
|
logger.InfoCF("channels", "Health endpoint registered", map[string]any{
|
|
"channel": name,
|
|
"path": hc.HealthPath(),
|
|
})
|
|
}
|
|
}
|
|
|
|
m.httpServer = &http.Server{
|
|
Addr: addr,
|
|
Handler: m.mux,
|
|
ReadTimeout: 30 * time.Second,
|
|
WriteTimeout: 30 * time.Second,
|
|
}
|
|
}
|
|
|
|
func (m *Manager) StartAll(ctx context.Context) error {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
if len(m.channels) == 0 {
|
|
logger.WarnC("channels", "No channels enabled")
|
|
}
|
|
|
|
logger.InfoC("channels", "Starting all channels")
|
|
|
|
dispatchCtx, cancel := context.WithCancel(ctx)
|
|
m.dispatchTask = &asyncTask{cancel: cancel}
|
|
|
|
for name, channel := range m.channels {
|
|
logger.InfoCF("channels", "Starting channel", map[string]any{
|
|
"channel": name,
|
|
})
|
|
if err := channel.Start(ctx); err != nil {
|
|
logger.ErrorCF("channels", "Failed to start channel", map[string]any{
|
|
"channel": name,
|
|
"error": err.Error(),
|
|
})
|
|
continue
|
|
}
|
|
// Lazily create worker only after channel starts successfully
|
|
w := newChannelWorker(name, channel)
|
|
m.workers[name] = w
|
|
go m.runWorker(dispatchCtx, name, w)
|
|
go m.runMediaWorker(dispatchCtx, name, w)
|
|
}
|
|
|
|
// Start the dispatcher that reads from the bus and routes to workers
|
|
go m.dispatchOutbound(dispatchCtx)
|
|
go m.dispatchOutboundMedia(dispatchCtx)
|
|
|
|
// Start the TTL janitor that cleans up stale typing/placeholder entries
|
|
go m.runTTLJanitor(dispatchCtx)
|
|
|
|
// Start shared HTTP server if configured
|
|
if m.httpServer != nil {
|
|
go func() {
|
|
logger.InfoCF("channels", "Shared HTTP server listening", map[string]any{
|
|
"addr": m.httpServer.Addr,
|
|
})
|
|
if err := m.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
logger.FatalCF("channels", "Shared HTTP server error", map[string]any{
|
|
"error": err.Error(),
|
|
})
|
|
}
|
|
}()
|
|
}
|
|
|
|
logger.InfoC("channels", "All channels started")
|
|
return nil
|
|
}
|
|
|
|
func (m *Manager) StopAll(ctx context.Context) error {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
logger.InfoC("channels", "Stopping all channels")
|
|
|
|
// Shutdown shared HTTP server first
|
|
if m.httpServer != nil {
|
|
shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
if err := m.httpServer.Shutdown(shutdownCtx); err != nil {
|
|
logger.ErrorCF("channels", "Shared HTTP server shutdown error", map[string]any{
|
|
"error": err.Error(),
|
|
})
|
|
}
|
|
m.httpServer = nil
|
|
}
|
|
|
|
// Cancel dispatcher
|
|
if m.dispatchTask != nil {
|
|
m.dispatchTask.cancel()
|
|
m.dispatchTask = nil
|
|
}
|
|
|
|
// Close all worker queues and wait for them to drain
|
|
for _, w := range m.workers {
|
|
if w != nil {
|
|
close(w.queue)
|
|
}
|
|
}
|
|
for _, w := range m.workers {
|
|
if w != nil {
|
|
<-w.done
|
|
}
|
|
}
|
|
// Close all media worker queues and wait for them to drain
|
|
for _, w := range m.workers {
|
|
if w != nil {
|
|
close(w.mediaQueue)
|
|
}
|
|
}
|
|
for _, w := range m.workers {
|
|
if w != nil {
|
|
<-w.mediaDone
|
|
}
|
|
}
|
|
|
|
// Stop all channels
|
|
for name, channel := range m.channels {
|
|
logger.InfoCF("channels", "Stopping channel", map[string]any{
|
|
"channel": name,
|
|
})
|
|
if err := channel.Stop(ctx); err != nil {
|
|
logger.ErrorCF("channels", "Error stopping channel", map[string]any{
|
|
"channel": name,
|
|
"error": err.Error(),
|
|
})
|
|
}
|
|
}
|
|
|
|
logger.InfoC("channels", "All channels stopped")
|
|
return nil
|
|
}
|
|
|
|
// newChannelWorker creates a channelWorker with a rate limiter configured
|
|
// for the given channel name.
|
|
func newChannelWorker(name string, ch Channel) *channelWorker {
|
|
rateVal := float64(defaultRateLimit)
|
|
if r, ok := channelRateConfig[name]; ok {
|
|
rateVal = r
|
|
}
|
|
burst := int(math.Max(1, math.Ceil(rateVal/2)))
|
|
|
|
return &channelWorker{
|
|
ch: ch,
|
|
queue: make(chan bus.OutboundMessage, defaultChannelQueueSize),
|
|
mediaQueue: make(chan bus.OutboundMediaMessage, defaultChannelQueueSize),
|
|
done: make(chan struct{}),
|
|
mediaDone: make(chan struct{}),
|
|
limiter: rate.NewLimiter(rate.Limit(rateVal), burst),
|
|
}
|
|
}
|
|
|
|
// runWorker processes outbound messages for a single channel, splitting
|
|
// messages that exceed the channel's maximum message length.
|
|
func (m *Manager) runWorker(ctx context.Context, name string, w *channelWorker) {
|
|
defer close(w.done)
|
|
for {
|
|
select {
|
|
case msg, ok := <-w.queue:
|
|
if !ok {
|
|
return
|
|
}
|
|
maxLen := 0
|
|
if mlp, ok := w.ch.(MessageLengthProvider); ok {
|
|
maxLen = mlp.MaxMessageLength()
|
|
}
|
|
if maxLen > 0 && len([]rune(msg.Content)) > maxLen {
|
|
chunks := SplitMessage(msg.Content, maxLen)
|
|
for _, chunk := range chunks {
|
|
chunkMsg := msg
|
|
chunkMsg.Content = chunk
|
|
m.sendWithRetry(ctx, name, w, chunkMsg)
|
|
}
|
|
} else {
|
|
m.sendWithRetry(ctx, name, w, msg)
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// sendWithRetry sends a message through the channel with rate limiting and
|
|
// retry logic. It classifies errors to determine the retry strategy:
|
|
// - ErrNotRunning / ErrSendFailed: permanent, no retry
|
|
// - ErrRateLimit: fixed delay retry
|
|
// - ErrTemporary / unknown: exponential backoff retry
|
|
func (m *Manager) sendWithRetry(ctx context.Context, name string, w *channelWorker, msg bus.OutboundMessage) {
|
|
// Rate limit: wait for token
|
|
if err := w.limiter.Wait(ctx); err != nil {
|
|
// ctx canceled, shutting down
|
|
return
|
|
}
|
|
|
|
// Pre-send: stop typing and try to edit placeholder
|
|
if m.preSend(ctx, name, msg, w.ch) {
|
|
return // placeholder was edited successfully, skip Send
|
|
}
|
|
|
|
var lastErr error
|
|
for attempt := 0; attempt <= maxRetries; attempt++ {
|
|
lastErr = w.ch.Send(ctx, msg)
|
|
if lastErr == nil {
|
|
return
|
|
}
|
|
|
|
// Permanent failures — don't retry
|
|
if errors.Is(lastErr, ErrNotRunning) || errors.Is(lastErr, ErrSendFailed) {
|
|
break
|
|
}
|
|
|
|
// Last attempt exhausted — don't sleep
|
|
if attempt == maxRetries {
|
|
break
|
|
}
|
|
|
|
// Rate limit error — fixed delay
|
|
if errors.Is(lastErr, ErrRateLimit) {
|
|
select {
|
|
case <-time.After(rateLimitDelay):
|
|
continue
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
|
|
// ErrTemporary or unknown error — exponential backoff
|
|
backoff := min(time.Duration(float64(baseBackoff)*math.Pow(2, float64(attempt))), maxBackoff)
|
|
select {
|
|
case <-time.After(backoff):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
|
|
// All retries exhausted or permanent failure
|
|
logger.ErrorCF("channels", "Send failed", map[string]any{
|
|
"channel": name,
|
|
"chat_id": msg.ChatID,
|
|
"error": lastErr.Error(),
|
|
"retries": maxRetries,
|
|
})
|
|
}
|
|
|
|
func dispatchLoop[M any](
|
|
ctx context.Context,
|
|
m *Manager,
|
|
ch <-chan M,
|
|
getChannel func(M) string,
|
|
enqueue func(context.Context, *channelWorker, M) bool,
|
|
startMsg, stopMsg, unknownMsg, noWorkerMsg string,
|
|
) {
|
|
logger.InfoC("channels", startMsg)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.InfoC("channels", stopMsg)
|
|
return
|
|
|
|
case msg, ok := <-ch:
|
|
if !ok {
|
|
logger.InfoC("channels", stopMsg)
|
|
return
|
|
}
|
|
|
|
channel := getChannel(msg)
|
|
|
|
// Silently skip internal channels
|
|
if constants.IsInternalChannel(channel) {
|
|
continue
|
|
}
|
|
|
|
m.mu.RLock()
|
|
_, exists := m.channels[channel]
|
|
w, wExists := m.workers[channel]
|
|
m.mu.RUnlock()
|
|
|
|
if !exists {
|
|
logger.WarnCF("channels", unknownMsg, map[string]any{"channel": channel})
|
|
continue
|
|
}
|
|
|
|
if wExists && w != nil {
|
|
if !enqueue(ctx, w, msg) {
|
|
return
|
|
}
|
|
} else if exists {
|
|
logger.WarnCF("channels", noWorkerMsg, map[string]any{"channel": channel})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) dispatchOutbound(ctx context.Context) {
|
|
dispatchLoop(
|
|
ctx, m,
|
|
m.bus.OutboundChan(),
|
|
func(msg bus.OutboundMessage) string { return msg.Channel },
|
|
func(ctx context.Context, w *channelWorker, msg bus.OutboundMessage) bool {
|
|
select {
|
|
case w.queue <- msg:
|
|
return true
|
|
case <-ctx.Done():
|
|
return false
|
|
}
|
|
},
|
|
"Outbound dispatcher started",
|
|
"Outbound dispatcher stopped",
|
|
"Unknown channel for outbound message",
|
|
"Channel has no active worker, skipping message",
|
|
)
|
|
}
|
|
|
|
func (m *Manager) dispatchOutboundMedia(ctx context.Context) {
|
|
dispatchLoop(
|
|
ctx, m,
|
|
m.bus.OutboundMediaChan(),
|
|
func(msg bus.OutboundMediaMessage) string { return msg.Channel },
|
|
func(ctx context.Context, w *channelWorker, msg bus.OutboundMediaMessage) bool {
|
|
select {
|
|
case w.mediaQueue <- msg:
|
|
return true
|
|
case <-ctx.Done():
|
|
return false
|
|
}
|
|
},
|
|
"Outbound media dispatcher started",
|
|
"Outbound media dispatcher stopped",
|
|
"Unknown channel for outbound media message",
|
|
"Channel has no active worker, skipping media message",
|
|
)
|
|
}
|
|
|
|
// runMediaWorker processes outbound media messages for a single channel.
|
|
func (m *Manager) runMediaWorker(ctx context.Context, name string, w *channelWorker) {
|
|
defer close(w.mediaDone)
|
|
for {
|
|
select {
|
|
case msg, ok := <-w.mediaQueue:
|
|
if !ok {
|
|
return
|
|
}
|
|
_ = m.sendMediaWithRetry(ctx, name, w, msg)
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// sendMediaWithRetry sends a media message through the channel with rate limiting and
|
|
// retry logic. It returns nil on success, or the last error after retries.
|
|
func (m *Manager) sendMediaWithRetry(
|
|
ctx context.Context,
|
|
name string,
|
|
w *channelWorker,
|
|
msg bus.OutboundMediaMessage,
|
|
) error {
|
|
ms, ok := w.ch.(MediaSender)
|
|
if !ok {
|
|
logger.DebugCF("channels", "Channel does not support MediaSender, skipping media", map[string]any{
|
|
"channel": name,
|
|
})
|
|
return nil
|
|
}
|
|
|
|
// Rate limit: wait for token
|
|
if err := w.limiter.Wait(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
var lastErr error
|
|
for attempt := 0; attempt <= maxRetries; attempt++ {
|
|
lastErr = ms.SendMedia(ctx, msg)
|
|
if lastErr == nil {
|
|
return nil
|
|
}
|
|
|
|
// Permanent failures — don't retry
|
|
if errors.Is(lastErr, ErrNotRunning) || errors.Is(lastErr, ErrSendFailed) {
|
|
break
|
|
}
|
|
|
|
// Last attempt exhausted — don't sleep
|
|
if attempt == maxRetries {
|
|
break
|
|
}
|
|
|
|
// Rate limit error — fixed delay
|
|
if errors.Is(lastErr, ErrRateLimit) {
|
|
select {
|
|
case <-time.After(rateLimitDelay):
|
|
continue
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
// ErrTemporary or unknown error — exponential backoff
|
|
backoff := min(time.Duration(float64(baseBackoff)*math.Pow(2, float64(attempt))), maxBackoff)
|
|
select {
|
|
case <-time.After(backoff):
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
// All retries exhausted or permanent failure
|
|
logger.ErrorCF("channels", "SendMedia failed", map[string]any{
|
|
"channel": name,
|
|
"chat_id": msg.ChatID,
|
|
"error": lastErr.Error(),
|
|
"retries": maxRetries,
|
|
})
|
|
return lastErr
|
|
}
|
|
|
|
// runTTLJanitor periodically scans the typingStops and placeholders maps
|
|
// and evicts entries that have exceeded their TTL. This prevents memory
|
|
// accumulation when outbound paths fail to trigger preSend (e.g. LLM errors).
|
|
func (m *Manager) runTTLJanitor(ctx context.Context) {
|
|
ticker := time.NewTicker(janitorInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case now := <-ticker.C:
|
|
m.typingStops.Range(func(key, value any) bool {
|
|
if entry, ok := value.(typingEntry); ok {
|
|
if now.Sub(entry.createdAt) > typingStopTTL {
|
|
if _, loaded := m.typingStops.LoadAndDelete(key); loaded {
|
|
entry.stop() // idempotent, safe
|
|
}
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
m.reactionUndos.Range(func(key, value any) bool {
|
|
if entry, ok := value.(reactionEntry); ok {
|
|
if now.Sub(entry.createdAt) > typingStopTTL {
|
|
if _, loaded := m.reactionUndos.LoadAndDelete(key); loaded {
|
|
entry.undo() // idempotent, safe
|
|
}
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
m.placeholders.Range(func(key, value any) bool {
|
|
if entry, ok := value.(placeholderEntry); ok {
|
|
if now.Sub(entry.createdAt) > placeholderTTL {
|
|
m.placeholders.Delete(key)
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) GetChannel(name string) (Channel, bool) {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
channel, ok := m.channels[name]
|
|
return channel, ok
|
|
}
|
|
|
|
func (m *Manager) GetStatus() map[string]any {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
status := make(map[string]any)
|
|
for name, channel := range m.channels {
|
|
status[name] = map[string]any{
|
|
"enabled": true,
|
|
"running": channel.IsRunning(),
|
|
}
|
|
}
|
|
return status
|
|
}
|
|
|
|
func (m *Manager) GetEnabledChannels() []string {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
names := make([]string, 0, len(m.channels))
|
|
for name := range m.channels {
|
|
names = append(names, name)
|
|
}
|
|
return names
|
|
}
|
|
|
|
// Reload updates the config reference without restarting channels.
|
|
// This is used when channel config hasn't changed but other parts of the config have.
|
|
func (m *Manager) Reload(ctx context.Context, cfg *config.Config) error {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
list := toChannelHashes(cfg)
|
|
added, removed := compareChannels(m.channelHashes, list)
|
|
for _, name := range removed {
|
|
// Stop all channels
|
|
channel := m.channels[name]
|
|
logger.InfoCF("channels", "Stopping channel", map[string]any{
|
|
"channel": name,
|
|
})
|
|
if err := channel.Stop(ctx); err != nil {
|
|
logger.ErrorCF("channels", "Error stopping channel", map[string]any{
|
|
"channel": name,
|
|
"error": err.Error(),
|
|
})
|
|
}
|
|
go func() {
|
|
m.UnregisterChannel(name)
|
|
}()
|
|
}
|
|
dispatchCtx, cancel := context.WithCancel(ctx)
|
|
m.dispatchTask = &asyncTask{cancel: cancel}
|
|
cc, err := toChannelConfig(cfg, added)
|
|
if err != nil {
|
|
logger.ErrorC("channels", fmt.Sprintf("toChannelConfig error: %v", err))
|
|
return err
|
|
}
|
|
err = m.initChannels(cc)
|
|
if err != nil {
|
|
logger.ErrorC("channels", fmt.Sprintf("initChannels error: %v", err))
|
|
return err
|
|
}
|
|
for _, name := range added {
|
|
channel := m.channels[name]
|
|
logger.InfoCF("channels", "Starting channel", map[string]any{
|
|
"channel": name,
|
|
})
|
|
if err := channel.Start(ctx); err != nil {
|
|
logger.ErrorCF("channels", "Failed to start channel", map[string]any{
|
|
"channel": name,
|
|
"error": err.Error(),
|
|
})
|
|
continue
|
|
}
|
|
// Lazily create worker only after channel starts successfully
|
|
w := newChannelWorker(name, channel)
|
|
m.workers[name] = w
|
|
go m.runWorker(dispatchCtx, name, w)
|
|
go m.runMediaWorker(dispatchCtx, name, w)
|
|
go func() {
|
|
m.RegisterChannel(name, channel)
|
|
}()
|
|
}
|
|
|
|
m.config = cfg
|
|
m.channelHashes = toChannelHashes(cfg)
|
|
return nil
|
|
}
|
|
|
|
func (m *Manager) RegisterChannel(name string, channel Channel) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.channels[name] = channel
|
|
}
|
|
|
|
func (m *Manager) UnregisterChannel(name string) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
if w, ok := m.workers[name]; ok && w != nil {
|
|
close(w.queue)
|
|
<-w.done
|
|
close(w.mediaQueue)
|
|
<-w.mediaDone
|
|
}
|
|
delete(m.workers, name)
|
|
delete(m.channels, name)
|
|
}
|
|
|
|
// SendMessage sends an outbound message synchronously through the channel
|
|
// worker's rate limiter and retry logic. It blocks until the message is
|
|
// delivered (or all retries are exhausted), which preserves ordering when
|
|
// a subsequent operation depends on the message having been sent.
|
|
func (m *Manager) SendMessage(ctx context.Context, msg bus.OutboundMessage) error {
|
|
m.mu.RLock()
|
|
_, exists := m.channels[msg.Channel]
|
|
w, wExists := m.workers[msg.Channel]
|
|
m.mu.RUnlock()
|
|
|
|
if !exists {
|
|
return fmt.Errorf("channel %s not found", msg.Channel)
|
|
}
|
|
if !wExists || w == nil {
|
|
return fmt.Errorf("channel %s has no active worker", msg.Channel)
|
|
}
|
|
|
|
maxLen := 0
|
|
if mlp, ok := w.ch.(MessageLengthProvider); ok {
|
|
maxLen = mlp.MaxMessageLength()
|
|
}
|
|
if maxLen > 0 && len([]rune(msg.Content)) > maxLen {
|
|
for _, chunk := range SplitMessage(msg.Content, maxLen) {
|
|
chunkMsg := msg
|
|
chunkMsg.Content = chunk
|
|
m.sendWithRetry(ctx, msg.Channel, w, chunkMsg)
|
|
}
|
|
} else {
|
|
m.sendWithRetry(ctx, msg.Channel, w, msg)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SendMedia sends outbound media synchronously through the channel worker's
|
|
// rate limiter and retry logic. It blocks until the media is delivered (or all
|
|
// retries are exhausted), which preserves ordering when later agent behavior
|
|
// depends on actual media delivery.
|
|
func (m *Manager) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
|
|
m.mu.RLock()
|
|
_, exists := m.channels[msg.Channel]
|
|
w, wExists := m.workers[msg.Channel]
|
|
m.mu.RUnlock()
|
|
|
|
if !exists {
|
|
return fmt.Errorf("channel %s not found", msg.Channel)
|
|
}
|
|
if !wExists || w == nil {
|
|
return fmt.Errorf("channel %s has no active worker", msg.Channel)
|
|
}
|
|
|
|
return m.sendMediaWithRetry(ctx, msg.Channel, w, msg)
|
|
}
|
|
|
|
func (m *Manager) SendToChannel(ctx context.Context, channelName, chatID, content string) error {
|
|
m.mu.RLock()
|
|
_, exists := m.channels[channelName]
|
|
w, wExists := m.workers[channelName]
|
|
m.mu.RUnlock()
|
|
|
|
if !exists {
|
|
return fmt.Errorf("channel %s not found", channelName)
|
|
}
|
|
|
|
msg := bus.OutboundMessage{
|
|
Channel: channelName,
|
|
ChatID: chatID,
|
|
Content: content,
|
|
}
|
|
|
|
if wExists && w != nil {
|
|
select {
|
|
case w.queue <- msg:
|
|
return nil
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
// Fallback: direct send (should not happen)
|
|
channel, _ := m.channels[channelName]
|
|
return channel.Send(ctx, msg)
|
|
}
|