refactor(bus,channels): promote peer and messageID from metadata to structured fields

Add bus.Peer struct and explicit Peer/MessageID fields to InboundMessage,
replacing the implicit peer_kind/peer_id/message_id metadata convention.

- Add Peer{Kind, ID} type to pkg/bus/types.go
- Extend InboundMessage with Peer and MessageID fields
- Change BaseChannel.HandleMessage signature to accept peer and messageID
- Adapt all 12 channel implementations to pass structured peer/messageID
- Simplify agent extractPeer() to read msg.Peer directly
- extractParentPeer unchanged (parent_peer still via metadata)
This commit is contained in:
Hoshina
2026-02-22 21:57:12 +08:00
parent b25b3c1324
commit 931093c19d
16 changed files with 108 additions and 84 deletions
+5 -6
View File
@@ -1119,21 +1119,20 @@ func (al *AgentLoop) handleCommand(ctx context.Context, msg bus.InboundMessage)
return "", false
}
// extractPeer extracts the routing peer from inbound message metadata.
// extractPeer extracts the routing peer from the inbound message's structured Peer field.
func extractPeer(msg bus.InboundMessage) *routing.RoutePeer {
peerKind := msg.Metadata["peer_kind"]
if peerKind == "" {
if msg.Peer.Kind == "" {
return nil
}
peerID := msg.Metadata["peer_id"]
peerID := msg.Peer.ID
if peerID == "" {
if peerKind == "direct" {
if msg.Peer.Kind == "direct" {
peerID = msg.SenderID
} else {
peerID = msg.ChatID
}
}
return &routing.RoutePeer{Kind: peerKind, ID: peerID}
return &routing.RoutePeer{Kind: msg.Peer.Kind, ID: peerID}
}
// extractParentPeer extracts the parent peer (reply-to) from inbound message metadata.
+8
View File
@@ -1,11 +1,19 @@
package bus
// Peer identifies the routing peer for a message (direct, group, channel, etc.)
type Peer struct {
Kind string `json:"kind"` // "direct" | "group" | "channel" | ""
ID string `json:"id"`
}
type InboundMessage struct {
Channel string `json:"channel"`
SenderID string `json:"sender_id"`
ChatID string `json:"chat_id"`
Content string `json:"content"`
Media []string `json:"media,omitempty"`
Peer Peer `json:"peer"` // routing peer
MessageID string `json:"message_id,omitempty"` // platform message ID
SessionKey string `json:"session_key"`
Metadata map[string]string `json:"metadata,omitempty"`
}
+14 -7
View File
@@ -81,18 +81,25 @@ func (c *BaseChannel) IsAllowed(senderID string) bool {
return false
}
func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []string, metadata map[string]string) {
func (c *BaseChannel) HandleMessage(
peer bus.Peer,
messageID, senderID, chatID, content string,
media []string,
metadata map[string]string,
) {
if !c.IsAllowed(senderID) {
return
}
msg := bus.InboundMessage{
Channel: c.name,
SenderID: senderID,
ChatID: chatID,
Content: content,
Media: media,
Metadata: metadata,
Channel: c.name,
SenderID: senderID,
ChatID: chatID,
Content: content,
Media: media,
Peer: peer,
MessageID: messageID,
Metadata: metadata,
}
c.bus.PublishInbound(msg)
+4 -5
View File
@@ -160,12 +160,11 @@ func (c *DingTalkChannel) onChatBotMessageReceived(
"session_webhook": data.SessionWebhook,
}
var peer bus.Peer
if data.ConversationType == "1" {
metadata["peer_kind"] = "direct"
metadata["peer_id"] = senderID
peer = bus.Peer{Kind: "direct", ID: senderID}
} else {
metadata["peer_kind"] = "group"
metadata["peer_id"] = data.ConversationId
peer = bus.Peer{Kind: "group", ID: data.ConversationId}
}
logger.DebugCF("dingtalk", "Received message", map[string]any{
@@ -175,7 +174,7 @@ func (c *DingTalkChannel) onChatBotMessageReceived(
})
// Handle the message through the base channel
c.HandleMessage(senderID, chatID, content, nil, metadata)
c.HandleMessage(peer, "", senderID, chatID, content, nil, metadata)
// Return nil to indicate we've handled the message asynchronously
// The response will be sent through the message bus
+3 -4
View File
@@ -294,19 +294,18 @@ func (c *DiscordChannel) handleMessage(s *discordgo.Session, m *discordgo.Messag
peerID = senderID
}
peer := bus.Peer{Kind: peerKind, ID: peerID}
metadata := map[string]string{
"message_id": m.ID,
"user_id": senderID,
"username": m.Author.Username,
"display_name": senderName,
"guild_id": m.GuildID,
"channel_id": m.ChannelID,
"is_dm": fmt.Sprintf("%t", m.GuildID == ""),
"peer_kind": peerKind,
"peer_id": peerID,
}
c.HandleMessage(senderID, m.ChannelID, content, mediaPaths, metadata)
c.HandleMessage(peer, m.ID, senderID, m.ChannelID, content, mediaPaths, metadata)
}
// startTyping starts a continuous typing indicator loop for the given chatID.
+3 -1
View File
@@ -18,7 +18,9 @@ type FeishuChannel struct {
// NewFeishuChannel returns an error on 32-bit architectures where the Feishu SDK is not supported
func NewFeishuChannel(cfg config.FeishuConfig, bus *bus.MessageBus) (*FeishuChannel, error) {
return nil, errors.New("feishu channel is not supported on 32-bit architectures (armv7l, 386, etc.). Please use a 64-bit system or disable feishu in your config")
return nil, errors.New(
"feishu channel is not supported on 32-bit architectures (armv7l, 386, etc.). Please use a 64-bit system or disable feishu in your config",
)
}
// Start is a stub method to satisfy the Channel interface
+7 -7
View File
@@ -153,8 +153,9 @@ func (c *FeishuChannel) handleMessageReceive(_ context.Context, event *larkim.P2
}
metadata := map[string]string{}
if messageID := stringValue(message.MessageId); messageID != "" {
metadata["message_id"] = messageID
messageID := ""
if mid := stringValue(message.MessageId); mid != "" {
messageID = mid
}
if messageType := stringValue(message.MessageType); messageType != "" {
metadata["message_type"] = messageType
@@ -167,12 +168,11 @@ func (c *FeishuChannel) handleMessageReceive(_ context.Context, event *larkim.P2
}
chatType := stringValue(message.ChatType)
var peer bus.Peer
if chatType == "p2p" {
metadata["peer_kind"] = "direct"
metadata["peer_id"] = senderID
peer = bus.Peer{Kind: "direct", ID: senderID}
} else {
metadata["peer_kind"] = "group"
metadata["peer_id"] = chatID
peer = bus.Peer{Kind: "group", ID: chatID}
}
logger.InfoCF("feishu", "Feishu message received", map[string]any{
@@ -181,7 +181,7 @@ func (c *FeishuChannel) handleMessageReceive(_ context.Context, event *larkim.P2
"preview": utils.Truncate(content, 80),
})
c.HandleMessage(senderID, chatID, content, nil, metadata)
c.HandleMessage(peer, messageID, senderID, chatID, content, nil, metadata)
return nil
}
+4 -6
View File
@@ -364,15 +364,13 @@ func (c *LINEChannel) processEvent(event lineEvent) {
metadata := map[string]string{
"platform": "line",
"source_type": event.Source.Type,
"message_id": msg.ID,
}
var peer bus.Peer
if isGroup {
metadata["peer_kind"] = "group"
metadata["peer_id"] = chatID
peer = bus.Peer{Kind: "group", ID: chatID}
} else {
metadata["peer_kind"] = "direct"
metadata["peer_id"] = senderID
peer = bus.Peer{Kind: "direct", ID: senderID}
}
logger.DebugCF("line", "Received message", map[string]any{
@@ -386,7 +384,7 @@ func (c *LINEChannel) processEvent(event lineEvent) {
// Show typing/loading indicator (requires user ID, not group ID)
c.sendLoading(senderID)
c.HandleMessage(senderID, chatID, content, mediaPaths, metadata)
c.HandleMessage(peer, msg.ID, senderID, chatID, content, mediaPaths, metadata)
}
// isBotMentioned checks if the bot is mentioned in the message.
+1 -3
View File
@@ -171,11 +171,9 @@ func (c *MaixCamChannel) handlePersonDetection(msg MaixCamMessage) {
"y": fmt.Sprintf("%.0f", y),
"w": fmt.Sprintf("%.0f", w),
"h": fmt.Sprintf("%.0f", h),
"peer_kind": "channel",
"peer_id": "default",
}
c.HandleMessage(senderID, chatID, content, []string{}, metadata)
c.HandleMessage(bus.Peer{Kind: "channel", ID: "default"}, "", senderID, chatID, content, []string{}, metadata)
}
func (c *MaixCamChannel) handleStatusUpdate(msg MaixCamMessage) {
+6 -8
View File
@@ -856,9 +856,9 @@ func (c *OneBotChannel) handleMessage(raw *oneBotRawEvent) {
senderID := strconv.FormatInt(userID, 10)
var chatID string
metadata := map[string]string{
"message_id": messageID,
}
var peer bus.Peer
metadata := map[string]string{}
if parsed.ReplyTo != "" {
metadata["reply_to_message_id"] = parsed.ReplyTo
@@ -867,14 +867,12 @@ func (c *OneBotChannel) handleMessage(raw *oneBotRawEvent) {
switch raw.MessageType {
case "private":
chatID = "private:" + senderID
metadata["peer_kind"] = "direct"
metadata["peer_id"] = senderID
peer = bus.Peer{Kind: "direct", ID: senderID}
case "group":
groupIDStr := strconv.FormatInt(groupID, 10)
chatID = "group:" + groupIDStr
metadata["peer_kind"] = "group"
metadata["peer_id"] = groupIDStr
peer = bus.Peer{Kind: "group", ID: groupIDStr}
metadata["group_id"] = groupIDStr
senderUserID, _ := parseJSONInt64(sender.UserID)
@@ -929,7 +927,7 @@ func (c *OneBotChannel) handleMessage(raw *oneBotRawEvent) {
c.pendingEmojiMsg.Store(chatID, messageID)
}
c.HandleMessage(senderID, chatID, content, parsed.Media, metadata)
c.HandleMessage(peer, messageID, senderID, chatID, content, parsed.Media, metadata)
}
func (c *OneBotChannel) isDuplicate(messageID string) bool {
+20 -11
View File
@@ -164,13 +164,17 @@ func (c *QQChannel) handleC2CMessage() event.C2CMessageEventHandler {
})
// 转发到消息总线
metadata := map[string]string{
"message_id": data.ID,
"peer_kind": "direct",
"peer_id": senderID,
}
metadata := map[string]string{}
c.HandleMessage(senderID, senderID, content, []string{}, metadata)
c.HandleMessage(
bus.Peer{Kind: "direct", ID: senderID},
data.ID,
senderID,
senderID,
content,
[]string{},
metadata,
)
return nil
}
@@ -208,13 +212,18 @@ func (c *QQChannel) handleGroupATMessage() event.GroupATMessageEventHandler {
// 转发到消息总线(使用 GroupID 作为 ChatID
metadata := map[string]string{
"message_id": data.ID,
"group_id": data.GroupID,
"peer_kind": "group",
"peer_id": data.GroupID,
"group_id": data.GroupID,
}
c.HandleMessage(senderID, data.GroupID, content, []string{}, metadata)
c.HandleMessage(
bus.Peer{Kind: "group", ID: data.GroupID},
data.ID,
senderID,
data.GroupID,
content,
[]string{},
metadata,
)
return nil
}
+7 -9
View File
@@ -284,13 +284,13 @@ func (c *SlackChannel) handleMessageEvent(ev *slackevents.MessageEvent) {
peerID = senderID
}
peer := bus.Peer{Kind: peerKind, ID: peerID}
metadata := map[string]string{
"message_ts": messageTS,
"channel_id": channelID,
"thread_ts": threadTS,
"platform": "slack",
"peer_kind": peerKind,
"peer_id": peerID,
"team_id": c.teamID,
}
@@ -301,7 +301,7 @@ func (c *SlackChannel) handleMessageEvent(ev *slackevents.MessageEvent) {
"has_thread": threadTS != "",
})
c.HandleMessage(senderID, chatID, content, mediaPaths, metadata)
c.HandleMessage(peer, messageTS, senderID, chatID, content, mediaPaths, metadata)
}
func (c *SlackChannel) handleAppMention(ev *slackevents.AppMentionEvent) {
@@ -351,18 +351,18 @@ func (c *SlackChannel) handleAppMention(ev *slackevents.AppMentionEvent) {
mentionPeerID = senderID
}
mentionPeer := bus.Peer{Kind: mentionPeerKind, ID: mentionPeerID}
metadata := map[string]string{
"message_ts": messageTS,
"channel_id": channelID,
"thread_ts": threadTS,
"platform": "slack",
"is_mention": "true",
"peer_kind": mentionPeerKind,
"peer_id": mentionPeerID,
"team_id": c.teamID,
}
c.HandleMessage(senderID, chatID, content, nil, metadata)
c.HandleMessage(mentionPeer, messageTS, senderID, chatID, content, nil, metadata)
}
func (c *SlackChannel) handleSlashCommand(event socketmode.Event) {
@@ -396,8 +396,6 @@ func (c *SlackChannel) handleSlashCommand(event socketmode.Event) {
"platform": "slack",
"is_command": "true",
"trigger_id": cmd.TriggerID,
"peer_kind": "channel",
"peer_id": channelID,
"team_id": c.teamID,
}
@@ -407,7 +405,7 @@ func (c *SlackChannel) handleSlashCommand(event socketmode.Event) {
"text": utils.Truncate(content, 50),
})
c.HandleMessage(senderID, chatID, content, nil, metadata)
c.HandleMessage(bus.Peer{Kind: "channel", ID: channelID}, "", senderID, chatID, content, nil, metadata)
}
func (c *SlackChannel) downloadSlackFile(file slack.File) string {
+12 -4
View File
@@ -362,17 +362,25 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Mes
peerID = fmt.Sprintf("%d", chatID)
}
peer := bus.Peer{Kind: peerKind, ID: peerID}
messageID := fmt.Sprintf("%d", message.MessageID)
metadata := map[string]string{
"message_id": fmt.Sprintf("%d", message.MessageID),
"user_id": fmt.Sprintf("%d", user.ID),
"username": user.Username,
"first_name": user.FirstName,
"is_group": fmt.Sprintf("%t", message.Chat.Type != "private"),
"peer_kind": peerKind,
"peer_id": peerID,
}
c.HandleMessage(fmt.Sprintf("%d", user.ID), fmt.Sprintf("%d", chatID), content, mediaPaths, metadata)
c.HandleMessage(
peer,
messageID,
fmt.Sprintf("%d", user.ID),
fmt.Sprintf("%d", chatID),
content,
mediaPaths,
metadata,
)
return nil
}
+4 -3
View File
@@ -425,6 +425,9 @@ func (c *WeComAppChannel) processMessage(ctx context.Context, msg WeComXMLMessag
// Build metadata
// WeCom App only supports direct messages (private chat)
peer := bus.Peer{Kind: "direct", ID: senderID}
messageID := fmt.Sprintf("%d", msg.MsgId)
metadata := map[string]string{
"msg_type": msg.MsgType,
"msg_id": fmt.Sprintf("%d", msg.MsgId),
@@ -432,8 +435,6 @@ func (c *WeComAppChannel) processMessage(ctx context.Context, msg WeComXMLMessag
"platform": "wecom_app",
"media_id": msg.MediaId,
"create_time": fmt.Sprintf("%d", msg.CreateTime),
"peer_kind": "direct",
"peer_id": senderID,
}
content := msg.Content
@@ -445,7 +446,7 @@ func (c *WeComAppChannel) processMessage(ctx context.Context, msg WeComXMLMessag
})
// Handle the message through the base channel
c.HandleMessage(senderID, chatID, content, nil, metadata)
c.HandleMessage(peer, messageID, senderID, chatID, content, nil, metadata)
}
// tokenRefreshLoop periodically refreshes the access token
+3 -3
View File
@@ -378,12 +378,12 @@ func (c *WeComBotChannel) processMessage(ctx context.Context, msg WeComBotMessag
}
// Build metadata
peer := bus.Peer{Kind: peerKind, ID: peerID}
metadata := map[string]string{
"msg_type": msg.MsgType,
"msg_id": msg.MsgID,
"platform": "wecom",
"peer_kind": peerKind,
"peer_id": peerID,
"response_url": msg.ResponseURL,
}
if isGroupChat {
@@ -400,7 +400,7 @@ func (c *WeComBotChannel) processMessage(ctx context.Context, msg WeComBotMessag
})
// Handle the message through the base channel
c.HandleMessage(senderID, chatID, content, nil, metadata)
c.HandleMessage(peer, msg.MsgID, senderID, chatID, content, nil, metadata)
}
// sendWebhookReply sends a reply using the webhook URL
+7 -7
View File
@@ -172,22 +172,22 @@ func (c *WhatsAppChannel) handleIncomingMessage(msg map[string]any) {
}
metadata := make(map[string]string)
if messageID, ok := msg["id"].(string); ok {
metadata["message_id"] = messageID
var messageID string
if mid, ok := msg["id"].(string); ok {
messageID = mid
}
if userName, ok := msg["from_name"].(string); ok {
metadata["user_name"] = userName
}
var peer bus.Peer
if chatID == senderID {
metadata["peer_kind"] = "direct"
metadata["peer_id"] = senderID
peer = bus.Peer{Kind: "direct", ID: senderID}
} else {
metadata["peer_kind"] = "group"
metadata["peer_id"] = chatID
peer = bus.Peer{Kind: "group", ID: chatID}
}
log.Printf("WhatsApp message from %s: %s...", senderID, utils.Truncate(content, 50))
c.HandleMessage(senderID, chatID, content, mediaPaths, metadata)
c.HandleMessage(peer, messageID, senderID, chatID, content, mediaPaths, metadata)
}