diff --git a/pkg/channels/wecom/app.go b/pkg/channels/wecom/app.go index c550d8b47..717815b9f 100644 --- a/pkg/channels/wecom/app.go +++ b/pkg/channels/wecom/app.go @@ -38,8 +38,7 @@ type WeComAppChannel struct { tokenMu sync.RWMutex ctx context.Context cancel context.CancelFunc - processedMsgs map[string]bool // Message deduplication: msg_id -> processed - msgMu sync.RWMutex + processedMsgs *MessageDeduplicator } // WeComXMLMessage represents the XML message structure from WeCom @@ -144,7 +143,7 @@ func NewWeComAppChannel(cfg config.WeComAppConfig, messageBus *bus.MessageBus) ( client: &http.Client{Timeout: clientTimeout}, ctx: ctx, cancel: cancel, - processedMsgs: make(map[string]bool), + processedMsgs: NewMessageDeduplicator(wecomMaxProcessedMessages), }, nil } @@ -607,7 +606,7 @@ func (c *WeComAppChannel) processMessage(ctx context.Context, msg WeComXMLMessag // Message deduplication: Use msg_id to prevent duplicate processing // As per WeCom documentation, use msg_id for deduplication msgID := fmt.Sprintf("%d", msg.MsgId) - if !markMessageProcessed(&c.msgMu, &c.processedMsgs, msgID, wecomMaxProcessedMessages) { + if !c.processedMsgs.MarkMessageProcessed(msgID) { logger.DebugCF("wecom_app", "Skipping duplicate message", map[string]any{ "msg_id": msgID, }) diff --git a/pkg/channels/wecom/bot.go b/pkg/channels/wecom/bot.go index c1bdf6c25..9126a847d 100644 --- a/pkg/channels/wecom/bot.go +++ b/pkg/channels/wecom/bot.go @@ -9,7 +9,6 @@ import ( "io" "net/http" "strings" - "sync" "time" "github.com/sipeed/picoclaw/pkg/bus" @@ -28,8 +27,7 @@ type WeComBotChannel struct { client *http.Client ctx context.Context cancel context.CancelFunc - processedMsgs map[string]bool // Message deduplication: msg_id -> processed - msgMu sync.RWMutex + processedMsgs *MessageDeduplicator } // WeComBotMessage represents the JSON message structure from WeCom Bot (AIBOT) @@ -108,7 +106,7 @@ func NewWeComBotChannel(cfg config.WeComConfig, messageBus *bus.MessageBus) (*We client: &http.Client{Timeout: clientTimeout}, ctx: ctx, cancel: cancel, - processedMsgs: make(map[string]bool), + processedMsgs: NewMessageDeduplicator(wecomMaxProcessedMessages), }, nil } @@ -330,7 +328,7 @@ func (c *WeComBotChannel) processMessage(ctx context.Context, msg WeComBotMessag // Message deduplication: Use msg_id to prevent duplicate processing msgID := msg.MsgID - if !markMessageProcessed(&c.msgMu, &c.processedMsgs, msgID, wecomMaxProcessedMessages) { + if !c.processedMsgs.MarkMessageProcessed(msgID) { logger.DebugCF("wecom", "Skipping duplicate message", map[string]any{ "msg_id": msgID, }) diff --git a/pkg/channels/wecom/dedupe.go b/pkg/channels/wecom/dedupe.go index 8ca98a30d..865be668e 100644 --- a/pkg/channels/wecom/dedupe.go +++ b/pkg/channels/wecom/dedupe.go @@ -4,29 +4,51 @@ import "sync" const wecomMaxProcessedMessages = 1000 -// markMessageProcessed marks msgID as processed and returns false for duplicates. -// All map reads/writes (including len) are protected by msgMu to avoid races. -func markMessageProcessed(msgMu *sync.RWMutex, processedMsgs *map[string]bool, msgID string, maxEntries int) bool { +// MessageDeduplicator provides thread-safe message deduplication using a circular queue (ring buffer) +// combined with a hash map. This ensures fast O(1) lookups while naturally evicting the oldest +// messages without causing "amnesia cliffs" when the limit is reached. +type MessageDeduplicator struct { + mu sync.Mutex + msgs map[string]bool + ring []string + idx int + max int +} + +// NewMessageDeduplicator creates a new deduplicator with the specified capacity. +func NewMessageDeduplicator(maxEntries int) *MessageDeduplicator { if maxEntries <= 0 { maxEntries = wecomMaxProcessedMessages } - - msgMu.Lock() - defer msgMu.Unlock() - - if *processedMsgs == nil { - *processedMsgs = make(map[string]bool) + return &MessageDeduplicator{ + msgs: make(map[string]bool, maxEntries), + ring: make([]string, maxEntries), + max: maxEntries, } +} - if (*processedMsgs)[msgID] { +// MarkMessageProcessed marks msgID as processed and returns false for duplicates. +func (d *MessageDeduplicator) MarkMessageProcessed(msgID string) bool { + d.mu.Lock() + defer d.mu.Unlock() + + // 1. Check for duplicate + if d.msgs[msgID] { return false } - (*processedMsgs)[msgID] = true - // When over limit, reset dedupe map but keep the current message. - if len(*processedMsgs) > maxEntries { - *processedMsgs = map[string]bool{msgID: true} + // 2. Evict the oldest message at our current ring position (if any) + oldestID := d.ring[d.idx] + if oldestID != "" { + delete(d.msgs, oldestID) } + // 3. Store the new message + d.msgs[msgID] = true + d.ring[d.idx] = msgID + + // 4. Advance the circle queue index + d.idx = (d.idx + 1) % d.max + return true } diff --git a/pkg/channels/wecom_dedupe_test.go b/pkg/channels/wecom_dedupe_test.go index 71f987892..41a50f7e2 100644 --- a/pkg/channels/wecom_dedupe_test.go +++ b/pkg/channels/wecom_dedupe_test.go @@ -5,22 +5,20 @@ import ( "testing" ) -func TestMarkMessageProcessed_DuplicateDetection(t *testing.T) { - var mu sync.RWMutex - processed := make(map[string]bool) +func TestMessageDeduplicator_DuplicateDetection(t *testing.T) { + d := NewMessageDeduplicator(wecomMaxProcessedMessages) - if ok := markMessageProcessed(&mu, &processed, "msg-1", wecomMaxProcessedMessages); !ok { + if ok := d.MarkMessageProcessed("msg-1"); !ok { t.Fatalf("first message should be accepted") } - if ok := markMessageProcessed(&mu, &processed, "msg-1", wecomMaxProcessedMessages); ok { + if ok := d.MarkMessageProcessed("msg-1"); ok { t.Fatalf("duplicate message should be rejected") } } -func TestMarkMessageProcessed_ConcurrentSameMessage(t *testing.T) { - var mu sync.RWMutex - processed := make(map[string]bool) +func TestMessageDeduplicator_ConcurrentSameMessage(t *testing.T) { + d := NewMessageDeduplicator(wecomMaxProcessedMessages) const goroutines = 64 var wg sync.WaitGroup @@ -30,7 +28,7 @@ func TestMarkMessageProcessed_ConcurrentSameMessage(t *testing.T) { for i := 0; i < goroutines; i++ { go func() { defer wg.Done() - results <- markMessageProcessed(&mu, &processed, "msg-concurrent", wecomMaxProcessedMessages) + results <- d.MarkMessageProcessed("msg-concurrent") }() } @@ -49,30 +47,37 @@ func TestMarkMessageProcessed_ConcurrentSameMessage(t *testing.T) { } } -func TestMarkMessageProcessed_RotationClearsMapAtBoundary(t *testing.T) { - var mu sync.RWMutex - processed := make(map[string]bool) +func TestMessageDeduplicator_CircularQueueEviction(t *testing.T) { + // Create a deduplicator with a very small capacity to test eviction easily + capacity := 3 + d := NewMessageDeduplicator(capacity) - if ok := markMessageProcessed(&mu, &processed, "msg-1", 1); !ok { - t.Fatalf("first message should be accepted") - } - if len(processed) != 1 { - t.Fatalf("expected map size 1 after first insert, got %d", len(processed)) + // Fill the queue + d.MarkMessageProcessed("msg-1") + d.MarkMessageProcessed("msg-2") + d.MarkMessageProcessed("msg-3") + + // At this point, the queue is full. msg-1 is the oldest. + if len(d.msgs) != 3 { + t.Fatalf("expected map size to be 3, got %d", len(d.msgs)) } - // Inserting second unique message exceeds maxEntries and should reset map, but keep the new message. - if ok := markMessageProcessed(&mu, &processed, "msg-2", 1); !ok { - t.Fatalf("second unique message should be accepted") - } - if len(processed) != 1 { - t.Fatalf("expected map to retain current message after rotation, got size %d", len(processed)) - } - if !processed["msg-2"] { - t.Fatalf("expected current message marker to be retained after rotation") + // This should evict msg-1 and add msg-4 + if ok := d.MarkMessageProcessed("msg-4"); !ok { + t.Fatalf("msg-4 should be accepted") } - // Because msg-2 was retained, an immediate duplicate should be rejected. - if ok := markMessageProcessed(&mu, &processed, "msg-2", 1); ok { - t.Fatalf("duplicate message immediately after rotation should be rejected") + if len(d.msgs) != 3 { + t.Fatalf("expected map size to remain at max capacity (3), got %d", len(d.msgs)) + } + + // msg-1 should now be forgotten (evicted) + if ok := d.MarkMessageProcessed("msg-1"); !ok { + t.Fatalf("msg-1 should be accepted again because it was evicted") + } + + // msg-2 should have been evicted when we added msg-1 back + if ok := d.MarkMessageProcessed("msg-2"); !ok { + t.Fatalf("msg-2 should be accepted again because it was evicted") } }