diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 693f2227b..131f7eb4f 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -1119,21 +1119,20 @@ func (al *AgentLoop) handleCommand(ctx context.Context, msg bus.InboundMessage) return "", false } -// extractPeer extracts the routing peer from inbound message metadata. +// extractPeer extracts the routing peer from the inbound message's structured Peer field. func extractPeer(msg bus.InboundMessage) *routing.RoutePeer { - peerKind := msg.Metadata["peer_kind"] - if peerKind == "" { + if msg.Peer.Kind == "" { return nil } - peerID := msg.Metadata["peer_id"] + peerID := msg.Peer.ID if peerID == "" { - if peerKind == "direct" { + if msg.Peer.Kind == "direct" { peerID = msg.SenderID } else { peerID = msg.ChatID } } - return &routing.RoutePeer{Kind: peerKind, ID: peerID} + return &routing.RoutePeer{Kind: msg.Peer.Kind, ID: peerID} } // extractParentPeer extracts the parent peer (reply-to) from inbound message metadata. diff --git a/pkg/bus/types.go b/pkg/bus/types.go index 44f9181a5..081f13a0b 100644 --- a/pkg/bus/types.go +++ b/pkg/bus/types.go @@ -1,11 +1,19 @@ package bus +// Peer identifies the routing peer for a message (direct, group, channel, etc.) +type Peer struct { + Kind string `json:"kind"` // "direct" | "group" | "channel" | "" + ID string `json:"id"` +} + type InboundMessage struct { Channel string `json:"channel"` SenderID string `json:"sender_id"` ChatID string `json:"chat_id"` Content string `json:"content"` Media []string `json:"media,omitempty"` + Peer Peer `json:"peer"` // routing peer + MessageID string `json:"message_id,omitempty"` // platform message ID SessionKey string `json:"session_key"` Metadata map[string]string `json:"metadata,omitempty"` } diff --git a/pkg/channels/base.go b/pkg/channels/base.go index 5d77c6c0d..5e603f0d4 100644 --- a/pkg/channels/base.go +++ b/pkg/channels/base.go @@ -81,18 +81,25 @@ func (c *BaseChannel) IsAllowed(senderID string) bool { return false } -func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []string, metadata map[string]string) { +func (c *BaseChannel) HandleMessage( + peer bus.Peer, + messageID, senderID, chatID, content string, + media []string, + metadata map[string]string, +) { if !c.IsAllowed(senderID) { return } msg := bus.InboundMessage{ - Channel: c.name, - SenderID: senderID, - ChatID: chatID, - Content: content, - Media: media, - Metadata: metadata, + Channel: c.name, + SenderID: senderID, + ChatID: chatID, + Content: content, + Media: media, + Peer: peer, + MessageID: messageID, + Metadata: metadata, } c.bus.PublishInbound(msg) diff --git a/pkg/channels/dingtalk/dingtalk.go b/pkg/channels/dingtalk/dingtalk.go index afc0de47f..a8aee65d6 100644 --- a/pkg/channels/dingtalk/dingtalk.go +++ b/pkg/channels/dingtalk/dingtalk.go @@ -160,12 +160,11 @@ func (c *DingTalkChannel) onChatBotMessageReceived( "session_webhook": data.SessionWebhook, } + var peer bus.Peer if data.ConversationType == "1" { - metadata["peer_kind"] = "direct" - metadata["peer_id"] = senderID + peer = bus.Peer{Kind: "direct", ID: senderID} } else { - metadata["peer_kind"] = "group" - metadata["peer_id"] = data.ConversationId + peer = bus.Peer{Kind: "group", ID: data.ConversationId} } logger.DebugCF("dingtalk", "Received message", map[string]any{ @@ -175,7 +174,7 @@ func (c *DingTalkChannel) onChatBotMessageReceived( }) // Handle the message through the base channel - c.HandleMessage(senderID, chatID, content, nil, metadata) + c.HandleMessage(peer, "", senderID, chatID, content, nil, metadata) // Return nil to indicate we've handled the message asynchronously // The response will be sent through the message bus diff --git a/pkg/channels/discord/discord.go b/pkg/channels/discord/discord.go index b83ac28fd..416a94710 100644 --- a/pkg/channels/discord/discord.go +++ b/pkg/channels/discord/discord.go @@ -294,19 +294,18 @@ func (c *DiscordChannel) handleMessage(s *discordgo.Session, m *discordgo.Messag peerID = senderID } + peer := bus.Peer{Kind: peerKind, ID: peerID} + metadata := map[string]string{ - "message_id": m.ID, "user_id": senderID, "username": m.Author.Username, "display_name": senderName, "guild_id": m.GuildID, "channel_id": m.ChannelID, "is_dm": fmt.Sprintf("%t", m.GuildID == ""), - "peer_kind": peerKind, - "peer_id": peerID, } - c.HandleMessage(senderID, m.ChannelID, content, mediaPaths, metadata) + c.HandleMessage(peer, m.ID, senderID, m.ChannelID, content, mediaPaths, metadata) } // startTyping starts a continuous typing indicator loop for the given chatID. diff --git a/pkg/channels/feishu/feishu_32.go b/pkg/channels/feishu/feishu_32.go index 14711e49e..d0ec758c6 100644 --- a/pkg/channels/feishu/feishu_32.go +++ b/pkg/channels/feishu/feishu_32.go @@ -18,7 +18,9 @@ type FeishuChannel struct { // NewFeishuChannel returns an error on 32-bit architectures where the Feishu SDK is not supported func NewFeishuChannel(cfg config.FeishuConfig, bus *bus.MessageBus) (*FeishuChannel, error) { - return nil, errors.New("feishu channel is not supported on 32-bit architectures (armv7l, 386, etc.). Please use a 64-bit system or disable feishu in your config") + return nil, errors.New( + "feishu channel is not supported on 32-bit architectures (armv7l, 386, etc.). Please use a 64-bit system or disable feishu in your config", + ) } // Start is a stub method to satisfy the Channel interface diff --git a/pkg/channels/feishu/feishu_64.go b/pkg/channels/feishu/feishu_64.go index aa4e141c4..d67823974 100644 --- a/pkg/channels/feishu/feishu_64.go +++ b/pkg/channels/feishu/feishu_64.go @@ -153,8 +153,9 @@ func (c *FeishuChannel) handleMessageReceive(_ context.Context, event *larkim.P2 } metadata := map[string]string{} - if messageID := stringValue(message.MessageId); messageID != "" { - metadata["message_id"] = messageID + messageID := "" + if mid := stringValue(message.MessageId); mid != "" { + messageID = mid } if messageType := stringValue(message.MessageType); messageType != "" { metadata["message_type"] = messageType @@ -167,12 +168,11 @@ func (c *FeishuChannel) handleMessageReceive(_ context.Context, event *larkim.P2 } chatType := stringValue(message.ChatType) + var peer bus.Peer if chatType == "p2p" { - metadata["peer_kind"] = "direct" - metadata["peer_id"] = senderID + peer = bus.Peer{Kind: "direct", ID: senderID} } else { - metadata["peer_kind"] = "group" - metadata["peer_id"] = chatID + peer = bus.Peer{Kind: "group", ID: chatID} } logger.InfoCF("feishu", "Feishu message received", map[string]any{ @@ -181,7 +181,7 @@ func (c *FeishuChannel) handleMessageReceive(_ context.Context, event *larkim.P2 "preview": utils.Truncate(content, 80), }) - c.HandleMessage(senderID, chatID, content, nil, metadata) + c.HandleMessage(peer, messageID, senderID, chatID, content, nil, metadata) return nil } diff --git a/pkg/channels/line/line.go b/pkg/channels/line/line.go index 4e1d0dfd3..96297e2cd 100644 --- a/pkg/channels/line/line.go +++ b/pkg/channels/line/line.go @@ -364,15 +364,13 @@ func (c *LINEChannel) processEvent(event lineEvent) { metadata := map[string]string{ "platform": "line", "source_type": event.Source.Type, - "message_id": msg.ID, } + var peer bus.Peer if isGroup { - metadata["peer_kind"] = "group" - metadata["peer_id"] = chatID + peer = bus.Peer{Kind: "group", ID: chatID} } else { - metadata["peer_kind"] = "direct" - metadata["peer_id"] = senderID + peer = bus.Peer{Kind: "direct", ID: senderID} } logger.DebugCF("line", "Received message", map[string]any{ @@ -386,7 +384,7 @@ func (c *LINEChannel) processEvent(event lineEvent) { // Show typing/loading indicator (requires user ID, not group ID) c.sendLoading(senderID) - c.HandleMessage(senderID, chatID, content, mediaPaths, metadata) + c.HandleMessage(peer, msg.ID, senderID, chatID, content, mediaPaths, metadata) } // isBotMentioned checks if the bot is mentioned in the message. diff --git a/pkg/channels/maixcam/maixcam.go b/pkg/channels/maixcam/maixcam.go index a7bff55e0..280098dda 100644 --- a/pkg/channels/maixcam/maixcam.go +++ b/pkg/channels/maixcam/maixcam.go @@ -171,11 +171,9 @@ func (c *MaixCamChannel) handlePersonDetection(msg MaixCamMessage) { "y": fmt.Sprintf("%.0f", y), "w": fmt.Sprintf("%.0f", w), "h": fmt.Sprintf("%.0f", h), - "peer_kind": "channel", - "peer_id": "default", } - c.HandleMessage(senderID, chatID, content, []string{}, metadata) + c.HandleMessage(bus.Peer{Kind: "channel", ID: "default"}, "", senderID, chatID, content, []string{}, metadata) } func (c *MaixCamChannel) handleStatusUpdate(msg MaixCamMessage) { diff --git a/pkg/channels/onebot/onebot.go b/pkg/channels/onebot/onebot.go index 3d2e64e2a..642eebd1d 100644 --- a/pkg/channels/onebot/onebot.go +++ b/pkg/channels/onebot/onebot.go @@ -856,9 +856,9 @@ func (c *OneBotChannel) handleMessage(raw *oneBotRawEvent) { senderID := strconv.FormatInt(userID, 10) var chatID string - metadata := map[string]string{ - "message_id": messageID, - } + var peer bus.Peer + + metadata := map[string]string{} if parsed.ReplyTo != "" { metadata["reply_to_message_id"] = parsed.ReplyTo @@ -867,14 +867,12 @@ func (c *OneBotChannel) handleMessage(raw *oneBotRawEvent) { switch raw.MessageType { case "private": chatID = "private:" + senderID - metadata["peer_kind"] = "direct" - metadata["peer_id"] = senderID + peer = bus.Peer{Kind: "direct", ID: senderID} case "group": groupIDStr := strconv.FormatInt(groupID, 10) chatID = "group:" + groupIDStr - metadata["peer_kind"] = "group" - metadata["peer_id"] = groupIDStr + peer = bus.Peer{Kind: "group", ID: groupIDStr} metadata["group_id"] = groupIDStr senderUserID, _ := parseJSONInt64(sender.UserID) @@ -929,7 +927,7 @@ func (c *OneBotChannel) handleMessage(raw *oneBotRawEvent) { c.pendingEmojiMsg.Store(chatID, messageID) } - c.HandleMessage(senderID, chatID, content, parsed.Media, metadata) + c.HandleMessage(peer, messageID, senderID, chatID, content, parsed.Media, metadata) } func (c *OneBotChannel) isDuplicate(messageID string) bool { diff --git a/pkg/channels/qq/qq.go b/pkg/channels/qq/qq.go index 2a95bbd06..429e23cbf 100644 --- a/pkg/channels/qq/qq.go +++ b/pkg/channels/qq/qq.go @@ -164,13 +164,17 @@ func (c *QQChannel) handleC2CMessage() event.C2CMessageEventHandler { }) // 转发到消息总线 - metadata := map[string]string{ - "message_id": data.ID, - "peer_kind": "direct", - "peer_id": senderID, - } + metadata := map[string]string{} - c.HandleMessage(senderID, senderID, content, []string{}, metadata) + c.HandleMessage( + bus.Peer{Kind: "direct", ID: senderID}, + data.ID, + senderID, + senderID, + content, + []string{}, + metadata, + ) return nil } @@ -208,13 +212,18 @@ func (c *QQChannel) handleGroupATMessage() event.GroupATMessageEventHandler { // 转发到消息总线(使用 GroupID 作为 ChatID) metadata := map[string]string{ - "message_id": data.ID, - "group_id": data.GroupID, - "peer_kind": "group", - "peer_id": data.GroupID, + "group_id": data.GroupID, } - c.HandleMessage(senderID, data.GroupID, content, []string{}, metadata) + c.HandleMessage( + bus.Peer{Kind: "group", ID: data.GroupID}, + data.ID, + senderID, + data.GroupID, + content, + []string{}, + metadata, + ) return nil } diff --git a/pkg/channels/slack/slack.go b/pkg/channels/slack/slack.go index cafe53103..b459a7140 100644 --- a/pkg/channels/slack/slack.go +++ b/pkg/channels/slack/slack.go @@ -284,13 +284,13 @@ func (c *SlackChannel) handleMessageEvent(ev *slackevents.MessageEvent) { peerID = senderID } + peer := bus.Peer{Kind: peerKind, ID: peerID} + metadata := map[string]string{ "message_ts": messageTS, "channel_id": channelID, "thread_ts": threadTS, "platform": "slack", - "peer_kind": peerKind, - "peer_id": peerID, "team_id": c.teamID, } @@ -301,7 +301,7 @@ func (c *SlackChannel) handleMessageEvent(ev *slackevents.MessageEvent) { "has_thread": threadTS != "", }) - c.HandleMessage(senderID, chatID, content, mediaPaths, metadata) + c.HandleMessage(peer, messageTS, senderID, chatID, content, mediaPaths, metadata) } func (c *SlackChannel) handleAppMention(ev *slackevents.AppMentionEvent) { @@ -351,18 +351,18 @@ func (c *SlackChannel) handleAppMention(ev *slackevents.AppMentionEvent) { mentionPeerID = senderID } + mentionPeer := bus.Peer{Kind: mentionPeerKind, ID: mentionPeerID} + metadata := map[string]string{ "message_ts": messageTS, "channel_id": channelID, "thread_ts": threadTS, "platform": "slack", "is_mention": "true", - "peer_kind": mentionPeerKind, - "peer_id": mentionPeerID, "team_id": c.teamID, } - c.HandleMessage(senderID, chatID, content, nil, metadata) + c.HandleMessage(mentionPeer, messageTS, senderID, chatID, content, nil, metadata) } func (c *SlackChannel) handleSlashCommand(event socketmode.Event) { @@ -396,8 +396,6 @@ func (c *SlackChannel) handleSlashCommand(event socketmode.Event) { "platform": "slack", "is_command": "true", "trigger_id": cmd.TriggerID, - "peer_kind": "channel", - "peer_id": channelID, "team_id": c.teamID, } @@ -407,7 +405,7 @@ func (c *SlackChannel) handleSlashCommand(event socketmode.Event) { "text": utils.Truncate(content, 50), }) - c.HandleMessage(senderID, chatID, content, nil, metadata) + c.HandleMessage(bus.Peer{Kind: "channel", ID: channelID}, "", senderID, chatID, content, nil, metadata) } func (c *SlackChannel) downloadSlackFile(file slack.File) string { diff --git a/pkg/channels/telegram/telegram.go b/pkg/channels/telegram/telegram.go index 7619440e2..5703000b4 100644 --- a/pkg/channels/telegram/telegram.go +++ b/pkg/channels/telegram/telegram.go @@ -362,17 +362,25 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Mes peerID = fmt.Sprintf("%d", chatID) } + peer := bus.Peer{Kind: peerKind, ID: peerID} + messageID := fmt.Sprintf("%d", message.MessageID) + metadata := map[string]string{ - "message_id": fmt.Sprintf("%d", message.MessageID), "user_id": fmt.Sprintf("%d", user.ID), "username": user.Username, "first_name": user.FirstName, "is_group": fmt.Sprintf("%t", message.Chat.Type != "private"), - "peer_kind": peerKind, - "peer_id": peerID, } - c.HandleMessage(fmt.Sprintf("%d", user.ID), fmt.Sprintf("%d", chatID), content, mediaPaths, metadata) + c.HandleMessage( + peer, + messageID, + fmt.Sprintf("%d", user.ID), + fmt.Sprintf("%d", chatID), + content, + mediaPaths, + metadata, + ) return nil } diff --git a/pkg/channels/wecom/app.go b/pkg/channels/wecom/app.go index f3557d60f..873431d3c 100644 --- a/pkg/channels/wecom/app.go +++ b/pkg/channels/wecom/app.go @@ -425,6 +425,9 @@ func (c *WeComAppChannel) processMessage(ctx context.Context, msg WeComXMLMessag // Build metadata // WeCom App only supports direct messages (private chat) + peer := bus.Peer{Kind: "direct", ID: senderID} + messageID := fmt.Sprintf("%d", msg.MsgId) + metadata := map[string]string{ "msg_type": msg.MsgType, "msg_id": fmt.Sprintf("%d", msg.MsgId), @@ -432,8 +435,6 @@ func (c *WeComAppChannel) processMessage(ctx context.Context, msg WeComXMLMessag "platform": "wecom_app", "media_id": msg.MediaId, "create_time": fmt.Sprintf("%d", msg.CreateTime), - "peer_kind": "direct", - "peer_id": senderID, } content := msg.Content @@ -445,7 +446,7 @@ func (c *WeComAppChannel) processMessage(ctx context.Context, msg WeComXMLMessag }) // Handle the message through the base channel - c.HandleMessage(senderID, chatID, content, nil, metadata) + c.HandleMessage(peer, messageID, senderID, chatID, content, nil, metadata) } // tokenRefreshLoop periodically refreshes the access token diff --git a/pkg/channels/wecom/bot.go b/pkg/channels/wecom/bot.go index 17ee2107f..3a8a16c43 100644 --- a/pkg/channels/wecom/bot.go +++ b/pkg/channels/wecom/bot.go @@ -378,12 +378,12 @@ func (c *WeComBotChannel) processMessage(ctx context.Context, msg WeComBotMessag } // Build metadata + peer := bus.Peer{Kind: peerKind, ID: peerID} + metadata := map[string]string{ "msg_type": msg.MsgType, "msg_id": msg.MsgID, "platform": "wecom", - "peer_kind": peerKind, - "peer_id": peerID, "response_url": msg.ResponseURL, } if isGroupChat { @@ -400,7 +400,7 @@ func (c *WeComBotChannel) processMessage(ctx context.Context, msg WeComBotMessag }) // Handle the message through the base channel - c.HandleMessage(senderID, chatID, content, nil, metadata) + c.HandleMessage(peer, msg.MsgID, senderID, chatID, content, nil, metadata) } // sendWebhookReply sends a reply using the webhook URL diff --git a/pkg/channels/whatsapp/whatsapp.go b/pkg/channels/whatsapp/whatsapp.go index 7e8f13ab6..1a5401172 100644 --- a/pkg/channels/whatsapp/whatsapp.go +++ b/pkg/channels/whatsapp/whatsapp.go @@ -172,22 +172,22 @@ func (c *WhatsAppChannel) handleIncomingMessage(msg map[string]any) { } metadata := make(map[string]string) - if messageID, ok := msg["id"].(string); ok { - metadata["message_id"] = messageID + var messageID string + if mid, ok := msg["id"].(string); ok { + messageID = mid } if userName, ok := msg["from_name"].(string); ok { metadata["user_name"] = userName } + var peer bus.Peer if chatID == senderID { - metadata["peer_kind"] = "direct" - metadata["peer_id"] = senderID + peer = bus.Peer{Kind: "direct", ID: senderID} } else { - metadata["peer_kind"] = "group" - metadata["peer_id"] = chatID + peer = bus.Peer{Kind: "group", ID: chatID} } log.Printf("WhatsApp message from %s: %s...", senderID, utils.Truncate(content, 50)) - c.HandleMessage(senderID, chatID, content, mediaPaths, metadata) + c.HandleMessage(peer, messageID, senderID, chatID, content, mediaPaths, metadata) }