Files
picoclaw/pkg/channels/manager.go
T
Amir Mamaghani 71134babb9 feat(telegram): stream LLM responses via sendMessageDraft (#1101)
* feat(telegram): stream LLM responses in real-time via sendMessageDraft

Implements real-time token streaming to Telegram using the sendMessageDraft
API (telego v1.6.0). Instead of showing only a "Thinking..." placeholder
until the full response arrives, users now see partial LLM output appear
in the chat as it's generated.

The streaming pipeline threads through all layers:

- StreamingProvider interface (providers/types.go): opt-in ChatStream()
  method that receives an onChunk callback with accumulated text
- OpenAI-compatible SSE streaming (openai_compat/provider.go): parses
  SSE events with stream:true, handles text deltas and tool call assembly
- Anthropic native streaming (anthropic/provider.go): uses SDK's
  NewStreaming() for direct Anthropic API connections
- HTTPProvider delegation (http_provider.go): delegates ChatStream to
  the underlying openai_compat provider
- StreamingCapable + Streamer interfaces (channels/interfaces.go):
  opt-in channel capability like TypingCapable/PlaceholderCapable
- Telegram streamer (telegram/telegram.go): BeginStream returns a
  telegramStreamer that throttles sendMessageDraft calls (3s/200 chars)
  with graceful degradation on API errors
- StreamDelegate bridge (bus/bus.go): decouples agent loop from channel
  manager without tight imports
- Manager integration (manager.go): implements StreamDelegate, tracks
  streamActive state, coordinates with placeholder editing
- Agent loop (loop.go): uses ChatStream when both provider and channel
  support streaming, cancels stream on tool calls, skips PublishOutbound
  when Finalize already delivered the message

Graceful degradation:
- Bots without forum/topics mode: first sendMessageDraft error sets
  failed=true, subsequent Updates become no-ops, Finalize still delivers
  via SendMessage. User sees normal non-streaming behavior.
- Non-streaming providers: type assertion fails, falls back to Chat()
- Config opt-out: streaming.enabled (default true) in telegram config

Closes #1098

* fix(telegram): delete placeholder message when streaming delivers response

When streaming was active, the "Thinking..." placeholder message stayed
in the chat because preSend only deleted the tracking entry without
removing the actual Telegram message. Now preSend deletes the placeholder
via the new MessageDeleter interface when streamActive is set.

* refactor(streaming): remove dead code and simplify streaming wiring

- Delete unused Anthropic ChatStream/parseStream (-131 lines) — factory
  creates HTTPProvider for all OpenAI-compat providers including OpenRouter
- Simplify runLLMIteration from 4 to 3 return values (remove unused
  streamed bool)
- Replace managerStreamer struct with finalizeHookStreamer using embedding
  (Update/Cancel promoted, only Finalize overridden)

* fix(streaming): skip streamer acquisition when SendResponse is false

Heartbeat messages set SendResponse=false but the streaming path
was unconditionally acquiring a streamer, causing HEARTBEAT_OK to
leak to Telegram via streamer.Finalize().

* fix(streaming): guard streamer for non-sendable messages, add streaming config

Skip streamer acquisition for heartbeat (NoHistory=true), preventing
HEARTBEAT_OK from leaking to Telegram via streamer.Finalize().

Add streaming.enabled to Telegram defaults and example config.

* feat(telegram): stream LLM responses in real-time via sendMessageDraft

Implements real-time token streaming to Telegram using the sendMessageDraft
API (telego v1.6.0). Instead of showing only a "Thinking..." placeholder
until the full response arrives, users now see partial LLM output appear
in the chat as it's generated.

The streaming pipeline threads through all layers:

- StreamingProvider interface (providers/types.go): opt-in ChatStream()
  method that receives an onChunk callback with accumulated text
- OpenAI-compatible SSE streaming (openai_compat/provider.go): parses
  SSE events with stream:true, handles text deltas and tool call assembly
- Anthropic native streaming (anthropic/provider.go): uses SDK's
  NewStreaming() for direct Anthropic API connections
- HTTPProvider delegation (http_provider.go): delegates ChatStream to
  the underlying openai_compat provider
- StreamingCapable + Streamer interfaces (channels/interfaces.go):
  opt-in channel capability like TypingCapable/PlaceholderCapable
- Telegram streamer (telegram/telegram.go): BeginStream returns a
  telegramStreamer that throttles sendMessageDraft calls (3s/200 chars)
  with graceful degradation on API errors
- StreamDelegate bridge (bus/bus.go): decouples agent loop from channel
  manager without tight imports
- Manager integration (manager.go): implements StreamDelegate, tracks
  streamActive state, coordinates with placeholder editing
- Agent loop (loop.go): uses ChatStream when both provider and channel
  support streaming, cancels stream on tool calls, skips PublishOutbound
  when Finalize already delivered the message

Graceful degradation:
- Bots without forum/topics mode: first sendMessageDraft error sets
  failed=true, subsequent Updates become no-ops, Finalize still delivers
  via SendMessage. User sees normal non-streaming behavior.
- Non-streaming providers: type assertion fails, falls back to Chat()
- Config opt-out: streaming.enabled (default true) in telegram config

Closes #1098

* fix(telegram): delete placeholder message when streaming delivers response

When streaming was active, the "Thinking..." placeholder message stayed
in the chat because preSend only deleted the tracking entry without
removing the actual Telegram message. Now preSend deletes the placeholder
via the new MessageDeleter interface when streamActive is set.

* refactor(streaming): remove dead code and simplify streaming wiring

- Delete unused Anthropic ChatStream/parseStream (-131 lines) — factory
  creates HTTPProvider for all OpenAI-compat providers including OpenRouter
- Simplify runLLMIteration from 4 to 3 return values (remove unused
  streamed bool)
- Replace managerStreamer struct with finalizeHookStreamer using embedding
  (Update/Cancel promoted, only Finalize overridden)

* fix(streaming): skip streamer acquisition when SendResponse is false

Heartbeat messages set SendResponse=false but the streaming path
was unconditionally acquiring a streamer, causing HEARTBEAT_OK to
leak to Telegram via streamer.Finalize().

* fix(streaming): guard streamer for non-sendable messages, add streaming config

Skip streamer acquisition for heartbeat (NoHistory=true), preventing
HEARTBEAT_OK from leaking to Telegram via streamer.Finalize().

Add streaming.enabled to Telegram defaults and example config.

* fix(picoclaw): add missing closing brace for StreamingProvider interface

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: resolve golangci-lint formatting issues

Fix gci import ordering in telegram and anthropic provider, and break
long function signature in openai_compat provider to satisfy golines.

* fix: address code review feedback on streaming PR

- Deduplicate Streamer interface: alias channels.Streamer to bus.Streamer
  to prevent type drift across packages
- Increase SSE scanner buffer to 10MB max to handle large single-line
  responses that exceed bufio.Scanner's 64KB default
- Switch draftID generation from math/rand to crypto/rand for
  collision-resistant random IDs
- Add context cancellation check in SSE parsing loop so cancelled
  streams stop processing immediately
- Log Finalize failures with chat_id and content length for debugging
  silent message delivery failures

* feat: make streaming throttle interval and min growth configurable

Move hardcoded streamThrottleInterval (3s) and streamMinGrowth (200)
into StreamingConfig so they can be tuned per deployment via config
or environment variables.

* fix(telegram): use parseTelegramChatID in DeleteMessage and BeginStream

These two functions called undefined parseChatID. Use
parseTelegramChatID with _ for the unused threadID instead of adding
a wrapper function. Fixes all three CI checks.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(streaming): set streamActive only after successful Finalize

Move onFinalize hook to run after Streamer.Finalize succeeds, so that
if Finalize fails the streamActive flag stays false and the regular
placeholder fallback path remains available.

Addresses review feedback from @alexhoshina.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-20 21:04:14 +08:00

1061 lines
28 KiB
Go

// PicoClaw - Ultra-lightweight personal AI agent
// Inspired by and based on nanobot: https://github.com/HKUDS/nanobot
// License: MIT
//
// Copyright (c) 2026 PicoClaw contributors
package channels
import (
"context"
"errors"
"fmt"
"math"
"net/http"
"sync"
"time"
"golang.org/x/time/rate"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/constants"
"github.com/sipeed/picoclaw/pkg/health"
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/media"
)
const (
defaultChannelQueueSize = 16
defaultRateLimit = 10 // default 10 msg/s
maxRetries = 3
rateLimitDelay = 1 * time.Second
baseBackoff = 500 * time.Millisecond
maxBackoff = 8 * time.Second
janitorInterval = 10 * time.Second
typingStopTTL = 5 * time.Minute
placeholderTTL = 10 * time.Minute
)
// typingEntry wraps a typing stop function with a creation timestamp for TTL eviction.
type typingEntry struct {
stop func()
createdAt time.Time
}
// reactionEntry wraps a reaction undo function with a creation timestamp for TTL eviction.
type reactionEntry struct {
undo func()
createdAt time.Time
}
// placeholderEntry wraps a placeholder ID with a creation timestamp for TTL eviction.
type placeholderEntry struct {
id string
createdAt time.Time
}
// channelRateConfig maps channel name to per-second rate limit.
var channelRateConfig = map[string]float64{
"telegram": 20,
"discord": 1,
"slack": 1,
"matrix": 2,
"line": 10,
"qq": 5,
"irc": 2,
}
type channelWorker struct {
ch Channel
queue chan bus.OutboundMessage
mediaQueue chan bus.OutboundMediaMessage
done chan struct{}
mediaDone chan struct{}
limiter *rate.Limiter
}
type Manager struct {
channels map[string]Channel
workers map[string]*channelWorker
bus *bus.MessageBus
config *config.Config
mediaStore media.MediaStore
dispatchTask *asyncTask
mux *http.ServeMux
httpServer *http.Server
mu sync.RWMutex
placeholders sync.Map // "channel:chatID" → placeholderID (string)
typingStops sync.Map // "channel:chatID" → func()
reactionUndos sync.Map // "channel:chatID" → reactionEntry
streamActive sync.Map // "channel:chatID" → true (set when streamer.Finalize sent the message)
channelHashes map[string]string // channel name → config hash
}
type asyncTask struct {
cancel context.CancelFunc
}
// RecordPlaceholder registers a placeholder message for later editing.
// Implements PlaceholderRecorder.
func (m *Manager) RecordPlaceholder(channel, chatID, placeholderID string) {
key := channel + ":" + chatID
m.placeholders.Store(key, placeholderEntry{id: placeholderID, createdAt: time.Now()})
}
// SendPlaceholder sends a "Thinking…" placeholder for the given channel/chatID
// and records it for later editing. Returns true if a placeholder was sent.
func (m *Manager) SendPlaceholder(ctx context.Context, channel, chatID string) bool {
m.mu.RLock()
ch, ok := m.channels[channel]
m.mu.RUnlock()
if !ok {
return false
}
pc, ok := ch.(PlaceholderCapable)
if !ok {
return false
}
phID, err := pc.SendPlaceholder(ctx, chatID)
if err != nil || phID == "" {
return false
}
m.RecordPlaceholder(channel, chatID, phID)
return true
}
// RecordTypingStop registers a typing stop function for later invocation.
// Implements PlaceholderRecorder.
func (m *Manager) RecordTypingStop(channel, chatID string, stop func()) {
key := channel + ":" + chatID
entry := typingEntry{stop: stop, createdAt: time.Now()}
if previous, loaded := m.typingStops.Swap(key, entry); loaded {
if oldEntry, ok := previous.(typingEntry); ok && oldEntry.stop != nil {
oldEntry.stop()
}
}
}
// InvokeTypingStop invokes the registered typing stop function for the given channel and chatID.
// It is safe to call even when no typing indicator is active (no-op).
// Used by the agent loop to stop typing when processing completes (success, error, or panic),
// regardless of whether an outbound message is published.
func (m *Manager) InvokeTypingStop(channel, chatID string) {
key := channel + ":" + chatID
if v, loaded := m.typingStops.LoadAndDelete(key); loaded {
if entry, ok := v.(typingEntry); ok {
entry.stop()
}
}
}
// RecordReactionUndo registers a reaction undo function for later invocation.
// Implements PlaceholderRecorder.
func (m *Manager) RecordReactionUndo(channel, chatID string, undo func()) {
key := channel + ":" + chatID
m.reactionUndos.Store(key, reactionEntry{undo: undo, createdAt: time.Now()})
}
// preSend handles typing stop, reaction undo, and placeholder editing before sending a message.
// Returns true if the message was already delivered (skip Send).
func (m *Manager) preSend(ctx context.Context, name string, msg bus.OutboundMessage, ch Channel) bool {
key := name + ":" + msg.ChatID
// 1. Stop typing
if v, loaded := m.typingStops.LoadAndDelete(key); loaded {
if entry, ok := v.(typingEntry); ok {
entry.stop() // idempotent, safe
}
}
// 2. Undo reaction
if v, loaded := m.reactionUndos.LoadAndDelete(key); loaded {
if entry, ok := v.(reactionEntry); ok {
entry.undo() // idempotent, safe
}
}
// 3. If a stream already finalized this message, delete the placeholder and skip send
if _, loaded := m.streamActive.LoadAndDelete(key); loaded {
if v, loaded := m.placeholders.LoadAndDelete(key); loaded {
if entry, ok := v.(placeholderEntry); ok && entry.id != "" {
// Prefer deleting the placeholder (cleaner UX than editing to same content)
if deleter, ok := ch.(MessageDeleter); ok {
deleter.DeleteMessage(ctx, msg.ChatID, entry.id) // best effort
} else if editor, ok := ch.(MessageEditor); ok {
editor.EditMessage(ctx, msg.ChatID, entry.id, msg.Content) // fallback
}
}
}
return true
}
// 4. Try editing placeholder
if v, loaded := m.placeholders.LoadAndDelete(key); loaded {
if entry, ok := v.(placeholderEntry); ok && entry.id != "" {
if editor, ok := ch.(MessageEditor); ok {
if err := editor.EditMessage(ctx, msg.ChatID, entry.id, msg.Content); err == nil {
return true // edited successfully, skip Send
}
// edit failed → fall through to normal Send
}
}
}
return false
}
func NewManager(cfg *config.Config, messageBus *bus.MessageBus, store media.MediaStore) (*Manager, error) {
m := &Manager{
channels: make(map[string]Channel),
workers: make(map[string]*channelWorker),
bus: messageBus,
config: cfg,
mediaStore: store,
channelHashes: make(map[string]string),
}
// Register as streaming delegate so the agent loop can obtain streamers
messageBus.SetStreamDelegate(m)
if err := m.initChannels(&cfg.Channels); err != nil {
return nil, err
}
// Store initial config hashes for all channels
m.channelHashes = toChannelHashes(cfg)
return m, nil
}
// GetStreamer implements bus.StreamDelegate.
// It checks if the named channel supports streaming and returns a Streamer.
func (m *Manager) GetStreamer(ctx context.Context, channelName, chatID string) (bus.Streamer, bool) {
m.mu.RLock()
ch, exists := m.channels[channelName]
m.mu.RUnlock()
if !exists {
return nil, false
}
sc, ok := ch.(StreamingCapable)
if !ok {
return nil, false
}
streamer, err := sc.BeginStream(ctx, chatID)
if err != nil {
logger.DebugCF("channels", "Streaming unavailable, falling back to placeholder", map[string]any{
"channel": channelName,
"error": err.Error(),
})
return nil, false
}
// Mark streamActive on Finalize so preSend knows to clean up the placeholder
key := channelName + ":" + chatID
return &finalizeHookStreamer{
Streamer: streamer,
onFinalize: func() { m.streamActive.Store(key, true) },
}, true
}
// finalizeHookStreamer wraps a Streamer to run a hook on Finalize.
type finalizeHookStreamer struct {
Streamer
onFinalize func()
}
func (s *finalizeHookStreamer) Finalize(ctx context.Context, content string) error {
if err := s.Streamer.Finalize(ctx, content); err != nil {
return err
}
s.onFinalize()
return nil
}
// initChannel is a helper that looks up a factory by name and creates the channel.
func (m *Manager) initChannel(name, displayName string) {
f, ok := getFactory(name)
if !ok {
logger.WarnCF("channels", "Factory not registered", map[string]any{
"channel": displayName,
})
return
}
logger.DebugCF("channels", "Attempting to initialize channel", map[string]any{
"channel": displayName,
})
ch, err := f(m.config, m.bus)
if err != nil {
logger.ErrorCF("channels", "Failed to initialize channel", map[string]any{
"channel": displayName,
"error": err.Error(),
})
} else {
// Inject MediaStore if channel supports it
if m.mediaStore != nil {
if setter, ok := ch.(interface{ SetMediaStore(s media.MediaStore) }); ok {
setter.SetMediaStore(m.mediaStore)
}
}
// Inject PlaceholderRecorder if channel supports it
if setter, ok := ch.(interface{ SetPlaceholderRecorder(r PlaceholderRecorder) }); ok {
setter.SetPlaceholderRecorder(m)
}
// Inject owner reference so BaseChannel.HandleMessage can auto-trigger typing/reaction
if setter, ok := ch.(interface{ SetOwner(ch Channel) }); ok {
setter.SetOwner(ch)
}
m.channels[name] = ch
logger.InfoCF("channels", "Channel enabled successfully", map[string]any{
"channel": displayName,
})
}
}
func (m *Manager) initChannels(channels *config.ChannelsConfig) error {
logger.InfoC("channels", "Initializing channel manager")
if channels.Telegram.Enabled && channels.Telegram.Token != "" {
m.initChannel("telegram", "Telegram")
}
if channels.WhatsApp.Enabled {
waCfg := channels.WhatsApp
if waCfg.UseNative {
m.initChannel("whatsapp_native", "WhatsApp Native")
} else if waCfg.BridgeURL != "" {
m.initChannel("whatsapp", "WhatsApp")
}
}
if channels.Feishu.Enabled {
m.initChannel("feishu", "Feishu")
}
if channels.Discord.Enabled && channels.Discord.Token != "" {
m.initChannel("discord", "Discord")
}
if channels.MaixCam.Enabled {
m.initChannel("maixcam", "MaixCam")
}
if channels.QQ.Enabled {
m.initChannel("qq", "QQ")
}
if channels.DingTalk.Enabled && channels.DingTalk.ClientID != "" {
m.initChannel("dingtalk", "DingTalk")
}
if channels.Slack.Enabled && channels.Slack.BotToken != "" {
m.initChannel("slack", "Slack")
}
if channels.Matrix.Enabled &&
m.config.Channels.Matrix.Homeserver != "" &&
m.config.Channels.Matrix.UserID != "" &&
m.config.Channels.Matrix.AccessToken != "" {
m.initChannel("matrix", "Matrix")
}
if channels.LINE.Enabled && channels.LINE.ChannelAccessToken != "" {
m.initChannel("line", "LINE")
}
if channels.OneBot.Enabled && channels.OneBot.WSUrl != "" {
m.initChannel("onebot", "OneBot")
}
if channels.WeCom.Enabled && channels.WeCom.Token != "" {
m.initChannel("wecom", "WeCom")
}
if m.config.Channels.WeComAIBot.Enabled &&
((m.config.Channels.WeComAIBot.BotID != "" && m.config.Channels.WeComAIBot.Secret != "") ||
m.config.Channels.WeComAIBot.Token != "") {
m.initChannel("wecom_aibot", "WeCom AI Bot")
}
if channels.WeComApp.Enabled && channels.WeComApp.CorpID != "" {
m.initChannel("wecom_app", "WeCom App")
}
if channels.Pico.Enabled && channels.Pico.Token != "" {
m.initChannel("pico", "Pico")
}
if channels.PicoClient.Enabled && channels.PicoClient.URL != "" {
m.initChannel("pico_client", "Pico Client")
}
if channels.IRC.Enabled && channels.IRC.Server != "" {
m.initChannel("irc", "IRC")
}
logger.InfoCF("channels", "Channel initialization completed", map[string]any{
"enabled_channels": len(m.channels),
})
return nil
}
// SetupHTTPServer creates a shared HTTP server with the given listen address.
// It registers health endpoints from the health server and discovers channels
// that implement WebhookHandler and/or HealthChecker to register their handlers.
func (m *Manager) SetupHTTPServer(addr string, healthServer *health.Server) {
m.mux = http.NewServeMux()
// Register health endpoints
if healthServer != nil {
healthServer.RegisterOnMux(m.mux)
}
// Discover and register webhook handlers and health checkers
for name, ch := range m.channels {
if wh, ok := ch.(WebhookHandler); ok {
m.mux.Handle(wh.WebhookPath(), wh)
logger.InfoCF("channels", "Webhook handler registered", map[string]any{
"channel": name,
"path": wh.WebhookPath(),
})
}
if hc, ok := ch.(HealthChecker); ok {
m.mux.HandleFunc(hc.HealthPath(), hc.HealthHandler)
logger.InfoCF("channels", "Health endpoint registered", map[string]any{
"channel": name,
"path": hc.HealthPath(),
})
}
}
m.httpServer = &http.Server{
Addr: addr,
Handler: m.mux,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
}
}
func (m *Manager) StartAll(ctx context.Context) error {
m.mu.Lock()
defer m.mu.Unlock()
if len(m.channels) == 0 {
logger.WarnC("channels", "No channels enabled")
}
logger.InfoC("channels", "Starting all channels")
dispatchCtx, cancel := context.WithCancel(ctx)
m.dispatchTask = &asyncTask{cancel: cancel}
for name, channel := range m.channels {
logger.InfoCF("channels", "Starting channel", map[string]any{
"channel": name,
})
if err := channel.Start(ctx); err != nil {
logger.ErrorCF("channels", "Failed to start channel", map[string]any{
"channel": name,
"error": err.Error(),
})
continue
}
// Lazily create worker only after channel starts successfully
w := newChannelWorker(name, channel)
m.workers[name] = w
go m.runWorker(dispatchCtx, name, w)
go m.runMediaWorker(dispatchCtx, name, w)
}
// Start the dispatcher that reads from the bus and routes to workers
go m.dispatchOutbound(dispatchCtx)
go m.dispatchOutboundMedia(dispatchCtx)
// Start the TTL janitor that cleans up stale typing/placeholder entries
go m.runTTLJanitor(dispatchCtx)
// Start shared HTTP server if configured
if m.httpServer != nil {
go func() {
logger.InfoCF("channels", "Shared HTTP server listening", map[string]any{
"addr": m.httpServer.Addr,
})
if err := m.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.FatalCF("channels", "Shared HTTP server error", map[string]any{
"error": err.Error(),
})
}
}()
}
logger.InfoC("channels", "All channels started")
return nil
}
func (m *Manager) StopAll(ctx context.Context) error {
m.mu.Lock()
defer m.mu.Unlock()
logger.InfoC("channels", "Stopping all channels")
// Shutdown shared HTTP server first
if m.httpServer != nil {
shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := m.httpServer.Shutdown(shutdownCtx); err != nil {
logger.ErrorCF("channels", "Shared HTTP server shutdown error", map[string]any{
"error": err.Error(),
})
}
m.httpServer = nil
}
// Cancel dispatcher
if m.dispatchTask != nil {
m.dispatchTask.cancel()
m.dispatchTask = nil
}
// Close all worker queues and wait for them to drain
for _, w := range m.workers {
if w != nil {
close(w.queue)
}
}
for _, w := range m.workers {
if w != nil {
<-w.done
}
}
// Close all media worker queues and wait for them to drain
for _, w := range m.workers {
if w != nil {
close(w.mediaQueue)
}
}
for _, w := range m.workers {
if w != nil {
<-w.mediaDone
}
}
// Stop all channels
for name, channel := range m.channels {
logger.InfoCF("channels", "Stopping channel", map[string]any{
"channel": name,
})
if err := channel.Stop(ctx); err != nil {
logger.ErrorCF("channels", "Error stopping channel", map[string]any{
"channel": name,
"error": err.Error(),
})
}
}
logger.InfoC("channels", "All channels stopped")
return nil
}
// newChannelWorker creates a channelWorker with a rate limiter configured
// for the given channel name.
func newChannelWorker(name string, ch Channel) *channelWorker {
rateVal := float64(defaultRateLimit)
if r, ok := channelRateConfig[name]; ok {
rateVal = r
}
burst := int(math.Max(1, math.Ceil(rateVal/2)))
return &channelWorker{
ch: ch,
queue: make(chan bus.OutboundMessage, defaultChannelQueueSize),
mediaQueue: make(chan bus.OutboundMediaMessage, defaultChannelQueueSize),
done: make(chan struct{}),
mediaDone: make(chan struct{}),
limiter: rate.NewLimiter(rate.Limit(rateVal), burst),
}
}
// runWorker processes outbound messages for a single channel, splitting
// messages that exceed the channel's maximum message length.
func (m *Manager) runWorker(ctx context.Context, name string, w *channelWorker) {
defer close(w.done)
for {
select {
case msg, ok := <-w.queue:
if !ok {
return
}
maxLen := 0
if mlp, ok := w.ch.(MessageLengthProvider); ok {
maxLen = mlp.MaxMessageLength()
}
if maxLen > 0 && len([]rune(msg.Content)) > maxLen {
chunks := SplitMessage(msg.Content, maxLen)
for _, chunk := range chunks {
chunkMsg := msg
chunkMsg.Content = chunk
m.sendWithRetry(ctx, name, w, chunkMsg)
}
} else {
m.sendWithRetry(ctx, name, w, msg)
}
case <-ctx.Done():
return
}
}
}
// sendWithRetry sends a message through the channel with rate limiting and
// retry logic. It classifies errors to determine the retry strategy:
// - ErrNotRunning / ErrSendFailed: permanent, no retry
// - ErrRateLimit: fixed delay retry
// - ErrTemporary / unknown: exponential backoff retry
func (m *Manager) sendWithRetry(ctx context.Context, name string, w *channelWorker, msg bus.OutboundMessage) {
// Rate limit: wait for token
if err := w.limiter.Wait(ctx); err != nil {
// ctx canceled, shutting down
return
}
// Pre-send: stop typing and try to edit placeholder
if m.preSend(ctx, name, msg, w.ch) {
return // placeholder was edited successfully, skip Send
}
var lastErr error
for attempt := 0; attempt <= maxRetries; attempt++ {
lastErr = w.ch.Send(ctx, msg)
if lastErr == nil {
return
}
// Permanent failures — don't retry
if errors.Is(lastErr, ErrNotRunning) || errors.Is(lastErr, ErrSendFailed) {
break
}
// Last attempt exhausted — don't sleep
if attempt == maxRetries {
break
}
// Rate limit error — fixed delay
if errors.Is(lastErr, ErrRateLimit) {
select {
case <-time.After(rateLimitDelay):
continue
case <-ctx.Done():
return
}
}
// ErrTemporary or unknown error — exponential backoff
backoff := min(time.Duration(float64(baseBackoff)*math.Pow(2, float64(attempt))), maxBackoff)
select {
case <-time.After(backoff):
case <-ctx.Done():
return
}
}
// All retries exhausted or permanent failure
logger.ErrorCF("channels", "Send failed", map[string]any{
"channel": name,
"chat_id": msg.ChatID,
"error": lastErr.Error(),
"retries": maxRetries,
})
}
func dispatchLoop[M any](
ctx context.Context,
m *Manager,
ch <-chan M,
getChannel func(M) string,
enqueue func(context.Context, *channelWorker, M) bool,
startMsg, stopMsg, unknownMsg, noWorkerMsg string,
) {
logger.InfoC("channels", startMsg)
for {
select {
case <-ctx.Done():
logger.InfoC("channels", stopMsg)
return
case msg, ok := <-ch:
if !ok {
logger.InfoC("channels", stopMsg)
return
}
channel := getChannel(msg)
// Silently skip internal channels
if constants.IsInternalChannel(channel) {
continue
}
m.mu.RLock()
_, exists := m.channels[channel]
w, wExists := m.workers[channel]
m.mu.RUnlock()
if !exists {
logger.WarnCF("channels", unknownMsg, map[string]any{"channel": channel})
continue
}
if wExists && w != nil {
if !enqueue(ctx, w, msg) {
return
}
} else if exists {
logger.WarnCF("channels", noWorkerMsg, map[string]any{"channel": channel})
}
}
}
}
func (m *Manager) dispatchOutbound(ctx context.Context) {
dispatchLoop(
ctx, m,
m.bus.OutboundChan(),
func(msg bus.OutboundMessage) string { return msg.Channel },
func(ctx context.Context, w *channelWorker, msg bus.OutboundMessage) bool {
select {
case w.queue <- msg:
return true
case <-ctx.Done():
return false
}
},
"Outbound dispatcher started",
"Outbound dispatcher stopped",
"Unknown channel for outbound message",
"Channel has no active worker, skipping message",
)
}
func (m *Manager) dispatchOutboundMedia(ctx context.Context) {
dispatchLoop(
ctx, m,
m.bus.OutboundMediaChan(),
func(msg bus.OutboundMediaMessage) string { return msg.Channel },
func(ctx context.Context, w *channelWorker, msg bus.OutboundMediaMessage) bool {
select {
case w.mediaQueue <- msg:
return true
case <-ctx.Done():
return false
}
},
"Outbound media dispatcher started",
"Outbound media dispatcher stopped",
"Unknown channel for outbound media message",
"Channel has no active worker, skipping media message",
)
}
// runMediaWorker processes outbound media messages for a single channel.
func (m *Manager) runMediaWorker(ctx context.Context, name string, w *channelWorker) {
defer close(w.mediaDone)
for {
select {
case msg, ok := <-w.mediaQueue:
if !ok {
return
}
m.sendMediaWithRetry(ctx, name, w, msg)
case <-ctx.Done():
return
}
}
}
// sendMediaWithRetry sends a media message through the channel with rate limiting and
// retry logic. If the channel does not implement MediaSender, it silently skips.
func (m *Manager) sendMediaWithRetry(ctx context.Context, name string, w *channelWorker, msg bus.OutboundMediaMessage) {
ms, ok := w.ch.(MediaSender)
if !ok {
logger.DebugCF("channels", "Channel does not support MediaSender, skipping media", map[string]any{
"channel": name,
})
return
}
// Rate limit: wait for token
if err := w.limiter.Wait(ctx); err != nil {
return
}
var lastErr error
for attempt := 0; attempt <= maxRetries; attempt++ {
lastErr = ms.SendMedia(ctx, msg)
if lastErr == nil {
return
}
// Permanent failures — don't retry
if errors.Is(lastErr, ErrNotRunning) || errors.Is(lastErr, ErrSendFailed) {
break
}
// Last attempt exhausted — don't sleep
if attempt == maxRetries {
break
}
// Rate limit error — fixed delay
if errors.Is(lastErr, ErrRateLimit) {
select {
case <-time.After(rateLimitDelay):
continue
case <-ctx.Done():
return
}
}
// ErrTemporary or unknown error — exponential backoff
backoff := min(time.Duration(float64(baseBackoff)*math.Pow(2, float64(attempt))), maxBackoff)
select {
case <-time.After(backoff):
case <-ctx.Done():
return
}
}
// All retries exhausted or permanent failure
logger.ErrorCF("channels", "SendMedia failed", map[string]any{
"channel": name,
"chat_id": msg.ChatID,
"error": lastErr.Error(),
"retries": maxRetries,
})
}
// runTTLJanitor periodically scans the typingStops and placeholders maps
// and evicts entries that have exceeded their TTL. This prevents memory
// accumulation when outbound paths fail to trigger preSend (e.g. LLM errors).
func (m *Manager) runTTLJanitor(ctx context.Context) {
ticker := time.NewTicker(janitorInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case now := <-ticker.C:
m.typingStops.Range(func(key, value any) bool {
if entry, ok := value.(typingEntry); ok {
if now.Sub(entry.createdAt) > typingStopTTL {
if _, loaded := m.typingStops.LoadAndDelete(key); loaded {
entry.stop() // idempotent, safe
}
}
}
return true
})
m.reactionUndos.Range(func(key, value any) bool {
if entry, ok := value.(reactionEntry); ok {
if now.Sub(entry.createdAt) > typingStopTTL {
if _, loaded := m.reactionUndos.LoadAndDelete(key); loaded {
entry.undo() // idempotent, safe
}
}
}
return true
})
m.placeholders.Range(func(key, value any) bool {
if entry, ok := value.(placeholderEntry); ok {
if now.Sub(entry.createdAt) > placeholderTTL {
m.placeholders.Delete(key)
}
}
return true
})
}
}
}
func (m *Manager) GetChannel(name string) (Channel, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
channel, ok := m.channels[name]
return channel, ok
}
func (m *Manager) GetStatus() map[string]any {
m.mu.RLock()
defer m.mu.RUnlock()
status := make(map[string]any)
for name, channel := range m.channels {
status[name] = map[string]any{
"enabled": true,
"running": channel.IsRunning(),
}
}
return status
}
func (m *Manager) GetEnabledChannels() []string {
m.mu.RLock()
defer m.mu.RUnlock()
names := make([]string, 0, len(m.channels))
for name := range m.channels {
names = append(names, name)
}
return names
}
// Reload updates the config reference without restarting channels.
// This is used when channel config hasn't changed but other parts of the config have.
func (m *Manager) Reload(ctx context.Context, cfg *config.Config) error {
m.mu.Lock()
defer m.mu.Unlock()
list := toChannelHashes(cfg)
added, removed := compareChannels(m.channelHashes, list)
for _, name := range removed {
// Stop all channels
channel := m.channels[name]
logger.InfoCF("channels", "Stopping channel", map[string]any{
"channel": name,
})
if err := channel.Stop(ctx); err != nil {
logger.ErrorCF("channels", "Error stopping channel", map[string]any{
"channel": name,
"error": err.Error(),
})
}
go func() {
m.UnregisterChannel(name)
}()
}
dispatchCtx, cancel := context.WithCancel(ctx)
m.dispatchTask = &asyncTask{cancel: cancel}
cc, err := toChannelConfig(cfg, added)
if err != nil {
logger.ErrorC("channels", fmt.Sprintf("toChannelConfig error: %v", err))
return err
}
err = m.initChannels(cc)
if err != nil {
logger.ErrorC("channels", fmt.Sprintf("initChannels error: %v", err))
return err
}
for _, name := range added {
channel := m.channels[name]
logger.InfoCF("channels", "Starting channel", map[string]any{
"channel": name,
})
if err := channel.Start(ctx); err != nil {
logger.ErrorCF("channels", "Failed to start channel", map[string]any{
"channel": name,
"error": err.Error(),
})
continue
}
// Lazily create worker only after channel starts successfully
w := newChannelWorker(name, channel)
m.workers[name] = w
go m.runWorker(dispatchCtx, name, w)
go m.runMediaWorker(dispatchCtx, name, w)
go func() {
m.RegisterChannel(name, channel)
}()
}
m.config = cfg
m.channelHashes = toChannelHashes(cfg)
return nil
}
func (m *Manager) RegisterChannel(name string, channel Channel) {
m.mu.Lock()
defer m.mu.Unlock()
m.channels[name] = channel
}
func (m *Manager) UnregisterChannel(name string) {
m.mu.Lock()
defer m.mu.Unlock()
if w, ok := m.workers[name]; ok && w != nil {
close(w.queue)
<-w.done
close(w.mediaQueue)
<-w.mediaDone
}
delete(m.workers, name)
delete(m.channels, name)
}
// SendMessage sends an outbound message synchronously through the channel
// worker's rate limiter and retry logic. It blocks until the message is
// delivered (or all retries are exhausted), which preserves ordering when
// a subsequent operation depends on the message having been sent.
func (m *Manager) SendMessage(ctx context.Context, msg bus.OutboundMessage) error {
m.mu.RLock()
_, exists := m.channels[msg.Channel]
w, wExists := m.workers[msg.Channel]
m.mu.RUnlock()
if !exists {
return fmt.Errorf("channel %s not found", msg.Channel)
}
if !wExists || w == nil {
return fmt.Errorf("channel %s has no active worker", msg.Channel)
}
maxLen := 0
if mlp, ok := w.ch.(MessageLengthProvider); ok {
maxLen = mlp.MaxMessageLength()
}
if maxLen > 0 && len([]rune(msg.Content)) > maxLen {
for _, chunk := range SplitMessage(msg.Content, maxLen) {
chunkMsg := msg
chunkMsg.Content = chunk
m.sendWithRetry(ctx, msg.Channel, w, chunkMsg)
}
} else {
m.sendWithRetry(ctx, msg.Channel, w, msg)
}
return nil
}
func (m *Manager) SendToChannel(ctx context.Context, channelName, chatID, content string) error {
m.mu.RLock()
_, exists := m.channels[channelName]
w, wExists := m.workers[channelName]
m.mu.RUnlock()
if !exists {
return fmt.Errorf("channel %s not found", channelName)
}
msg := bus.OutboundMessage{
Channel: channelName,
ChatID: chatID,
Content: content,
}
if wExists && w != nil {
select {
case w.queue <- msg:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// Fallback: direct send (should not happen)
channel, _ := m.channels[channelName]
return channel.Send(ctx, msg)
}