diff --git a/pkg/channels/discord.go b/pkg/channels/discord.go index 472b51c53..9ddec662c 100644 --- a/pkg/channels/discord.go +++ b/pkg/channels/discord.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "sync" "time" "github.com/bwmarrin/discordgo" @@ -25,6 +26,8 @@ type DiscordChannel struct { config config.DiscordConfig transcriber *voice.GroqTranscriber ctx context.Context + typingMu sync.Mutex + typingStop map[string]chan struct{} // chatID → stop signal } func NewDiscordChannel(cfg config.DiscordConfig, bus *bus.MessageBus) (*DiscordChannel, error) { @@ -41,6 +44,7 @@ func NewDiscordChannel(cfg config.DiscordConfig, bus *bus.MessageBus) (*DiscordC config: cfg, transcriber: nil, ctx: context.Background(), + typingStop: make(map[string]chan struct{}), }, nil } @@ -83,6 +87,14 @@ func (c *DiscordChannel) Stop(ctx context.Context) error { logger.InfoC("discord", "Stopping Discord bot") c.setRunning(false) + // Stop all typing goroutines before closing session + c.typingMu.Lock() + for chatID, stop := range c.typingStop { + close(stop) + delete(c.typingStop, chatID) + } + c.typingMu.Unlock() + if err := c.session.Close(); err != nil { return fmt.Errorf("failed to close discord session: %w", err) } @@ -91,6 +103,8 @@ func (c *DiscordChannel) Stop(ctx context.Context) error { } func (c *DiscordChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { + c.stopTyping(msg.ChatID) + if !c.IsRunning() { return fmt.Errorf("discord bot not running") } @@ -155,12 +169,6 @@ func (c *DiscordChannel) handleMessage(s *discordgo.Session, m *discordgo.Messag return } - if err := c.session.ChannelTyping(m.ChannelID); err != nil { - logger.ErrorCF("discord", "Failed to send typing indicator", map[string]any{ - "error": err.Error(), - }) - } - // 检查白名单,避免为被拒绝的用户下载附件和转录 if !c.IsAllowed(m.Author.ID) { logger.DebugCF("discord", "Message rejected by allowlist", map[string]any{ @@ -243,6 +251,9 @@ func (c *DiscordChannel) handleMessage(s *discordgo.Session, m *discordgo.Messag content = "[media only]" } + // Start typing after all early returns — guaranteed to have a matching Send() + c.startTyping(m.ChannelID) + logger.DebugCF("discord", "Received message", map[string]any{ "sender_name": senderName, "sender_id": senderID, @@ -271,6 +282,52 @@ func (c *DiscordChannel) handleMessage(s *discordgo.Session, m *discordgo.Messag c.HandleMessage(senderID, m.ChannelID, content, mediaPaths, metadata) } +// startTyping starts a continuous typing indicator loop for the given chatID. +// It stops any existing typing loop for that chatID before starting a new one. +func (c *DiscordChannel) startTyping(chatID string) { + c.typingMu.Lock() + // Stop existing loop for this chatID if any + if stop, ok := c.typingStop[chatID]; ok { + close(stop) + } + stop := make(chan struct{}) + c.typingStop[chatID] = stop + c.typingMu.Unlock() + + go func() { + if err := c.session.ChannelTyping(chatID); err != nil { + logger.DebugCF("discord", "ChannelTyping error", map[string]interface{}{"chatID": chatID, "err": err}) + } + ticker := time.NewTicker(8 * time.Second) + defer ticker.Stop() + timeout := time.After(5 * time.Minute) + for { + select { + case <-stop: + return + case <-timeout: + return + case <-c.ctx.Done(): + return + case <-ticker.C: + if err := c.session.ChannelTyping(chatID); err != nil { + logger.DebugCF("discord", "ChannelTyping error", map[string]interface{}{"chatID": chatID, "err": err}) + } + } + } + }() +} + +// stopTyping stops the typing indicator loop for the given chatID. +func (c *DiscordChannel) stopTyping(chatID string) { + c.typingMu.Lock() + defer c.typingMu.Unlock() + if stop, ok := c.typingStop[chatID]; ok { + close(stop) + delete(c.typingStop, chatID) + } +} + func (c *DiscordChannel) downloadAttachment(url, filename string) string { return utils.DownloadFile(url, filename, utils.DownloadOptions{ LoggerPrefix: "discord",