mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
fix(wecom): replace dedupe map rotation with circular queue
The previous dedupe map rotation logic completely cleared the map when it reached max size, causing an 'amnesia cliff' where immediately arriving duplicates of just-forgotten messages would be processed. This change replaces that with a MessageDeduplicator struct that uses a circular queue (ring buffer) to track insertions. When the limit is reached, it only evicts the absolute oldest message from the map, completely resolving the cliff issue. This also cleans up the WeCom Bot and App webhook handlers by encapsulating the mutex and map state.
This commit is contained in:
@@ -38,8 +38,7 @@ type WeComAppChannel struct {
|
||||
tokenMu sync.RWMutex
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
processedMsgs map[string]bool // Message deduplication: msg_id -> processed
|
||||
msgMu sync.RWMutex
|
||||
processedMsgs *MessageDeduplicator
|
||||
}
|
||||
|
||||
// WeComXMLMessage represents the XML message structure from WeCom
|
||||
@@ -144,7 +143,7 @@ func NewWeComAppChannel(cfg config.WeComAppConfig, messageBus *bus.MessageBus) (
|
||||
client: &http.Client{Timeout: clientTimeout},
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
processedMsgs: make(map[string]bool),
|
||||
processedMsgs: NewMessageDeduplicator(wecomMaxProcessedMessages),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -607,7 +606,7 @@ func (c *WeComAppChannel) processMessage(ctx context.Context, msg WeComXMLMessag
|
||||
// Message deduplication: Use msg_id to prevent duplicate processing
|
||||
// As per WeCom documentation, use msg_id for deduplication
|
||||
msgID := fmt.Sprintf("%d", msg.MsgId)
|
||||
if !markMessageProcessed(&c.msgMu, &c.processedMsgs, msgID, wecomMaxProcessedMessages) {
|
||||
if !c.processedMsgs.MarkMessageProcessed(msgID) {
|
||||
logger.DebugCF("wecom_app", "Skipping duplicate message", map[string]any{
|
||||
"msg_id": msgID,
|
||||
})
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/sipeed/picoclaw/pkg/bus"
|
||||
@@ -28,8 +27,7 @@ type WeComBotChannel struct {
|
||||
client *http.Client
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
processedMsgs map[string]bool // Message deduplication: msg_id -> processed
|
||||
msgMu sync.RWMutex
|
||||
processedMsgs *MessageDeduplicator
|
||||
}
|
||||
|
||||
// WeComBotMessage represents the JSON message structure from WeCom Bot (AIBOT)
|
||||
@@ -108,7 +106,7 @@ func NewWeComBotChannel(cfg config.WeComConfig, messageBus *bus.MessageBus) (*We
|
||||
client: &http.Client{Timeout: clientTimeout},
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
processedMsgs: make(map[string]bool),
|
||||
processedMsgs: NewMessageDeduplicator(wecomMaxProcessedMessages),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -330,7 +328,7 @@ func (c *WeComBotChannel) processMessage(ctx context.Context, msg WeComBotMessag
|
||||
|
||||
// Message deduplication: Use msg_id to prevent duplicate processing
|
||||
msgID := msg.MsgID
|
||||
if !markMessageProcessed(&c.msgMu, &c.processedMsgs, msgID, wecomMaxProcessedMessages) {
|
||||
if !c.processedMsgs.MarkMessageProcessed(msgID) {
|
||||
logger.DebugCF("wecom", "Skipping duplicate message", map[string]any{
|
||||
"msg_id": msgID,
|
||||
})
|
||||
|
||||
@@ -4,29 +4,51 @@ import "sync"
|
||||
|
||||
const wecomMaxProcessedMessages = 1000
|
||||
|
||||
// markMessageProcessed marks msgID as processed and returns false for duplicates.
|
||||
// All map reads/writes (including len) are protected by msgMu to avoid races.
|
||||
func markMessageProcessed(msgMu *sync.RWMutex, processedMsgs *map[string]bool, msgID string, maxEntries int) bool {
|
||||
// MessageDeduplicator provides thread-safe message deduplication using a circular queue (ring buffer)
|
||||
// combined with a hash map. This ensures fast O(1) lookups while naturally evicting the oldest
|
||||
// messages without causing "amnesia cliffs" when the limit is reached.
|
||||
type MessageDeduplicator struct {
|
||||
mu sync.Mutex
|
||||
msgs map[string]bool
|
||||
ring []string
|
||||
idx int
|
||||
max int
|
||||
}
|
||||
|
||||
// NewMessageDeduplicator creates a new deduplicator with the specified capacity.
|
||||
func NewMessageDeduplicator(maxEntries int) *MessageDeduplicator {
|
||||
if maxEntries <= 0 {
|
||||
maxEntries = wecomMaxProcessedMessages
|
||||
}
|
||||
|
||||
msgMu.Lock()
|
||||
defer msgMu.Unlock()
|
||||
|
||||
if *processedMsgs == nil {
|
||||
*processedMsgs = make(map[string]bool)
|
||||
return &MessageDeduplicator{
|
||||
msgs: make(map[string]bool, maxEntries),
|
||||
ring: make([]string, maxEntries),
|
||||
max: maxEntries,
|
||||
}
|
||||
}
|
||||
|
||||
if (*processedMsgs)[msgID] {
|
||||
// MarkMessageProcessed marks msgID as processed and returns false for duplicates.
|
||||
func (d *MessageDeduplicator) MarkMessageProcessed(msgID string) bool {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
// 1. Check for duplicate
|
||||
if d.msgs[msgID] {
|
||||
return false
|
||||
}
|
||||
(*processedMsgs)[msgID] = true
|
||||
|
||||
// When over limit, reset dedupe map but keep the current message.
|
||||
if len(*processedMsgs) > maxEntries {
|
||||
*processedMsgs = map[string]bool{msgID: true}
|
||||
// 2. Evict the oldest message at our current ring position (if any)
|
||||
oldestID := d.ring[d.idx]
|
||||
if oldestID != "" {
|
||||
delete(d.msgs, oldestID)
|
||||
}
|
||||
|
||||
// 3. Store the new message
|
||||
d.msgs[msgID] = true
|
||||
d.ring[d.idx] = msgID
|
||||
|
||||
// 4. Advance the circle queue index
|
||||
d.idx = (d.idx + 1) % d.max
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -5,22 +5,20 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestMarkMessageProcessed_DuplicateDetection(t *testing.T) {
|
||||
var mu sync.RWMutex
|
||||
processed := make(map[string]bool)
|
||||
func TestMessageDeduplicator_DuplicateDetection(t *testing.T) {
|
||||
d := NewMessageDeduplicator(wecomMaxProcessedMessages)
|
||||
|
||||
if ok := markMessageProcessed(&mu, &processed, "msg-1", wecomMaxProcessedMessages); !ok {
|
||||
if ok := d.MarkMessageProcessed("msg-1"); !ok {
|
||||
t.Fatalf("first message should be accepted")
|
||||
}
|
||||
|
||||
if ok := markMessageProcessed(&mu, &processed, "msg-1", wecomMaxProcessedMessages); ok {
|
||||
if ok := d.MarkMessageProcessed("msg-1"); ok {
|
||||
t.Fatalf("duplicate message should be rejected")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarkMessageProcessed_ConcurrentSameMessage(t *testing.T) {
|
||||
var mu sync.RWMutex
|
||||
processed := make(map[string]bool)
|
||||
func TestMessageDeduplicator_ConcurrentSameMessage(t *testing.T) {
|
||||
d := NewMessageDeduplicator(wecomMaxProcessedMessages)
|
||||
|
||||
const goroutines = 64
|
||||
var wg sync.WaitGroup
|
||||
@@ -30,7 +28,7 @@ func TestMarkMessageProcessed_ConcurrentSameMessage(t *testing.T) {
|
||||
for i := 0; i < goroutines; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
results <- markMessageProcessed(&mu, &processed, "msg-concurrent", wecomMaxProcessedMessages)
|
||||
results <- d.MarkMessageProcessed("msg-concurrent")
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -49,30 +47,37 @@ func TestMarkMessageProcessed_ConcurrentSameMessage(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarkMessageProcessed_RotationClearsMapAtBoundary(t *testing.T) {
|
||||
var mu sync.RWMutex
|
||||
processed := make(map[string]bool)
|
||||
func TestMessageDeduplicator_CircularQueueEviction(t *testing.T) {
|
||||
// Create a deduplicator with a very small capacity to test eviction easily
|
||||
capacity := 3
|
||||
d := NewMessageDeduplicator(capacity)
|
||||
|
||||
if ok := markMessageProcessed(&mu, &processed, "msg-1", 1); !ok {
|
||||
t.Fatalf("first message should be accepted")
|
||||
}
|
||||
if len(processed) != 1 {
|
||||
t.Fatalf("expected map size 1 after first insert, got %d", len(processed))
|
||||
// Fill the queue
|
||||
d.MarkMessageProcessed("msg-1")
|
||||
d.MarkMessageProcessed("msg-2")
|
||||
d.MarkMessageProcessed("msg-3")
|
||||
|
||||
// At this point, the queue is full. msg-1 is the oldest.
|
||||
if len(d.msgs) != 3 {
|
||||
t.Fatalf("expected map size to be 3, got %d", len(d.msgs))
|
||||
}
|
||||
|
||||
// Inserting second unique message exceeds maxEntries and should reset map, but keep the new message.
|
||||
if ok := markMessageProcessed(&mu, &processed, "msg-2", 1); !ok {
|
||||
t.Fatalf("second unique message should be accepted")
|
||||
}
|
||||
if len(processed) != 1 {
|
||||
t.Fatalf("expected map to retain current message after rotation, got size %d", len(processed))
|
||||
}
|
||||
if !processed["msg-2"] {
|
||||
t.Fatalf("expected current message marker to be retained after rotation")
|
||||
// This should evict msg-1 and add msg-4
|
||||
if ok := d.MarkMessageProcessed("msg-4"); !ok {
|
||||
t.Fatalf("msg-4 should be accepted")
|
||||
}
|
||||
|
||||
// Because msg-2 was retained, an immediate duplicate should be rejected.
|
||||
if ok := markMessageProcessed(&mu, &processed, "msg-2", 1); ok {
|
||||
t.Fatalf("duplicate message immediately after rotation should be rejected")
|
||||
if len(d.msgs) != 3 {
|
||||
t.Fatalf("expected map size to remain at max capacity (3), got %d", len(d.msgs))
|
||||
}
|
||||
|
||||
// msg-1 should now be forgotten (evicted)
|
||||
if ok := d.MarkMessageProcessed("msg-1"); !ok {
|
||||
t.Fatalf("msg-1 should be accepted again because it was evicted")
|
||||
}
|
||||
|
||||
// msg-2 should have been evicted when we added msg-1 back
|
||||
if ok := d.MarkMessageProcessed("msg-2"); !ok {
|
||||
t.Fatalf("msg-2 should be accepted again because it was evicted")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user