diff --git a/pkg/channels/wecom/dedupe.go b/pkg/channels/wecom/dedupe.go index 5f2b4cf81..09f5a8a41 100644 --- a/pkg/channels/wecom/dedupe.go +++ b/pkg/channels/wecom/dedupe.go @@ -19,9 +19,9 @@ func markMessageProcessed(msgMu *sync.RWMutex, processedMsgs *map[string]bool, m } (*processedMsgs)[msgID] = true - // Keep the newest message marker when rotating to bound memory growth. + // Keep existing behavior: when over limit, reset dedupe map entirely. if len(*processedMsgs) > maxEntries { - *processedMsgs = map[string]bool{msgID: true} + *processedMsgs = make(map[string]bool) } return true diff --git a/pkg/channels/wecom_dedupe_test.go b/pkg/channels/wecom_dedupe_test.go new file mode 100644 index 000000000..d26c3388b --- /dev/null +++ b/pkg/channels/wecom_dedupe_test.go @@ -0,0 +1,42 @@ +package channels + +import ( + "sync" + "testing" +) + +func TestMarkMessageProcessed_DuplicateDetection(t *testing.T) { + var mu sync.RWMutex + processed := make(map[string]bool) + + if ok := markMessageProcessed(&mu, &processed, "msg-1", 1000); !ok { + t.Fatalf("first message should be accepted") + } + + if ok := markMessageProcessed(&mu, &processed, "msg-1", 1000); ok { + t.Fatalf("duplicate message should be rejected") + } +} + +func TestMarkMessageProcessed_RotationClearsMapAtBoundary(t *testing.T) { + var mu sync.RWMutex + processed := make(map[string]bool) + + if ok := markMessageProcessed(&mu, &processed, "msg-1", 1); !ok { + t.Fatalf("first message should be accepted") + } + if len(processed) != 1 { + t.Fatalf("expected map size 1 after first insert, got %d", len(processed)) + } + + // Inserting second unique message exceeds maxEntries and should reset map. + if ok := markMessageProcessed(&mu, &processed, "msg-2", 1); !ok { + t.Fatalf("second unique message should be accepted") + } + if len(processed) != 0 { + t.Fatalf("expected map to be reset after rotation, got size %d", len(processed)) + } + if processed["msg-2"] { + t.Fatalf("expected current message marker to be cleared after rotation") + } +}