From 18d89937ad487bd467b64aa798e5ec4560adecb7 Mon Sep 17 00:00:00 2001 From: esubaalew Date: Tue, 24 Feb 2026 14:29:17 +0300 Subject: [PATCH] fix(wecom): remove message-dedupe data races in bot/app channels Centralize dedupe map access behind a mutex-safe helper and use it in both WeCom bot and WeCom app channels to eliminate concurrent map access races while preserving current dedupe behavior. --- pkg/channels/wecom/app.go | 13 +------------ pkg/channels/wecom/bot.go | 13 +------------ pkg/channels/wecom/dedupe.go | 28 ++++++++++++++++++++++++++++ 3 files changed, 30 insertions(+), 24 deletions(-) create mode 100644 pkg/channels/wecom/dedupe.go diff --git a/pkg/channels/wecom/app.go b/pkg/channels/wecom/app.go index b79340315..c550d8b47 100644 --- a/pkg/channels/wecom/app.go +++ b/pkg/channels/wecom/app.go @@ -607,23 +607,12 @@ func (c *WeComAppChannel) processMessage(ctx context.Context, msg WeComXMLMessag // Message deduplication: Use msg_id to prevent duplicate processing // As per WeCom documentation, use msg_id for deduplication msgID := fmt.Sprintf("%d", msg.MsgId) - c.msgMu.Lock() - if c.processedMsgs[msgID] { - c.msgMu.Unlock() + if !markMessageProcessed(&c.msgMu, &c.processedMsgs, msgID, wecomMaxProcessedMessages) { logger.DebugCF("wecom_app", "Skipping duplicate message", map[string]any{ "msg_id": msgID, }) return } - c.processedMsgs[msgID] = true - // Clean up old messages while still holding the lock to avoid a data race - // on len(). Reset the map but re-insert the current msgID so it remains - // deduplicated. - if len(c.processedMsgs) > 1000 { - c.processedMsgs = make(map[string]bool) - c.processedMsgs[msgID] = true - } - c.msgMu.Unlock() senderID := msg.FromUserName chatID := senderID // WeCom App uses user ID as chat ID for direct messages diff --git a/pkg/channels/wecom/bot.go b/pkg/channels/wecom/bot.go index 0d0426c0d..c1bdf6c25 100644 --- a/pkg/channels/wecom/bot.go +++ b/pkg/channels/wecom/bot.go @@ -330,23 +330,12 @@ func (c *WeComBotChannel) processMessage(ctx context.Context, msg WeComBotMessag // Message deduplication: Use msg_id to prevent duplicate processing msgID := msg.MsgID - c.msgMu.Lock() - if c.processedMsgs[msgID] { - c.msgMu.Unlock() + if !markMessageProcessed(&c.msgMu, &c.processedMsgs, msgID, wecomMaxProcessedMessages) { logger.DebugCF("wecom", "Skipping duplicate message", map[string]any{ "msg_id": msgID, }) return } - c.processedMsgs[msgID] = true - // Clean up old messages while still holding the lock to avoid a data race - // on len(). Reset the map but re-insert the current msgID so it remains - // deduplicated. - if len(c.processedMsgs) > 1000 { - c.processedMsgs = make(map[string]bool) - c.processedMsgs[msgID] = true - } - c.msgMu.Unlock() senderID := msg.From.UserID diff --git a/pkg/channels/wecom/dedupe.go b/pkg/channels/wecom/dedupe.go new file mode 100644 index 000000000..5f2b4cf81 --- /dev/null +++ b/pkg/channels/wecom/dedupe.go @@ -0,0 +1,28 @@ +package wecom + +import "sync" + +const wecomMaxProcessedMessages = 1000 + +// markMessageProcessed marks msgID as processed and returns false for duplicates. +// All map reads/writes (including len) are protected by msgMu to avoid races. +func markMessageProcessed(msgMu *sync.RWMutex, processedMsgs *map[string]bool, msgID string, maxEntries int) bool { + if maxEntries <= 0 { + maxEntries = wecomMaxProcessedMessages + } + + msgMu.Lock() + defer msgMu.Unlock() + + if (*processedMsgs)[msgID] { + return false + } + (*processedMsgs)[msgID] = true + + // Keep the newest message marker when rotating to bound memory growth. + if len(*processedMsgs) > maxEntries { + *processedMsgs = map[string]bool{msgID: true} + } + + return true +}