diff --git a/docs/channels/wecom/wecom_aibot/README.zh.md b/docs/channels/wecom/wecom_aibot/README.zh.md index 200a83a69..8470fe16f 100644 --- a/docs/channels/wecom/wecom_aibot/README.zh.md +++ b/docs/channels/wecom/wecom_aibot/README.zh.md @@ -75,9 +75,9 @@ PicoClaw 立即返回 {finish: false}(Agent 开始处理) └─ Agent 完成 → 返回 {finish: true, content: "回答内容"} ``` -**超时处理**(任务超过 5 分 30 秒): +**超时处理**(任务超过 30 秒): -若 Agent 处理时间超过约 5 分 30 秒(企业微信最大轮询窗口为 6 分钟),PicoClaw 会: +若 Agent 处理时间超过约 30 秒(企业微信最大轮询窗口为 6 分钟),PicoClaw 会: 1. 立即关闭流,向用户显示「⏳ 正在处理中,请稍候,结果将稍后发送。」 2. Agent 继续在后台运行 diff --git a/pkg/channels/wecom/aibot.go b/pkg/channels/wecom/aibot.go index de1a50c4a..2962623e1 100644 --- a/pkg/channels/wecom/aibot.go +++ b/pkg/channels/wecom/aibot.go @@ -37,21 +37,28 @@ type WeComAIBotChannel struct { taskMu sync.RWMutex } -// streamTask represents a streaming task for AI Bot +// streamTask represents a streaming task for AI Bot. +// +// Mutable fields (Finished, StreamClosed, StreamClosedAt) must be read/written +// while holding WeComAIBotChannel.taskMu. Immutable fields (StreamID, ChatID, +// ResponseURL, Question, CreatedTime, Deadline, answerCh, ctx, cancel) are set +// once at creation and never modified, so they are safe to read without a lock. type streamTask struct { - StreamID string - ChatID string // used by Send() to find this task - ResponseURL string // temporary URL for proactive reply (valid 1 hour, use once) - Question string - CreatedTime time.Time - Deadline time.Time // ~30s, we close the stream here and switch to response_url + // immutable after creation + StreamID string + ChatID string // used by Send() to find this task + ResponseURL string // temporary URL for proactive reply (valid 1 hour, use once) + Question string + CreatedTime time.Time + Deadline time.Time // ~30s, we close the stream here and switch to response_url + answerCh chan string // receives agent reply from Send() + ctx context.Context // canceled when task is removed; used to interrupt the agent goroutine + cancel context.CancelFunc // call on task removal to cancel ctx + + // mutable — guarded by WeComAIBotChannel.taskMu StreamClosed bool // stream returned finish:true; waiting for agent to reply via response_url StreamClosedAt time.Time // set when StreamClosed becomes true; used for accelerated cleanup Finished bool // fully done - mu sync.Mutex - answerCh chan string // receives agent reply from Send() - ctx context.Context // canceled when task is removed; used to interrupt the agent goroutine - cancel context.CancelFunc // call on task removal to cancel ctx } // WeComAIBotMessage represents the decrypted JSON message from WeCom AI Bot @@ -194,8 +201,13 @@ func (c *WeComAIBotChannel) Send(ctx context.Context, msg bus.OutboundMessage) e } c.chatTasks[msg.ChatID] = queue var task *streamTask + var streamClosed bool + var responseURL string if len(queue) > 0 { task = queue[0] + // Read mutable fields while holding c.taskMu to avoid data races. + streamClosed = task.StreamClosed + responseURL = task.ResponseURL } c.taskMu.Unlock() @@ -210,13 +222,9 @@ func (c *WeComAIBotChannel) Send(ctx context.Context, msg bus.OutboundMessage) e return nil } - task.mu.Lock() - streamClosed := task.StreamClosed - responseURL := task.ResponseURL - task.mu.Unlock() - if streamClosed { // Stream already ended with a "please wait" notice; send the real reply via response_url. + // Note: task.StreamID and task.ChatID are immutable, safe to read without a lock. logger.InfoCF("wecom_aibot", "Sending reply via response_url", map[string]any{ "stream_id": task.StreamID, "chat_id": msg.ChatID, @@ -779,13 +787,11 @@ func (c *WeComAIBotChannel) getStreamResponse(task *streamTask, timestamp, nonce // Normal finish: remove from all maps. c.removeTask(task) } else if closeStreamOnly { - // Only mark stream as closed; keep in chatTasks for Send() to find. - task.mu.Lock() + // Mark stream as closed and remove from streamTasks under a single lock + // to keep StreamClosed/StreamClosedAt consistent with map membership. + c.taskMu.Lock() task.StreamClosed = true task.StreamClosedAt = time.Now() - task.mu.Unlock() - // Remove from streamTasks (no more stream polls expected). - c.taskMu.Lock() delete(c.streamTasks, task.StreamID) c.taskMu.Unlock() } @@ -816,12 +822,12 @@ func (c *WeComAIBotChannel) getStreamResponse(task *streamTask, timestamp, nonce // removeTask removes a task from both streamTasks and chatTasks, marks it finished, // and cancels its context to interrupt the associated agent goroutine. func (c *WeComAIBotChannel) removeTask(task *streamTask) { - task.mu.Lock() - task.Finished = true - task.mu.Unlock() - task.cancel() // interrupt agent goroutine bound to this task + // Cancel first so the agent goroutine stops as soon as possible, + // before we acquire the write lock. + task.cancel() c.taskMu.Lock() + task.Finished = true // written under c.taskMu, consistent with all readers delete(c.streamTasks, task.StreamID) queue := c.chatTasks[task.ChatID] for i, t := range queue {