diff --git a/pkg/channels/wecom_dedupe_test.go b/pkg/channels/wecom_dedupe_test.go index d26c3388b..467f79979 100644 --- a/pkg/channels/wecom_dedupe_test.go +++ b/pkg/channels/wecom_dedupe_test.go @@ -9,15 +9,46 @@ func TestMarkMessageProcessed_DuplicateDetection(t *testing.T) { var mu sync.RWMutex processed := make(map[string]bool) - if ok := markMessageProcessed(&mu, &processed, "msg-1", 1000); !ok { + if ok := markMessageProcessed(&mu, &processed, "msg-1", wecomMaxProcessedMessages); !ok { t.Fatalf("first message should be accepted") } - if ok := markMessageProcessed(&mu, &processed, "msg-1", 1000); ok { + if ok := markMessageProcessed(&mu, &processed, "msg-1", wecomMaxProcessedMessages); ok { t.Fatalf("duplicate message should be rejected") } } +func TestMarkMessageProcessed_ConcurrentSameMessage(t *testing.T) { + var mu sync.RWMutex + processed := make(map[string]bool) + + const goroutines = 64 + var wg sync.WaitGroup + wg.Add(goroutines) + + results := make(chan bool, goroutines) + for i := 0; i < goroutines; i++ { + go func() { + defer wg.Done() + results <- markMessageProcessed(&mu, &processed, "msg-concurrent", wecomMaxProcessedMessages) + }() + } + + wg.Wait() + close(results) + + successes := 0 + for ok := range results { + if ok { + successes++ + } + } + + if successes != 1 { + t.Fatalf("expected exactly 1 successful mark, got %d", successes) + } +} + func TestMarkMessageProcessed_RotationClearsMapAtBoundary(t *testing.T) { var mu sync.RWMutex processed := make(map[string]bool)