mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
fix: keep Discord typing indicator alive during agent processing (#391)
* fix: keep Discord typing indicator alive during agent processing Discord's ChannelTyping() expires after ~10s, but agent processing (LLM + tool execution) typically takes 30-60s+. Replace single-fire ChannelTyping() with a self-managed typing loop inside DiscordChannel. - startTyping(chatID): goroutine refreshes ChannelTyping every 8s - stopTyping(chatID): called in Send() when response is dispatched - Stop() cleans up all typing goroutines on shutdown - startTyping placed after all early returns to prevent goroutine leaks Typing lifecycle fully contained in channel layer, no interface changes. Fixes #390 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: add goroutine safety to Discord typing indicator - Add 5-minute timeout as safety net to prevent indefinite goroutine leaks when agent produces no outbound message (empty response, panic, etc.) - Listen on c.ctx.Done() so goroutine exits when channel context is cancelled - Log ChannelTyping() errors at debug level for diagnostics (rate limits, session closed) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
+63
-6
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/bwmarrin/discordgo"
|
||||
@@ -25,6 +26,8 @@ type DiscordChannel struct {
|
||||
config config.DiscordConfig
|
||||
transcriber *voice.GroqTranscriber
|
||||
ctx context.Context
|
||||
typingMu sync.Mutex
|
||||
typingStop map[string]chan struct{} // chatID → stop signal
|
||||
}
|
||||
|
||||
func NewDiscordChannel(cfg config.DiscordConfig, bus *bus.MessageBus) (*DiscordChannel, error) {
|
||||
@@ -41,6 +44,7 @@ func NewDiscordChannel(cfg config.DiscordConfig, bus *bus.MessageBus) (*DiscordC
|
||||
config: cfg,
|
||||
transcriber: nil,
|
||||
ctx: context.Background(),
|
||||
typingStop: make(map[string]chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -83,6 +87,14 @@ func (c *DiscordChannel) Stop(ctx context.Context) error {
|
||||
logger.InfoC("discord", "Stopping Discord bot")
|
||||
c.setRunning(false)
|
||||
|
||||
// Stop all typing goroutines before closing session
|
||||
c.typingMu.Lock()
|
||||
for chatID, stop := range c.typingStop {
|
||||
close(stop)
|
||||
delete(c.typingStop, chatID)
|
||||
}
|
||||
c.typingMu.Unlock()
|
||||
|
||||
if err := c.session.Close(); err != nil {
|
||||
return fmt.Errorf("failed to close discord session: %w", err)
|
||||
}
|
||||
@@ -91,6 +103,8 @@ func (c *DiscordChannel) Stop(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (c *DiscordChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
|
||||
c.stopTyping(msg.ChatID)
|
||||
|
||||
if !c.IsRunning() {
|
||||
return fmt.Errorf("discord bot not running")
|
||||
}
|
||||
@@ -155,12 +169,6 @@ func (c *DiscordChannel) handleMessage(s *discordgo.Session, m *discordgo.Messag
|
||||
return
|
||||
}
|
||||
|
||||
if err := c.session.ChannelTyping(m.ChannelID); err != nil {
|
||||
logger.ErrorCF("discord", "Failed to send typing indicator", map[string]any{
|
||||
"error": err.Error(),
|
||||
})
|
||||
}
|
||||
|
||||
// 检查白名单,避免为被拒绝的用户下载附件和转录
|
||||
if !c.IsAllowed(m.Author.ID) {
|
||||
logger.DebugCF("discord", "Message rejected by allowlist", map[string]any{
|
||||
@@ -243,6 +251,9 @@ func (c *DiscordChannel) handleMessage(s *discordgo.Session, m *discordgo.Messag
|
||||
content = "[media only]"
|
||||
}
|
||||
|
||||
// Start typing after all early returns — guaranteed to have a matching Send()
|
||||
c.startTyping(m.ChannelID)
|
||||
|
||||
logger.DebugCF("discord", "Received message", map[string]any{
|
||||
"sender_name": senderName,
|
||||
"sender_id": senderID,
|
||||
@@ -271,6 +282,52 @@ func (c *DiscordChannel) handleMessage(s *discordgo.Session, m *discordgo.Messag
|
||||
c.HandleMessage(senderID, m.ChannelID, content, mediaPaths, metadata)
|
||||
}
|
||||
|
||||
// startTyping starts a continuous typing indicator loop for the given chatID.
|
||||
// It stops any existing typing loop for that chatID before starting a new one.
|
||||
func (c *DiscordChannel) startTyping(chatID string) {
|
||||
c.typingMu.Lock()
|
||||
// Stop existing loop for this chatID if any
|
||||
if stop, ok := c.typingStop[chatID]; ok {
|
||||
close(stop)
|
||||
}
|
||||
stop := make(chan struct{})
|
||||
c.typingStop[chatID] = stop
|
||||
c.typingMu.Unlock()
|
||||
|
||||
go func() {
|
||||
if err := c.session.ChannelTyping(chatID); err != nil {
|
||||
logger.DebugCF("discord", "ChannelTyping error", map[string]interface{}{"chatID": chatID, "err": err})
|
||||
}
|
||||
ticker := time.NewTicker(8 * time.Second)
|
||||
defer ticker.Stop()
|
||||
timeout := time.After(5 * time.Minute)
|
||||
for {
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
case <-timeout:
|
||||
return
|
||||
case <-c.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := c.session.ChannelTyping(chatID); err != nil {
|
||||
logger.DebugCF("discord", "ChannelTyping error", map[string]interface{}{"chatID": chatID, "err": err})
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// stopTyping stops the typing indicator loop for the given chatID.
|
||||
func (c *DiscordChannel) stopTyping(chatID string) {
|
||||
c.typingMu.Lock()
|
||||
defer c.typingMu.Unlock()
|
||||
if stop, ok := c.typingStop[chatID]; ok {
|
||||
close(stop)
|
||||
delete(c.typingStop, chatID)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *DiscordChannel) downloadAttachment(url, filename string) string {
|
||||
return utils.DownloadFile(url, filename, utils.DownloadOptions{
|
||||
LoggerPrefix: "discord",
|
||||
|
||||
Reference in New Issue
Block a user