refactor(channels): emit inbound context in primary adapters

This commit is contained in:
Hoshina
2026-04-01 13:50:24 +08:00
parent 9cfa3c3ba6
commit cf11ff70c3
4 changed files with 137 additions and 33 deletions
+52 -13
View File
@@ -251,12 +251,39 @@ func (c *BaseChannel) HandleMessage(
media []string,
metadata map[string]string,
senderOpts ...bus.SenderInfo,
) {
var sender bus.SenderInfo
if len(senderOpts) > 0 {
sender = senderOpts[0]
}
inboundCtx := bus.ContextFromLegacyInbound(bus.InboundMessage{
Channel: c.name,
SenderID: senderID,
Sender: sender,
ChatID: chatID,
Peer: peer,
MessageID: messageID,
Metadata: metadata,
})
c.HandleMessageWithContext(ctx, peer, chatID, content, media, inboundCtx, senderOpts...)
}
func (c *BaseChannel) HandleMessageWithContext(
ctx context.Context,
peer bus.Peer,
deliveryChatID, content string,
media []string,
inboundCtx bus.InboundContext,
senderOpts ...bus.SenderInfo,
) {
// Use SenderInfo-based allow check when available, else fall back to string
var sender bus.SenderInfo
if len(senderOpts) > 0 {
sender = senderOpts[0]
}
senderID := strings.TrimSpace(inboundCtx.SenderID)
if sender.CanonicalID != "" || sender.PlatformID != "" {
if !c.IsAllowedSender(sender) {
return
@@ -273,21 +300,33 @@ func (c *BaseChannel) HandleMessage(
resolvedSenderID = sender.CanonicalID
}
scope := BuildMediaScope(c.name, chatID, messageID)
if resolvedSenderID == "" {
resolvedSenderID = senderID
}
inboundCtx.Channel = c.name
if inboundCtx.ChatID == "" {
inboundCtx.ChatID = deliveryChatID
}
if inboundCtx.SenderID == "" {
inboundCtx.SenderID = resolvedSenderID
}
scope := BuildMediaScope(c.name, deliveryChatID, inboundCtx.MessageID)
msg := bus.InboundMessage{
Channel: c.name,
SenderID: resolvedSenderID,
Sender: sender,
ChatID: chatID,
ChatID: deliveryChatID,
Context: inboundCtx,
Content: content,
Media: media,
Peer: peer,
MessageID: messageID,
MessageID: inboundCtx.MessageID,
MediaScope: scope,
Metadata: metadata,
}
msg.Context = bus.ContextFromLegacyInbound(msg)
msg = bus.NormalizeInboundMessage(msg)
// Auto-trigger typing indicator, message reaction, and placeholder before publishing.
// Each capability is independent — all three may fire for the same message.
@@ -298,14 +337,14 @@ func (c *BaseChannel) HandleMessage(
if c.owner != nil && c.placeholderRecorder != nil {
// Typing
if tc, ok := c.owner.(TypingCapable); ok {
if stop, err := tc.StartTyping(ctx, chatID); err == nil {
c.placeholderRecorder.RecordTypingStop(c.name, chatID, stop)
if stop, err := tc.StartTyping(ctx, deliveryChatID); err == nil {
c.placeholderRecorder.RecordTypingStop(c.name, deliveryChatID, stop)
}
}
// Reaction
if rc, ok := c.owner.(ReactionCapable); ok && messageID != "" {
if undo, err := rc.ReactToMessage(ctx, chatID, messageID); err == nil {
c.placeholderRecorder.RecordReactionUndo(c.name, chatID, undo)
if rc, ok := c.owner.(ReactionCapable); ok && msg.MessageID != "" {
if undo, err := rc.ReactToMessage(ctx, deliveryChatID, msg.MessageID); err == nil {
c.placeholderRecorder.RecordReactionUndo(c.name, deliveryChatID, undo)
}
}
// Placeholder — independent pipeline.
@@ -314,8 +353,8 @@ func (c *BaseChannel) HandleMessage(
// "Thinking…" only once the voice has been processed.
if !audioAnnotationRe.MatchString(content) {
if pc, ok := c.owner.(PlaceholderCapable); ok {
if phID, err := pc.SendPlaceholder(ctx, chatID); err == nil && phID != "" {
c.placeholderRecorder.RecordPlaceholder(c.name, chatID, phID)
if phID, err := pc.SendPlaceholder(ctx, deliveryChatID); err == nil && phID != "" {
c.placeholderRecorder.RecordPlaceholder(c.name, deliveryChatID, phID)
}
}
}
@@ -324,7 +363,7 @@ func (c *BaseChannel) HandleMessage(
if err := c.bus.PublishInbound(ctx, msg); err != nil {
logger.ErrorCF("channels", "Failed to publish inbound message", map[string]any{
"channel": c.name,
"chat_id": chatID,
"chat_id": deliveryChatID,
"error": err.Error(),
})
}
+18 -2
View File
@@ -363,8 +363,8 @@ func (c *DiscordChannel) handleMessage(s *discordgo.Session, m *discordgo.Messag
// In guild (group) channels, apply unified group trigger filtering
// DMs (GuildID is empty) always get a response
isMentioned := false
if m.GuildID != "" {
isMentioned := false
for _, mention := range m.Mentions {
if mention.ID == c.botUserID {
isMentioned = true
@@ -477,8 +477,24 @@ func (c *DiscordChannel) handleMessage(s *discordgo.Session, m *discordgo.Messag
"channel_id": m.ChannelID,
"is_dm": fmt.Sprintf("%t", m.GuildID == ""),
}
inboundCtx := bus.InboundContext{
Channel: c.Name(),
ChatID: m.ChannelID,
ChatType: peerKind,
SenderID: senderID,
MessageID: m.ID,
Mentioned: isMentioned,
Raw: metadata,
}
if m.GuildID != "" {
inboundCtx.SpaceID = m.GuildID
inboundCtx.SpaceType = "guild"
}
if m.MessageReference != nil {
inboundCtx.ReplyToMessageID = m.MessageReference.MessageID
}
c.HandleMessage(c.ctx, peer, m.ID, senderID, m.ChannelID, content, mediaPaths, metadata, sender)
c.HandleMessageWithContext(c.ctx, peer, m.ChannelID, content, mediaPaths, inboundCtx, sender)
}
// startTyping starts a continuous typing indicator loop for the given chatID.
+49 -7
View File
@@ -379,7 +379,22 @@ func (c *SlackChannel) handleMessageEvent(ev *slackevents.MessageEvent) {
"has_thread": threadTS != "",
})
c.HandleMessage(c.ctx, peer, messageTS, senderID, chatID, content, mediaPaths, metadata, sender)
inboundCtx := bus.InboundContext{
Channel: c.Name(),
Account: c.teamID,
ChatID: channelID,
ChatType: peerKind,
SenderID: senderID,
MessageID: messageTS,
SpaceID: c.teamID,
SpaceType: "workspace",
Raw: metadata,
}
if threadTS != "" {
inboundCtx.TopicID = threadTS
}
c.HandleMessageWithContext(c.ctx, peer, chatID, content, mediaPaths, inboundCtx, sender)
}
func (c *SlackChannel) handleAppMention(ev *slackevents.AppMentionEvent) {
@@ -443,8 +458,21 @@ func (c *SlackChannel) handleAppMention(ev *slackevents.AppMentionEvent) {
"is_mention": "true",
"team_id": c.teamID,
}
inboundCtx := bus.InboundContext{
Channel: c.Name(),
Account: c.teamID,
ChatID: channelID,
ChatType: mentionPeerKind,
TopicID: threadTS,
SenderID: senderID,
MessageID: messageTS,
SpaceID: c.teamID,
SpaceType: "workspace",
Mentioned: true,
Raw: metadata,
}
c.HandleMessage(c.ctx, mentionPeer, messageTS, senderID, chatID, content, nil, metadata, mentionSender)
c.HandleMessageWithContext(c.ctx, mentionPeer, chatID, content, nil, inboundCtx, mentionSender)
}
func (c *SlackChannel) handleSlashCommand(event socketmode.Event) {
@@ -491,16 +519,30 @@ func (c *SlackChannel) handleSlashCommand(event socketmode.Event) {
"command": cmd.Command,
"text": utils.Truncate(content, 50),
})
peerKind := "channel"
peerID := channelID
if strings.HasPrefix(channelID, "D") {
peerKind = "direct"
peerID = senderID
}
inboundCtx := bus.InboundContext{
Channel: c.Name(),
Account: c.teamID,
ChatID: channelID,
ChatType: peerKind,
SenderID: senderID,
SpaceID: c.teamID,
SpaceType: "workspace",
Raw: metadata,
}
c.HandleMessage(
c.HandleMessageWithContext(
c.ctx,
bus.Peer{Kind: "channel", ID: channelID},
"",
senderID,
bus.Peer{Kind: peerKind, ID: peerID},
chatID,
content,
nil,
metadata,
inboundCtx,
cmdSender,
)
}
+18 -11
View File
@@ -660,8 +660,9 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Mes
}
// In group chats, apply unified group trigger filtering
isMentioned := false
if message.Chat.Type != "private" {
isMentioned := c.isBotMentioned(message)
isMentioned = c.isBotMentioned(message)
if isMentioned {
content = c.stripBotMention(content)
}
@@ -722,24 +723,30 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Mes
"first_name": user.FirstName,
"is_group": fmt.Sprintf("%t", message.Chat.Type != "private"),
}
if message.ReplyToMessage != nil {
metadata["reply_to_message_id"] = fmt.Sprintf("%d", message.ReplyToMessage.MessageID)
}
// Set parent_peer metadata for per-topic agent binding.
inboundCtx := bus.InboundContext{
Channel: c.Name(),
ChatID: fmt.Sprintf("%d", chatID),
ChatType: peerKind,
SenderID: platformID,
MessageID: messageID,
Mentioned: isMentioned,
Raw: metadata,
}
if message.Chat.IsForum && threadID != 0 {
metadata["parent_peer_kind"] = "topic"
metadata["parent_peer_id"] = fmt.Sprintf("%d", threadID)
inboundCtx.TopicID = fmt.Sprintf("%d", threadID)
}
if message.ReplyToMessage != nil {
inboundCtx.ReplyToMessageID = fmt.Sprintf("%d", message.ReplyToMessage.MessageID)
}
c.HandleMessage(c.ctx,
c.HandleMessageWithContext(
c.ctx,
peer,
messageID,
platformID,
compositeChatID,
content,
mediaPaths,
metadata,
inboundCtx,
sender,
)
return nil