From 18d89937ad487bd467b64aa798e5ec4560adecb7 Mon Sep 17 00:00:00 2001 From: esubaalew Date: Tue, 24 Feb 2026 14:29:17 +0300 Subject: [PATCH 1/6] fix(wecom): remove message-dedupe data races in bot/app channels Centralize dedupe map access behind a mutex-safe helper and use it in both WeCom bot and WeCom app channels to eliminate concurrent map access races while preserving current dedupe behavior. --- pkg/channels/wecom/app.go | 13 +------------ pkg/channels/wecom/bot.go | 13 +------------ pkg/channels/wecom/dedupe.go | 28 ++++++++++++++++++++++++++++ 3 files changed, 30 insertions(+), 24 deletions(-) create mode 100644 pkg/channels/wecom/dedupe.go diff --git a/pkg/channels/wecom/app.go b/pkg/channels/wecom/app.go index b79340315..c550d8b47 100644 --- a/pkg/channels/wecom/app.go +++ b/pkg/channels/wecom/app.go @@ -607,23 +607,12 @@ func (c *WeComAppChannel) processMessage(ctx context.Context, msg WeComXMLMessag // Message deduplication: Use msg_id to prevent duplicate processing // As per WeCom documentation, use msg_id for deduplication msgID := fmt.Sprintf("%d", msg.MsgId) - c.msgMu.Lock() - if c.processedMsgs[msgID] { - c.msgMu.Unlock() + if !markMessageProcessed(&c.msgMu, &c.processedMsgs, msgID, wecomMaxProcessedMessages) { logger.DebugCF("wecom_app", "Skipping duplicate message", map[string]any{ "msg_id": msgID, }) return } - c.processedMsgs[msgID] = true - // Clean up old messages while still holding the lock to avoid a data race - // on len(). Reset the map but re-insert the current msgID so it remains - // deduplicated. - if len(c.processedMsgs) > 1000 { - c.processedMsgs = make(map[string]bool) - c.processedMsgs[msgID] = true - } - c.msgMu.Unlock() senderID := msg.FromUserName chatID := senderID // WeCom App uses user ID as chat ID for direct messages diff --git a/pkg/channels/wecom/bot.go b/pkg/channels/wecom/bot.go index 0d0426c0d..c1bdf6c25 100644 --- a/pkg/channels/wecom/bot.go +++ b/pkg/channels/wecom/bot.go @@ -330,23 +330,12 @@ func (c *WeComBotChannel) processMessage(ctx context.Context, msg WeComBotMessag // Message deduplication: Use msg_id to prevent duplicate processing msgID := msg.MsgID - c.msgMu.Lock() - if c.processedMsgs[msgID] { - c.msgMu.Unlock() + if !markMessageProcessed(&c.msgMu, &c.processedMsgs, msgID, wecomMaxProcessedMessages) { logger.DebugCF("wecom", "Skipping duplicate message", map[string]any{ "msg_id": msgID, }) return } - c.processedMsgs[msgID] = true - // Clean up old messages while still holding the lock to avoid a data race - // on len(). Reset the map but re-insert the current msgID so it remains - // deduplicated. - if len(c.processedMsgs) > 1000 { - c.processedMsgs = make(map[string]bool) - c.processedMsgs[msgID] = true - } - c.msgMu.Unlock() senderID := msg.From.UserID diff --git a/pkg/channels/wecom/dedupe.go b/pkg/channels/wecom/dedupe.go new file mode 100644 index 000000000..5f2b4cf81 --- /dev/null +++ b/pkg/channels/wecom/dedupe.go @@ -0,0 +1,28 @@ +package wecom + +import "sync" + +const wecomMaxProcessedMessages = 1000 + +// markMessageProcessed marks msgID as processed and returns false for duplicates. +// All map reads/writes (including len) are protected by msgMu to avoid races. +func markMessageProcessed(msgMu *sync.RWMutex, processedMsgs *map[string]bool, msgID string, maxEntries int) bool { + if maxEntries <= 0 { + maxEntries = wecomMaxProcessedMessages + } + + msgMu.Lock() + defer msgMu.Unlock() + + if (*processedMsgs)[msgID] { + return false + } + (*processedMsgs)[msgID] = true + + // Keep the newest message marker when rotating to bound memory growth. + if len(*processedMsgs) > maxEntries { + *processedMsgs = map[string]bool{msgID: true} + } + + return true +} From db17cdc86deeb37927ed39e11919dabfcca0a03c Mon Sep 17 00:00:00 2001 From: esubaalew Date: Tue, 24 Feb 2026 14:48:58 +0300 Subject: [PATCH 2/6] test(wecom): align dedupe rotation behavior and add helper tests Match rotation semantics to prior behavior by fully resetting the dedupe map once the size limit is exceeded, and add focused tests for duplicate detection and boundary rotation behavior. --- pkg/channels/wecom/dedupe.go | 4 +-- pkg/channels/wecom_dedupe_test.go | 42 +++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) create mode 100644 pkg/channels/wecom_dedupe_test.go diff --git a/pkg/channels/wecom/dedupe.go b/pkg/channels/wecom/dedupe.go index 5f2b4cf81..09f5a8a41 100644 --- a/pkg/channels/wecom/dedupe.go +++ b/pkg/channels/wecom/dedupe.go @@ -19,9 +19,9 @@ func markMessageProcessed(msgMu *sync.RWMutex, processedMsgs *map[string]bool, m } (*processedMsgs)[msgID] = true - // Keep the newest message marker when rotating to bound memory growth. + // Keep existing behavior: when over limit, reset dedupe map entirely. if len(*processedMsgs) > maxEntries { - *processedMsgs = map[string]bool{msgID: true} + *processedMsgs = make(map[string]bool) } return true diff --git a/pkg/channels/wecom_dedupe_test.go b/pkg/channels/wecom_dedupe_test.go new file mode 100644 index 000000000..d26c3388b --- /dev/null +++ b/pkg/channels/wecom_dedupe_test.go @@ -0,0 +1,42 @@ +package channels + +import ( + "sync" + "testing" +) + +func TestMarkMessageProcessed_DuplicateDetection(t *testing.T) { + var mu sync.RWMutex + processed := make(map[string]bool) + + if ok := markMessageProcessed(&mu, &processed, "msg-1", 1000); !ok { + t.Fatalf("first message should be accepted") + } + + if ok := markMessageProcessed(&mu, &processed, "msg-1", 1000); ok { + t.Fatalf("duplicate message should be rejected") + } +} + +func TestMarkMessageProcessed_RotationClearsMapAtBoundary(t *testing.T) { + var mu sync.RWMutex + processed := make(map[string]bool) + + if ok := markMessageProcessed(&mu, &processed, "msg-1", 1); !ok { + t.Fatalf("first message should be accepted") + } + if len(processed) != 1 { + t.Fatalf("expected map size 1 after first insert, got %d", len(processed)) + } + + // Inserting second unique message exceeds maxEntries and should reset map. + if ok := markMessageProcessed(&mu, &processed, "msg-2", 1); !ok { + t.Fatalf("second unique message should be accepted") + } + if len(processed) != 0 { + t.Fatalf("expected map to be reset after rotation, got size %d", len(processed)) + } + if processed["msg-2"] { + t.Fatalf("expected current message marker to be cleared after rotation") + } +} From 1e2ab4a5e51b08536c76ba4b0783ad560b6971c6 Mon Sep 17 00:00:00 2001 From: esubaalew Date: Tue, 24 Feb 2026 14:55:24 +0300 Subject: [PATCH 3/6] test(wecom): add dedupe helper coverage and align constant usage Use wecomMaxProcessedMessages in tests and add a concurrent same-message test to lock in race-safety behavior for markMessageProcessed. --- pkg/channels/wecom_dedupe_test.go | 35 +++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/pkg/channels/wecom_dedupe_test.go b/pkg/channels/wecom_dedupe_test.go index d26c3388b..467f79979 100644 --- a/pkg/channels/wecom_dedupe_test.go +++ b/pkg/channels/wecom_dedupe_test.go @@ -9,15 +9,46 @@ func TestMarkMessageProcessed_DuplicateDetection(t *testing.T) { var mu sync.RWMutex processed := make(map[string]bool) - if ok := markMessageProcessed(&mu, &processed, "msg-1", 1000); !ok { + if ok := markMessageProcessed(&mu, &processed, "msg-1", wecomMaxProcessedMessages); !ok { t.Fatalf("first message should be accepted") } - if ok := markMessageProcessed(&mu, &processed, "msg-1", 1000); ok { + if ok := markMessageProcessed(&mu, &processed, "msg-1", wecomMaxProcessedMessages); ok { t.Fatalf("duplicate message should be rejected") } } +func TestMarkMessageProcessed_ConcurrentSameMessage(t *testing.T) { + var mu sync.RWMutex + processed := make(map[string]bool) + + const goroutines = 64 + var wg sync.WaitGroup + wg.Add(goroutines) + + results := make(chan bool, goroutines) + for i := 0; i < goroutines; i++ { + go func() { + defer wg.Done() + results <- markMessageProcessed(&mu, &processed, "msg-concurrent", wecomMaxProcessedMessages) + }() + } + + wg.Wait() + close(results) + + successes := 0 + for ok := range results { + if ok { + successes++ + } + } + + if successes != 1 { + t.Fatalf("expected exactly 1 successful mark, got %d", successes) + } +} + func TestMarkMessageProcessed_RotationClearsMapAtBoundary(t *testing.T) { var mu sync.RWMutex processed := make(map[string]bool) From 8640c8177ca544558fa6c89eeb76b72e084c7fd9 Mon Sep 17 00:00:00 2001 From: esubaalew Date: Tue, 24 Feb 2026 15:18:54 +0300 Subject: [PATCH 4/6] fix(wecom): correctly retain boundary message during dedupe map rotation When the dedupe map rotates, the previous logic entirely cleared the map, meaning the message that triggered the rotation was immediately forgotten and could be duplicated immediately. This change seeds the new map with the current message to prevent that. Also adds a defensive nil check. --- pkg/channels/wecom/dedupe.go | 8 ++++++-- pkg/channels/wecom_dedupe_test.go | 15 ++++++++++----- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/pkg/channels/wecom/dedupe.go b/pkg/channels/wecom/dedupe.go index 09f5a8a41..8ca98a30d 100644 --- a/pkg/channels/wecom/dedupe.go +++ b/pkg/channels/wecom/dedupe.go @@ -14,14 +14,18 @@ func markMessageProcessed(msgMu *sync.RWMutex, processedMsgs *map[string]bool, m msgMu.Lock() defer msgMu.Unlock() + if *processedMsgs == nil { + *processedMsgs = make(map[string]bool) + } + if (*processedMsgs)[msgID] { return false } (*processedMsgs)[msgID] = true - // Keep existing behavior: when over limit, reset dedupe map entirely. + // When over limit, reset dedupe map but keep the current message. if len(*processedMsgs) > maxEntries { - *processedMsgs = make(map[string]bool) + *processedMsgs = map[string]bool{msgID: true} } return true diff --git a/pkg/channels/wecom_dedupe_test.go b/pkg/channels/wecom_dedupe_test.go index 467f79979..71f987892 100644 --- a/pkg/channels/wecom_dedupe_test.go +++ b/pkg/channels/wecom_dedupe_test.go @@ -60,14 +60,19 @@ func TestMarkMessageProcessed_RotationClearsMapAtBoundary(t *testing.T) { t.Fatalf("expected map size 1 after first insert, got %d", len(processed)) } - // Inserting second unique message exceeds maxEntries and should reset map. + // Inserting second unique message exceeds maxEntries and should reset map, but keep the new message. if ok := markMessageProcessed(&mu, &processed, "msg-2", 1); !ok { t.Fatalf("second unique message should be accepted") } - if len(processed) != 0 { - t.Fatalf("expected map to be reset after rotation, got size %d", len(processed)) + if len(processed) != 1 { + t.Fatalf("expected map to retain current message after rotation, got size %d", len(processed)) } - if processed["msg-2"] { - t.Fatalf("expected current message marker to be cleared after rotation") + if !processed["msg-2"] { + t.Fatalf("expected current message marker to be retained after rotation") + } + + // Because msg-2 was retained, an immediate duplicate should be rejected. + if ok := markMessageProcessed(&mu, &processed, "msg-2", 1); ok { + t.Fatalf("duplicate message immediately after rotation should be rejected") } } From 29e9b6b4b5c1d6055b4e1e695c83faf1324b383e Mon Sep 17 00:00:00 2001 From: esubaalew Date: Tue, 24 Feb 2026 15:56:28 +0300 Subject: [PATCH 5/6] fix(wecom): replace dedupe map rotation with circular queue The previous dedupe map rotation logic completely cleared the map when it reached max size, causing an 'amnesia cliff' where immediately arriving duplicates of just-forgotten messages would be processed. This change replaces that with a MessageDeduplicator struct that uses a circular queue (ring buffer) to track insertions. When the limit is reached, it only evicts the absolute oldest message from the map, completely resolving the cliff issue. This also cleans up the WeCom Bot and App webhook handlers by encapsulating the mutex and map state. --- pkg/channels/wecom/app.go | 7 ++-- pkg/channels/wecom/bot.go | 8 ++-- pkg/channels/wecom/dedupe.go | 50 +++++++++++++++++------- pkg/channels/wecom_dedupe_test.go | 63 +++++++++++++++++-------------- 4 files changed, 76 insertions(+), 52 deletions(-) diff --git a/pkg/channels/wecom/app.go b/pkg/channels/wecom/app.go index c550d8b47..717815b9f 100644 --- a/pkg/channels/wecom/app.go +++ b/pkg/channels/wecom/app.go @@ -38,8 +38,7 @@ type WeComAppChannel struct { tokenMu sync.RWMutex ctx context.Context cancel context.CancelFunc - processedMsgs map[string]bool // Message deduplication: msg_id -> processed - msgMu sync.RWMutex + processedMsgs *MessageDeduplicator } // WeComXMLMessage represents the XML message structure from WeCom @@ -144,7 +143,7 @@ func NewWeComAppChannel(cfg config.WeComAppConfig, messageBus *bus.MessageBus) ( client: &http.Client{Timeout: clientTimeout}, ctx: ctx, cancel: cancel, - processedMsgs: make(map[string]bool), + processedMsgs: NewMessageDeduplicator(wecomMaxProcessedMessages), }, nil } @@ -607,7 +606,7 @@ func (c *WeComAppChannel) processMessage(ctx context.Context, msg WeComXMLMessag // Message deduplication: Use msg_id to prevent duplicate processing // As per WeCom documentation, use msg_id for deduplication msgID := fmt.Sprintf("%d", msg.MsgId) - if !markMessageProcessed(&c.msgMu, &c.processedMsgs, msgID, wecomMaxProcessedMessages) { + if !c.processedMsgs.MarkMessageProcessed(msgID) { logger.DebugCF("wecom_app", "Skipping duplicate message", map[string]any{ "msg_id": msgID, }) diff --git a/pkg/channels/wecom/bot.go b/pkg/channels/wecom/bot.go index c1bdf6c25..9126a847d 100644 --- a/pkg/channels/wecom/bot.go +++ b/pkg/channels/wecom/bot.go @@ -9,7 +9,6 @@ import ( "io" "net/http" "strings" - "sync" "time" "github.com/sipeed/picoclaw/pkg/bus" @@ -28,8 +27,7 @@ type WeComBotChannel struct { client *http.Client ctx context.Context cancel context.CancelFunc - processedMsgs map[string]bool // Message deduplication: msg_id -> processed - msgMu sync.RWMutex + processedMsgs *MessageDeduplicator } // WeComBotMessage represents the JSON message structure from WeCom Bot (AIBOT) @@ -108,7 +106,7 @@ func NewWeComBotChannel(cfg config.WeComConfig, messageBus *bus.MessageBus) (*We client: &http.Client{Timeout: clientTimeout}, ctx: ctx, cancel: cancel, - processedMsgs: make(map[string]bool), + processedMsgs: NewMessageDeduplicator(wecomMaxProcessedMessages), }, nil } @@ -330,7 +328,7 @@ func (c *WeComBotChannel) processMessage(ctx context.Context, msg WeComBotMessag // Message deduplication: Use msg_id to prevent duplicate processing msgID := msg.MsgID - if !markMessageProcessed(&c.msgMu, &c.processedMsgs, msgID, wecomMaxProcessedMessages) { + if !c.processedMsgs.MarkMessageProcessed(msgID) { logger.DebugCF("wecom", "Skipping duplicate message", map[string]any{ "msg_id": msgID, }) diff --git a/pkg/channels/wecom/dedupe.go b/pkg/channels/wecom/dedupe.go index 8ca98a30d..865be668e 100644 --- a/pkg/channels/wecom/dedupe.go +++ b/pkg/channels/wecom/dedupe.go @@ -4,29 +4,51 @@ import "sync" const wecomMaxProcessedMessages = 1000 -// markMessageProcessed marks msgID as processed and returns false for duplicates. -// All map reads/writes (including len) are protected by msgMu to avoid races. -func markMessageProcessed(msgMu *sync.RWMutex, processedMsgs *map[string]bool, msgID string, maxEntries int) bool { +// MessageDeduplicator provides thread-safe message deduplication using a circular queue (ring buffer) +// combined with a hash map. This ensures fast O(1) lookups while naturally evicting the oldest +// messages without causing "amnesia cliffs" when the limit is reached. +type MessageDeduplicator struct { + mu sync.Mutex + msgs map[string]bool + ring []string + idx int + max int +} + +// NewMessageDeduplicator creates a new deduplicator with the specified capacity. +func NewMessageDeduplicator(maxEntries int) *MessageDeduplicator { if maxEntries <= 0 { maxEntries = wecomMaxProcessedMessages } - - msgMu.Lock() - defer msgMu.Unlock() - - if *processedMsgs == nil { - *processedMsgs = make(map[string]bool) + return &MessageDeduplicator{ + msgs: make(map[string]bool, maxEntries), + ring: make([]string, maxEntries), + max: maxEntries, } +} - if (*processedMsgs)[msgID] { +// MarkMessageProcessed marks msgID as processed and returns false for duplicates. +func (d *MessageDeduplicator) MarkMessageProcessed(msgID string) bool { + d.mu.Lock() + defer d.mu.Unlock() + + // 1. Check for duplicate + if d.msgs[msgID] { return false } - (*processedMsgs)[msgID] = true - // When over limit, reset dedupe map but keep the current message. - if len(*processedMsgs) > maxEntries { - *processedMsgs = map[string]bool{msgID: true} + // 2. Evict the oldest message at our current ring position (if any) + oldestID := d.ring[d.idx] + if oldestID != "" { + delete(d.msgs, oldestID) } + // 3. Store the new message + d.msgs[msgID] = true + d.ring[d.idx] = msgID + + // 4. Advance the circle queue index + d.idx = (d.idx + 1) % d.max + return true } diff --git a/pkg/channels/wecom_dedupe_test.go b/pkg/channels/wecom_dedupe_test.go index 71f987892..41a50f7e2 100644 --- a/pkg/channels/wecom_dedupe_test.go +++ b/pkg/channels/wecom_dedupe_test.go @@ -5,22 +5,20 @@ import ( "testing" ) -func TestMarkMessageProcessed_DuplicateDetection(t *testing.T) { - var mu sync.RWMutex - processed := make(map[string]bool) +func TestMessageDeduplicator_DuplicateDetection(t *testing.T) { + d := NewMessageDeduplicator(wecomMaxProcessedMessages) - if ok := markMessageProcessed(&mu, &processed, "msg-1", wecomMaxProcessedMessages); !ok { + if ok := d.MarkMessageProcessed("msg-1"); !ok { t.Fatalf("first message should be accepted") } - if ok := markMessageProcessed(&mu, &processed, "msg-1", wecomMaxProcessedMessages); ok { + if ok := d.MarkMessageProcessed("msg-1"); ok { t.Fatalf("duplicate message should be rejected") } } -func TestMarkMessageProcessed_ConcurrentSameMessage(t *testing.T) { - var mu sync.RWMutex - processed := make(map[string]bool) +func TestMessageDeduplicator_ConcurrentSameMessage(t *testing.T) { + d := NewMessageDeduplicator(wecomMaxProcessedMessages) const goroutines = 64 var wg sync.WaitGroup @@ -30,7 +28,7 @@ func TestMarkMessageProcessed_ConcurrentSameMessage(t *testing.T) { for i := 0; i < goroutines; i++ { go func() { defer wg.Done() - results <- markMessageProcessed(&mu, &processed, "msg-concurrent", wecomMaxProcessedMessages) + results <- d.MarkMessageProcessed("msg-concurrent") }() } @@ -49,30 +47,37 @@ func TestMarkMessageProcessed_ConcurrentSameMessage(t *testing.T) { } } -func TestMarkMessageProcessed_RotationClearsMapAtBoundary(t *testing.T) { - var mu sync.RWMutex - processed := make(map[string]bool) +func TestMessageDeduplicator_CircularQueueEviction(t *testing.T) { + // Create a deduplicator with a very small capacity to test eviction easily + capacity := 3 + d := NewMessageDeduplicator(capacity) - if ok := markMessageProcessed(&mu, &processed, "msg-1", 1); !ok { - t.Fatalf("first message should be accepted") - } - if len(processed) != 1 { - t.Fatalf("expected map size 1 after first insert, got %d", len(processed)) + // Fill the queue + d.MarkMessageProcessed("msg-1") + d.MarkMessageProcessed("msg-2") + d.MarkMessageProcessed("msg-3") + + // At this point, the queue is full. msg-1 is the oldest. + if len(d.msgs) != 3 { + t.Fatalf("expected map size to be 3, got %d", len(d.msgs)) } - // Inserting second unique message exceeds maxEntries and should reset map, but keep the new message. - if ok := markMessageProcessed(&mu, &processed, "msg-2", 1); !ok { - t.Fatalf("second unique message should be accepted") - } - if len(processed) != 1 { - t.Fatalf("expected map to retain current message after rotation, got size %d", len(processed)) - } - if !processed["msg-2"] { - t.Fatalf("expected current message marker to be retained after rotation") + // This should evict msg-1 and add msg-4 + if ok := d.MarkMessageProcessed("msg-4"); !ok { + t.Fatalf("msg-4 should be accepted") } - // Because msg-2 was retained, an immediate duplicate should be rejected. - if ok := markMessageProcessed(&mu, &processed, "msg-2", 1); ok { - t.Fatalf("duplicate message immediately after rotation should be rejected") + if len(d.msgs) != 3 { + t.Fatalf("expected map size to remain at max capacity (3), got %d", len(d.msgs)) + } + + // msg-1 should now be forgotten (evicted) + if ok := d.MarkMessageProcessed("msg-1"); !ok { + t.Fatalf("msg-1 should be accepted again because it was evicted") + } + + // msg-2 should have been evicted when we added msg-1 back + if ok := d.MarkMessageProcessed("msg-2"); !ok { + t.Fatalf("msg-2 should be accepted again because it was evicted") } } From 2e0be9277660b33b4017b0c0f5e4f388ac54ac75 Mon Sep 17 00:00:00 2001 From: esubaalew Date: Mon, 2 Mar 2026 18:54:11 +0300 Subject: [PATCH 6/6] fix(wecom): resolve upstream rebase conflicts after channel refactor Rebase onto latest upstream/main, keep ring-buffer dedupe behavior, move dedupe tests to pkg/channels/wecom, and ensure wecom/channels race tests pass. --- .../{wecom_dedupe_test.go => wecom/dedupe_test.go} | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) rename pkg/channels/{wecom_dedupe_test.go => wecom/dedupe_test.go} (90%) diff --git a/pkg/channels/wecom_dedupe_test.go b/pkg/channels/wecom/dedupe_test.go similarity index 90% rename from pkg/channels/wecom_dedupe_test.go rename to pkg/channels/wecom/dedupe_test.go index 41a50f7e2..10dff4cfe 100644 --- a/pkg/channels/wecom_dedupe_test.go +++ b/pkg/channels/wecom/dedupe_test.go @@ -1,4 +1,4 @@ -package channels +package wecom import ( "sync" @@ -48,11 +48,11 @@ func TestMessageDeduplicator_ConcurrentSameMessage(t *testing.T) { } func TestMessageDeduplicator_CircularQueueEviction(t *testing.T) { - // Create a deduplicator with a very small capacity to test eviction easily + // Create a deduplicator with a very small capacity to test eviction easily. capacity := 3 d := NewMessageDeduplicator(capacity) - // Fill the queue + // Fill the queue. d.MarkMessageProcessed("msg-1") d.MarkMessageProcessed("msg-2") d.MarkMessageProcessed("msg-3") @@ -62,7 +62,7 @@ func TestMessageDeduplicator_CircularQueueEviction(t *testing.T) { t.Fatalf("expected map size to be 3, got %d", len(d.msgs)) } - // This should evict msg-1 and add msg-4 + // This should evict msg-1 and add msg-4. if ok := d.MarkMessageProcessed("msg-4"); !ok { t.Fatalf("msg-4 should be accepted") } @@ -71,12 +71,12 @@ func TestMessageDeduplicator_CircularQueueEviction(t *testing.T) { t.Fatalf("expected map size to remain at max capacity (3), got %d", len(d.msgs)) } - // msg-1 should now be forgotten (evicted) + // msg-1 should now be forgotten (evicted). if ok := d.MarkMessageProcessed("msg-1"); !ok { t.Fatalf("msg-1 should be accepted again because it was evicted") } - // msg-2 should have been evicted when we added msg-1 back + // msg-2 should have been evicted when we added msg-1 back. if ok := d.MarkMessageProcessed("msg-2"); !ok { t.Fatalf("msg-2 should be accepted again because it was evicted") }