diff --git a/pkg/channels/telegram.go b/pkg/channels/telegram.go
deleted file mode 100644
index c1ade454c..000000000
--- a/pkg/channels/telegram.go
+++ /dev/null
@@ -1,585 +0,0 @@
-package channels
-
-import (
- "context"
- "fmt"
- "net/http"
- "net/url"
- "os"
- "regexp"
- "slices"
- "strings"
- "sync"
- "time"
-
- "github.com/mymmrac/telego"
- th "github.com/mymmrac/telego/telegohandler"
- tu "github.com/mymmrac/telego/telegoutil"
-
- "github.com/sipeed/picoclaw/pkg/bus"
- "github.com/sipeed/picoclaw/pkg/config"
- "github.com/sipeed/picoclaw/pkg/logger"
- "github.com/sipeed/picoclaw/pkg/utils"
- "github.com/sipeed/picoclaw/pkg/voice"
-)
-
-type TelegramChannel struct {
- *BaseChannel
- bot *telego.Bot
- botHandler *th.BotHandler
- commands TelegramCommander
- config *config.Config
- chatIDs map[string]int64
- transcriber *voice.GroqTranscriber
- placeholders sync.Map // chatID -> messageID
- stopThinking sync.Map // chatID -> thinkingCancel
-}
-
-type thinkingCancel struct {
- fn context.CancelFunc
-}
-
-func (c *thinkingCancel) Cancel() {
- if c != nil && c.fn != nil {
- c.fn()
- }
-}
-
-func NewTelegramChannel(cfg *config.Config, bus *bus.MessageBus) (*TelegramChannel, error) {
- var opts []telego.BotOption
- telegramCfg := cfg.Channels.Telegram
-
- if telegramCfg.Proxy != "" {
- proxyURL, parseErr := url.Parse(telegramCfg.Proxy)
- if parseErr != nil {
- return nil, fmt.Errorf("invalid proxy URL %q: %w", telegramCfg.Proxy, parseErr)
- }
- opts = append(opts, telego.WithHTTPClient(&http.Client{
- Transport: &http.Transport{
- Proxy: http.ProxyURL(proxyURL),
- },
- }))
- } else if os.Getenv("HTTP_PROXY") != "" || os.Getenv("HTTPS_PROXY") != "" {
- // Use environment proxy if configured
- opts = append(opts, telego.WithHTTPClient(&http.Client{
- Transport: &http.Transport{
- Proxy: http.ProxyFromEnvironment,
- },
- }))
- }
-
- bot, err := telego.NewBot(telegramCfg.Token, opts...)
- if err != nil {
- return nil, fmt.Errorf("failed to create telegram bot: %w", err)
- }
-
- base := NewBaseChannel("telegram", telegramCfg, bus, telegramCfg.AllowFrom)
-
- return &TelegramChannel{
- BaseChannel: base,
- commands: NewTelegramCommands(bot, cfg),
- bot: bot,
- config: cfg,
- chatIDs: make(map[string]int64),
- transcriber: nil,
- placeholders: sync.Map{},
- stopThinking: sync.Map{},
- }, nil
-}
-
-func (c *TelegramChannel) SetTranscriber(transcriber *voice.GroqTranscriber) {
- c.transcriber = transcriber
-}
-
-func (c *TelegramChannel) Start(ctx context.Context) error {
- logger.InfoC("telegram", "Starting Telegram bot (polling mode)...")
-
- if err := c.initBotCommands(ctx); err != nil {
- logger.WarnCF("telegram", "Failed to initialize bot commands", map[string]any{
- "error": err.Error(),
- })
- }
-
- updates, err := c.bot.UpdatesViaLongPolling(ctx, &telego.GetUpdatesParams{
- Timeout: 30,
- })
- if err != nil {
- return fmt.Errorf("failed to start long polling: %w", err)
- }
-
- bh, err := th.NewBotHandler(c.bot, updates)
- if err != nil {
- return fmt.Errorf("failed to create bot handler: %w", err)
- }
- c.botHandler = bh
-
- bh.HandleMessage(func(ctx *th.Context, message telego.Message) error {
- return c.commands.Start(ctx, message)
- }, th.CommandEqual("start"))
- bh.HandleMessage(func(ctx *th.Context, message telego.Message) error {
- return c.commands.Help(ctx, message)
- }, th.CommandEqual("help"))
-
- bh.HandleMessage(func(ctx *th.Context, message telego.Message) error {
- return c.commands.Show(ctx, message)
- }, th.CommandEqual("show"))
-
- bh.HandleMessage(func(ctx *th.Context, message telego.Message) error {
- return c.commands.List(ctx, message)
- }, th.CommandEqual("list"))
-
- bh.HandleMessage(func(ctx *th.Context, message telego.Message) error {
- return c.handleMessage(ctx, &message)
- }, th.AnyMessage())
-
- c.setRunning(true)
- logger.InfoCF("telegram", "Telegram bot connected", map[string]any{
- "username": c.bot.Username(),
- })
-
- go func() {
- if err = bh.Start(); err != nil {
- logger.ErrorCF("telegram", "Bot handler failed", map[string]any{
- "error": err.Error(),
- })
- }
- }()
-
- return nil
-}
-
-func (c *TelegramChannel) Stop(ctx context.Context) error {
- logger.InfoC("telegram", "Stopping Telegram bot...")
- c.setRunning(false)
- if c.botHandler != nil {
- _ = c.botHandler.StopWithContext(ctx)
- }
- return nil
-}
-
-func (c *TelegramChannel) initBotCommands(ctx context.Context) error {
- currentCommands, err := c.bot.GetMyCommands(ctx, &telego.GetMyCommandsParams{
- Scope: tu.ScopeDefault(),
- })
- if err != nil {
- return fmt.Errorf("get commands: %w", err)
- }
-
- commands := []telego.BotCommand{
- {
- Command: "start",
- Description: "Start the bot",
- },
- {
- Command: "help",
- Description: "Show a help message",
- },
- {
- Command: "show",
- Description: "Show current configuration",
- },
- {
- Command: "list",
- Description: "List available options",
- },
- }
-
- // Setting commands on each start will hit the rate limit very quickly, that's why we check if an update is needed
- if !slices.Equal(currentCommands, commands) {
- logger.InfoC("telegram", "Updating bot commands")
-
- err = c.bot.SetMyCommands(ctx, &telego.SetMyCommandsParams{
- Commands: commands,
- Scope: tu.ScopeDefault(),
- })
- if err != nil {
- return fmt.Errorf("set commands: %w", err)
- }
- } else {
- logger.DebugC("telegram", "Bot commands are up to date")
- }
-
- return nil
-}
-
-func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
- if !c.IsRunning() {
- return fmt.Errorf("telegram bot not running")
- }
-
- chatID, err := parseChatID(msg.ChatID)
- if err != nil {
- return fmt.Errorf("invalid chat ID: %w", err)
- }
-
- // Stop thinking animation
- if stop, ok := c.stopThinking.Load(msg.ChatID); ok {
- if cf, ok := stop.(*thinkingCancel); ok && cf != nil {
- cf.Cancel()
- }
- c.stopThinking.Delete(msg.ChatID)
- }
-
- htmlContent := markdownToTelegramHTML(msg.Content)
-
- // Try to edit placeholder
- if pID, ok := c.placeholders.Load(msg.ChatID); ok {
- c.placeholders.Delete(msg.ChatID)
- editMsg := tu.EditMessageText(tu.ID(chatID), pID.(int), htmlContent)
- editMsg.ParseMode = telego.ModeHTML
-
- if _, err = c.bot.EditMessageText(ctx, editMsg); err == nil {
- return nil
- }
- // Fallback to new message if edit fails
- }
-
- tgMsg := tu.Message(tu.ID(chatID), htmlContent)
- tgMsg.ParseMode = telego.ModeHTML
-
- if _, err = c.bot.SendMessage(ctx, tgMsg); err != nil {
- logger.ErrorCF("telegram", "HTML parse failed, falling back to plain text", map[string]any{
- "error": err.Error(),
- })
- tgMsg.ParseMode = ""
- _, err = c.bot.SendMessage(ctx, tgMsg)
- return err
- }
-
- return nil
-}
-
-func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Message) error {
- if message == nil {
- return fmt.Errorf("message is nil")
- }
-
- user := message.From
- if user == nil {
- return fmt.Errorf("message sender (user) is nil")
- }
-
- senderID := fmt.Sprintf("%d", user.ID)
- if user.Username != "" {
- senderID = fmt.Sprintf("%d|%s", user.ID, user.Username)
- }
-
- // 检查白名单,避免为被拒绝的用户下载附件
- if !c.IsAllowed(senderID) {
- logger.DebugCF("telegram", "Message rejected by allowlist", map[string]any{
- "user_id": senderID,
- })
- return nil
- }
-
- chatID := message.Chat.ID
- c.chatIDs[senderID] = chatID
-
- content := ""
- mediaPaths := []string{}
- localFiles := []string{} // 跟踪需要清理的本地文件
-
- // 确保临时文件在函数返回时被清理
- defer func() {
- for _, file := range localFiles {
- if err := os.Remove(file); err != nil {
- logger.DebugCF("telegram", "Failed to cleanup temp file", map[string]any{
- "file": file,
- "error": err.Error(),
- })
- }
- }
- }()
-
- if message.Text != "" {
- content += message.Text
- }
-
- if message.Caption != "" {
- if content != "" {
- content += "\n"
- }
- content += message.Caption
- }
-
- if len(message.Photo) > 0 {
- photo := message.Photo[len(message.Photo)-1]
- photoPath := c.downloadPhoto(ctx, photo.FileID)
- if photoPath != "" {
- localFiles = append(localFiles, photoPath)
- mediaPaths = append(mediaPaths, photoPath)
- if content != "" {
- content += "\n"
- }
- content += "[image: photo]"
- }
- }
-
- if message.Voice != nil {
- voicePath := c.downloadFile(ctx, message.Voice.FileID, ".ogg")
- if voicePath != "" {
- localFiles = append(localFiles, voicePath)
- mediaPaths = append(mediaPaths, voicePath)
-
- transcribedText := ""
- if c.transcriber != nil && c.transcriber.IsAvailable() {
- ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
- defer cancel()
-
- result, err := c.transcriber.Transcribe(ctx, voicePath)
- if err != nil {
- logger.ErrorCF("telegram", "Voice transcription failed", map[string]any{
- "error": err.Error(),
- "path": voicePath,
- })
- transcribedText = "[voice (transcription failed)]"
- } else {
- transcribedText = fmt.Sprintf("[voice transcription: %s]", result.Text)
- logger.InfoCF("telegram", "Voice transcribed successfully", map[string]any{
- "text": result.Text,
- })
- }
- } else {
- transcribedText = "[voice]"
- }
-
- if content != "" {
- content += "\n"
- }
- content += transcribedText
- }
- }
-
- if message.Audio != nil {
- audioPath := c.downloadFile(ctx, message.Audio.FileID, ".mp3")
- if audioPath != "" {
- localFiles = append(localFiles, audioPath)
- mediaPaths = append(mediaPaths, audioPath)
- if content != "" {
- content += "\n"
- }
- content += "[audio]"
- }
- }
-
- if message.Document != nil {
- docPath := c.downloadFile(ctx, message.Document.FileID, "")
- if docPath != "" {
- localFiles = append(localFiles, docPath)
- mediaPaths = append(mediaPaths, docPath)
- if content != "" {
- content += "\n"
- }
- content += "[file]"
- }
- }
-
- if content == "" {
- content = "[empty message]"
- }
-
- logger.DebugCF("telegram", "Received message", map[string]any{
- "sender_id": senderID,
- "chat_id": fmt.Sprintf("%d", chatID),
- "preview": utils.Truncate(content, 50),
- })
-
- // Thinking indicator
- err := c.bot.SendChatAction(ctx, tu.ChatAction(tu.ID(chatID), telego.ChatActionTyping))
- if err != nil {
- logger.ErrorCF("telegram", "Failed to send chat action", map[string]any{
- "error": err.Error(),
- })
- }
-
- // Stop any previous thinking animation
- chatIDStr := fmt.Sprintf("%d", chatID)
- if prevStop, ok := c.stopThinking.Load(chatIDStr); ok {
- if cf, ok := prevStop.(*thinkingCancel); ok && cf != nil {
- cf.Cancel()
- }
- }
-
- // Create cancel function for thinking state
- _, thinkCancel := context.WithTimeout(ctx, 5*time.Minute)
- c.stopThinking.Store(chatIDStr, &thinkingCancel{fn: thinkCancel})
-
- pMsg, err := c.bot.SendMessage(ctx, tu.Message(tu.ID(chatID), "Thinking... 💭"))
- if err == nil {
- pID := pMsg.MessageID
- c.placeholders.Store(chatIDStr, pID)
- }
-
- peerKind := "direct"
- peerID := fmt.Sprintf("%d", user.ID)
- if message.Chat.Type != "private" {
- peerKind = "group"
- peerID = fmt.Sprintf("%d", chatID)
- }
-
- metadata := map[string]string{
- "message_id": fmt.Sprintf("%d", message.MessageID),
- "user_id": fmt.Sprintf("%d", user.ID),
- "username": user.Username,
- "first_name": user.FirstName,
- "is_group": fmt.Sprintf("%t", message.Chat.Type != "private"),
- "peer_kind": peerKind,
- "peer_id": peerID,
- }
-
- c.HandleMessage(fmt.Sprintf("%d", user.ID), fmt.Sprintf("%d", chatID), content, mediaPaths, metadata)
- return nil
-}
-
-func (c *TelegramChannel) downloadPhoto(ctx context.Context, fileID string) string {
- file, err := c.bot.GetFile(ctx, &telego.GetFileParams{FileID: fileID})
- if err != nil {
- logger.ErrorCF("telegram", "Failed to get photo file", map[string]any{
- "error": err.Error(),
- })
- return ""
- }
-
- return c.downloadFileWithInfo(file, ".jpg")
-}
-
-func (c *TelegramChannel) downloadFileWithInfo(file *telego.File, ext string) string {
- if file.FilePath == "" {
- return ""
- }
-
- url := c.bot.FileDownloadURL(file.FilePath)
- logger.DebugCF("telegram", "File URL", map[string]any{"url": url})
-
- // Use FilePath as filename for better identification
- filename := file.FilePath + ext
- return utils.DownloadFile(url, filename, utils.DownloadOptions{
- LoggerPrefix: "telegram",
- })
-}
-
-func (c *TelegramChannel) downloadFile(ctx context.Context, fileID, ext string) string {
- file, err := c.bot.GetFile(ctx, &telego.GetFileParams{FileID: fileID})
- if err != nil {
- logger.ErrorCF("telegram", "Failed to get file", map[string]any{
- "error": err.Error(),
- })
- return ""
- }
-
- return c.downloadFileWithInfo(file, ext)
-}
-
-func parseChatID(chatIDStr string) (int64, error) {
- var id int64
- _, err := fmt.Sscanf(chatIDStr, "%d", &id)
- return id, err
-}
-
-func markdownToTelegramHTML(text string) string {
- if text == "" {
- return ""
- }
-
- codeBlocks := extractCodeBlocks(text)
- text = codeBlocks.text
-
- inlineCodes := extractInlineCodes(text)
- text = inlineCodes.text
-
- text = regexp.MustCompile(`^#{1,6}\s+(.+)$`).ReplaceAllString(text, "$1")
-
- text = regexp.MustCompile(`^>\s*(.*)$`).ReplaceAllString(text, "$1")
-
- text = escapeHTML(text)
-
- text = regexp.MustCompile(`\[([^\]]+)\]\(([^)]+)\)`).ReplaceAllString(text, `$1`)
-
- text = regexp.MustCompile(`\*\*(.+?)\*\*`).ReplaceAllString(text, "$1")
-
- text = regexp.MustCompile(`__(.+?)__`).ReplaceAllString(text, "$1")
-
- reItalic := regexp.MustCompile(`_([^_]+)_`)
- text = reItalic.ReplaceAllStringFunc(text, func(s string) string {
- match := reItalic.FindStringSubmatch(s)
- if len(match) < 2 {
- return s
- }
- return "" + match[1] + ""
- })
-
- text = regexp.MustCompile(`~~(.+?)~~`).ReplaceAllString(text, "$1")
-
- text = regexp.MustCompile(`^[-*]\s+`).ReplaceAllString(text, "• ")
-
- for i, code := range inlineCodes.codes {
- escaped := escapeHTML(code)
- text = strings.ReplaceAll(text, fmt.Sprintf("\x00IC%d\x00", i), fmt.Sprintf("%s", escaped))
- }
-
- for i, code := range codeBlocks.codes {
- escaped := escapeHTML(code)
- text = strings.ReplaceAll(
- text,
- fmt.Sprintf("\x00CB%d\x00", i),
- fmt.Sprintf("
%s", escaped),
- )
- }
-
- return text
-}
-
-type codeBlockMatch struct {
- text string
- codes []string
-}
-
-func extractCodeBlocks(text string) codeBlockMatch {
- re := regexp.MustCompile("```[\\w]*\\n?([\\s\\S]*?)```")
- matches := re.FindAllStringSubmatch(text, -1)
-
- codes := make([]string, 0, len(matches))
- for _, match := range matches {
- codes = append(codes, match[1])
- }
-
- i := 0
- text = re.ReplaceAllStringFunc(text, func(m string) string {
- placeholder := fmt.Sprintf("\x00CB%d\x00", i)
- i++
- return placeholder
- })
-
- return codeBlockMatch{text: text, codes: codes}
-}
-
-type inlineCodeMatch struct {
- text string
- codes []string
-}
-
-func extractInlineCodes(text string) inlineCodeMatch {
- re := regexp.MustCompile("`([^`]+)`")
- matches := re.FindAllStringSubmatch(text, -1)
-
- codes := make([]string, 0, len(matches))
- for _, match := range matches {
- codes = append(codes, match[1])
- }
-
- i := 0
- text = re.ReplaceAllStringFunc(text, func(m string) string {
- placeholder := fmt.Sprintf("\x00IC%d\x00", i)
- i++
- return placeholder
- })
-
- return inlineCodeMatch{text: text, codes: codes}
-}
-
-func escapeHTML(text string) string {
- text = strings.ReplaceAll(text, "&", "&")
- text = strings.ReplaceAll(text, "<", "<")
- text = strings.ReplaceAll(text, ">", ">")
- return text
-}
diff --git a/pkg/channels/telegram/telegram.go b/pkg/channels/telegram/telegram.go
index a11cf53b8..7feb706aa 100644
--- a/pkg/channels/telegram/telegram.go
+++ b/pkg/channels/telegram/telegram.go
@@ -7,12 +7,12 @@ import (
"net/url"
"os"
"regexp"
+ "slices"
"strconv"
"strings"
"time"
"github.com/mymmrac/telego"
- "github.com/mymmrac/telego/telegohandler"
th "github.com/mymmrac/telego/telegohandler"
tu "github.com/mymmrac/telego/telegoutil"
@@ -41,7 +41,7 @@ var (
type TelegramChannel struct {
*channels.BaseChannel
bot *telego.Bot
- bh *telegohandler.BotHandler
+ bh *th.BotHandler
commands TelegramCommander
config *config.Config
chatIDs map[string]int64
@@ -101,6 +101,12 @@ func (c *TelegramChannel) Start(ctx context.Context) error {
c.ctx, c.cancel = context.WithCancel(ctx)
+ if err := c.initBotCommands(c.ctx); err != nil {
+ logger.WarnCF("telegram", "Failed to initialize bot commands", map[string]any{
+ "error": err.Error(),
+ })
+ }
+
updates, err := c.bot.UpdatesViaLongPolling(c.ctx, &telego.GetUpdatesParams{
Timeout: 30,
})
@@ -109,20 +115,19 @@ func (c *TelegramChannel) Start(ctx context.Context) error {
return fmt.Errorf("failed to start long polling: %w", err)
}
- bh, err := telegohandler.NewBotHandler(c.bot, updates)
+ bh, err := th.NewBotHandler(c.bot, updates)
if err != nil {
c.cancel()
return fmt.Errorf("failed to create bot handler: %w", err)
}
c.bh = bh
- bh.HandleMessage(func(ctx *th.Context, message telego.Message) error {
- c.commands.Help(ctx, message)
- return nil
- }, th.CommandEqual("help"))
bh.HandleMessage(func(ctx *th.Context, message telego.Message) error {
return c.commands.Start(ctx, message)
}, th.CommandEqual("start"))
+ bh.HandleMessage(func(ctx *th.Context, message telego.Message) error {
+ return c.commands.Help(ctx, message)
+ }, th.CommandEqual("help"))
bh.HandleMessage(func(ctx *th.Context, message telego.Message) error {
return c.commands.Show(ctx, message)
@@ -141,7 +146,13 @@ func (c *TelegramChannel) Start(ctx context.Context) error {
"username": c.bot.Username(),
})
- go bh.Start()
+ go func() {
+ if err = bh.Start(); err != nil {
+ logger.ErrorCF("telegram", "Bot handler failed", map[string]any{
+ "error": err.Error(),
+ })
+ }
+ }()
return nil
}
@@ -152,7 +163,7 @@ func (c *TelegramChannel) Stop(ctx context.Context) error {
// Stop the bot handler
if c.bh != nil {
- c.bh.Stop()
+ _ = c.bh.StopWithContext(ctx)
}
// Cancel our context (stops long polling)
@@ -163,6 +174,51 @@ func (c *TelegramChannel) Stop(ctx context.Context) error {
return nil
}
+func (c *TelegramChannel) initBotCommands(ctx context.Context) error {
+ currentCommands, err := c.bot.GetMyCommands(ctx, &telego.GetMyCommandsParams{
+ Scope: tu.ScopeDefault(),
+ })
+ if err != nil {
+ return fmt.Errorf("get commands: %w", err)
+ }
+
+ commands := []telego.BotCommand{
+ {
+ Command: "start",
+ Description: "Start the bot",
+ },
+ {
+ Command: "help",
+ Description: "Show a help message",
+ },
+ {
+ Command: "show",
+ Description: "Show current configuration",
+ },
+ {
+ Command: "list",
+ Description: "List available options",
+ },
+ }
+
+ // Setting commands on each start will hit the rate limit very quickly, that's why we check if an update is needed
+ if !slices.Equal(currentCommands, commands) {
+ logger.InfoC("telegram", "Updating bot commands")
+
+ err = c.bot.SetMyCommands(ctx, &telego.SetMyCommandsParams{
+ Commands: commands,
+ Scope: tu.ScopeDefault(),
+ })
+ if err != nil {
+ return fmt.Errorf("set commands: %w", err)
+ }
+ } else {
+ logger.DebugC("telegram", "Bot commands are up to date")
+ }
+
+ return nil
+}
+
func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
if !c.IsRunning() {
return channels.ErrNotRunning