mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
6c0798ca3f
* feat(channels): Channel.Send and MediaSender.SendMedia return delivered message IDs Change Channel.Send signature from (ctx, msg) error to (ctx, msg) ([]string, error) and MediaSender.SendMedia similarly, so callers can capture platform message IDs for threading, reactions, and history annotation. Adapters that return real IDs: Telegram (per-chunk MessageID), Discord (Message.ID), Slack Send (ts), QQ (sentMsg.ID), Matrix (EventID). Slack SendMedia returns nil because UploadFileV2 does not expose the posted message timestamp in its response. All other adapters return nil IDs. preSend and sendWithRetry in manager.go updated to propagate ([]string, bool). README examples updated for both English and Chinese docs. * style: apply golangci-lint fixes (golines) * docs: fix Send migration guide — restore old error-only signature in before/after example
1212 lines
33 KiB
Go
1212 lines
33 KiB
Go
// PicoClaw - Ultra-lightweight personal AI agent
|
|
// Inspired by and based on nanobot: https://github.com/HKUDS/nanobot
|
|
// License: MIT
|
|
//
|
|
// Copyright (c) 2026 PicoClaw contributors
|
|
|
|
package channels
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"golang.org/x/time/rate"
|
|
|
|
"github.com/sipeed/picoclaw/pkg/bus"
|
|
"github.com/sipeed/picoclaw/pkg/config"
|
|
"github.com/sipeed/picoclaw/pkg/constants"
|
|
"github.com/sipeed/picoclaw/pkg/health"
|
|
"github.com/sipeed/picoclaw/pkg/logger"
|
|
"github.com/sipeed/picoclaw/pkg/media"
|
|
)
|
|
|
|
const (
|
|
defaultChannelQueueSize = 16
|
|
defaultRateLimit = 10 // default 10 msg/s
|
|
maxRetries = 3
|
|
rateLimitDelay = 1 * time.Second
|
|
baseBackoff = 500 * time.Millisecond
|
|
maxBackoff = 8 * time.Second
|
|
|
|
janitorInterval = 10 * time.Second
|
|
typingStopTTL = 5 * time.Minute
|
|
placeholderTTL = 10 * time.Minute
|
|
)
|
|
|
|
// typingEntry wraps a typing stop function with a creation timestamp for TTL eviction.
|
|
type typingEntry struct {
|
|
stop func()
|
|
createdAt time.Time
|
|
}
|
|
|
|
// reactionEntry wraps a reaction undo function with a creation timestamp for TTL eviction.
|
|
type reactionEntry struct {
|
|
undo func()
|
|
createdAt time.Time
|
|
}
|
|
|
|
// placeholderEntry wraps a placeholder ID with a creation timestamp for TTL eviction.
|
|
type placeholderEntry struct {
|
|
id string
|
|
createdAt time.Time
|
|
}
|
|
|
|
// channelRateConfig maps channel name to per-second rate limit.
|
|
var channelRateConfig = map[string]float64{
|
|
"telegram": 20,
|
|
"discord": 1,
|
|
"slack": 1,
|
|
"matrix": 2,
|
|
"line": 10,
|
|
"qq": 5,
|
|
"irc": 2,
|
|
}
|
|
|
|
type channelWorker struct {
|
|
ch Channel
|
|
queue chan bus.OutboundMessage
|
|
mediaQueue chan bus.OutboundMediaMessage
|
|
done chan struct{}
|
|
mediaDone chan struct{}
|
|
limiter *rate.Limiter
|
|
}
|
|
|
|
type Manager struct {
|
|
channels map[string]Channel
|
|
workers map[string]*channelWorker
|
|
bus *bus.MessageBus
|
|
config *config.Config
|
|
mediaStore media.MediaStore
|
|
dispatchTask *asyncTask
|
|
mux *dynamicServeMux
|
|
httpServer *http.Server
|
|
mu sync.RWMutex
|
|
placeholders sync.Map // "channel:chatID" → placeholderID (string)
|
|
typingStops sync.Map // "channel:chatID" → func()
|
|
reactionUndos sync.Map // "channel:chatID" → reactionEntry
|
|
streamActive sync.Map // "channel:chatID" → true (set when streamer.Finalize sent the message)
|
|
channelHashes map[string]string // channel name → config hash
|
|
}
|
|
|
|
type asyncTask struct {
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
// RecordPlaceholder registers a placeholder message for later editing.
|
|
// Implements PlaceholderRecorder.
|
|
func (m *Manager) RecordPlaceholder(channel, chatID, placeholderID string) {
|
|
key := channel + ":" + chatID
|
|
m.placeholders.Store(key, placeholderEntry{id: placeholderID, createdAt: time.Now()})
|
|
}
|
|
|
|
// SendPlaceholder sends a "Thinking…" placeholder for the given channel/chatID
|
|
// and records it for later editing. Returns true if a placeholder was sent.
|
|
func (m *Manager) SendPlaceholder(ctx context.Context, channel, chatID string) bool {
|
|
m.mu.RLock()
|
|
ch, ok := m.channels[channel]
|
|
m.mu.RUnlock()
|
|
if !ok {
|
|
return false
|
|
}
|
|
pc, ok := ch.(PlaceholderCapable)
|
|
if !ok {
|
|
return false
|
|
}
|
|
phID, err := pc.SendPlaceholder(ctx, chatID)
|
|
if err != nil || phID == "" {
|
|
return false
|
|
}
|
|
m.RecordPlaceholder(channel, chatID, phID)
|
|
return true
|
|
}
|
|
|
|
// RecordTypingStop registers a typing stop function for later invocation.
|
|
// Implements PlaceholderRecorder.
|
|
func (m *Manager) RecordTypingStop(channel, chatID string, stop func()) {
|
|
key := channel + ":" + chatID
|
|
entry := typingEntry{stop: stop, createdAt: time.Now()}
|
|
if previous, loaded := m.typingStops.Swap(key, entry); loaded {
|
|
if oldEntry, ok := previous.(typingEntry); ok && oldEntry.stop != nil {
|
|
oldEntry.stop()
|
|
}
|
|
}
|
|
}
|
|
|
|
// InvokeTypingStop invokes the registered typing stop function for the given channel and chatID.
|
|
// It is safe to call even when no typing indicator is active (no-op).
|
|
// Used by the agent loop to stop typing when processing completes (success, error, or panic),
|
|
// regardless of whether an outbound message is published.
|
|
func (m *Manager) InvokeTypingStop(channel, chatID string) {
|
|
key := channel + ":" + chatID
|
|
if v, loaded := m.typingStops.LoadAndDelete(key); loaded {
|
|
if entry, ok := v.(typingEntry); ok {
|
|
entry.stop()
|
|
}
|
|
}
|
|
}
|
|
|
|
// RecordReactionUndo registers a reaction undo function for later invocation.
|
|
// Implements PlaceholderRecorder.
|
|
func (m *Manager) RecordReactionUndo(channel, chatID string, undo func()) {
|
|
key := channel + ":" + chatID
|
|
m.reactionUndos.Store(key, reactionEntry{undo: undo, createdAt: time.Now()})
|
|
}
|
|
|
|
// preSend handles typing stop, reaction undo, and placeholder editing before sending a message.
|
|
// Returns the delivered message IDs and true when delivery completed before a normal Send.
|
|
func (m *Manager) preSend(ctx context.Context, name string, msg bus.OutboundMessage, ch Channel) ([]string, bool) {
|
|
key := name + ":" + msg.ChatID
|
|
|
|
// 1. Stop typing
|
|
if v, loaded := m.typingStops.LoadAndDelete(key); loaded {
|
|
if entry, ok := v.(typingEntry); ok {
|
|
entry.stop() // idempotent, safe
|
|
}
|
|
}
|
|
|
|
// 2. Undo reaction
|
|
if v, loaded := m.reactionUndos.LoadAndDelete(key); loaded {
|
|
if entry, ok := v.(reactionEntry); ok {
|
|
entry.undo() // idempotent, safe
|
|
}
|
|
}
|
|
|
|
// 3. If a stream already finalized this message, delete the placeholder and skip send
|
|
if _, loaded := m.streamActive.LoadAndDelete(key); loaded {
|
|
if v, loaded := m.placeholders.LoadAndDelete(key); loaded {
|
|
if entry, ok := v.(placeholderEntry); ok && entry.id != "" {
|
|
// Prefer deleting the placeholder (cleaner UX than editing to same content)
|
|
if deleter, ok := ch.(MessageDeleter); ok {
|
|
deleter.DeleteMessage(ctx, msg.ChatID, entry.id) // best effort
|
|
} else if editor, ok := ch.(MessageEditor); ok {
|
|
editor.EditMessage(ctx, msg.ChatID, entry.id, msg.Content) // fallback
|
|
}
|
|
}
|
|
}
|
|
return nil, true
|
|
}
|
|
|
|
// 4. Try editing placeholder
|
|
if v, loaded := m.placeholders.LoadAndDelete(key); loaded {
|
|
if entry, ok := v.(placeholderEntry); ok && entry.id != "" {
|
|
if editor, ok := ch.(MessageEditor); ok {
|
|
if err := editor.EditMessage(ctx, msg.ChatID, entry.id, msg.Content); err == nil {
|
|
return []string{entry.id}, true
|
|
}
|
|
// edit failed → fall through to normal Send
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil, false
|
|
}
|
|
|
|
// preSendMedia handles typing stop, reaction undo, and placeholder cleanup
|
|
// before sending media attachments. Unlike preSend for text messages, media
|
|
// delivery never edits the placeholder because there is no text payload to
|
|
// replace it with; it only attempts to delete the placeholder when possible.
|
|
func (m *Manager) preSendMedia(ctx context.Context, name string, msg bus.OutboundMediaMessage, ch Channel) {
|
|
key := name + ":" + msg.ChatID
|
|
|
|
// 1. Stop typing
|
|
if v, loaded := m.typingStops.LoadAndDelete(key); loaded {
|
|
if entry, ok := v.(typingEntry); ok {
|
|
entry.stop() // idempotent, safe
|
|
}
|
|
}
|
|
|
|
// 2. Undo reaction
|
|
if v, loaded := m.reactionUndos.LoadAndDelete(key); loaded {
|
|
if entry, ok := v.(reactionEntry); ok {
|
|
entry.undo() // idempotent, safe
|
|
}
|
|
}
|
|
|
|
// 3. Clear any finalized stream marker for this chat before media delivery.
|
|
m.streamActive.LoadAndDelete(key)
|
|
|
|
// 4. Delete placeholder if present.
|
|
if v, loaded := m.placeholders.LoadAndDelete(key); loaded {
|
|
if entry, ok := v.(placeholderEntry); ok && entry.id != "" {
|
|
if deleter, ok := ch.(MessageDeleter); ok {
|
|
deleter.DeleteMessage(ctx, msg.ChatID, entry.id) // best effort
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func NewManager(cfg *config.Config, messageBus *bus.MessageBus, store media.MediaStore) (*Manager, error) {
|
|
m := &Manager{
|
|
channels: make(map[string]Channel),
|
|
workers: make(map[string]*channelWorker),
|
|
bus: messageBus,
|
|
config: cfg,
|
|
mediaStore: store,
|
|
channelHashes: make(map[string]string),
|
|
}
|
|
|
|
// Register as streaming delegate so the agent loop can obtain streamers
|
|
messageBus.SetStreamDelegate(m)
|
|
|
|
if err := m.initChannels(&cfg.Channels); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Store initial config hashes for all channels
|
|
m.channelHashes = toChannelHashes(cfg)
|
|
|
|
return m, nil
|
|
}
|
|
|
|
// GetStreamer implements bus.StreamDelegate.
|
|
// It checks if the named channel supports streaming and returns a Streamer.
|
|
func (m *Manager) GetStreamer(ctx context.Context, channelName, chatID string) (bus.Streamer, bool) {
|
|
m.mu.RLock()
|
|
ch, exists := m.channels[channelName]
|
|
m.mu.RUnlock()
|
|
|
|
if !exists {
|
|
return nil, false
|
|
}
|
|
|
|
sc, ok := ch.(StreamingCapable)
|
|
if !ok {
|
|
return nil, false
|
|
}
|
|
|
|
streamer, err := sc.BeginStream(ctx, chatID)
|
|
if err != nil {
|
|
logger.DebugCF("channels", "Streaming unavailable, falling back to placeholder", map[string]any{
|
|
"channel": channelName,
|
|
"error": err.Error(),
|
|
})
|
|
return nil, false
|
|
}
|
|
|
|
// Mark streamActive on Finalize so preSend knows to clean up the placeholder
|
|
key := channelName + ":" + chatID
|
|
return &finalizeHookStreamer{
|
|
Streamer: streamer,
|
|
onFinalize: func() { m.streamActive.Store(key, true) },
|
|
}, true
|
|
}
|
|
|
|
// finalizeHookStreamer wraps a Streamer to run a hook on Finalize.
|
|
type finalizeHookStreamer struct {
|
|
Streamer
|
|
onFinalize func()
|
|
}
|
|
|
|
func (s *finalizeHookStreamer) Finalize(ctx context.Context, content string) error {
|
|
if err := s.Streamer.Finalize(ctx, content); err != nil {
|
|
return err
|
|
}
|
|
s.onFinalize()
|
|
return nil
|
|
}
|
|
|
|
// initChannel is a helper that looks up a factory by name and creates the channel.
|
|
func (m *Manager) initChannel(name, displayName string) {
|
|
f, ok := getFactory(name)
|
|
if !ok {
|
|
logger.WarnCF("channels", "Factory not registered", map[string]any{
|
|
"channel": displayName,
|
|
})
|
|
return
|
|
}
|
|
logger.DebugCF("channels", "Attempting to initialize channel", map[string]any{
|
|
"channel": displayName,
|
|
})
|
|
ch, err := f(m.config, m.bus)
|
|
if err != nil {
|
|
logger.ErrorCF("channels", "Failed to initialize channel", map[string]any{
|
|
"channel": displayName,
|
|
"error": err.Error(),
|
|
})
|
|
} else {
|
|
// Inject MediaStore if channel supports it
|
|
if m.mediaStore != nil {
|
|
if setter, ok := ch.(interface{ SetMediaStore(s media.MediaStore) }); ok {
|
|
setter.SetMediaStore(m.mediaStore)
|
|
}
|
|
}
|
|
// Inject PlaceholderRecorder if channel supports it
|
|
if setter, ok := ch.(interface{ SetPlaceholderRecorder(r PlaceholderRecorder) }); ok {
|
|
setter.SetPlaceholderRecorder(m)
|
|
}
|
|
// Inject owner reference so BaseChannel.HandleMessage can auto-trigger typing/reaction
|
|
if setter, ok := ch.(interface{ SetOwner(ch Channel) }); ok {
|
|
setter.SetOwner(ch)
|
|
}
|
|
m.channels[name] = ch
|
|
logger.InfoCF("channels", "Channel enabled successfully", map[string]any{
|
|
"channel": displayName,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (m *Manager) initChannels(channels *config.ChannelsConfig) error {
|
|
logger.InfoC("channels", "Initializing channel manager")
|
|
|
|
if channels.Telegram.Enabled && channels.Telegram.Token.String() != "" {
|
|
m.initChannel("telegram", "Telegram")
|
|
}
|
|
|
|
if channels.WhatsApp.Enabled {
|
|
waCfg := channels.WhatsApp
|
|
if waCfg.UseNative {
|
|
m.initChannel("whatsapp_native", "WhatsApp Native")
|
|
} else if waCfg.BridgeURL != "" {
|
|
m.initChannel("whatsapp", "WhatsApp")
|
|
}
|
|
}
|
|
|
|
if channels.Feishu.Enabled {
|
|
m.initChannel("feishu", "Feishu")
|
|
}
|
|
|
|
if channels.Discord.Enabled && channels.Discord.Token.String() != "" {
|
|
m.initChannel("discord", "Discord")
|
|
}
|
|
|
|
if channels.MaixCam.Enabled {
|
|
m.initChannel("maixcam", "MaixCam")
|
|
}
|
|
|
|
if channels.QQ.Enabled {
|
|
m.initChannel("qq", "QQ")
|
|
}
|
|
|
|
if channels.DingTalk.Enabled && channels.DingTalk.ClientID != "" {
|
|
m.initChannel("dingtalk", "DingTalk")
|
|
}
|
|
|
|
if channels.Slack.Enabled && channels.Slack.BotToken.String() != "" {
|
|
m.initChannel("slack", "Slack")
|
|
}
|
|
|
|
if channels.Matrix.Enabled &&
|
|
m.config.Channels.Matrix.Homeserver != "" &&
|
|
m.config.Channels.Matrix.UserID != "" &&
|
|
m.config.Channels.Matrix.AccessToken.String() != "" {
|
|
m.initChannel("matrix", "Matrix")
|
|
}
|
|
|
|
if channels.LINE.Enabled && channels.LINE.ChannelAccessToken.String() != "" {
|
|
m.initChannel("line", "LINE")
|
|
}
|
|
|
|
if channels.OneBot.Enabled && channels.OneBot.WSUrl != "" {
|
|
m.initChannel("onebot", "OneBot")
|
|
}
|
|
|
|
if channels.WeCom.Enabled && channels.WeCom.BotID != "" && channels.WeCom.Secret.String() != "" {
|
|
m.initChannel("wecom", "WeCom")
|
|
}
|
|
|
|
if channels.Weixin.Enabled && channels.Weixin.Token.String() != "" {
|
|
m.initChannel("weixin", "Weixin")
|
|
}
|
|
|
|
if channels.Pico.Enabled && channels.Pico.Token.String() != "" {
|
|
m.initChannel("pico", "Pico")
|
|
}
|
|
|
|
if channels.PicoClient.Enabled && channels.PicoClient.URL != "" {
|
|
m.initChannel("pico_client", "Pico Client")
|
|
}
|
|
|
|
if channels.IRC.Enabled && channels.IRC.Server != "" {
|
|
m.initChannel("irc", "IRC")
|
|
}
|
|
|
|
logger.InfoCF("channels", "Channel initialization completed", map[string]any{
|
|
"enabled_channels": len(m.channels),
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
// SetupHTTPServer creates a shared HTTP server with the given listen address.
|
|
// It registers health endpoints from the health server and discovers channels
|
|
// that implement WebhookHandler and/or HealthChecker to register their handlers.
|
|
func (m *Manager) SetupHTTPServer(addr string, healthServer *health.Server) {
|
|
m.mux = newDynamicServeMux()
|
|
|
|
// Register health endpoints
|
|
if healthServer != nil {
|
|
healthServer.RegisterOnMux(m.mux)
|
|
}
|
|
|
|
// Discover and register webhook handlers and health checkers
|
|
m.registerHTTPHandlersLocked()
|
|
|
|
m.httpServer = &http.Server{
|
|
Addr: addr,
|
|
Handler: m.mux,
|
|
ReadTimeout: 30 * time.Second,
|
|
WriteTimeout: 30 * time.Second,
|
|
}
|
|
}
|
|
|
|
// registerHTTPHandlersLocked registers webhook and health-check handlers for
|
|
// all channels currently in m.channels. Caller must hold m.mu (or ensure
|
|
// exclusive access).
|
|
func (m *Manager) registerHTTPHandlersLocked() {
|
|
for name, ch := range m.channels {
|
|
m.registerChannelHTTPHandler(name, ch)
|
|
}
|
|
}
|
|
|
|
// registerChannelHTTPHandler registers the webhook/health handlers for a
|
|
// single channel onto m.mux.
|
|
func (m *Manager) registerChannelHTTPHandler(name string, ch Channel) {
|
|
if wh, ok := ch.(WebhookHandler); ok {
|
|
m.mux.Handle(wh.WebhookPath(), wh)
|
|
logger.InfoCF("channels", "Webhook handler registered", map[string]any{
|
|
"channel": name,
|
|
"path": wh.WebhookPath(),
|
|
})
|
|
}
|
|
if hc, ok := ch.(HealthChecker); ok {
|
|
m.mux.HandleFunc(hc.HealthPath(), hc.HealthHandler)
|
|
logger.InfoCF("channels", "Health endpoint registered", map[string]any{
|
|
"channel": name,
|
|
"path": hc.HealthPath(),
|
|
})
|
|
}
|
|
}
|
|
|
|
// unregisterChannelHTTPHandler removes the webhook/health handlers for a
|
|
// single channel from m.mux.
|
|
func (m *Manager) unregisterChannelHTTPHandler(name string, ch Channel) {
|
|
if wh, ok := ch.(WebhookHandler); ok {
|
|
m.mux.Unhandle(wh.WebhookPath())
|
|
logger.InfoCF("channels", "Webhook handler unregistered", map[string]any{
|
|
"channel": name,
|
|
"path": wh.WebhookPath(),
|
|
})
|
|
}
|
|
if hc, ok := ch.(HealthChecker); ok {
|
|
m.mux.Unhandle(hc.HealthPath())
|
|
logger.InfoCF("channels", "Health endpoint unregistered", map[string]any{
|
|
"channel": name,
|
|
"path": hc.HealthPath(),
|
|
})
|
|
}
|
|
}
|
|
|
|
func (m *Manager) StartAll(ctx context.Context) error {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
if len(m.channels) == 0 {
|
|
logger.WarnC("channels", "No channels enabled")
|
|
}
|
|
|
|
logger.InfoC("channels", "Starting all channels")
|
|
|
|
dispatchCtx, cancel := context.WithCancel(ctx)
|
|
m.dispatchTask = &asyncTask{cancel: cancel}
|
|
|
|
for name, channel := range m.channels {
|
|
logger.InfoCF("channels", "Starting channel", map[string]any{
|
|
"channel": name,
|
|
})
|
|
if err := channel.Start(ctx); err != nil {
|
|
logger.ErrorCF("channels", "Failed to start channel", map[string]any{
|
|
"channel": name,
|
|
"error": err.Error(),
|
|
})
|
|
continue
|
|
}
|
|
// Lazily create worker only after channel starts successfully
|
|
w := newChannelWorker(name, channel)
|
|
m.workers[name] = w
|
|
go m.runWorker(dispatchCtx, name, w)
|
|
go m.runMediaWorker(dispatchCtx, name, w)
|
|
}
|
|
|
|
// Start the dispatcher that reads from the bus and routes to workers
|
|
go m.dispatchOutbound(dispatchCtx)
|
|
go m.dispatchOutboundMedia(dispatchCtx)
|
|
|
|
// Start the TTL janitor that cleans up stale typing/placeholder entries
|
|
go m.runTTLJanitor(dispatchCtx)
|
|
|
|
// Start shared HTTP server if configured
|
|
if m.httpServer != nil {
|
|
go func() {
|
|
logger.InfoCF("channels", "Shared HTTP server listening", map[string]any{
|
|
"addr": m.httpServer.Addr,
|
|
})
|
|
if err := m.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
logger.FatalCF("channels", "Shared HTTP server error", map[string]any{
|
|
"error": err.Error(),
|
|
})
|
|
}
|
|
}()
|
|
}
|
|
|
|
logger.InfoC("channels", "All channels started")
|
|
return nil
|
|
}
|
|
|
|
func (m *Manager) StopAll(ctx context.Context) error {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
logger.InfoC("channels", "Stopping all channels")
|
|
|
|
// Shutdown shared HTTP server first
|
|
if m.httpServer != nil {
|
|
shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
if err := m.httpServer.Shutdown(shutdownCtx); err != nil {
|
|
logger.ErrorCF("channels", "Shared HTTP server shutdown error", map[string]any{
|
|
"error": err.Error(),
|
|
})
|
|
}
|
|
m.httpServer = nil
|
|
}
|
|
|
|
// Cancel dispatcher
|
|
if m.dispatchTask != nil {
|
|
m.dispatchTask.cancel()
|
|
m.dispatchTask = nil
|
|
}
|
|
|
|
// Close all worker queues and wait for them to drain
|
|
for _, w := range m.workers {
|
|
if w != nil {
|
|
close(w.queue)
|
|
}
|
|
}
|
|
for _, w := range m.workers {
|
|
if w != nil {
|
|
<-w.done
|
|
}
|
|
}
|
|
// Close all media worker queues and wait for them to drain
|
|
for _, w := range m.workers {
|
|
if w != nil {
|
|
close(w.mediaQueue)
|
|
}
|
|
}
|
|
for _, w := range m.workers {
|
|
if w != nil {
|
|
<-w.mediaDone
|
|
}
|
|
}
|
|
|
|
// Stop all channels
|
|
for name, channel := range m.channels {
|
|
logger.InfoCF("channels", "Stopping channel", map[string]any{
|
|
"channel": name,
|
|
})
|
|
if err := channel.Stop(ctx); err != nil {
|
|
logger.ErrorCF("channels", "Error stopping channel", map[string]any{
|
|
"channel": name,
|
|
"error": err.Error(),
|
|
})
|
|
}
|
|
}
|
|
|
|
logger.InfoC("channels", "All channels stopped")
|
|
return nil
|
|
}
|
|
|
|
// newChannelWorker creates a channelWorker with a rate limiter configured
|
|
// for the given channel name.
|
|
func newChannelWorker(name string, ch Channel) *channelWorker {
|
|
rateVal := float64(defaultRateLimit)
|
|
if r, ok := channelRateConfig[name]; ok {
|
|
rateVal = r
|
|
}
|
|
burst := int(math.Max(1, math.Ceil(rateVal/2)))
|
|
|
|
return &channelWorker{
|
|
ch: ch,
|
|
queue: make(chan bus.OutboundMessage, defaultChannelQueueSize),
|
|
mediaQueue: make(chan bus.OutboundMediaMessage, defaultChannelQueueSize),
|
|
done: make(chan struct{}),
|
|
mediaDone: make(chan struct{}),
|
|
limiter: rate.NewLimiter(rate.Limit(rateVal), burst),
|
|
}
|
|
}
|
|
|
|
// runWorker processes outbound messages for a single channel.
|
|
// Message processing follows this order:
|
|
// 1. SplitByMarker (if enabled in config) - LLM semantic marker-based splitting
|
|
// 2. SplitMessage - channel-specific length-based splitting (MaxMessageLength)
|
|
func (m *Manager) runWorker(ctx context.Context, name string, w *channelWorker) {
|
|
defer close(w.done)
|
|
for {
|
|
select {
|
|
case msg, ok := <-w.queue:
|
|
if !ok {
|
|
return
|
|
}
|
|
maxLen := 0
|
|
if mlp, ok := w.ch.(MessageLengthProvider); ok {
|
|
maxLen = mlp.MaxMessageLength()
|
|
}
|
|
|
|
// Collect all message chunks to send
|
|
var chunks []string
|
|
|
|
// Step 1: Try marker-based splitting if enabled
|
|
if m.config != nil && m.config.Agents.Defaults.SplitOnMarker {
|
|
if markerChunks := SplitByMarker(msg.Content); len(markerChunks) > 1 {
|
|
for _, chunk := range markerChunks {
|
|
chunks = append(chunks, splitByLength(chunk, maxLen)...)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Step 2: Fallback to length-based splitting if no chunks from marker
|
|
if len(chunks) == 0 {
|
|
chunks = splitByLength(msg.Content, maxLen)
|
|
}
|
|
|
|
// Step 3: Send all chunks
|
|
for _, chunk := range chunks {
|
|
chunkMsg := msg
|
|
chunkMsg.Content = chunk
|
|
m.sendWithRetry(ctx, name, w, chunkMsg)
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// splitByLength splits content by maxLen if needed, otherwise returns single chunk.
|
|
func splitByLength(content string, maxLen int) []string {
|
|
if maxLen > 0 && len([]rune(content)) > maxLen {
|
|
return SplitMessage(content, maxLen)
|
|
}
|
|
return []string{content}
|
|
}
|
|
|
|
// sendWithRetry sends a message through the channel with rate limiting and
|
|
// retry logic. It classifies errors to determine the retry strategy:
|
|
// - ErrNotRunning / ErrSendFailed: permanent, no retry
|
|
// - ErrRateLimit: fixed delay retry
|
|
// - ErrTemporary / unknown: exponential backoff retry
|
|
func (m *Manager) sendWithRetry(
|
|
ctx context.Context,
|
|
name string,
|
|
w *channelWorker,
|
|
msg bus.OutboundMessage,
|
|
) ([]string, bool) {
|
|
// Rate limit: wait for token
|
|
if err := w.limiter.Wait(ctx); err != nil {
|
|
// ctx canceled, shutting down
|
|
return nil, false
|
|
}
|
|
|
|
// Pre-send: stop typing and try to edit placeholder
|
|
if msgIDs, handled := m.preSend(ctx, name, msg, w.ch); handled {
|
|
return msgIDs, true
|
|
}
|
|
|
|
var lastErr error
|
|
var msgIDs []string
|
|
for attempt := 0; attempt <= maxRetries; attempt++ {
|
|
msgIDs, lastErr = w.ch.Send(ctx, msg)
|
|
if lastErr == nil {
|
|
return msgIDs, true
|
|
}
|
|
|
|
// Permanent failures — don't retry
|
|
if errors.Is(lastErr, ErrNotRunning) || errors.Is(lastErr, ErrSendFailed) {
|
|
break
|
|
}
|
|
|
|
// Last attempt exhausted — don't sleep
|
|
if attempt == maxRetries {
|
|
break
|
|
}
|
|
|
|
// Rate limit error — fixed delay
|
|
if errors.Is(lastErr, ErrRateLimit) {
|
|
select {
|
|
case <-time.After(rateLimitDelay):
|
|
continue
|
|
case <-ctx.Done():
|
|
return nil, false
|
|
}
|
|
}
|
|
|
|
// ErrTemporary or unknown error — exponential backoff
|
|
backoff := min(time.Duration(float64(baseBackoff)*math.Pow(2, float64(attempt))), maxBackoff)
|
|
select {
|
|
case <-time.After(backoff):
|
|
case <-ctx.Done():
|
|
return nil, false
|
|
}
|
|
}
|
|
|
|
// All retries exhausted or permanent failure
|
|
logger.ErrorCF("channels", "Send failed", map[string]any{
|
|
"channel": name,
|
|
"chat_id": msg.ChatID,
|
|
"error": lastErr.Error(),
|
|
"retries": maxRetries,
|
|
})
|
|
|
|
return nil, false
|
|
}
|
|
|
|
func dispatchLoop[M any](
|
|
ctx context.Context,
|
|
m *Manager,
|
|
ch <-chan M,
|
|
getChannel func(M) string,
|
|
enqueue func(context.Context, *channelWorker, M) bool,
|
|
startMsg, stopMsg, unknownMsg, noWorkerMsg string,
|
|
) {
|
|
logger.InfoC("channels", startMsg)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.InfoC("channels", stopMsg)
|
|
return
|
|
|
|
case msg, ok := <-ch:
|
|
if !ok {
|
|
logger.InfoC("channels", stopMsg)
|
|
return
|
|
}
|
|
|
|
channel := getChannel(msg)
|
|
|
|
// Silently skip internal channels
|
|
if constants.IsInternalChannel(channel) {
|
|
continue
|
|
}
|
|
|
|
m.mu.RLock()
|
|
_, exists := m.channels[channel]
|
|
w, wExists := m.workers[channel]
|
|
m.mu.RUnlock()
|
|
|
|
if !exists {
|
|
logger.WarnCF("channels", unknownMsg, map[string]any{"channel": channel})
|
|
continue
|
|
}
|
|
|
|
if wExists && w != nil {
|
|
if !enqueue(ctx, w, msg) {
|
|
return
|
|
}
|
|
} else if exists {
|
|
logger.WarnCF("channels", noWorkerMsg, map[string]any{"channel": channel})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) dispatchOutbound(ctx context.Context) {
|
|
dispatchLoop(
|
|
ctx, m,
|
|
m.bus.OutboundChan(),
|
|
func(msg bus.OutboundMessage) string { return msg.Channel },
|
|
func(ctx context.Context, w *channelWorker, msg bus.OutboundMessage) bool {
|
|
select {
|
|
case w.queue <- msg:
|
|
return true
|
|
case <-ctx.Done():
|
|
return false
|
|
}
|
|
},
|
|
"Outbound dispatcher started",
|
|
"Outbound dispatcher stopped",
|
|
"Unknown channel for outbound message",
|
|
"Channel has no active worker, skipping message",
|
|
)
|
|
}
|
|
|
|
func (m *Manager) dispatchOutboundMedia(ctx context.Context) {
|
|
dispatchLoop(
|
|
ctx, m,
|
|
m.bus.OutboundMediaChan(),
|
|
func(msg bus.OutboundMediaMessage) string { return msg.Channel },
|
|
func(ctx context.Context, w *channelWorker, msg bus.OutboundMediaMessage) bool {
|
|
select {
|
|
case w.mediaQueue <- msg:
|
|
return true
|
|
case <-ctx.Done():
|
|
return false
|
|
}
|
|
},
|
|
"Outbound media dispatcher started",
|
|
"Outbound media dispatcher stopped",
|
|
"Unknown channel for outbound media message",
|
|
"Channel has no active worker, skipping media message",
|
|
)
|
|
}
|
|
|
|
// runMediaWorker processes outbound media messages for a single channel.
|
|
func (m *Manager) runMediaWorker(ctx context.Context, name string, w *channelWorker) {
|
|
defer close(w.mediaDone)
|
|
for {
|
|
select {
|
|
case msg, ok := <-w.mediaQueue:
|
|
if !ok {
|
|
return
|
|
}
|
|
_, _ = m.sendMediaWithRetry(ctx, name, w, msg)
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// sendMediaWithRetry sends a media message through the channel with rate limiting and
|
|
// retry logic. It returns the message IDs and nil on success, or nil and the last error
|
|
// after retries, including when the channel does not support MediaSender.
|
|
func (m *Manager) sendMediaWithRetry(
|
|
ctx context.Context,
|
|
name string,
|
|
w *channelWorker,
|
|
msg bus.OutboundMediaMessage,
|
|
) ([]string, error) {
|
|
ms, ok := w.ch.(MediaSender)
|
|
if !ok {
|
|
err := fmt.Errorf("channel %q does not support media sending", name)
|
|
logger.WarnCF("channels", "Channel does not support MediaSender", map[string]any{
|
|
"channel": name,
|
|
"error": err.Error(),
|
|
})
|
|
return nil, err
|
|
}
|
|
|
|
// Rate limit: wait for token
|
|
if err := w.limiter.Wait(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Pre-send: stop typing and clean up any placeholder before sending media.
|
|
m.preSendMedia(ctx, name, msg, w.ch)
|
|
|
|
var lastErr error
|
|
var msgIDs []string
|
|
for attempt := 0; attempt <= maxRetries; attempt++ {
|
|
msgIDs, lastErr = ms.SendMedia(ctx, msg)
|
|
if lastErr == nil {
|
|
return msgIDs, nil
|
|
}
|
|
|
|
// Permanent failures — don't retry
|
|
if errors.Is(lastErr, ErrNotRunning) || errors.Is(lastErr, ErrSendFailed) {
|
|
break
|
|
}
|
|
|
|
// Last attempt exhausted — don't sleep
|
|
if attempt == maxRetries {
|
|
break
|
|
}
|
|
|
|
// Rate limit error — fixed delay
|
|
if errors.Is(lastErr, ErrRateLimit) {
|
|
select {
|
|
case <-time.After(rateLimitDelay):
|
|
continue
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
}
|
|
|
|
// ErrTemporary or unknown error — exponential backoff
|
|
backoff := min(time.Duration(float64(baseBackoff)*math.Pow(2, float64(attempt))), maxBackoff)
|
|
select {
|
|
case <-time.After(backoff):
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
}
|
|
|
|
// All retries exhausted or permanent failure
|
|
logger.ErrorCF("channels", "SendMedia failed", map[string]any{
|
|
"channel": name,
|
|
"chat_id": msg.ChatID,
|
|
"error": lastErr.Error(),
|
|
"retries": maxRetries,
|
|
})
|
|
return nil, lastErr
|
|
}
|
|
|
|
// runTTLJanitor periodically scans the typingStops and placeholders maps
|
|
// and evicts entries that have exceeded their TTL. This prevents memory
|
|
// accumulation when outbound paths fail to trigger preSend (e.g. LLM errors).
|
|
func (m *Manager) runTTLJanitor(ctx context.Context) {
|
|
ticker := time.NewTicker(janitorInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case now := <-ticker.C:
|
|
m.typingStops.Range(func(key, value any) bool {
|
|
if entry, ok := value.(typingEntry); ok {
|
|
if now.Sub(entry.createdAt) > typingStopTTL {
|
|
if _, loaded := m.typingStops.LoadAndDelete(key); loaded {
|
|
entry.stop() // idempotent, safe
|
|
}
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
m.reactionUndos.Range(func(key, value any) bool {
|
|
if entry, ok := value.(reactionEntry); ok {
|
|
if now.Sub(entry.createdAt) > typingStopTTL {
|
|
if _, loaded := m.reactionUndos.LoadAndDelete(key); loaded {
|
|
entry.undo() // idempotent, safe
|
|
}
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
m.placeholders.Range(func(key, value any) bool {
|
|
if entry, ok := value.(placeholderEntry); ok {
|
|
if now.Sub(entry.createdAt) > placeholderTTL {
|
|
m.placeholders.Delete(key)
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) GetChannel(name string) (Channel, bool) {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
channel, ok := m.channels[name]
|
|
return channel, ok
|
|
}
|
|
|
|
func (m *Manager) GetStatus() map[string]any {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
status := make(map[string]any)
|
|
for name, channel := range m.channels {
|
|
status[name] = map[string]any{
|
|
"enabled": true,
|
|
"running": channel.IsRunning(),
|
|
}
|
|
}
|
|
return status
|
|
}
|
|
|
|
func (m *Manager) GetEnabledChannels() []string {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
names := make([]string, 0, len(m.channels))
|
|
for name := range m.channels {
|
|
names = append(names, name)
|
|
}
|
|
return names
|
|
}
|
|
|
|
// Reload updates the config reference without restarting channels.
|
|
// This is used when channel config hasn't changed but other parts of the config have.
|
|
func (m *Manager) Reload(ctx context.Context, cfg *config.Config) error {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
// Save old config so we can revert on error.
|
|
oldConfig := m.config
|
|
|
|
// Update config early: initChannel uses m.config via factory(m.config, m.bus).
|
|
m.config = cfg
|
|
|
|
list := toChannelHashes(cfg)
|
|
added, removed := compareChannels(m.channelHashes, list)
|
|
|
|
deferFuncs := make([]func(), 0, len(removed)+len(added))
|
|
for _, name := range removed {
|
|
// Stop all channels
|
|
channel := m.channels[name]
|
|
logger.InfoCF("channels", "Stopping channel", map[string]any{
|
|
"channel": name,
|
|
})
|
|
if err := channel.Stop(ctx); err != nil {
|
|
logger.ErrorCF("channels", "Error stopping channel", map[string]any{
|
|
"channel": name,
|
|
"error": err.Error(),
|
|
})
|
|
}
|
|
deferFuncs = append(deferFuncs, func() {
|
|
m.UnregisterChannel(name)
|
|
})
|
|
}
|
|
dispatchCtx, cancel := context.WithCancel(ctx)
|
|
m.dispatchTask = &asyncTask{cancel: cancel}
|
|
cc, err := toChannelConfig(cfg, added)
|
|
if err != nil {
|
|
logger.ErrorC("channels", fmt.Sprintf("toChannelConfig error: %v", err))
|
|
m.config = oldConfig
|
|
cancel()
|
|
return err
|
|
}
|
|
err = m.initChannels(cc)
|
|
if err != nil {
|
|
logger.ErrorC("channels", fmt.Sprintf("initChannels error: %v", err))
|
|
m.config = oldConfig
|
|
cancel()
|
|
return err
|
|
}
|
|
for _, name := range added {
|
|
channel := m.channels[name]
|
|
logger.InfoCF("channels", "Starting channel", map[string]any{
|
|
"channel": name,
|
|
})
|
|
if err := channel.Start(ctx); err != nil {
|
|
logger.ErrorCF("channels", "Failed to start channel", map[string]any{
|
|
"channel": name,
|
|
"error": err.Error(),
|
|
})
|
|
continue
|
|
}
|
|
// Lazily create worker only after channel starts successfully
|
|
w := newChannelWorker(name, channel)
|
|
m.workers[name] = w
|
|
go m.runWorker(dispatchCtx, name, w)
|
|
go m.runMediaWorker(dispatchCtx, name, w)
|
|
deferFuncs = append(deferFuncs, func() {
|
|
m.RegisterChannel(name, channel)
|
|
})
|
|
}
|
|
|
|
// Commit hashes only on full success.
|
|
m.channelHashes = list
|
|
go func() {
|
|
for _, f := range deferFuncs {
|
|
f()
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func (m *Manager) RegisterChannel(name string, channel Channel) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.channels[name] = channel
|
|
if m.mux != nil {
|
|
m.registerChannelHTTPHandler(name, channel)
|
|
}
|
|
}
|
|
|
|
func (m *Manager) UnregisterChannel(name string) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
if ch, ok := m.channels[name]; ok && m.mux != nil {
|
|
m.unregisterChannelHTTPHandler(name, ch)
|
|
}
|
|
if w, ok := m.workers[name]; ok && w != nil {
|
|
close(w.queue)
|
|
<-w.done
|
|
close(w.mediaQueue)
|
|
<-w.mediaDone
|
|
}
|
|
delete(m.workers, name)
|
|
delete(m.channels, name)
|
|
}
|
|
|
|
// SendMessage sends an outbound message synchronously through the channel
|
|
// worker's rate limiter and retry logic. It blocks until the message is
|
|
// delivered (or all retries are exhausted), which preserves ordering when
|
|
// a subsequent operation depends on the message having been sent.
|
|
func (m *Manager) SendMessage(ctx context.Context, msg bus.OutboundMessage) error {
|
|
m.mu.RLock()
|
|
_, exists := m.channels[msg.Channel]
|
|
w, wExists := m.workers[msg.Channel]
|
|
m.mu.RUnlock()
|
|
|
|
if !exists {
|
|
return fmt.Errorf("channel %s not found", msg.Channel)
|
|
}
|
|
if !wExists || w == nil {
|
|
return fmt.Errorf("channel %s has no active worker", msg.Channel)
|
|
}
|
|
|
|
maxLen := 0
|
|
if mlp, ok := w.ch.(MessageLengthProvider); ok {
|
|
maxLen = mlp.MaxMessageLength()
|
|
}
|
|
if maxLen > 0 && len([]rune(msg.Content)) > maxLen {
|
|
for _, chunk := range SplitMessage(msg.Content, maxLen) {
|
|
chunkMsg := msg
|
|
chunkMsg.Content = chunk
|
|
m.sendWithRetry(ctx, msg.Channel, w, chunkMsg)
|
|
}
|
|
} else {
|
|
m.sendWithRetry(ctx, msg.Channel, w, msg)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SendMedia sends outbound media synchronously through the channel worker's
|
|
// rate limiter and retry logic. It blocks until the media is delivered (or all
|
|
// retries are exhausted), which preserves ordering when later agent behavior
|
|
// depends on actual media delivery.
|
|
func (m *Manager) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
|
|
m.mu.RLock()
|
|
_, exists := m.channels[msg.Channel]
|
|
w, wExists := m.workers[msg.Channel]
|
|
m.mu.RUnlock()
|
|
|
|
if !exists {
|
|
return fmt.Errorf("channel %s not found", msg.Channel)
|
|
}
|
|
if !wExists || w == nil {
|
|
return fmt.Errorf("channel %s has no active worker", msg.Channel)
|
|
}
|
|
|
|
_, err := m.sendMediaWithRetry(ctx, msg.Channel, w, msg)
|
|
return err
|
|
}
|
|
|
|
func (m *Manager) SendToChannel(ctx context.Context, channelName, chatID, content string) error {
|
|
m.mu.RLock()
|
|
_, exists := m.channels[channelName]
|
|
w, wExists := m.workers[channelName]
|
|
m.mu.RUnlock()
|
|
|
|
if !exists {
|
|
return fmt.Errorf("channel %s not found", channelName)
|
|
}
|
|
|
|
msg := bus.OutboundMessage{
|
|
Channel: channelName,
|
|
ChatID: chatID,
|
|
Content: content,
|
|
}
|
|
|
|
if wExists && w != nil {
|
|
select {
|
|
case w.queue <- msg:
|
|
return nil
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
// Fallback: direct send (should not happen)
|
|
channel, _ := m.channels[channelName]
|
|
_, err := channel.Send(ctx, msg)
|
|
return err
|
|
}
|