feat(telegram): support forum topics with per-topic session isolation

This commit is contained in:
statxc
2026-03-10 02:54:10 +00:00
parent b89f6445d1
commit 123275fcbe
2 changed files with 219 additions and 31 deletions
+76 -31
View File
@@ -168,7 +168,7 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
return channels.ErrNotRunning
}
chatID, err := parseChatID(msg.ChatID)
chatID, threadID, err := parseTelegramChatID(msg.ChatID)
if err != nil {
return fmt.Errorf("invalid chat ID %s: %w", msg.ChatID, channels.ErrSendFailed)
}
@@ -200,7 +200,7 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
continue
}
if err := c.sendHTMLChunk(ctx, chatID, htmlContent, chunk); err != nil {
if err := c.sendHTMLChunk(ctx, chatID, threadID, htmlContent, chunk); err != nil {
return err
}
}
@@ -210,9 +210,10 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
// sendHTMLChunk sends a single HTML message, falling back to the original
// markdown as plain text on parse failure so users never see raw HTML tags.
func (c *TelegramChannel) sendHTMLChunk(ctx context.Context, chatID int64, htmlContent, mdFallback string) error {
func (c *TelegramChannel) sendHTMLChunk(ctx context.Context, chatID int64, threadID int, htmlContent, mdFallback string) error {
tgMsg := tu.Message(tu.ID(chatID), htmlContent)
tgMsg.ParseMode = telego.ModeHTML
tgMsg.MessageThreadID = threadID
if _, err := c.bot.SendMessage(ctx, tgMsg); err != nil {
logger.ErrorCF("telegram", "HTML parse failed, falling back to plain text", map[string]any{
@@ -232,13 +233,16 @@ func (c *TelegramChannel) sendHTMLChunk(ctx context.Context, chatID int64, htmlC
// (Telegram's typing indicator expires after ~5s) in a background goroutine.
// The returned stop function is idempotent and cancels the goroutine.
func (c *TelegramChannel) StartTyping(ctx context.Context, chatID string) (func(), error) {
cid, err := parseChatID(chatID)
cid, threadID, err := parseTelegramChatID(chatID)
if err != nil {
return func() {}, err
}
action := tu.ChatAction(tu.ID(cid), telego.ChatActionTyping)
action.MessageThreadID = threadID
// Send the first typing action immediately
_ = c.bot.SendChatAction(ctx, tu.ChatAction(tu.ID(cid), telego.ChatActionTyping))
_ = c.bot.SendChatAction(ctx, action)
typingCtx, cancel := context.WithCancel(ctx)
go func() {
@@ -249,7 +253,9 @@ func (c *TelegramChannel) StartTyping(ctx context.Context, chatID string) (func(
case <-typingCtx.Done():
return
case <-ticker.C:
_ = c.bot.SendChatAction(typingCtx, tu.ChatAction(tu.ID(cid), telego.ChatActionTyping))
a := tu.ChatAction(tu.ID(cid), telego.ChatActionTyping)
a.MessageThreadID = threadID
_ = c.bot.SendChatAction(typingCtx, a)
}
}
}()
@@ -259,7 +265,7 @@ func (c *TelegramChannel) StartTyping(ctx context.Context, chatID string) (func(
// EditMessage implements channels.MessageEditor.
func (c *TelegramChannel) EditMessage(ctx context.Context, chatID string, messageID string, content string) error {
cid, err := parseChatID(chatID)
cid, _, err := parseTelegramChatID(chatID)
if err != nil {
return err
}
@@ -288,12 +294,14 @@ func (c *TelegramChannel) SendPlaceholder(ctx context.Context, chatID string) (s
text = "Thinking... 💭"
}
cid, err := parseChatID(chatID)
cid, threadID, err := parseTelegramChatID(chatID)
if err != nil {
return "", err
}
pMsg, err := c.bot.SendMessage(ctx, tu.Message(tu.ID(cid), text))
phMsg := tu.Message(tu.ID(cid), text)
phMsg.MessageThreadID = threadID
pMsg, err := c.bot.SendMessage(ctx, phMsg)
if err != nil {
return "", err
}
@@ -307,7 +315,7 @@ func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMe
return channels.ErrNotRunning
}
chatID, err := parseChatID(msg.ChatID)
chatID, threadID, err := parseTelegramChatID(msg.ChatID)
if err != nil {
return fmt.Errorf("invalid chat ID %s: %w", msg.ChatID, channels.ErrSendFailed)
}
@@ -339,30 +347,34 @@ func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMe
switch part.Type {
case "image":
params := &telego.SendPhotoParams{
ChatID: tu.ID(chatID),
Photo: telego.InputFile{File: file},
Caption: part.Caption,
ChatID: tu.ID(chatID),
MessageThreadID: threadID,
Photo: telego.InputFile{File: file},
Caption: part.Caption,
}
_, err = c.bot.SendPhoto(ctx, params)
case "audio":
params := &telego.SendAudioParams{
ChatID: tu.ID(chatID),
Audio: telego.InputFile{File: file},
Caption: part.Caption,
ChatID: tu.ID(chatID),
MessageThreadID: threadID,
Audio: telego.InputFile{File: file},
Caption: part.Caption,
}
_, err = c.bot.SendAudio(ctx, params)
case "video":
params := &telego.SendVideoParams{
ChatID: tu.ID(chatID),
Video: telego.InputFile{File: file},
Caption: part.Caption,
ChatID: tu.ID(chatID),
MessageThreadID: threadID,
Video: telego.InputFile{File: file},
Caption: part.Caption,
}
_, err = c.bot.SendVideo(ctx, params)
default: // "file" or unknown types
params := &telego.SendDocumentParams{
ChatID: tu.ID(chatID),
Document: telego.InputFile{File: file},
Caption: part.Caption,
ChatID: tu.ID(chatID),
MessageThreadID: threadID,
Document: telego.InputFile{File: file},
Caption: part.Caption,
}
_, err = c.bot.SendDocument(ctx, params)
}
@@ -506,19 +518,29 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Mes
content = cleaned
}
// Build composite chatID. For forum topics, embed the thread ID in the
// chatID as "chatID/threadID" (same pattern as Slack's "channelID/threadTS")
// so all outbound methods route replies to the correct topic.
// Note: Telegram's "General" topic uses thread ID 1; it is treated like any
// other topic for session isolation and agent binding.
compositeChatID := fmt.Sprintf("%d", chatID)
threadID := message.MessageThreadID
if threadID != 0 {
compositeChatID = fmt.Sprintf("%d/%d", chatID, threadID)
}
logger.DebugCF("telegram", "Received message", map[string]any{
"sender_id": sender.CanonicalID,
"chat_id": fmt.Sprintf("%d", chatID),
"chat_id": compositeChatID,
"thread_id": threadID,
"preview": utils.Truncate(content, 50),
})
// Placeholder is now auto-triggered by BaseChannel.HandleMessage via PlaceholderCapable
peerKind := "direct"
peerID := fmt.Sprintf("%d", user.ID)
if message.Chat.Type != "private" {
peerKind = "group"
peerID = fmt.Sprintf("%d", chatID)
peerID = compositeChatID
}
peer := bus.Peer{Kind: peerKind, ID: peerID}
@@ -531,11 +553,18 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Mes
"is_group": fmt.Sprintf("%t", message.Chat.Type != "private"),
}
// For forum topic messages, set parent_peer metadata so the routing system
// can isolate sessions per topic via the existing 7-level priority cascade.
if threadID != 0 {
metadata["parent_peer_kind"] = "topic"
metadata["parent_peer_id"] = fmt.Sprintf("%d", threadID)
}
c.HandleMessage(c.ctx,
peer,
messageID,
platformID,
fmt.Sprintf("%d", chatID),
compositeChatID,
content,
mediaPaths,
metadata,
@@ -583,10 +612,26 @@ func (c *TelegramChannel) downloadFile(ctx context.Context, fileID, ext string)
return c.downloadFileWithInfo(file, ext)
}
func parseChatID(chatIDStr string) (int64, error) {
var id int64
_, err := fmt.Sscanf(chatIDStr, "%d", &id)
return id, err
// parseTelegramChatID splits a composite chat ID "chatID/threadID" into its
// components. For non-forum messages the threadID is 0. If the chatID string
// contains a "/" segment, the second part must be a valid integer thread ID;
// otherwise an error is returned. This mirrors the Slack adapter (channelID/threadTS).
// Implemented with strings.Index to avoid allocating a slice in the hot path.
func parseTelegramChatID(chatID string) (int64, int, error) {
idx := strings.Index(chatID, "/")
if idx == -1 {
cid, err := strconv.ParseInt(chatID, 10, 64)
return cid, 0, err
}
cid, err := strconv.ParseInt(chatID[:idx], 10, 64)
if err != nil {
return 0, 0, err
}
tid, err := strconv.Atoi(chatID[idx+1:])
if err != nil {
return 0, 0, fmt.Errorf("invalid thread ID in chat ID %q: %w", chatID, err)
}
return cid, tid, nil
}
func markdownToTelegramHTML(text string) string {
+143
View File
@@ -6,6 +6,7 @@ import (
"errors"
"strings"
"testing"
"time"
"github.com/mymmrac/telego"
ta "github.com/mymmrac/telego/telegoapi"
@@ -271,3 +272,145 @@ func TestSend_InvalidChatID(t *testing.T) {
assert.True(t, errors.Is(err, channels.ErrSendFailed), "error should wrap ErrSendFailed")
assert.Empty(t, caller.calls)
}
func TestParseTelegramChatID_Plain(t *testing.T) {
cid, tid, err := parseTelegramChatID("12345")
assert.NoError(t, err)
assert.Equal(t, int64(12345), cid)
assert.Equal(t, 0, tid)
}
func TestParseTelegramChatID_NegativeGroup(t *testing.T) {
cid, tid, err := parseTelegramChatID("-1001234567890")
assert.NoError(t, err)
assert.Equal(t, int64(-1001234567890), cid)
assert.Equal(t, 0, tid)
}
func TestParseTelegramChatID_WithThreadID(t *testing.T) {
cid, tid, err := parseTelegramChatID("-1001234567890/42")
assert.NoError(t, err)
assert.Equal(t, int64(-1001234567890), cid)
assert.Equal(t, 42, tid)
}
func TestParseTelegramChatID_GeneralTopic(t *testing.T) {
cid, tid, err := parseTelegramChatID("-100123/1")
assert.NoError(t, err)
assert.Equal(t, int64(-100123), cid)
assert.Equal(t, 1, tid)
}
func TestParseTelegramChatID_Invalid(t *testing.T) {
_, _, err := parseTelegramChatID("not-a-number")
assert.Error(t, err)
}
func TestParseTelegramChatID_InvalidThreadID(t *testing.T) {
_, _, err := parseTelegramChatID("-100123/not-a-thread")
assert.Error(t, err)
assert.Contains(t, err.Error(), "invalid thread ID")
}
func TestSend_WithForumThreadID(t *testing.T) {
caller := &stubCaller{
callFn: func(ctx context.Context, url string, data *ta.RequestData) (*ta.Response, error) {
return successResponse(t), nil
},
}
ch := newTestChannel(t, caller)
err := ch.Send(context.Background(), bus.OutboundMessage{
ChatID: "-1001234567890/42",
Content: "Hello from topic",
})
assert.NoError(t, err)
assert.Len(t, caller.calls, 1)
}
func TestHandleMessage_ForumTopic_SetsMetadata(t *testing.T) {
messageBus := bus.NewMessageBus()
ch := &TelegramChannel{
BaseChannel: channels.NewBaseChannel("telegram", nil, messageBus, nil),
chatIDs: make(map[string]int64),
ctx: context.Background(),
}
msg := &telego.Message{
Text: "hello from topic",
MessageID: 10,
MessageThreadID: 42,
Chat: telego.Chat{
ID: -1001234567890,
Type: "supergroup",
IsForum: true,
},
From: &telego.User{
ID: 7,
FirstName: "Alice",
},
}
err := ch.handleMessage(context.Background(), msg)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
inbound, ok := messageBus.ConsumeInbound(ctx)
require.True(t, ok, "expected inbound message")
// Composite chatID should include thread ID
assert.Equal(t, "-1001234567890/42", inbound.ChatID)
// Peer ID should include thread ID for session key isolation
assert.Equal(t, "group", inbound.Peer.Kind)
assert.Equal(t, "-1001234567890/42", inbound.Peer.ID)
// Parent peer metadata should be set for agent binding
assert.Equal(t, "topic", inbound.Metadata["parent_peer_kind"])
assert.Equal(t, "42", inbound.Metadata["parent_peer_id"])
}
func TestHandleMessage_NoForum_NoThreadMetadata(t *testing.T) {
messageBus := bus.NewMessageBus()
ch := &TelegramChannel{
BaseChannel: channels.NewBaseChannel("telegram", nil, messageBus, nil),
chatIDs: make(map[string]int64),
ctx: context.Background(),
}
msg := &telego.Message{
Text: "regular group message",
MessageID: 11,
Chat: telego.Chat{
ID: -100999,
Type: "group",
},
From: &telego.User{
ID: 8,
FirstName: "Bob",
},
}
err := ch.handleMessage(context.Background(), msg)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
inbound, ok := messageBus.ConsumeInbound(ctx)
require.True(t, ok)
// Plain chatID without thread suffix
assert.Equal(t, "-100999", inbound.ChatID)
// Peer ID should be raw chat ID (no thread suffix)
assert.Equal(t, "group", inbound.Peer.Kind)
assert.Equal(t, "-100999", inbound.Peer.ID)
// No parent peer metadata
assert.Empty(t, inbound.Metadata["parent_peer_kind"])
assert.Empty(t, inbound.Metadata["parent_peer_id"])
}