From a87e6b0551593a466c3b3a863b5f4a8ebf7959ba Mon Sep 17 00:00:00 2001 From: Zhang Rui Date: Sat, 28 Feb 2026 15:45:32 +0800 Subject: [PATCH] feat(wecom-aibot): enhance stream task management with StreamClosedAt and improved cleanup logic --- pkg/channels/wecom/aibot.go | 56 +++++++++++++++++++++++++------------ 1 file changed, 38 insertions(+), 18 deletions(-) diff --git a/pkg/channels/wecom/aibot.go b/pkg/channels/wecom/aibot.go index b54202ece..4be430626 100644 --- a/pkg/channels/wecom/aibot.go +++ b/pkg/channels/wecom/aibot.go @@ -39,18 +39,19 @@ type WeComAIBotChannel struct { // streamTask represents a streaming task for AI Bot type streamTask struct { - StreamID string - ChatID string // used by Send() to find this task - ResponseURL string // temporary URL for proactive reply (valid 1 hour, use once) - Question string - CreatedTime time.Time - Deadline time.Time // ~30s, we close the stream here and switch to response_url - StreamClosed bool // stream returned finish:true; waiting for agent to reply via response_url - Finished bool // fully done - mu sync.Mutex - answerCh chan string // receives agent reply from Send() - ctx context.Context // canceled when task is removed; used to interrupt the agent goroutine - cancel context.CancelFunc // call on task removal to cancel ctx + StreamID string + ChatID string // used by Send() to find this task + ResponseURL string // temporary URL for proactive reply (valid 1 hour, use once) + Question string + CreatedTime time.Time + Deadline time.Time // ~30s, we close the stream here and switch to response_url + StreamClosed bool // stream returned finish:true; waiting for agent to reply via response_url + StreamClosedAt time.Time // set when StreamClosed becomes true; used for accelerated cleanup + Finished bool // fully done + mu sync.Mutex + answerCh chan string // receives agent reply from Send() + ctx context.Context // canceled when task is removed; used to interrupt the agent goroutine + cancel context.CancelFunc // call on task removal to cancel ctx } // WeComAIBotMessage represents the decrypted JSON message from WeCom AI Bot @@ -781,6 +782,7 @@ func (c *WeComAIBotChannel) getStreamResponse(task *streamTask, timestamp, nonce // Only mark stream as closed; keep in chatTasks for Send() to find. task.mu.Lock() task.StreamClosed = true + task.StreamClosedAt = time.Now() task.mu.Unlock() // Remove from streamTasks (no more stream polls expected). c.taskMu.Lock() @@ -1117,13 +1119,24 @@ func (c *WeComAIBotChannel) cleanupLoop() { } } -// cleanupOldTasks removes tasks that have been alive longer than 1 hour -// (response_url validity window), which is the absolute maximum lifetime of any task. +// cleanupOldTasks removes tasks that have exceeded their expected lifetime: +// - Active tasks (in streamTasks): cleaned up after 1 hour (response_url validity window). +// - StreamClosed tasks (in chatTasks only): cleaned up after streamClosedGracePeriod. +// These tasks are waiting for the agent to call Send() via response_url. If the agent +// crashes or times out without calling Send(), we must not let them accumulate indefinitely. +// The grace period is generous enough to cover typical LLM latency but far shorter than 1 hour, +// preventing chatTasks from filling up when many requests time out in quick succession. +const ( + streamClosedGracePeriod = 10 * time.Minute // max wait for agent after stream closes + taskMaxLifetime = 1 * time.Hour // absolute max (≈ response_url validity) +) + func (c *WeComAIBotChannel) cleanupOldTasks() { c.taskMu.Lock() defer c.taskMu.Unlock() - cutoff := time.Now().Add(-1 * time.Hour) + now := time.Now() + cutoff := now.Add(-taskMaxLifetime) for id, task := range c.streamTasks { if task.CreatedTime.Before(cutoff) { delete(c.streamTasks, id) @@ -1143,12 +1156,19 @@ func (c *WeComAIBotChannel) cleanupOldTasks() { }) } } - // Also clean up StreamClosed tasks from chatTasks that are older than 1 hour. - // These were removed from streamTasks earlier but kept alive for response_url delivery. + // Clean up StreamClosed tasks from chatTasks. + // Two expiry conditions are checked: + // 1. Absolute expiry: task was created more than taskMaxLifetime ago. + // 2. Grace expiry: stream closed more than streamClosedGracePeriod ago + // (agent had enough time to reply; it is not coming back). for chatID, queue := range c.chatTasks { filtered := queue[:0] for _, t := range queue { - if !t.Finished && t.CreatedTime.After(cutoff) { + absoluteExpired := t.CreatedTime.Before(cutoff) + graceExpired := t.StreamClosed && + !t.StreamClosedAt.IsZero() && + t.StreamClosedAt.Before(now.Add(-streamClosedGracePeriod)) + if !t.Finished && !absoluteExpired && !graceExpired { filtered = append(filtered, t) } else if !t.Finished { t.cancel() // cancel any lingering agent goroutine