From 123275fcbe3678de652669b7c0aecdb9e15c4c77 Mon Sep 17 00:00:00 2001 From: statxc <181730535+statxc@users.noreply.github.com> Date: Tue, 10 Mar 2026 02:54:10 +0000 Subject: [PATCH] feat(telegram): support forum topics with per-topic session isolation --- pkg/channels/telegram/telegram.go | 107 ++++++++++++------ pkg/channels/telegram/telegram_test.go | 143 +++++++++++++++++++++++++ 2 files changed, 219 insertions(+), 31 deletions(-) diff --git a/pkg/channels/telegram/telegram.go b/pkg/channels/telegram/telegram.go index 0a36247a6..0df82cd47 100644 --- a/pkg/channels/telegram/telegram.go +++ b/pkg/channels/telegram/telegram.go @@ -168,7 +168,7 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err return channels.ErrNotRunning } - chatID, err := parseChatID(msg.ChatID) + chatID, threadID, err := parseTelegramChatID(msg.ChatID) if err != nil { return fmt.Errorf("invalid chat ID %s: %w", msg.ChatID, channels.ErrSendFailed) } @@ -200,7 +200,7 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err continue } - if err := c.sendHTMLChunk(ctx, chatID, htmlContent, chunk); err != nil { + if err := c.sendHTMLChunk(ctx, chatID, threadID, htmlContent, chunk); err != nil { return err } } @@ -210,9 +210,10 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err // sendHTMLChunk sends a single HTML message, falling back to the original // markdown as plain text on parse failure so users never see raw HTML tags. -func (c *TelegramChannel) sendHTMLChunk(ctx context.Context, chatID int64, htmlContent, mdFallback string) error { +func (c *TelegramChannel) sendHTMLChunk(ctx context.Context, chatID int64, threadID int, htmlContent, mdFallback string) error { tgMsg := tu.Message(tu.ID(chatID), htmlContent) tgMsg.ParseMode = telego.ModeHTML + tgMsg.MessageThreadID = threadID if _, err := c.bot.SendMessage(ctx, tgMsg); err != nil { logger.ErrorCF("telegram", "HTML parse failed, falling back to plain text", map[string]any{ @@ -232,13 +233,16 @@ func (c *TelegramChannel) sendHTMLChunk(ctx context.Context, chatID int64, htmlC // (Telegram's typing indicator expires after ~5s) in a background goroutine. // The returned stop function is idempotent and cancels the goroutine. func (c *TelegramChannel) StartTyping(ctx context.Context, chatID string) (func(), error) { - cid, err := parseChatID(chatID) + cid, threadID, err := parseTelegramChatID(chatID) if err != nil { return func() {}, err } + action := tu.ChatAction(tu.ID(cid), telego.ChatActionTyping) + action.MessageThreadID = threadID + // Send the first typing action immediately - _ = c.bot.SendChatAction(ctx, tu.ChatAction(tu.ID(cid), telego.ChatActionTyping)) + _ = c.bot.SendChatAction(ctx, action) typingCtx, cancel := context.WithCancel(ctx) go func() { @@ -249,7 +253,9 @@ func (c *TelegramChannel) StartTyping(ctx context.Context, chatID string) (func( case <-typingCtx.Done(): return case <-ticker.C: - _ = c.bot.SendChatAction(typingCtx, tu.ChatAction(tu.ID(cid), telego.ChatActionTyping)) + a := tu.ChatAction(tu.ID(cid), telego.ChatActionTyping) + a.MessageThreadID = threadID + _ = c.bot.SendChatAction(typingCtx, a) } } }() @@ -259,7 +265,7 @@ func (c *TelegramChannel) StartTyping(ctx context.Context, chatID string) (func( // EditMessage implements channels.MessageEditor. func (c *TelegramChannel) EditMessage(ctx context.Context, chatID string, messageID string, content string) error { - cid, err := parseChatID(chatID) + cid, _, err := parseTelegramChatID(chatID) if err != nil { return err } @@ -288,12 +294,14 @@ func (c *TelegramChannel) SendPlaceholder(ctx context.Context, chatID string) (s text = "Thinking... 💭" } - cid, err := parseChatID(chatID) + cid, threadID, err := parseTelegramChatID(chatID) if err != nil { return "", err } - pMsg, err := c.bot.SendMessage(ctx, tu.Message(tu.ID(cid), text)) + phMsg := tu.Message(tu.ID(cid), text) + phMsg.MessageThreadID = threadID + pMsg, err := c.bot.SendMessage(ctx, phMsg) if err != nil { return "", err } @@ -307,7 +315,7 @@ func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMe return channels.ErrNotRunning } - chatID, err := parseChatID(msg.ChatID) + chatID, threadID, err := parseTelegramChatID(msg.ChatID) if err != nil { return fmt.Errorf("invalid chat ID %s: %w", msg.ChatID, channels.ErrSendFailed) } @@ -339,30 +347,34 @@ func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMe switch part.Type { case "image": params := &telego.SendPhotoParams{ - ChatID: tu.ID(chatID), - Photo: telego.InputFile{File: file}, - Caption: part.Caption, + ChatID: tu.ID(chatID), + MessageThreadID: threadID, + Photo: telego.InputFile{File: file}, + Caption: part.Caption, } _, err = c.bot.SendPhoto(ctx, params) case "audio": params := &telego.SendAudioParams{ - ChatID: tu.ID(chatID), - Audio: telego.InputFile{File: file}, - Caption: part.Caption, + ChatID: tu.ID(chatID), + MessageThreadID: threadID, + Audio: telego.InputFile{File: file}, + Caption: part.Caption, } _, err = c.bot.SendAudio(ctx, params) case "video": params := &telego.SendVideoParams{ - ChatID: tu.ID(chatID), - Video: telego.InputFile{File: file}, - Caption: part.Caption, + ChatID: tu.ID(chatID), + MessageThreadID: threadID, + Video: telego.InputFile{File: file}, + Caption: part.Caption, } _, err = c.bot.SendVideo(ctx, params) default: // "file" or unknown types params := &telego.SendDocumentParams{ - ChatID: tu.ID(chatID), - Document: telego.InputFile{File: file}, - Caption: part.Caption, + ChatID: tu.ID(chatID), + MessageThreadID: threadID, + Document: telego.InputFile{File: file}, + Caption: part.Caption, } _, err = c.bot.SendDocument(ctx, params) } @@ -506,19 +518,29 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Mes content = cleaned } + // Build composite chatID. For forum topics, embed the thread ID in the + // chatID as "chatID/threadID" (same pattern as Slack's "channelID/threadTS") + // so all outbound methods route replies to the correct topic. + // Note: Telegram's "General" topic uses thread ID 1; it is treated like any + // other topic for session isolation and agent binding. + compositeChatID := fmt.Sprintf("%d", chatID) + threadID := message.MessageThreadID + if threadID != 0 { + compositeChatID = fmt.Sprintf("%d/%d", chatID, threadID) + } + logger.DebugCF("telegram", "Received message", map[string]any{ "sender_id": sender.CanonicalID, - "chat_id": fmt.Sprintf("%d", chatID), + "chat_id": compositeChatID, + "thread_id": threadID, "preview": utils.Truncate(content, 50), }) - // Placeholder is now auto-triggered by BaseChannel.HandleMessage via PlaceholderCapable - peerKind := "direct" peerID := fmt.Sprintf("%d", user.ID) if message.Chat.Type != "private" { peerKind = "group" - peerID = fmt.Sprintf("%d", chatID) + peerID = compositeChatID } peer := bus.Peer{Kind: peerKind, ID: peerID} @@ -531,11 +553,18 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Mes "is_group": fmt.Sprintf("%t", message.Chat.Type != "private"), } + // For forum topic messages, set parent_peer metadata so the routing system + // can isolate sessions per topic via the existing 7-level priority cascade. + if threadID != 0 { + metadata["parent_peer_kind"] = "topic" + metadata["parent_peer_id"] = fmt.Sprintf("%d", threadID) + } + c.HandleMessage(c.ctx, peer, messageID, platformID, - fmt.Sprintf("%d", chatID), + compositeChatID, content, mediaPaths, metadata, @@ -583,10 +612,26 @@ func (c *TelegramChannel) downloadFile(ctx context.Context, fileID, ext string) return c.downloadFileWithInfo(file, ext) } -func parseChatID(chatIDStr string) (int64, error) { - var id int64 - _, err := fmt.Sscanf(chatIDStr, "%d", &id) - return id, err +// parseTelegramChatID splits a composite chat ID "chatID/threadID" into its +// components. For non-forum messages the threadID is 0. If the chatID string +// contains a "/" segment, the second part must be a valid integer thread ID; +// otherwise an error is returned. This mirrors the Slack adapter (channelID/threadTS). +// Implemented with strings.Index to avoid allocating a slice in the hot path. +func parseTelegramChatID(chatID string) (int64, int, error) { + idx := strings.Index(chatID, "/") + if idx == -1 { + cid, err := strconv.ParseInt(chatID, 10, 64) + return cid, 0, err + } + cid, err := strconv.ParseInt(chatID[:idx], 10, 64) + if err != nil { + return 0, 0, err + } + tid, err := strconv.Atoi(chatID[idx+1:]) + if err != nil { + return 0, 0, fmt.Errorf("invalid thread ID in chat ID %q: %w", chatID, err) + } + return cid, tid, nil } func markdownToTelegramHTML(text string) string { diff --git a/pkg/channels/telegram/telegram_test.go b/pkg/channels/telegram/telegram_test.go index 3a2f1aa66..d8780dc88 100644 --- a/pkg/channels/telegram/telegram_test.go +++ b/pkg/channels/telegram/telegram_test.go @@ -6,6 +6,7 @@ import ( "errors" "strings" "testing" + "time" "github.com/mymmrac/telego" ta "github.com/mymmrac/telego/telegoapi" @@ -271,3 +272,145 @@ func TestSend_InvalidChatID(t *testing.T) { assert.True(t, errors.Is(err, channels.ErrSendFailed), "error should wrap ErrSendFailed") assert.Empty(t, caller.calls) } + +func TestParseTelegramChatID_Plain(t *testing.T) { + cid, tid, err := parseTelegramChatID("12345") + assert.NoError(t, err) + assert.Equal(t, int64(12345), cid) + assert.Equal(t, 0, tid) +} + +func TestParseTelegramChatID_NegativeGroup(t *testing.T) { + cid, tid, err := parseTelegramChatID("-1001234567890") + assert.NoError(t, err) + assert.Equal(t, int64(-1001234567890), cid) + assert.Equal(t, 0, tid) +} + +func TestParseTelegramChatID_WithThreadID(t *testing.T) { + cid, tid, err := parseTelegramChatID("-1001234567890/42") + assert.NoError(t, err) + assert.Equal(t, int64(-1001234567890), cid) + assert.Equal(t, 42, tid) +} + +func TestParseTelegramChatID_GeneralTopic(t *testing.T) { + cid, tid, err := parseTelegramChatID("-100123/1") + assert.NoError(t, err) + assert.Equal(t, int64(-100123), cid) + assert.Equal(t, 1, tid) +} + +func TestParseTelegramChatID_Invalid(t *testing.T) { + _, _, err := parseTelegramChatID("not-a-number") + assert.Error(t, err) +} + +func TestParseTelegramChatID_InvalidThreadID(t *testing.T) { + _, _, err := parseTelegramChatID("-100123/not-a-thread") + assert.Error(t, err) + assert.Contains(t, err.Error(), "invalid thread ID") +} + +func TestSend_WithForumThreadID(t *testing.T) { + caller := &stubCaller{ + callFn: func(ctx context.Context, url string, data *ta.RequestData) (*ta.Response, error) { + return successResponse(t), nil + }, + } + ch := newTestChannel(t, caller) + + err := ch.Send(context.Background(), bus.OutboundMessage{ + ChatID: "-1001234567890/42", + Content: "Hello from topic", + }) + + assert.NoError(t, err) + assert.Len(t, caller.calls, 1) +} + +func TestHandleMessage_ForumTopic_SetsMetadata(t *testing.T) { + messageBus := bus.NewMessageBus() + ch := &TelegramChannel{ + BaseChannel: channels.NewBaseChannel("telegram", nil, messageBus, nil), + chatIDs: make(map[string]int64), + ctx: context.Background(), + } + + msg := &telego.Message{ + Text: "hello from topic", + MessageID: 10, + MessageThreadID: 42, + Chat: telego.Chat{ + ID: -1001234567890, + Type: "supergroup", + IsForum: true, + }, + From: &telego.User{ + ID: 7, + FirstName: "Alice", + }, + } + + err := ch.handleMessage(context.Background(), msg) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + inbound, ok := messageBus.ConsumeInbound(ctx) + require.True(t, ok, "expected inbound message") + + // Composite chatID should include thread ID + assert.Equal(t, "-1001234567890/42", inbound.ChatID) + + // Peer ID should include thread ID for session key isolation + assert.Equal(t, "group", inbound.Peer.Kind) + assert.Equal(t, "-1001234567890/42", inbound.Peer.ID) + + // Parent peer metadata should be set for agent binding + assert.Equal(t, "topic", inbound.Metadata["parent_peer_kind"]) + assert.Equal(t, "42", inbound.Metadata["parent_peer_id"]) +} + +func TestHandleMessage_NoForum_NoThreadMetadata(t *testing.T) { + messageBus := bus.NewMessageBus() + ch := &TelegramChannel{ + BaseChannel: channels.NewBaseChannel("telegram", nil, messageBus, nil), + chatIDs: make(map[string]int64), + ctx: context.Background(), + } + + msg := &telego.Message{ + Text: "regular group message", + MessageID: 11, + Chat: telego.Chat{ + ID: -100999, + Type: "group", + }, + From: &telego.User{ + ID: 8, + FirstName: "Bob", + }, + } + + err := ch.handleMessage(context.Background(), msg) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + inbound, ok := messageBus.ConsumeInbound(ctx) + require.True(t, ok) + + // Plain chatID without thread suffix + assert.Equal(t, "-100999", inbound.ChatID) + + // Peer ID should be raw chat ID (no thread suffix) + assert.Equal(t, "group", inbound.Peer.Kind) + assert.Equal(t, "-100999", inbound.Peer.ID) + + // No parent peer metadata + assert.Empty(t, inbound.Metadata["parent_peer_kind"]) + assert.Empty(t, inbound.Metadata["parent_peer_id"]) +}