mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
213274002a
* fix: keep Discord typing indicator alive during agent processing Discord's ChannelTyping() expires after ~10s, but agent processing (LLM + tool execution) typically takes 30-60s+. Replace single-fire ChannelTyping() with a self-managed typing loop inside DiscordChannel. - startTyping(chatID): goroutine refreshes ChannelTyping every 8s - stopTyping(chatID): called in Send() when response is dispatched - Stop() cleans up all typing goroutines on shutdown - startTyping placed after all early returns to prevent goroutine leaks Typing lifecycle fully contained in channel layer, no interface changes. Fixes #390 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: add goroutine safety to Discord typing indicator - Add 5-minute timeout as safety net to prevent indefinite goroutine leaks when agent produces no outbound message (empty response, panic, etc.) - Listen on c.ctx.Done() so goroutine exits when channel context is cancelled - Log ChannelTyping() errors at debug level for diagnostics (rate limits, session closed) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
336 lines
8.5 KiB
Go
336 lines
8.5 KiB
Go
package channels
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/bwmarrin/discordgo"
|
|
"github.com/sipeed/picoclaw/pkg/bus"
|
|
"github.com/sipeed/picoclaw/pkg/config"
|
|
"github.com/sipeed/picoclaw/pkg/logger"
|
|
"github.com/sipeed/picoclaw/pkg/utils"
|
|
"github.com/sipeed/picoclaw/pkg/voice"
|
|
)
|
|
|
|
const (
|
|
transcriptionTimeout = 30 * time.Second
|
|
sendTimeout = 10 * time.Second
|
|
)
|
|
|
|
type DiscordChannel struct {
|
|
*BaseChannel
|
|
session *discordgo.Session
|
|
config config.DiscordConfig
|
|
transcriber *voice.GroqTranscriber
|
|
ctx context.Context
|
|
typingMu sync.Mutex
|
|
typingStop map[string]chan struct{} // chatID → stop signal
|
|
}
|
|
|
|
func NewDiscordChannel(cfg config.DiscordConfig, bus *bus.MessageBus) (*DiscordChannel, error) {
|
|
session, err := discordgo.New("Bot " + cfg.Token)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create discord session: %w", err)
|
|
}
|
|
|
|
base := NewBaseChannel("discord", cfg, bus, cfg.AllowFrom)
|
|
|
|
return &DiscordChannel{
|
|
BaseChannel: base,
|
|
session: session,
|
|
config: cfg,
|
|
transcriber: nil,
|
|
ctx: context.Background(),
|
|
typingStop: make(map[string]chan struct{}),
|
|
}, nil
|
|
}
|
|
|
|
func (c *DiscordChannel) SetTranscriber(transcriber *voice.GroqTranscriber) {
|
|
c.transcriber = transcriber
|
|
}
|
|
|
|
func (c *DiscordChannel) getContext() context.Context {
|
|
if c.ctx == nil {
|
|
return context.Background()
|
|
}
|
|
return c.ctx
|
|
}
|
|
|
|
func (c *DiscordChannel) Start(ctx context.Context) error {
|
|
logger.InfoC("discord", "Starting Discord bot")
|
|
|
|
c.ctx = ctx
|
|
c.session.AddHandler(c.handleMessage)
|
|
|
|
if err := c.session.Open(); err != nil {
|
|
return fmt.Errorf("failed to open discord session: %w", err)
|
|
}
|
|
|
|
c.setRunning(true)
|
|
|
|
botUser, err := c.session.User("@me")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get bot user: %w", err)
|
|
}
|
|
logger.InfoCF("discord", "Discord bot connected", map[string]any{
|
|
"username": botUser.Username,
|
|
"user_id": botUser.ID,
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *DiscordChannel) Stop(ctx context.Context) error {
|
|
logger.InfoC("discord", "Stopping Discord bot")
|
|
c.setRunning(false)
|
|
|
|
// Stop all typing goroutines before closing session
|
|
c.typingMu.Lock()
|
|
for chatID, stop := range c.typingStop {
|
|
close(stop)
|
|
delete(c.typingStop, chatID)
|
|
}
|
|
c.typingMu.Unlock()
|
|
|
|
if err := c.session.Close(); err != nil {
|
|
return fmt.Errorf("failed to close discord session: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *DiscordChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
|
|
c.stopTyping(msg.ChatID)
|
|
|
|
if !c.IsRunning() {
|
|
return fmt.Errorf("discord bot not running")
|
|
}
|
|
|
|
channelID := msg.ChatID
|
|
if channelID == "" {
|
|
return fmt.Errorf("channel ID is empty")
|
|
}
|
|
|
|
runes := []rune(msg.Content)
|
|
if len(runes) == 0 {
|
|
return nil
|
|
}
|
|
|
|
chunks := utils.SplitMessage(msg.Content, 2000) // Split messages into chunks, Discord length limit: 2000 chars
|
|
|
|
for _, chunk := range chunks {
|
|
if err := c.sendChunk(ctx, channelID, chunk); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *DiscordChannel) sendChunk(ctx context.Context, channelID, content string) error {
|
|
// 使用传入的 ctx 进行超时控制
|
|
sendCtx, cancel := context.WithTimeout(ctx, sendTimeout)
|
|
defer cancel()
|
|
|
|
done := make(chan error, 1)
|
|
go func() {
|
|
_, err := c.session.ChannelMessageSend(channelID, content)
|
|
done <- err
|
|
}()
|
|
|
|
select {
|
|
case err := <-done:
|
|
if err != nil {
|
|
return fmt.Errorf("failed to send discord message: %w", err)
|
|
}
|
|
return nil
|
|
case <-sendCtx.Done():
|
|
return fmt.Errorf("send message timeout: %w", sendCtx.Err())
|
|
}
|
|
}
|
|
|
|
// appendContent 安全地追加内容到现有文本
|
|
func appendContent(content, suffix string) string {
|
|
if content == "" {
|
|
return suffix
|
|
}
|
|
return content + "\n" + suffix
|
|
}
|
|
|
|
func (c *DiscordChannel) handleMessage(s *discordgo.Session, m *discordgo.MessageCreate) {
|
|
if m == nil || m.Author == nil {
|
|
return
|
|
}
|
|
|
|
if m.Author.ID == s.State.User.ID {
|
|
return
|
|
}
|
|
|
|
// 检查白名单,避免为被拒绝的用户下载附件和转录
|
|
if !c.IsAllowed(m.Author.ID) {
|
|
logger.DebugCF("discord", "Message rejected by allowlist", map[string]any{
|
|
"user_id": m.Author.ID,
|
|
})
|
|
return
|
|
}
|
|
|
|
senderID := m.Author.ID
|
|
senderName := m.Author.Username
|
|
if m.Author.Discriminator != "" && m.Author.Discriminator != "0" {
|
|
senderName += "#" + m.Author.Discriminator
|
|
}
|
|
|
|
content := m.Content
|
|
mediaPaths := make([]string, 0, len(m.Attachments))
|
|
localFiles := make([]string, 0, len(m.Attachments))
|
|
|
|
// 确保临时文件在函数返回时被清理
|
|
defer func() {
|
|
for _, file := range localFiles {
|
|
if err := os.Remove(file); err != nil {
|
|
logger.DebugCF("discord", "Failed to cleanup temp file", map[string]any{
|
|
"file": file,
|
|
"error": err.Error(),
|
|
})
|
|
}
|
|
}
|
|
}()
|
|
|
|
for _, attachment := range m.Attachments {
|
|
isAudio := utils.IsAudioFile(attachment.Filename, attachment.ContentType)
|
|
|
|
if isAudio {
|
|
localPath := c.downloadAttachment(attachment.URL, attachment.Filename)
|
|
if localPath != "" {
|
|
localFiles = append(localFiles, localPath)
|
|
|
|
transcribedText := ""
|
|
if c.transcriber != nil && c.transcriber.IsAvailable() {
|
|
ctx, cancel := context.WithTimeout(c.getContext(), transcriptionTimeout)
|
|
result, err := c.transcriber.Transcribe(ctx, localPath)
|
|
cancel() // 立即释放context资源,避免在for循环中泄漏
|
|
|
|
if err != nil {
|
|
logger.ErrorCF("discord", "Voice transcription failed", map[string]any{
|
|
"error": err.Error(),
|
|
})
|
|
transcribedText = fmt.Sprintf("[audio: %s (transcription failed)]", attachment.Filename)
|
|
} else {
|
|
transcribedText = fmt.Sprintf("[audio transcription: %s]", result.Text)
|
|
logger.DebugCF("discord", "Audio transcribed successfully", map[string]any{
|
|
"text": result.Text,
|
|
})
|
|
}
|
|
} else {
|
|
transcribedText = fmt.Sprintf("[audio: %s]", attachment.Filename)
|
|
}
|
|
|
|
content = appendContent(content, transcribedText)
|
|
} else {
|
|
logger.WarnCF("discord", "Failed to download audio attachment", map[string]any{
|
|
"url": attachment.URL,
|
|
"filename": attachment.Filename,
|
|
})
|
|
mediaPaths = append(mediaPaths, attachment.URL)
|
|
content = appendContent(content, fmt.Sprintf("[attachment: %s]", attachment.URL))
|
|
}
|
|
} else {
|
|
mediaPaths = append(mediaPaths, attachment.URL)
|
|
content = appendContent(content, fmt.Sprintf("[attachment: %s]", attachment.URL))
|
|
}
|
|
}
|
|
|
|
if content == "" && len(mediaPaths) == 0 {
|
|
return
|
|
}
|
|
|
|
if content == "" {
|
|
content = "[media only]"
|
|
}
|
|
|
|
// Start typing after all early returns — guaranteed to have a matching Send()
|
|
c.startTyping(m.ChannelID)
|
|
|
|
logger.DebugCF("discord", "Received message", map[string]any{
|
|
"sender_name": senderName,
|
|
"sender_id": senderID,
|
|
"preview": utils.Truncate(content, 50),
|
|
})
|
|
|
|
peerKind := "channel"
|
|
peerID := m.ChannelID
|
|
if m.GuildID == "" {
|
|
peerKind = "direct"
|
|
peerID = senderID
|
|
}
|
|
|
|
metadata := map[string]string{
|
|
"message_id": m.ID,
|
|
"user_id": senderID,
|
|
"username": m.Author.Username,
|
|
"display_name": senderName,
|
|
"guild_id": m.GuildID,
|
|
"channel_id": m.ChannelID,
|
|
"is_dm": fmt.Sprintf("%t", m.GuildID == ""),
|
|
"peer_kind": peerKind,
|
|
"peer_id": peerID,
|
|
}
|
|
|
|
c.HandleMessage(senderID, m.ChannelID, content, mediaPaths, metadata)
|
|
}
|
|
|
|
// startTyping starts a continuous typing indicator loop for the given chatID.
|
|
// It stops any existing typing loop for that chatID before starting a new one.
|
|
func (c *DiscordChannel) startTyping(chatID string) {
|
|
c.typingMu.Lock()
|
|
// Stop existing loop for this chatID if any
|
|
if stop, ok := c.typingStop[chatID]; ok {
|
|
close(stop)
|
|
}
|
|
stop := make(chan struct{})
|
|
c.typingStop[chatID] = stop
|
|
c.typingMu.Unlock()
|
|
|
|
go func() {
|
|
if err := c.session.ChannelTyping(chatID); err != nil {
|
|
logger.DebugCF("discord", "ChannelTyping error", map[string]interface{}{"chatID": chatID, "err": err})
|
|
}
|
|
ticker := time.NewTicker(8 * time.Second)
|
|
defer ticker.Stop()
|
|
timeout := time.After(5 * time.Minute)
|
|
for {
|
|
select {
|
|
case <-stop:
|
|
return
|
|
case <-timeout:
|
|
return
|
|
case <-c.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
if err := c.session.ChannelTyping(chatID); err != nil {
|
|
logger.DebugCF("discord", "ChannelTyping error", map[string]interface{}{"chatID": chatID, "err": err})
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// stopTyping stops the typing indicator loop for the given chatID.
|
|
func (c *DiscordChannel) stopTyping(chatID string) {
|
|
c.typingMu.Lock()
|
|
defer c.typingMu.Unlock()
|
|
if stop, ok := c.typingStop[chatID]; ok {
|
|
close(stop)
|
|
delete(c.typingStop, chatID)
|
|
}
|
|
}
|
|
|
|
func (c *DiscordChannel) downloadAttachment(url, filename string) string {
|
|
return utils.DownloadFile(url, filename, utils.DownloadOptions{
|
|
LoggerPrefix: "discord",
|
|
})
|
|
}
|