From cf11ff70c3cc0ad4b8e3f08a8f362d51b4a446cd Mon Sep 17 00:00:00 2001 From: Hoshina Date: Wed, 1 Apr 2026 13:50:24 +0800 Subject: [PATCH] refactor(channels): emit inbound context in primary adapters --- pkg/channels/base.go | 65 ++++++++++++++++++++++++------- pkg/channels/discord/discord.go | 20 +++++++++- pkg/channels/slack/slack.go | 56 ++++++++++++++++++++++---- pkg/channels/telegram/telegram.go | 29 ++++++++------ 4 files changed, 137 insertions(+), 33 deletions(-) diff --git a/pkg/channels/base.go b/pkg/channels/base.go index fd68ebcc2..8161fa12e 100644 --- a/pkg/channels/base.go +++ b/pkg/channels/base.go @@ -251,12 +251,39 @@ func (c *BaseChannel) HandleMessage( media []string, metadata map[string]string, senderOpts ...bus.SenderInfo, +) { + var sender bus.SenderInfo + if len(senderOpts) > 0 { + sender = senderOpts[0] + } + + inboundCtx := bus.ContextFromLegacyInbound(bus.InboundMessage{ + Channel: c.name, + SenderID: senderID, + Sender: sender, + ChatID: chatID, + Peer: peer, + MessageID: messageID, + Metadata: metadata, + }) + + c.HandleMessageWithContext(ctx, peer, chatID, content, media, inboundCtx, senderOpts...) +} + +func (c *BaseChannel) HandleMessageWithContext( + ctx context.Context, + peer bus.Peer, + deliveryChatID, content string, + media []string, + inboundCtx bus.InboundContext, + senderOpts ...bus.SenderInfo, ) { // Use SenderInfo-based allow check when available, else fall back to string var sender bus.SenderInfo if len(senderOpts) > 0 { sender = senderOpts[0] } + senderID := strings.TrimSpace(inboundCtx.SenderID) if sender.CanonicalID != "" || sender.PlatformID != "" { if !c.IsAllowedSender(sender) { return @@ -273,21 +300,33 @@ func (c *BaseChannel) HandleMessage( resolvedSenderID = sender.CanonicalID } - scope := BuildMediaScope(c.name, chatID, messageID) + if resolvedSenderID == "" { + resolvedSenderID = senderID + } + + inboundCtx.Channel = c.name + if inboundCtx.ChatID == "" { + inboundCtx.ChatID = deliveryChatID + } + if inboundCtx.SenderID == "" { + inboundCtx.SenderID = resolvedSenderID + } + + scope := BuildMediaScope(c.name, deliveryChatID, inboundCtx.MessageID) msg := bus.InboundMessage{ Channel: c.name, SenderID: resolvedSenderID, Sender: sender, - ChatID: chatID, + ChatID: deliveryChatID, + Context: inboundCtx, Content: content, Media: media, Peer: peer, - MessageID: messageID, + MessageID: inboundCtx.MessageID, MediaScope: scope, - Metadata: metadata, } - msg.Context = bus.ContextFromLegacyInbound(msg) + msg = bus.NormalizeInboundMessage(msg) // Auto-trigger typing indicator, message reaction, and placeholder before publishing. // Each capability is independent — all three may fire for the same message. @@ -298,14 +337,14 @@ func (c *BaseChannel) HandleMessage( if c.owner != nil && c.placeholderRecorder != nil { // Typing if tc, ok := c.owner.(TypingCapable); ok { - if stop, err := tc.StartTyping(ctx, chatID); err == nil { - c.placeholderRecorder.RecordTypingStop(c.name, chatID, stop) + if stop, err := tc.StartTyping(ctx, deliveryChatID); err == nil { + c.placeholderRecorder.RecordTypingStop(c.name, deliveryChatID, stop) } } // Reaction - if rc, ok := c.owner.(ReactionCapable); ok && messageID != "" { - if undo, err := rc.ReactToMessage(ctx, chatID, messageID); err == nil { - c.placeholderRecorder.RecordReactionUndo(c.name, chatID, undo) + if rc, ok := c.owner.(ReactionCapable); ok && msg.MessageID != "" { + if undo, err := rc.ReactToMessage(ctx, deliveryChatID, msg.MessageID); err == nil { + c.placeholderRecorder.RecordReactionUndo(c.name, deliveryChatID, undo) } } // Placeholder — independent pipeline. @@ -314,8 +353,8 @@ func (c *BaseChannel) HandleMessage( // "Thinking…" only once the voice has been processed. if !audioAnnotationRe.MatchString(content) { if pc, ok := c.owner.(PlaceholderCapable); ok { - if phID, err := pc.SendPlaceholder(ctx, chatID); err == nil && phID != "" { - c.placeholderRecorder.RecordPlaceholder(c.name, chatID, phID) + if phID, err := pc.SendPlaceholder(ctx, deliveryChatID); err == nil && phID != "" { + c.placeholderRecorder.RecordPlaceholder(c.name, deliveryChatID, phID) } } } @@ -324,7 +363,7 @@ func (c *BaseChannel) HandleMessage( if err := c.bus.PublishInbound(ctx, msg); err != nil { logger.ErrorCF("channels", "Failed to publish inbound message", map[string]any{ "channel": c.name, - "chat_id": chatID, + "chat_id": deliveryChatID, "error": err.Error(), }) } diff --git a/pkg/channels/discord/discord.go b/pkg/channels/discord/discord.go index b3070a822..0376dcdae 100644 --- a/pkg/channels/discord/discord.go +++ b/pkg/channels/discord/discord.go @@ -363,8 +363,8 @@ func (c *DiscordChannel) handleMessage(s *discordgo.Session, m *discordgo.Messag // In guild (group) channels, apply unified group trigger filtering // DMs (GuildID is empty) always get a response + isMentioned := false if m.GuildID != "" { - isMentioned := false for _, mention := range m.Mentions { if mention.ID == c.botUserID { isMentioned = true @@ -477,8 +477,24 @@ func (c *DiscordChannel) handleMessage(s *discordgo.Session, m *discordgo.Messag "channel_id": m.ChannelID, "is_dm": fmt.Sprintf("%t", m.GuildID == ""), } + inboundCtx := bus.InboundContext{ + Channel: c.Name(), + ChatID: m.ChannelID, + ChatType: peerKind, + SenderID: senderID, + MessageID: m.ID, + Mentioned: isMentioned, + Raw: metadata, + } + if m.GuildID != "" { + inboundCtx.SpaceID = m.GuildID + inboundCtx.SpaceType = "guild" + } + if m.MessageReference != nil { + inboundCtx.ReplyToMessageID = m.MessageReference.MessageID + } - c.HandleMessage(c.ctx, peer, m.ID, senderID, m.ChannelID, content, mediaPaths, metadata, sender) + c.HandleMessageWithContext(c.ctx, peer, m.ChannelID, content, mediaPaths, inboundCtx, sender) } // startTyping starts a continuous typing indicator loop for the given chatID. diff --git a/pkg/channels/slack/slack.go b/pkg/channels/slack/slack.go index 1e4a4fef5..882cc5cb5 100644 --- a/pkg/channels/slack/slack.go +++ b/pkg/channels/slack/slack.go @@ -379,7 +379,22 @@ func (c *SlackChannel) handleMessageEvent(ev *slackevents.MessageEvent) { "has_thread": threadTS != "", }) - c.HandleMessage(c.ctx, peer, messageTS, senderID, chatID, content, mediaPaths, metadata, sender) + inboundCtx := bus.InboundContext{ + Channel: c.Name(), + Account: c.teamID, + ChatID: channelID, + ChatType: peerKind, + SenderID: senderID, + MessageID: messageTS, + SpaceID: c.teamID, + SpaceType: "workspace", + Raw: metadata, + } + if threadTS != "" { + inboundCtx.TopicID = threadTS + } + + c.HandleMessageWithContext(c.ctx, peer, chatID, content, mediaPaths, inboundCtx, sender) } func (c *SlackChannel) handleAppMention(ev *slackevents.AppMentionEvent) { @@ -443,8 +458,21 @@ func (c *SlackChannel) handleAppMention(ev *slackevents.AppMentionEvent) { "is_mention": "true", "team_id": c.teamID, } + inboundCtx := bus.InboundContext{ + Channel: c.Name(), + Account: c.teamID, + ChatID: channelID, + ChatType: mentionPeerKind, + TopicID: threadTS, + SenderID: senderID, + MessageID: messageTS, + SpaceID: c.teamID, + SpaceType: "workspace", + Mentioned: true, + Raw: metadata, + } - c.HandleMessage(c.ctx, mentionPeer, messageTS, senderID, chatID, content, nil, metadata, mentionSender) + c.HandleMessageWithContext(c.ctx, mentionPeer, chatID, content, nil, inboundCtx, mentionSender) } func (c *SlackChannel) handleSlashCommand(event socketmode.Event) { @@ -491,16 +519,30 @@ func (c *SlackChannel) handleSlashCommand(event socketmode.Event) { "command": cmd.Command, "text": utils.Truncate(content, 50), }) + peerKind := "channel" + peerID := channelID + if strings.HasPrefix(channelID, "D") { + peerKind = "direct" + peerID = senderID + } + inboundCtx := bus.InboundContext{ + Channel: c.Name(), + Account: c.teamID, + ChatID: channelID, + ChatType: peerKind, + SenderID: senderID, + SpaceID: c.teamID, + SpaceType: "workspace", + Raw: metadata, + } - c.HandleMessage( + c.HandleMessageWithContext( c.ctx, - bus.Peer{Kind: "channel", ID: channelID}, - "", - senderID, + bus.Peer{Kind: peerKind, ID: peerID}, chatID, content, nil, - metadata, + inboundCtx, cmdSender, ) } diff --git a/pkg/channels/telegram/telegram.go b/pkg/channels/telegram/telegram.go index 831eb43cc..e1532bcf9 100644 --- a/pkg/channels/telegram/telegram.go +++ b/pkg/channels/telegram/telegram.go @@ -660,8 +660,9 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Mes } // In group chats, apply unified group trigger filtering + isMentioned := false if message.Chat.Type != "private" { - isMentioned := c.isBotMentioned(message) + isMentioned = c.isBotMentioned(message) if isMentioned { content = c.stripBotMention(content) } @@ -722,24 +723,30 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Mes "first_name": user.FirstName, "is_group": fmt.Sprintf("%t", message.Chat.Type != "private"), } - if message.ReplyToMessage != nil { - metadata["reply_to_message_id"] = fmt.Sprintf("%d", message.ReplyToMessage.MessageID) - } - // Set parent_peer metadata for per-topic agent binding. + inboundCtx := bus.InboundContext{ + Channel: c.Name(), + ChatID: fmt.Sprintf("%d", chatID), + ChatType: peerKind, + SenderID: platformID, + MessageID: messageID, + Mentioned: isMentioned, + Raw: metadata, + } if message.Chat.IsForum && threadID != 0 { - metadata["parent_peer_kind"] = "topic" - metadata["parent_peer_id"] = fmt.Sprintf("%d", threadID) + inboundCtx.TopicID = fmt.Sprintf("%d", threadID) + } + if message.ReplyToMessage != nil { + inboundCtx.ReplyToMessageID = fmt.Sprintf("%d", message.ReplyToMessage.MessageID) } - c.HandleMessage(c.ctx, + c.HandleMessageWithContext( + c.ctx, peer, - messageID, - platformID, compositeChatID, content, mediaPaths, - metadata, + inboundCtx, sender, ) return nil