feat(wecom-aibot): enhance stream task management with StreamClosedAt and improved cleanup logic

This commit is contained in:
Zhang Rui
2026-02-28 15:45:32 +08:00
parent 4e09c91dda
commit a87e6b0551
+38 -18
View File
@@ -39,18 +39,19 @@ type WeComAIBotChannel struct {
// streamTask represents a streaming task for AI Bot
type streamTask struct {
StreamID string
ChatID string // used by Send() to find this task
ResponseURL string // temporary URL for proactive reply (valid 1 hour, use once)
Question string
CreatedTime time.Time
Deadline time.Time // ~30s, we close the stream here and switch to response_url
StreamClosed bool // stream returned finish:true; waiting for agent to reply via response_url
Finished bool // fully done
mu sync.Mutex
answerCh chan string // receives agent reply from Send()
ctx context.Context // canceled when task is removed; used to interrupt the agent goroutine
cancel context.CancelFunc // call on task removal to cancel ctx
StreamID string
ChatID string // used by Send() to find this task
ResponseURL string // temporary URL for proactive reply (valid 1 hour, use once)
Question string
CreatedTime time.Time
Deadline time.Time // ~30s, we close the stream here and switch to response_url
StreamClosed bool // stream returned finish:true; waiting for agent to reply via response_url
StreamClosedAt time.Time // set when StreamClosed becomes true; used for accelerated cleanup
Finished bool // fully done
mu sync.Mutex
answerCh chan string // receives agent reply from Send()
ctx context.Context // canceled when task is removed; used to interrupt the agent goroutine
cancel context.CancelFunc // call on task removal to cancel ctx
}
// WeComAIBotMessage represents the decrypted JSON message from WeCom AI Bot
@@ -781,6 +782,7 @@ func (c *WeComAIBotChannel) getStreamResponse(task *streamTask, timestamp, nonce
// Only mark stream as closed; keep in chatTasks for Send() to find.
task.mu.Lock()
task.StreamClosed = true
task.StreamClosedAt = time.Now()
task.mu.Unlock()
// Remove from streamTasks (no more stream polls expected).
c.taskMu.Lock()
@@ -1117,13 +1119,24 @@ func (c *WeComAIBotChannel) cleanupLoop() {
}
}
// cleanupOldTasks removes tasks that have been alive longer than 1 hour
// (response_url validity window), which is the absolute maximum lifetime of any task.
// cleanupOldTasks removes tasks that have exceeded their expected lifetime:
// - Active tasks (in streamTasks): cleaned up after 1 hour (response_url validity window).
// - StreamClosed tasks (in chatTasks only): cleaned up after streamClosedGracePeriod.
// These tasks are waiting for the agent to call Send() via response_url. If the agent
// crashes or times out without calling Send(), we must not let them accumulate indefinitely.
// The grace period is generous enough to cover typical LLM latency but far shorter than 1 hour,
// preventing chatTasks from filling up when many requests time out in quick succession.
const (
streamClosedGracePeriod = 10 * time.Minute // max wait for agent after stream closes
taskMaxLifetime = 1 * time.Hour // absolute max (≈ response_url validity)
)
func (c *WeComAIBotChannel) cleanupOldTasks() {
c.taskMu.Lock()
defer c.taskMu.Unlock()
cutoff := time.Now().Add(-1 * time.Hour)
now := time.Now()
cutoff := now.Add(-taskMaxLifetime)
for id, task := range c.streamTasks {
if task.CreatedTime.Before(cutoff) {
delete(c.streamTasks, id)
@@ -1143,12 +1156,19 @@ func (c *WeComAIBotChannel) cleanupOldTasks() {
})
}
}
// Also clean up StreamClosed tasks from chatTasks that are older than 1 hour.
// These were removed from streamTasks earlier but kept alive for response_url delivery.
// Clean up StreamClosed tasks from chatTasks.
// Two expiry conditions are checked:
// 1. Absolute expiry: task was created more than taskMaxLifetime ago.
// 2. Grace expiry: stream closed more than streamClosedGracePeriod ago
// (agent had enough time to reply; it is not coming back).
for chatID, queue := range c.chatTasks {
filtered := queue[:0]
for _, t := range queue {
if !t.Finished && t.CreatedTime.After(cutoff) {
absoluteExpired := t.CreatedTime.Before(cutoff)
graceExpired := t.StreamClosed &&
!t.StreamClosedAt.IsZero() &&
t.StreamClosedAt.Before(now.Add(-streamClosedGracePeriod))
if !t.Finished && !absoluteExpired && !graceExpired {
filtered = append(filtered, t)
} else if !t.Finished {
t.cancel() // cancel any lingering agent goroutine