mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
fix(wecom): remove message-dedupe data races in bot/app channels
Centralize dedupe map access behind a mutex-safe helper and use it in both WeCom bot and WeCom app channels to eliminate concurrent map access races while preserving current dedupe behavior.
This commit is contained in:
@@ -607,23 +607,12 @@ func (c *WeComAppChannel) processMessage(ctx context.Context, msg WeComXMLMessag
|
||||
// Message deduplication: Use msg_id to prevent duplicate processing
|
||||
// As per WeCom documentation, use msg_id for deduplication
|
||||
msgID := fmt.Sprintf("%d", msg.MsgId)
|
||||
c.msgMu.Lock()
|
||||
if c.processedMsgs[msgID] {
|
||||
c.msgMu.Unlock()
|
||||
if !markMessageProcessed(&c.msgMu, &c.processedMsgs, msgID, wecomMaxProcessedMessages) {
|
||||
logger.DebugCF("wecom_app", "Skipping duplicate message", map[string]any{
|
||||
"msg_id": msgID,
|
||||
})
|
||||
return
|
||||
}
|
||||
c.processedMsgs[msgID] = true
|
||||
// Clean up old messages while still holding the lock to avoid a data race
|
||||
// on len(). Reset the map but re-insert the current msgID so it remains
|
||||
// deduplicated.
|
||||
if len(c.processedMsgs) > 1000 {
|
||||
c.processedMsgs = make(map[string]bool)
|
||||
c.processedMsgs[msgID] = true
|
||||
}
|
||||
c.msgMu.Unlock()
|
||||
|
||||
senderID := msg.FromUserName
|
||||
chatID := senderID // WeCom App uses user ID as chat ID for direct messages
|
||||
|
||||
@@ -330,23 +330,12 @@ func (c *WeComBotChannel) processMessage(ctx context.Context, msg WeComBotMessag
|
||||
|
||||
// Message deduplication: Use msg_id to prevent duplicate processing
|
||||
msgID := msg.MsgID
|
||||
c.msgMu.Lock()
|
||||
if c.processedMsgs[msgID] {
|
||||
c.msgMu.Unlock()
|
||||
if !markMessageProcessed(&c.msgMu, &c.processedMsgs, msgID, wecomMaxProcessedMessages) {
|
||||
logger.DebugCF("wecom", "Skipping duplicate message", map[string]any{
|
||||
"msg_id": msgID,
|
||||
})
|
||||
return
|
||||
}
|
||||
c.processedMsgs[msgID] = true
|
||||
// Clean up old messages while still holding the lock to avoid a data race
|
||||
// on len(). Reset the map but re-insert the current msgID so it remains
|
||||
// deduplicated.
|
||||
if len(c.processedMsgs) > 1000 {
|
||||
c.processedMsgs = make(map[string]bool)
|
||||
c.processedMsgs[msgID] = true
|
||||
}
|
||||
c.msgMu.Unlock()
|
||||
|
||||
senderID := msg.From.UserID
|
||||
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
package wecom
|
||||
|
||||
import "sync"
|
||||
|
||||
const wecomMaxProcessedMessages = 1000
|
||||
|
||||
// markMessageProcessed marks msgID as processed and returns false for duplicates.
|
||||
// All map reads/writes (including len) are protected by msgMu to avoid races.
|
||||
func markMessageProcessed(msgMu *sync.RWMutex, processedMsgs *map[string]bool, msgID string, maxEntries int) bool {
|
||||
if maxEntries <= 0 {
|
||||
maxEntries = wecomMaxProcessedMessages
|
||||
}
|
||||
|
||||
msgMu.Lock()
|
||||
defer msgMu.Unlock()
|
||||
|
||||
if (*processedMsgs)[msgID] {
|
||||
return false
|
||||
}
|
||||
(*processedMsgs)[msgID] = true
|
||||
|
||||
// Keep the newest message marker when rotating to bound memory growth.
|
||||
if len(*processedMsgs) > maxEntries {
|
||||
*processedMsgs = map[string]bool{msgID: true}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
Reference in New Issue
Block a user