From b0c8fc4a7ed21657a56d487ac8636e6cdd2678eb Mon Sep 17 00:00:00 2001 From: Artem Yadelskyi Date: Sat, 28 Feb 2026 23:32:15 +0200 Subject: [PATCH] feat(telegram): Fix conflicts --- pkg/channels/telegram.go | 585 ------------------------------ pkg/channels/telegram/telegram.go | 74 +++- 2 files changed, 65 insertions(+), 594 deletions(-) delete mode 100644 pkg/channels/telegram.go diff --git a/pkg/channels/telegram.go b/pkg/channels/telegram.go deleted file mode 100644 index c1ade454c..000000000 --- a/pkg/channels/telegram.go +++ /dev/null @@ -1,585 +0,0 @@ -package channels - -import ( - "context" - "fmt" - "net/http" - "net/url" - "os" - "regexp" - "slices" - "strings" - "sync" - "time" - - "github.com/mymmrac/telego" - th "github.com/mymmrac/telego/telegohandler" - tu "github.com/mymmrac/telego/telegoutil" - - "github.com/sipeed/picoclaw/pkg/bus" - "github.com/sipeed/picoclaw/pkg/config" - "github.com/sipeed/picoclaw/pkg/logger" - "github.com/sipeed/picoclaw/pkg/utils" - "github.com/sipeed/picoclaw/pkg/voice" -) - -type TelegramChannel struct { - *BaseChannel - bot *telego.Bot - botHandler *th.BotHandler - commands TelegramCommander - config *config.Config - chatIDs map[string]int64 - transcriber *voice.GroqTranscriber - placeholders sync.Map // chatID -> messageID - stopThinking sync.Map // chatID -> thinkingCancel -} - -type thinkingCancel struct { - fn context.CancelFunc -} - -func (c *thinkingCancel) Cancel() { - if c != nil && c.fn != nil { - c.fn() - } -} - -func NewTelegramChannel(cfg *config.Config, bus *bus.MessageBus) (*TelegramChannel, error) { - var opts []telego.BotOption - telegramCfg := cfg.Channels.Telegram - - if telegramCfg.Proxy != "" { - proxyURL, parseErr := url.Parse(telegramCfg.Proxy) - if parseErr != nil { - return nil, fmt.Errorf("invalid proxy URL %q: %w", telegramCfg.Proxy, parseErr) - } - opts = append(opts, telego.WithHTTPClient(&http.Client{ - Transport: &http.Transport{ - Proxy: http.ProxyURL(proxyURL), - }, - })) - } else if os.Getenv("HTTP_PROXY") != "" || os.Getenv("HTTPS_PROXY") != "" { - // Use environment proxy if configured - opts = append(opts, telego.WithHTTPClient(&http.Client{ - Transport: &http.Transport{ - Proxy: http.ProxyFromEnvironment, - }, - })) - } - - bot, err := telego.NewBot(telegramCfg.Token, opts...) - if err != nil { - return nil, fmt.Errorf("failed to create telegram bot: %w", err) - } - - base := NewBaseChannel("telegram", telegramCfg, bus, telegramCfg.AllowFrom) - - return &TelegramChannel{ - BaseChannel: base, - commands: NewTelegramCommands(bot, cfg), - bot: bot, - config: cfg, - chatIDs: make(map[string]int64), - transcriber: nil, - placeholders: sync.Map{}, - stopThinking: sync.Map{}, - }, nil -} - -func (c *TelegramChannel) SetTranscriber(transcriber *voice.GroqTranscriber) { - c.transcriber = transcriber -} - -func (c *TelegramChannel) Start(ctx context.Context) error { - logger.InfoC("telegram", "Starting Telegram bot (polling mode)...") - - if err := c.initBotCommands(ctx); err != nil { - logger.WarnCF("telegram", "Failed to initialize bot commands", map[string]any{ - "error": err.Error(), - }) - } - - updates, err := c.bot.UpdatesViaLongPolling(ctx, &telego.GetUpdatesParams{ - Timeout: 30, - }) - if err != nil { - return fmt.Errorf("failed to start long polling: %w", err) - } - - bh, err := th.NewBotHandler(c.bot, updates) - if err != nil { - return fmt.Errorf("failed to create bot handler: %w", err) - } - c.botHandler = bh - - bh.HandleMessage(func(ctx *th.Context, message telego.Message) error { - return c.commands.Start(ctx, message) - }, th.CommandEqual("start")) - bh.HandleMessage(func(ctx *th.Context, message telego.Message) error { - return c.commands.Help(ctx, message) - }, th.CommandEqual("help")) - - bh.HandleMessage(func(ctx *th.Context, message telego.Message) error { - return c.commands.Show(ctx, message) - }, th.CommandEqual("show")) - - bh.HandleMessage(func(ctx *th.Context, message telego.Message) error { - return c.commands.List(ctx, message) - }, th.CommandEqual("list")) - - bh.HandleMessage(func(ctx *th.Context, message telego.Message) error { - return c.handleMessage(ctx, &message) - }, th.AnyMessage()) - - c.setRunning(true) - logger.InfoCF("telegram", "Telegram bot connected", map[string]any{ - "username": c.bot.Username(), - }) - - go func() { - if err = bh.Start(); err != nil { - logger.ErrorCF("telegram", "Bot handler failed", map[string]any{ - "error": err.Error(), - }) - } - }() - - return nil -} - -func (c *TelegramChannel) Stop(ctx context.Context) error { - logger.InfoC("telegram", "Stopping Telegram bot...") - c.setRunning(false) - if c.botHandler != nil { - _ = c.botHandler.StopWithContext(ctx) - } - return nil -} - -func (c *TelegramChannel) initBotCommands(ctx context.Context) error { - currentCommands, err := c.bot.GetMyCommands(ctx, &telego.GetMyCommandsParams{ - Scope: tu.ScopeDefault(), - }) - if err != nil { - return fmt.Errorf("get commands: %w", err) - } - - commands := []telego.BotCommand{ - { - Command: "start", - Description: "Start the bot", - }, - { - Command: "help", - Description: "Show a help message", - }, - { - Command: "show", - Description: "Show current configuration", - }, - { - Command: "list", - Description: "List available options", - }, - } - - // Setting commands on each start will hit the rate limit very quickly, that's why we check if an update is needed - if !slices.Equal(currentCommands, commands) { - logger.InfoC("telegram", "Updating bot commands") - - err = c.bot.SetMyCommands(ctx, &telego.SetMyCommandsParams{ - Commands: commands, - Scope: tu.ScopeDefault(), - }) - if err != nil { - return fmt.Errorf("set commands: %w", err) - } - } else { - logger.DebugC("telegram", "Bot commands are up to date") - } - - return nil -} - -func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { - if !c.IsRunning() { - return fmt.Errorf("telegram bot not running") - } - - chatID, err := parseChatID(msg.ChatID) - if err != nil { - return fmt.Errorf("invalid chat ID: %w", err) - } - - // Stop thinking animation - if stop, ok := c.stopThinking.Load(msg.ChatID); ok { - if cf, ok := stop.(*thinkingCancel); ok && cf != nil { - cf.Cancel() - } - c.stopThinking.Delete(msg.ChatID) - } - - htmlContent := markdownToTelegramHTML(msg.Content) - - // Try to edit placeholder - if pID, ok := c.placeholders.Load(msg.ChatID); ok { - c.placeholders.Delete(msg.ChatID) - editMsg := tu.EditMessageText(tu.ID(chatID), pID.(int), htmlContent) - editMsg.ParseMode = telego.ModeHTML - - if _, err = c.bot.EditMessageText(ctx, editMsg); err == nil { - return nil - } - // Fallback to new message if edit fails - } - - tgMsg := tu.Message(tu.ID(chatID), htmlContent) - tgMsg.ParseMode = telego.ModeHTML - - if _, err = c.bot.SendMessage(ctx, tgMsg); err != nil { - logger.ErrorCF("telegram", "HTML parse failed, falling back to plain text", map[string]any{ - "error": err.Error(), - }) - tgMsg.ParseMode = "" - _, err = c.bot.SendMessage(ctx, tgMsg) - return err - } - - return nil -} - -func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Message) error { - if message == nil { - return fmt.Errorf("message is nil") - } - - user := message.From - if user == nil { - return fmt.Errorf("message sender (user) is nil") - } - - senderID := fmt.Sprintf("%d", user.ID) - if user.Username != "" { - senderID = fmt.Sprintf("%d|%s", user.ID, user.Username) - } - - // 检查白名单,避免为被拒绝的用户下载附件 - if !c.IsAllowed(senderID) { - logger.DebugCF("telegram", "Message rejected by allowlist", map[string]any{ - "user_id": senderID, - }) - return nil - } - - chatID := message.Chat.ID - c.chatIDs[senderID] = chatID - - content := "" - mediaPaths := []string{} - localFiles := []string{} // 跟踪需要清理的本地文件 - - // 确保临时文件在函数返回时被清理 - defer func() { - for _, file := range localFiles { - if err := os.Remove(file); err != nil { - logger.DebugCF("telegram", "Failed to cleanup temp file", map[string]any{ - "file": file, - "error": err.Error(), - }) - } - } - }() - - if message.Text != "" { - content += message.Text - } - - if message.Caption != "" { - if content != "" { - content += "\n" - } - content += message.Caption - } - - if len(message.Photo) > 0 { - photo := message.Photo[len(message.Photo)-1] - photoPath := c.downloadPhoto(ctx, photo.FileID) - if photoPath != "" { - localFiles = append(localFiles, photoPath) - mediaPaths = append(mediaPaths, photoPath) - if content != "" { - content += "\n" - } - content += "[image: photo]" - } - } - - if message.Voice != nil { - voicePath := c.downloadFile(ctx, message.Voice.FileID, ".ogg") - if voicePath != "" { - localFiles = append(localFiles, voicePath) - mediaPaths = append(mediaPaths, voicePath) - - transcribedText := "" - if c.transcriber != nil && c.transcriber.IsAvailable() { - ctx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - - result, err := c.transcriber.Transcribe(ctx, voicePath) - if err != nil { - logger.ErrorCF("telegram", "Voice transcription failed", map[string]any{ - "error": err.Error(), - "path": voicePath, - }) - transcribedText = "[voice (transcription failed)]" - } else { - transcribedText = fmt.Sprintf("[voice transcription: %s]", result.Text) - logger.InfoCF("telegram", "Voice transcribed successfully", map[string]any{ - "text": result.Text, - }) - } - } else { - transcribedText = "[voice]" - } - - if content != "" { - content += "\n" - } - content += transcribedText - } - } - - if message.Audio != nil { - audioPath := c.downloadFile(ctx, message.Audio.FileID, ".mp3") - if audioPath != "" { - localFiles = append(localFiles, audioPath) - mediaPaths = append(mediaPaths, audioPath) - if content != "" { - content += "\n" - } - content += "[audio]" - } - } - - if message.Document != nil { - docPath := c.downloadFile(ctx, message.Document.FileID, "") - if docPath != "" { - localFiles = append(localFiles, docPath) - mediaPaths = append(mediaPaths, docPath) - if content != "" { - content += "\n" - } - content += "[file]" - } - } - - if content == "" { - content = "[empty message]" - } - - logger.DebugCF("telegram", "Received message", map[string]any{ - "sender_id": senderID, - "chat_id": fmt.Sprintf("%d", chatID), - "preview": utils.Truncate(content, 50), - }) - - // Thinking indicator - err := c.bot.SendChatAction(ctx, tu.ChatAction(tu.ID(chatID), telego.ChatActionTyping)) - if err != nil { - logger.ErrorCF("telegram", "Failed to send chat action", map[string]any{ - "error": err.Error(), - }) - } - - // Stop any previous thinking animation - chatIDStr := fmt.Sprintf("%d", chatID) - if prevStop, ok := c.stopThinking.Load(chatIDStr); ok { - if cf, ok := prevStop.(*thinkingCancel); ok && cf != nil { - cf.Cancel() - } - } - - // Create cancel function for thinking state - _, thinkCancel := context.WithTimeout(ctx, 5*time.Minute) - c.stopThinking.Store(chatIDStr, &thinkingCancel{fn: thinkCancel}) - - pMsg, err := c.bot.SendMessage(ctx, tu.Message(tu.ID(chatID), "Thinking... 💭")) - if err == nil { - pID := pMsg.MessageID - c.placeholders.Store(chatIDStr, pID) - } - - peerKind := "direct" - peerID := fmt.Sprintf("%d", user.ID) - if message.Chat.Type != "private" { - peerKind = "group" - peerID = fmt.Sprintf("%d", chatID) - } - - metadata := map[string]string{ - "message_id": fmt.Sprintf("%d", message.MessageID), - "user_id": fmt.Sprintf("%d", user.ID), - "username": user.Username, - "first_name": user.FirstName, - "is_group": fmt.Sprintf("%t", message.Chat.Type != "private"), - "peer_kind": peerKind, - "peer_id": peerID, - } - - c.HandleMessage(fmt.Sprintf("%d", user.ID), fmt.Sprintf("%d", chatID), content, mediaPaths, metadata) - return nil -} - -func (c *TelegramChannel) downloadPhoto(ctx context.Context, fileID string) string { - file, err := c.bot.GetFile(ctx, &telego.GetFileParams{FileID: fileID}) - if err != nil { - logger.ErrorCF("telegram", "Failed to get photo file", map[string]any{ - "error": err.Error(), - }) - return "" - } - - return c.downloadFileWithInfo(file, ".jpg") -} - -func (c *TelegramChannel) downloadFileWithInfo(file *telego.File, ext string) string { - if file.FilePath == "" { - return "" - } - - url := c.bot.FileDownloadURL(file.FilePath) - logger.DebugCF("telegram", "File URL", map[string]any{"url": url}) - - // Use FilePath as filename for better identification - filename := file.FilePath + ext - return utils.DownloadFile(url, filename, utils.DownloadOptions{ - LoggerPrefix: "telegram", - }) -} - -func (c *TelegramChannel) downloadFile(ctx context.Context, fileID, ext string) string { - file, err := c.bot.GetFile(ctx, &telego.GetFileParams{FileID: fileID}) - if err != nil { - logger.ErrorCF("telegram", "Failed to get file", map[string]any{ - "error": err.Error(), - }) - return "" - } - - return c.downloadFileWithInfo(file, ext) -} - -func parseChatID(chatIDStr string) (int64, error) { - var id int64 - _, err := fmt.Sscanf(chatIDStr, "%d", &id) - return id, err -} - -func markdownToTelegramHTML(text string) string { - if text == "" { - return "" - } - - codeBlocks := extractCodeBlocks(text) - text = codeBlocks.text - - inlineCodes := extractInlineCodes(text) - text = inlineCodes.text - - text = regexp.MustCompile(`^#{1,6}\s+(.+)$`).ReplaceAllString(text, "$1") - - text = regexp.MustCompile(`^>\s*(.*)$`).ReplaceAllString(text, "$1") - - text = escapeHTML(text) - - text = regexp.MustCompile(`\[([^\]]+)\]\(([^)]+)\)`).ReplaceAllString(text, `$1`) - - text = regexp.MustCompile(`\*\*(.+?)\*\*`).ReplaceAllString(text, "$1") - - text = regexp.MustCompile(`__(.+?)__`).ReplaceAllString(text, "$1") - - reItalic := regexp.MustCompile(`_([^_]+)_`) - text = reItalic.ReplaceAllStringFunc(text, func(s string) string { - match := reItalic.FindStringSubmatch(s) - if len(match) < 2 { - return s - } - return "" + match[1] + "" - }) - - text = regexp.MustCompile(`~~(.+?)~~`).ReplaceAllString(text, "$1") - - text = regexp.MustCompile(`^[-*]\s+`).ReplaceAllString(text, "• ") - - for i, code := range inlineCodes.codes { - escaped := escapeHTML(code) - text = strings.ReplaceAll(text, fmt.Sprintf("\x00IC%d\x00", i), fmt.Sprintf("%s", escaped)) - } - - for i, code := range codeBlocks.codes { - escaped := escapeHTML(code) - text = strings.ReplaceAll( - text, - fmt.Sprintf("\x00CB%d\x00", i), - fmt.Sprintf("
%s
", escaped), - ) - } - - return text -} - -type codeBlockMatch struct { - text string - codes []string -} - -func extractCodeBlocks(text string) codeBlockMatch { - re := regexp.MustCompile("```[\\w]*\\n?([\\s\\S]*?)```") - matches := re.FindAllStringSubmatch(text, -1) - - codes := make([]string, 0, len(matches)) - for _, match := range matches { - codes = append(codes, match[1]) - } - - i := 0 - text = re.ReplaceAllStringFunc(text, func(m string) string { - placeholder := fmt.Sprintf("\x00CB%d\x00", i) - i++ - return placeholder - }) - - return codeBlockMatch{text: text, codes: codes} -} - -type inlineCodeMatch struct { - text string - codes []string -} - -func extractInlineCodes(text string) inlineCodeMatch { - re := regexp.MustCompile("`([^`]+)`") - matches := re.FindAllStringSubmatch(text, -1) - - codes := make([]string, 0, len(matches)) - for _, match := range matches { - codes = append(codes, match[1]) - } - - i := 0 - text = re.ReplaceAllStringFunc(text, func(m string) string { - placeholder := fmt.Sprintf("\x00IC%d\x00", i) - i++ - return placeholder - }) - - return inlineCodeMatch{text: text, codes: codes} -} - -func escapeHTML(text string) string { - text = strings.ReplaceAll(text, "&", "&") - text = strings.ReplaceAll(text, "<", "<") - text = strings.ReplaceAll(text, ">", ">") - return text -} diff --git a/pkg/channels/telegram/telegram.go b/pkg/channels/telegram/telegram.go index a11cf53b8..7feb706aa 100644 --- a/pkg/channels/telegram/telegram.go +++ b/pkg/channels/telegram/telegram.go @@ -7,12 +7,12 @@ import ( "net/url" "os" "regexp" + "slices" "strconv" "strings" "time" "github.com/mymmrac/telego" - "github.com/mymmrac/telego/telegohandler" th "github.com/mymmrac/telego/telegohandler" tu "github.com/mymmrac/telego/telegoutil" @@ -41,7 +41,7 @@ var ( type TelegramChannel struct { *channels.BaseChannel bot *telego.Bot - bh *telegohandler.BotHandler + bh *th.BotHandler commands TelegramCommander config *config.Config chatIDs map[string]int64 @@ -101,6 +101,12 @@ func (c *TelegramChannel) Start(ctx context.Context) error { c.ctx, c.cancel = context.WithCancel(ctx) + if err := c.initBotCommands(c.ctx); err != nil { + logger.WarnCF("telegram", "Failed to initialize bot commands", map[string]any{ + "error": err.Error(), + }) + } + updates, err := c.bot.UpdatesViaLongPolling(c.ctx, &telego.GetUpdatesParams{ Timeout: 30, }) @@ -109,20 +115,19 @@ func (c *TelegramChannel) Start(ctx context.Context) error { return fmt.Errorf("failed to start long polling: %w", err) } - bh, err := telegohandler.NewBotHandler(c.bot, updates) + bh, err := th.NewBotHandler(c.bot, updates) if err != nil { c.cancel() return fmt.Errorf("failed to create bot handler: %w", err) } c.bh = bh - bh.HandleMessage(func(ctx *th.Context, message telego.Message) error { - c.commands.Help(ctx, message) - return nil - }, th.CommandEqual("help")) bh.HandleMessage(func(ctx *th.Context, message telego.Message) error { return c.commands.Start(ctx, message) }, th.CommandEqual("start")) + bh.HandleMessage(func(ctx *th.Context, message telego.Message) error { + return c.commands.Help(ctx, message) + }, th.CommandEqual("help")) bh.HandleMessage(func(ctx *th.Context, message telego.Message) error { return c.commands.Show(ctx, message) @@ -141,7 +146,13 @@ func (c *TelegramChannel) Start(ctx context.Context) error { "username": c.bot.Username(), }) - go bh.Start() + go func() { + if err = bh.Start(); err != nil { + logger.ErrorCF("telegram", "Bot handler failed", map[string]any{ + "error": err.Error(), + }) + } + }() return nil } @@ -152,7 +163,7 @@ func (c *TelegramChannel) Stop(ctx context.Context) error { // Stop the bot handler if c.bh != nil { - c.bh.Stop() + _ = c.bh.StopWithContext(ctx) } // Cancel our context (stops long polling) @@ -163,6 +174,51 @@ func (c *TelegramChannel) Stop(ctx context.Context) error { return nil } +func (c *TelegramChannel) initBotCommands(ctx context.Context) error { + currentCommands, err := c.bot.GetMyCommands(ctx, &telego.GetMyCommandsParams{ + Scope: tu.ScopeDefault(), + }) + if err != nil { + return fmt.Errorf("get commands: %w", err) + } + + commands := []telego.BotCommand{ + { + Command: "start", + Description: "Start the bot", + }, + { + Command: "help", + Description: "Show a help message", + }, + { + Command: "show", + Description: "Show current configuration", + }, + { + Command: "list", + Description: "List available options", + }, + } + + // Setting commands on each start will hit the rate limit very quickly, that's why we check if an update is needed + if !slices.Equal(currentCommands, commands) { + logger.InfoC("telegram", "Updating bot commands") + + err = c.bot.SetMyCommands(ctx, &telego.SetMyCommandsParams{ + Commands: commands, + Scope: tu.ScopeDefault(), + }) + if err != nil { + return fmt.Errorf("set commands: %w", err) + } + } else { + logger.DebugC("telegram", "Bot commands are up to date") + } + + return nil +} + func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { if !c.IsRunning() { return channels.ErrNotRunning