Files
picoclaw/pkg/channels/base.go
T
Amir Mamaghani 71134babb9 feat(telegram): stream LLM responses via sendMessageDraft (#1101)
* feat(telegram): stream LLM responses in real-time via sendMessageDraft

Implements real-time token streaming to Telegram using the sendMessageDraft
API (telego v1.6.0). Instead of showing only a "Thinking..." placeholder
until the full response arrives, users now see partial LLM output appear
in the chat as it's generated.

The streaming pipeline threads through all layers:

- StreamingProvider interface (providers/types.go): opt-in ChatStream()
  method that receives an onChunk callback with accumulated text
- OpenAI-compatible SSE streaming (openai_compat/provider.go): parses
  SSE events with stream:true, handles text deltas and tool call assembly
- Anthropic native streaming (anthropic/provider.go): uses SDK's
  NewStreaming() for direct Anthropic API connections
- HTTPProvider delegation (http_provider.go): delegates ChatStream to
  the underlying openai_compat provider
- StreamingCapable + Streamer interfaces (channels/interfaces.go):
  opt-in channel capability like TypingCapable/PlaceholderCapable
- Telegram streamer (telegram/telegram.go): BeginStream returns a
  telegramStreamer that throttles sendMessageDraft calls (3s/200 chars)
  with graceful degradation on API errors
- StreamDelegate bridge (bus/bus.go): decouples agent loop from channel
  manager without tight imports
- Manager integration (manager.go): implements StreamDelegate, tracks
  streamActive state, coordinates with placeholder editing
- Agent loop (loop.go): uses ChatStream when both provider and channel
  support streaming, cancels stream on tool calls, skips PublishOutbound
  when Finalize already delivered the message

Graceful degradation:
- Bots without forum/topics mode: first sendMessageDraft error sets
  failed=true, subsequent Updates become no-ops, Finalize still delivers
  via SendMessage. User sees normal non-streaming behavior.
- Non-streaming providers: type assertion fails, falls back to Chat()
- Config opt-out: streaming.enabled (default true) in telegram config

Closes #1098

* fix(telegram): delete placeholder message when streaming delivers response

When streaming was active, the "Thinking..." placeholder message stayed
in the chat because preSend only deleted the tracking entry without
removing the actual Telegram message. Now preSend deletes the placeholder
via the new MessageDeleter interface when streamActive is set.

* refactor(streaming): remove dead code and simplify streaming wiring

- Delete unused Anthropic ChatStream/parseStream (-131 lines) — factory
  creates HTTPProvider for all OpenAI-compat providers including OpenRouter
- Simplify runLLMIteration from 4 to 3 return values (remove unused
  streamed bool)
- Replace managerStreamer struct with finalizeHookStreamer using embedding
  (Update/Cancel promoted, only Finalize overridden)

* fix(streaming): skip streamer acquisition when SendResponse is false

Heartbeat messages set SendResponse=false but the streaming path
was unconditionally acquiring a streamer, causing HEARTBEAT_OK to
leak to Telegram via streamer.Finalize().

* fix(streaming): guard streamer for non-sendable messages, add streaming config

Skip streamer acquisition for heartbeat (NoHistory=true), preventing
HEARTBEAT_OK from leaking to Telegram via streamer.Finalize().

Add streaming.enabled to Telegram defaults and example config.

* feat(telegram): stream LLM responses in real-time via sendMessageDraft

Implements real-time token streaming to Telegram using the sendMessageDraft
API (telego v1.6.0). Instead of showing only a "Thinking..." placeholder
until the full response arrives, users now see partial LLM output appear
in the chat as it's generated.

The streaming pipeline threads through all layers:

- StreamingProvider interface (providers/types.go): opt-in ChatStream()
  method that receives an onChunk callback with accumulated text
- OpenAI-compatible SSE streaming (openai_compat/provider.go): parses
  SSE events with stream:true, handles text deltas and tool call assembly
- Anthropic native streaming (anthropic/provider.go): uses SDK's
  NewStreaming() for direct Anthropic API connections
- HTTPProvider delegation (http_provider.go): delegates ChatStream to
  the underlying openai_compat provider
- StreamingCapable + Streamer interfaces (channels/interfaces.go):
  opt-in channel capability like TypingCapable/PlaceholderCapable
- Telegram streamer (telegram/telegram.go): BeginStream returns a
  telegramStreamer that throttles sendMessageDraft calls (3s/200 chars)
  with graceful degradation on API errors
- StreamDelegate bridge (bus/bus.go): decouples agent loop from channel
  manager without tight imports
- Manager integration (manager.go): implements StreamDelegate, tracks
  streamActive state, coordinates with placeholder editing
- Agent loop (loop.go): uses ChatStream when both provider and channel
  support streaming, cancels stream on tool calls, skips PublishOutbound
  when Finalize already delivered the message

Graceful degradation:
- Bots without forum/topics mode: first sendMessageDraft error sets
  failed=true, subsequent Updates become no-ops, Finalize still delivers
  via SendMessage. User sees normal non-streaming behavior.
- Non-streaming providers: type assertion fails, falls back to Chat()
- Config opt-out: streaming.enabled (default true) in telegram config

Closes #1098

* fix(telegram): delete placeholder message when streaming delivers response

When streaming was active, the "Thinking..." placeholder message stayed
in the chat because preSend only deleted the tracking entry without
removing the actual Telegram message. Now preSend deletes the placeholder
via the new MessageDeleter interface when streamActive is set.

* refactor(streaming): remove dead code and simplify streaming wiring

- Delete unused Anthropic ChatStream/parseStream (-131 lines) — factory
  creates HTTPProvider for all OpenAI-compat providers including OpenRouter
- Simplify runLLMIteration from 4 to 3 return values (remove unused
  streamed bool)
- Replace managerStreamer struct with finalizeHookStreamer using embedding
  (Update/Cancel promoted, only Finalize overridden)

* fix(streaming): skip streamer acquisition when SendResponse is false

Heartbeat messages set SendResponse=false but the streaming path
was unconditionally acquiring a streamer, causing HEARTBEAT_OK to
leak to Telegram via streamer.Finalize().

* fix(streaming): guard streamer for non-sendable messages, add streaming config

Skip streamer acquisition for heartbeat (NoHistory=true), preventing
HEARTBEAT_OK from leaking to Telegram via streamer.Finalize().

Add streaming.enabled to Telegram defaults and example config.

* fix(picoclaw): add missing closing brace for StreamingProvider interface

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: resolve golangci-lint formatting issues

Fix gci import ordering in telegram and anthropic provider, and break
long function signature in openai_compat provider to satisfy golines.

* fix: address code review feedback on streaming PR

- Deduplicate Streamer interface: alias channels.Streamer to bus.Streamer
  to prevent type drift across packages
- Increase SSE scanner buffer to 10MB max to handle large single-line
  responses that exceed bufio.Scanner's 64KB default
- Switch draftID generation from math/rand to crypto/rand for
  collision-resistant random IDs
- Add context cancellation check in SSE parsing loop so cancelled
  streams stop processing immediately
- Log Finalize failures with chat_id and content length for debugging
  silent message delivery failures

* feat: make streaming throttle interval and min growth configurable

Move hardcoded streamThrottleInterval (3s) and streamMinGrowth (200)
into StreamingConfig so they can be tuned per deployment via config
or environment variables.

* fix(telegram): use parseTelegramChatID in DeleteMessage and BeginStream

These two functions called undefined parseChatID. Use
parseTelegramChatID with _ for the unused threadID instead of adding
a wrapper function. Fixes all three CI checks.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(streaming): set streamActive only after successful Finalize

Move onFinalize hook to run after Streamer.Finalize succeeds, so that
if Finalize fails the streamActive flag stays false and the regular
placeholder fallback path remains available.

Addresses review feedback from @alexhoshina.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-20 21:04:14 +08:00

351 lines
10 KiB
Go

package channels
import (
"context"
"crypto/rand"
"encoding/binary"
"encoding/hex"
"regexp"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/identity"
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/media"
)
var (
uniqueIDCounter uint64
uniqueIDPrefix string
)
func init() {
// One-time read from crypto/rand for a unique prefix (single syscall).
var b [8]byte
if _, err := rand.Read(b[:]); err != nil {
// fallback to time-based prefix
binary.BigEndian.PutUint64(b[:], uint64(time.Now().UnixNano()))
}
uniqueIDPrefix = hex.EncodeToString(b[:])
}
// audioAnnotationRe matches audio/voice annotations injected by channels (e.g. [voice], [audio: file.ogg]).
var audioAnnotationRe = regexp.MustCompile(`\[(voice|audio)(?::[^\]]*)?\]`)
// uniqueID generates a process-unique ID using a random prefix and an atomic counter.
// This ID is intended for internal correlation (e.g. media scope keys) and is NOT
// cryptographically secure — it must not be used in contexts where unpredictability matters.
func uniqueID() string {
n := atomic.AddUint64(&uniqueIDCounter, 1)
return uniqueIDPrefix + strconv.FormatUint(n, 16)
}
type Channel interface {
Name() string
Start(ctx context.Context) error
Stop(ctx context.Context) error
Send(ctx context.Context, msg bus.OutboundMessage) error
IsRunning() bool
IsAllowed(senderID string) bool
IsAllowedSender(sender bus.SenderInfo) bool
ReasoningChannelID() string
}
// BaseChannelOption is a functional option for configuring a BaseChannel.
type BaseChannelOption func(*BaseChannel)
// WithMaxMessageLength sets the maximum message length (in runes) for a channel.
// Messages exceeding this limit will be automatically split by the Manager.
// A value of 0 means no limit.
func WithMaxMessageLength(n int) BaseChannelOption {
return func(c *BaseChannel) { c.maxMessageLength = n }
}
// WithGroupTrigger sets the group trigger configuration for a channel.
func WithGroupTrigger(gt config.GroupTriggerConfig) BaseChannelOption {
return func(c *BaseChannel) { c.groupTrigger = gt }
}
// WithReasoningChannelID sets the reasoning channel ID where thoughts should be sent.
func WithReasoningChannelID(id string) BaseChannelOption {
return func(c *BaseChannel) { c.reasoningChannelID = id }
}
// MessageLengthProvider is an opt-in interface that channels implement
// to advertise their maximum message length. The Manager uses this via
// type assertion to decide whether to split outbound messages.
type MessageLengthProvider interface {
MaxMessageLength() int
}
type BaseChannel struct {
config any
bus *bus.MessageBus
running atomic.Bool
name string
allowList []string
maxMessageLength int
groupTrigger config.GroupTriggerConfig
mediaStore media.MediaStore
placeholderRecorder PlaceholderRecorder
owner Channel // the concrete channel that embeds this BaseChannel
reasoningChannelID string
}
func NewBaseChannel(
name string,
config any,
bus *bus.MessageBus,
allowList []string,
opts ...BaseChannelOption,
) *BaseChannel {
bc := &BaseChannel{
config: config,
bus: bus,
name: name,
allowList: allowList,
}
for _, opt := range opts {
opt(bc)
}
return bc
}
// MaxMessageLength returns the maximum message length (in runes) for this channel.
// A value of 0 means no limit.
func (c *BaseChannel) MaxMessageLength() int {
return c.maxMessageLength
}
// ShouldRespondInGroup determines whether the bot should respond in a group chat.
// Each channel is responsible for:
// 1. Detecting isMentioned (platform-specific)
// 2. Stripping bot mention from content (platform-specific)
// 3. Calling this method to get the group response decision
//
// Logic:
// - If isMentioned → always respond
// - If mention_only configured and not mentioned → ignore
// - If prefixes configured → respond if content starts with any prefix (strip it)
// - If prefixes configured but no match and not mentioned → ignore
// - Otherwise (no group_trigger configured) → respond to all (permissive default)
func (c *BaseChannel) ShouldRespondInGroup(isMentioned bool, content string) (bool, string) {
gt := c.groupTrigger
// Mentioned → always respond
if isMentioned {
return true, strings.TrimSpace(content)
}
// mention_only → require mention
if gt.MentionOnly {
return false, content
}
// Prefix matching
if len(gt.Prefixes) > 0 {
for _, prefix := range gt.Prefixes {
if prefix != "" && strings.HasPrefix(content, prefix) {
return true, strings.TrimSpace(strings.TrimPrefix(content, prefix))
}
}
// Prefixes configured but none matched and not mentioned → ignore
return false, content
}
// No group_trigger configured → permissive (respond to all)
return true, strings.TrimSpace(content)
}
func (c *BaseChannel) Name() string {
return c.name
}
func (c *BaseChannel) ReasoningChannelID() string {
return c.reasoningChannelID
}
func (c *BaseChannel) IsRunning() bool {
return c.running.Load()
}
func (c *BaseChannel) IsAllowed(senderID string) bool {
if len(c.allowList) == 0 {
return true
}
// Extract parts from compound senderID like "123456|username"
idPart := senderID
userPart := ""
if idx := strings.Index(senderID, "|"); idx > 0 {
idPart = senderID[:idx]
userPart = senderID[idx+1:]
}
for _, allowed := range c.allowList {
// Strip leading "@" from allowed value for username matching
trimmed := strings.TrimPrefix(allowed, "@")
allowedID := trimmed
allowedUser := ""
if idx := strings.Index(trimmed, "|"); idx > 0 {
allowedID = trimmed[:idx]
allowedUser = trimmed[idx+1:]
}
// Support either side using "id|username" compound form.
// This keeps backward compatibility with legacy Telegram allowlist entries.
if senderID == allowed ||
idPart == allowed ||
senderID == trimmed ||
idPart == trimmed ||
idPart == allowedID ||
(allowedUser != "" && senderID == allowedUser) ||
(userPart != "" && (userPart == allowed || userPart == trimmed || userPart == allowedUser)) {
return true
}
}
return false
}
// IsAllowedSender checks whether a structured SenderInfo is permitted by the allow-list.
// It delegates to identity.MatchAllowed for each entry, providing unified matching
// across all legacy formats and the new canonical "platform:id" format.
func (c *BaseChannel) IsAllowedSender(sender bus.SenderInfo) bool {
if len(c.allowList) == 0 {
return true
}
for _, allowed := range c.allowList {
if identity.MatchAllowed(sender, allowed) {
return true
}
}
return false
}
func (c *BaseChannel) HandleMessage(
ctx context.Context,
peer bus.Peer,
messageID, senderID, chatID, content string,
media []string,
metadata map[string]string,
senderOpts ...bus.SenderInfo,
) {
// Use SenderInfo-based allow check when available, else fall back to string
var sender bus.SenderInfo
if len(senderOpts) > 0 {
sender = senderOpts[0]
}
if sender.CanonicalID != "" || sender.PlatformID != "" {
if !c.IsAllowedSender(sender) {
return
}
} else {
if !c.IsAllowed(senderID) {
return
}
}
// Set SenderID to canonical if available, otherwise keep the raw senderID
resolvedSenderID := senderID
if sender.CanonicalID != "" {
resolvedSenderID = sender.CanonicalID
}
scope := BuildMediaScope(c.name, chatID, messageID)
msg := bus.InboundMessage{
Channel: c.name,
SenderID: resolvedSenderID,
Sender: sender,
ChatID: chatID,
Content: content,
Media: media,
Peer: peer,
MessageID: messageID,
MediaScope: scope,
Metadata: metadata,
}
// Auto-trigger typing indicator, message reaction, and placeholder before publishing.
// Each capability is independent — all three may fire for the same message.
// Note: even when streaming is available, we still show typing + placeholder on inbound.
// If streaming actually activates, preSend will skip the placeholder edit (streamActive map)
// and the typing stop will still be called. This avoids the problem of compile-time interface
// checks incorrectly skipping indicators when streaming may not work at runtime.
if c.owner != nil && c.placeholderRecorder != nil {
// Typing
if tc, ok := c.owner.(TypingCapable); ok {
if stop, err := tc.StartTyping(ctx, chatID); err == nil {
c.placeholderRecorder.RecordTypingStop(c.name, chatID, stop)
}
}
// Reaction
if rc, ok := c.owner.(ReactionCapable); ok && messageID != "" {
if undo, err := rc.ReactToMessage(ctx, chatID, messageID); err == nil {
c.placeholderRecorder.RecordReactionUndo(c.name, chatID, undo)
}
}
// Placeholder — independent pipeline.
// Skip when the message contains audio: the agent will send the
// placeholder after transcription completes, so the user sees
// "Thinking…" only once the voice has been processed.
if !audioAnnotationRe.MatchString(content) {
if pc, ok := c.owner.(PlaceholderCapable); ok {
if phID, err := pc.SendPlaceholder(ctx, chatID); err == nil && phID != "" {
c.placeholderRecorder.RecordPlaceholder(c.name, chatID, phID)
}
}
}
}
if err := c.bus.PublishInbound(ctx, msg); err != nil {
logger.ErrorCF("channels", "Failed to publish inbound message", map[string]any{
"channel": c.name,
"chat_id": chatID,
"error": err.Error(),
})
}
}
func (c *BaseChannel) SetRunning(running bool) {
c.running.Store(running)
}
// SetMediaStore injects a MediaStore into the channel.
func (c *BaseChannel) SetMediaStore(s media.MediaStore) { c.mediaStore = s }
// GetMediaStore returns the injected MediaStore (may be nil).
func (c *BaseChannel) GetMediaStore() media.MediaStore { return c.mediaStore }
// SetPlaceholderRecorder injects a PlaceholderRecorder into the channel.
func (c *BaseChannel) SetPlaceholderRecorder(r PlaceholderRecorder) {
c.placeholderRecorder = r
}
// GetPlaceholderRecorder returns the injected PlaceholderRecorder (may be nil).
func (c *BaseChannel) GetPlaceholderRecorder() PlaceholderRecorder {
return c.placeholderRecorder
}
// SetOwner injects the concrete channel that embeds this BaseChannel.
// This allows HandleMessage to auto-trigger TypingCapable / ReactionCapable / PlaceholderCapable.
func (c *BaseChannel) SetOwner(ch Channel) {
c.owner = ch
}
// BuildMediaScope constructs a scope key for media lifecycle tracking.
func BuildMediaScope(channel, chatID, messageID string) string {
id := messageID
if id == "" {
id = uniqueID()
}
return channel + ":" + chatID + ":" + id
}