Merge pull request #727 from Esubaalew/fix/wecom-dedupe-race

fix(wecom): remove message-dedupe data races and fix amnesia cliff
This commit is contained in:
daming大铭
2026-03-02 23:57:42 +08:00
committed by GitHub
4 changed files with 143 additions and 31 deletions
+3 -15
View File
@@ -38,8 +38,7 @@ type WeComAppChannel struct {
tokenMu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
processedMsgs map[string]bool // Message deduplication: msg_id -> processed
msgMu sync.RWMutex
processedMsgs *MessageDeduplicator
}
// WeComXMLMessage represents the XML message structure from WeCom
@@ -144,7 +143,7 @@ func NewWeComAppChannel(cfg config.WeComAppConfig, messageBus *bus.MessageBus) (
client: &http.Client{Timeout: clientTimeout},
ctx: ctx,
cancel: cancel,
processedMsgs: make(map[string]bool),
processedMsgs: NewMessageDeduplicator(wecomMaxProcessedMessages),
}, nil
}
@@ -607,23 +606,12 @@ func (c *WeComAppChannel) processMessage(ctx context.Context, msg WeComXMLMessag
// Message deduplication: Use msg_id to prevent duplicate processing
// As per WeCom documentation, use msg_id for deduplication
msgID := fmt.Sprintf("%d", msg.MsgId)
c.msgMu.Lock()
if c.processedMsgs[msgID] {
c.msgMu.Unlock()
if !c.processedMsgs.MarkMessageProcessed(msgID) {
logger.DebugCF("wecom_app", "Skipping duplicate message", map[string]any{
"msg_id": msgID,
})
return
}
c.processedMsgs[msgID] = true
// Clean up old messages while still holding the lock to avoid a data race
// on len(). Reset the map but re-insert the current msgID so it remains
// deduplicated.
if len(c.processedMsgs) > 1000 {
c.processedMsgs = make(map[string]bool)
c.processedMsgs[msgID] = true
}
c.msgMu.Unlock()
senderID := msg.FromUserName
chatID := senderID // WeCom App uses user ID as chat ID for direct messages
+3 -16
View File
@@ -9,7 +9,6 @@ import (
"io"
"net/http"
"strings"
"sync"
"time"
"github.com/sipeed/picoclaw/pkg/bus"
@@ -28,8 +27,7 @@ type WeComBotChannel struct {
client *http.Client
ctx context.Context
cancel context.CancelFunc
processedMsgs map[string]bool // Message deduplication: msg_id -> processed
msgMu sync.RWMutex
processedMsgs *MessageDeduplicator
}
// WeComBotMessage represents the JSON message structure from WeCom Bot (AIBOT)
@@ -108,7 +106,7 @@ func NewWeComBotChannel(cfg config.WeComConfig, messageBus *bus.MessageBus) (*We
client: &http.Client{Timeout: clientTimeout},
ctx: ctx,
cancel: cancel,
processedMsgs: make(map[string]bool),
processedMsgs: NewMessageDeduplicator(wecomMaxProcessedMessages),
}, nil
}
@@ -330,23 +328,12 @@ func (c *WeComBotChannel) processMessage(ctx context.Context, msg WeComBotMessag
// Message deduplication: Use msg_id to prevent duplicate processing
msgID := msg.MsgID
c.msgMu.Lock()
if c.processedMsgs[msgID] {
c.msgMu.Unlock()
if !c.processedMsgs.MarkMessageProcessed(msgID) {
logger.DebugCF("wecom", "Skipping duplicate message", map[string]any{
"msg_id": msgID,
})
return
}
c.processedMsgs[msgID] = true
// Clean up old messages while still holding the lock to avoid a data race
// on len(). Reset the map but re-insert the current msgID so it remains
// deduplicated.
if len(c.processedMsgs) > 1000 {
c.processedMsgs = make(map[string]bool)
c.processedMsgs[msgID] = true
}
c.msgMu.Unlock()
senderID := msg.From.UserID
+54
View File
@@ -0,0 +1,54 @@
package wecom
import "sync"
const wecomMaxProcessedMessages = 1000
// MessageDeduplicator provides thread-safe message deduplication using a circular queue (ring buffer)
// combined with a hash map. This ensures fast O(1) lookups while naturally evicting the oldest
// messages without causing "amnesia cliffs" when the limit is reached.
type MessageDeduplicator struct {
mu sync.Mutex
msgs map[string]bool
ring []string
idx int
max int
}
// NewMessageDeduplicator creates a new deduplicator with the specified capacity.
func NewMessageDeduplicator(maxEntries int) *MessageDeduplicator {
if maxEntries <= 0 {
maxEntries = wecomMaxProcessedMessages
}
return &MessageDeduplicator{
msgs: make(map[string]bool, maxEntries),
ring: make([]string, maxEntries),
max: maxEntries,
}
}
// MarkMessageProcessed marks msgID as processed and returns false for duplicates.
func (d *MessageDeduplicator) MarkMessageProcessed(msgID string) bool {
d.mu.Lock()
defer d.mu.Unlock()
// 1. Check for duplicate
if d.msgs[msgID] {
return false
}
// 2. Evict the oldest message at our current ring position (if any)
oldestID := d.ring[d.idx]
if oldestID != "" {
delete(d.msgs, oldestID)
}
// 3. Store the new message
d.msgs[msgID] = true
d.ring[d.idx] = msgID
// 4. Advance the circle queue index
d.idx = (d.idx + 1) % d.max
return true
}
+83
View File
@@ -0,0 +1,83 @@
package wecom
import (
"sync"
"testing"
)
func TestMessageDeduplicator_DuplicateDetection(t *testing.T) {
d := NewMessageDeduplicator(wecomMaxProcessedMessages)
if ok := d.MarkMessageProcessed("msg-1"); !ok {
t.Fatalf("first message should be accepted")
}
if ok := d.MarkMessageProcessed("msg-1"); ok {
t.Fatalf("duplicate message should be rejected")
}
}
func TestMessageDeduplicator_ConcurrentSameMessage(t *testing.T) {
d := NewMessageDeduplicator(wecomMaxProcessedMessages)
const goroutines = 64
var wg sync.WaitGroup
wg.Add(goroutines)
results := make(chan bool, goroutines)
for i := 0; i < goroutines; i++ {
go func() {
defer wg.Done()
results <- d.MarkMessageProcessed("msg-concurrent")
}()
}
wg.Wait()
close(results)
successes := 0
for ok := range results {
if ok {
successes++
}
}
if successes != 1 {
t.Fatalf("expected exactly 1 successful mark, got %d", successes)
}
}
func TestMessageDeduplicator_CircularQueueEviction(t *testing.T) {
// Create a deduplicator with a very small capacity to test eviction easily.
capacity := 3
d := NewMessageDeduplicator(capacity)
// Fill the queue.
d.MarkMessageProcessed("msg-1")
d.MarkMessageProcessed("msg-2")
d.MarkMessageProcessed("msg-3")
// At this point, the queue is full. msg-1 is the oldest.
if len(d.msgs) != 3 {
t.Fatalf("expected map size to be 3, got %d", len(d.msgs))
}
// This should evict msg-1 and add msg-4.
if ok := d.MarkMessageProcessed("msg-4"); !ok {
t.Fatalf("msg-4 should be accepted")
}
if len(d.msgs) != 3 {
t.Fatalf("expected map size to remain at max capacity (3), got %d", len(d.msgs))
}
// msg-1 should now be forgotten (evicted).
if ok := d.MarkMessageProcessed("msg-1"); !ok {
t.Fatalf("msg-1 should be accepted again because it was evicted")
}
// msg-2 should have been evicted when we added msg-1 back.
if ok := d.MarkMessageProcessed("msg-2"); !ok {
t.Fatalf("msg-2 should be accepted again because it was evicted")
}
}