diff --git a/pkg/channels/telegram/telegram.go b/pkg/channels/telegram/telegram.go index 0a36247a6..b04beeb6e 100644 --- a/pkg/channels/telegram/telegram.go +++ b/pkg/channels/telegram/telegram.go @@ -168,7 +168,7 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err return channels.ErrNotRunning } - chatID, err := parseChatID(msg.ChatID) + chatID, threadID, err := parseTelegramChatID(msg.ChatID) if err != nil { return fmt.Errorf("invalid chat ID %s: %w", msg.ChatID, channels.ErrSendFailed) } @@ -200,7 +200,7 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err continue } - if err := c.sendHTMLChunk(ctx, chatID, htmlContent, chunk); err != nil { + if err := c.sendHTMLChunk(ctx, chatID, threadID, htmlContent, chunk); err != nil { return err } } @@ -210,9 +210,12 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err // sendHTMLChunk sends a single HTML message, falling back to the original // markdown as plain text on parse failure so users never see raw HTML tags. -func (c *TelegramChannel) sendHTMLChunk(ctx context.Context, chatID int64, htmlContent, mdFallback string) error { +func (c *TelegramChannel) sendHTMLChunk( + ctx context.Context, chatID int64, threadID int, htmlContent, mdFallback string, +) error { tgMsg := tu.Message(tu.ID(chatID), htmlContent) tgMsg.ParseMode = telego.ModeHTML + tgMsg.MessageThreadID = threadID if _, err := c.bot.SendMessage(ctx, tgMsg); err != nil { logger.ErrorCF("telegram", "HTML parse failed, falling back to plain text", map[string]any{ @@ -232,13 +235,16 @@ func (c *TelegramChannel) sendHTMLChunk(ctx context.Context, chatID int64, htmlC // (Telegram's typing indicator expires after ~5s) in a background goroutine. // The returned stop function is idempotent and cancels the goroutine. func (c *TelegramChannel) StartTyping(ctx context.Context, chatID string) (func(), error) { - cid, err := parseChatID(chatID) + cid, threadID, err := parseTelegramChatID(chatID) if err != nil { return func() {}, err } + action := tu.ChatAction(tu.ID(cid), telego.ChatActionTyping) + action.MessageThreadID = threadID + // Send the first typing action immediately - _ = c.bot.SendChatAction(ctx, tu.ChatAction(tu.ID(cid), telego.ChatActionTyping)) + _ = c.bot.SendChatAction(ctx, action) typingCtx, cancel := context.WithCancel(ctx) go func() { @@ -249,7 +255,9 @@ func (c *TelegramChannel) StartTyping(ctx context.Context, chatID string) (func( case <-typingCtx.Done(): return case <-ticker.C: - _ = c.bot.SendChatAction(typingCtx, tu.ChatAction(tu.ID(cid), telego.ChatActionTyping)) + a := tu.ChatAction(tu.ID(cid), telego.ChatActionTyping) + a.MessageThreadID = threadID + _ = c.bot.SendChatAction(typingCtx, a) } } }() @@ -259,7 +267,7 @@ func (c *TelegramChannel) StartTyping(ctx context.Context, chatID string) (func( // EditMessage implements channels.MessageEditor. func (c *TelegramChannel) EditMessage(ctx context.Context, chatID string, messageID string, content string) error { - cid, err := parseChatID(chatID) + cid, _, err := parseTelegramChatID(chatID) if err != nil { return err } @@ -288,12 +296,14 @@ func (c *TelegramChannel) SendPlaceholder(ctx context.Context, chatID string) (s text = "Thinking... 💭" } - cid, err := parseChatID(chatID) + cid, threadID, err := parseTelegramChatID(chatID) if err != nil { return "", err } - pMsg, err := c.bot.SendMessage(ctx, tu.Message(tu.ID(cid), text)) + phMsg := tu.Message(tu.ID(cid), text) + phMsg.MessageThreadID = threadID + pMsg, err := c.bot.SendMessage(ctx, phMsg) if err != nil { return "", err } @@ -307,7 +317,7 @@ func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMe return channels.ErrNotRunning } - chatID, err := parseChatID(msg.ChatID) + chatID, threadID, err := parseTelegramChatID(msg.ChatID) if err != nil { return fmt.Errorf("invalid chat ID %s: %w", msg.ChatID, channels.ErrSendFailed) } @@ -339,30 +349,34 @@ func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMe switch part.Type { case "image": params := &telego.SendPhotoParams{ - ChatID: tu.ID(chatID), - Photo: telego.InputFile{File: file}, - Caption: part.Caption, + ChatID: tu.ID(chatID), + MessageThreadID: threadID, + Photo: telego.InputFile{File: file}, + Caption: part.Caption, } _, err = c.bot.SendPhoto(ctx, params) case "audio": params := &telego.SendAudioParams{ - ChatID: tu.ID(chatID), - Audio: telego.InputFile{File: file}, - Caption: part.Caption, + ChatID: tu.ID(chatID), + MessageThreadID: threadID, + Audio: telego.InputFile{File: file}, + Caption: part.Caption, } _, err = c.bot.SendAudio(ctx, params) case "video": params := &telego.SendVideoParams{ - ChatID: tu.ID(chatID), - Video: telego.InputFile{File: file}, - Caption: part.Caption, + ChatID: tu.ID(chatID), + MessageThreadID: threadID, + Video: telego.InputFile{File: file}, + Caption: part.Caption, } _, err = c.bot.SendVideo(ctx, params) default: // "file" or unknown types params := &telego.SendDocumentParams{ - ChatID: tu.ID(chatID), - Document: telego.InputFile{File: file}, - Caption: part.Caption, + ChatID: tu.ID(chatID), + MessageThreadID: threadID, + Document: telego.InputFile{File: file}, + Caption: part.Caption, } _, err = c.bot.SendDocument(ctx, params) } @@ -506,19 +520,28 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Mes content = cleaned } + // For forum topics, embed the thread ID as "chatID/threadID" so replies + // route to the correct topic and each topic gets its own session. + // Only forum groups (IsForum) are handled; regular group reply threads + // must share one session per group. + compositeChatID := fmt.Sprintf("%d", chatID) + threadID := message.MessageThreadID + if message.Chat.IsForum && threadID != 0 { + compositeChatID = fmt.Sprintf("%d/%d", chatID, threadID) + } + logger.DebugCF("telegram", "Received message", map[string]any{ "sender_id": sender.CanonicalID, - "chat_id": fmt.Sprintf("%d", chatID), + "chat_id": compositeChatID, + "thread_id": threadID, "preview": utils.Truncate(content, 50), }) - // Placeholder is now auto-triggered by BaseChannel.HandleMessage via PlaceholderCapable - peerKind := "direct" peerID := fmt.Sprintf("%d", user.ID) if message.Chat.Type != "private" { peerKind = "group" - peerID = fmt.Sprintf("%d", chatID) + peerID = compositeChatID } peer := bus.Peer{Kind: peerKind, ID: peerID} @@ -531,11 +554,17 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Mes "is_group": fmt.Sprintf("%t", message.Chat.Type != "private"), } + // Set parent_peer metadata for per-topic agent binding. + if message.Chat.IsForum && threadID != 0 { + metadata["parent_peer_kind"] = "topic" + metadata["parent_peer_id"] = fmt.Sprintf("%d", threadID) + } + c.HandleMessage(c.ctx, peer, messageID, platformID, - fmt.Sprintf("%d", chatID), + compositeChatID, content, mediaPaths, metadata, @@ -583,10 +612,23 @@ func (c *TelegramChannel) downloadFile(ctx context.Context, fileID, ext string) return c.downloadFileWithInfo(file, ext) } -func parseChatID(chatIDStr string) (int64, error) { - var id int64 - _, err := fmt.Sscanf(chatIDStr, "%d", &id) - return id, err +// parseTelegramChatID splits "chatID/threadID" into its components. +// Returns threadID=0 when no "/" is present (non-forum messages). +func parseTelegramChatID(chatID string) (int64, int, error) { + idx := strings.Index(chatID, "/") + if idx == -1 { + cid, err := strconv.ParseInt(chatID, 10, 64) + return cid, 0, err + } + cid, err := strconv.ParseInt(chatID[:idx], 10, 64) + if err != nil { + return 0, 0, err + } + tid, err := strconv.Atoi(chatID[idx+1:]) + if err != nil { + return 0, 0, fmt.Errorf("invalid thread ID in chat ID %q: %w", chatID, err) + } + return cid, tid, nil } func markdownToTelegramHTML(text string) string { diff --git a/pkg/channels/telegram/telegram_test.go b/pkg/channels/telegram/telegram_test.go index 3a2f1aa66..c2186d0a3 100644 --- a/pkg/channels/telegram/telegram_test.go +++ b/pkg/channels/telegram/telegram_test.go @@ -6,6 +6,7 @@ import ( "errors" "strings" "testing" + "time" "github.com/mymmrac/telego" ta "github.com/mymmrac/telego/telegoapi" @@ -271,3 +272,191 @@ func TestSend_InvalidChatID(t *testing.T) { assert.True(t, errors.Is(err, channels.ErrSendFailed), "error should wrap ErrSendFailed") assert.Empty(t, caller.calls) } + +func TestParseTelegramChatID_Plain(t *testing.T) { + cid, tid, err := parseTelegramChatID("12345") + assert.NoError(t, err) + assert.Equal(t, int64(12345), cid) + assert.Equal(t, 0, tid) +} + +func TestParseTelegramChatID_NegativeGroup(t *testing.T) { + cid, tid, err := parseTelegramChatID("-1001234567890") + assert.NoError(t, err) + assert.Equal(t, int64(-1001234567890), cid) + assert.Equal(t, 0, tid) +} + +func TestParseTelegramChatID_WithThreadID(t *testing.T) { + cid, tid, err := parseTelegramChatID("-1001234567890/42") + assert.NoError(t, err) + assert.Equal(t, int64(-1001234567890), cid) + assert.Equal(t, 42, tid) +} + +func TestParseTelegramChatID_GeneralTopic(t *testing.T) { + cid, tid, err := parseTelegramChatID("-100123/1") + assert.NoError(t, err) + assert.Equal(t, int64(-100123), cid) + assert.Equal(t, 1, tid) +} + +func TestParseTelegramChatID_Invalid(t *testing.T) { + _, _, err := parseTelegramChatID("not-a-number") + assert.Error(t, err) +} + +func TestParseTelegramChatID_InvalidThreadID(t *testing.T) { + _, _, err := parseTelegramChatID("-100123/not-a-thread") + assert.Error(t, err) + assert.Contains(t, err.Error(), "invalid thread ID") +} + +func TestSend_WithForumThreadID(t *testing.T) { + caller := &stubCaller{ + callFn: func(ctx context.Context, url string, data *ta.RequestData) (*ta.Response, error) { + return successResponse(t), nil + }, + } + ch := newTestChannel(t, caller) + + err := ch.Send(context.Background(), bus.OutboundMessage{ + ChatID: "-1001234567890/42", + Content: "Hello from topic", + }) + + assert.NoError(t, err) + assert.Len(t, caller.calls, 1) +} + +func TestHandleMessage_ForumTopic_SetsMetadata(t *testing.T) { + messageBus := bus.NewMessageBus() + ch := &TelegramChannel{ + BaseChannel: channels.NewBaseChannel("telegram", nil, messageBus, nil), + chatIDs: make(map[string]int64), + ctx: context.Background(), + } + + msg := &telego.Message{ + Text: "hello from topic", + MessageID: 10, + MessageThreadID: 42, + Chat: telego.Chat{ + ID: -1001234567890, + Type: "supergroup", + IsForum: true, + }, + From: &telego.User{ + ID: 7, + FirstName: "Alice", + }, + } + + err := ch.handleMessage(context.Background(), msg) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + inbound, ok := messageBus.ConsumeInbound(ctx) + require.True(t, ok, "expected inbound message") + + // Composite chatID should include thread ID + assert.Equal(t, "-1001234567890/42", inbound.ChatID) + + // Peer ID should include thread ID for session key isolation + assert.Equal(t, "group", inbound.Peer.Kind) + assert.Equal(t, "-1001234567890/42", inbound.Peer.ID) + + // Parent peer metadata should be set for agent binding + assert.Equal(t, "topic", inbound.Metadata["parent_peer_kind"]) + assert.Equal(t, "42", inbound.Metadata["parent_peer_id"]) +} + +func TestHandleMessage_NoForum_NoThreadMetadata(t *testing.T) { + messageBus := bus.NewMessageBus() + ch := &TelegramChannel{ + BaseChannel: channels.NewBaseChannel("telegram", nil, messageBus, nil), + chatIDs: make(map[string]int64), + ctx: context.Background(), + } + + msg := &telego.Message{ + Text: "regular group message", + MessageID: 11, + Chat: telego.Chat{ + ID: -100999, + Type: "group", + }, + From: &telego.User{ + ID: 8, + FirstName: "Bob", + }, + } + + err := ch.handleMessage(context.Background(), msg) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + inbound, ok := messageBus.ConsumeInbound(ctx) + require.True(t, ok) + + // Plain chatID without thread suffix + assert.Equal(t, "-100999", inbound.ChatID) + + // Peer ID should be raw chat ID (no thread suffix) + assert.Equal(t, "group", inbound.Peer.Kind) + assert.Equal(t, "-100999", inbound.Peer.ID) + + // No parent peer metadata + assert.Empty(t, inbound.Metadata["parent_peer_kind"]) + assert.Empty(t, inbound.Metadata["parent_peer_id"]) +} + +func TestHandleMessage_ReplyThread_NonForum_NoIsolation(t *testing.T) { + messageBus := bus.NewMessageBus() + ch := &TelegramChannel{ + BaseChannel: channels.NewBaseChannel("telegram", nil, messageBus, nil), + chatIDs: make(map[string]int64), + ctx: context.Background(), + } + + // In regular groups, reply threads set MessageThreadID to the original + // message ID. This should NOT trigger per-thread session isolation. + msg := &telego.Message{ + Text: "reply in thread", + MessageID: 20, + MessageThreadID: 15, + Chat: telego.Chat{ + ID: -100999, + Type: "supergroup", + IsForum: false, + }, + From: &telego.User{ + ID: 9, + FirstName: "Carol", + }, + } + + err := ch.handleMessage(context.Background(), msg) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + inbound, ok := messageBus.ConsumeInbound(ctx) + require.True(t, ok) + + // chatID should NOT include thread suffix for non-forum groups + assert.Equal(t, "-100999", inbound.ChatID) + + // Peer ID should be raw chat ID (shared session for whole group) + assert.Equal(t, "group", inbound.Peer.Kind) + assert.Equal(t, "-100999", inbound.Peer.ID) + + // No parent peer metadata + assert.Empty(t, inbound.Metadata["parent_peer_kind"]) + assert.Empty(t, inbound.Metadata["parent_peer_id"]) +}