diff --git a/pkg/agent/loop_test.go b/pkg/agent/loop_test.go index 58149f92c..9513d8aca 100644 --- a/pkg/agent/loop_test.go +++ b/pkg/agent/loop_test.go @@ -25,23 +25,25 @@ import ( type fakeChannel struct{ id string } -func (f *fakeChannel) Name() string { return "fake" } -func (f *fakeChannel) Start(ctx context.Context) error { return nil } -func (f *fakeChannel) Stop(ctx context.Context) error { return nil } -func (f *fakeChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { return nil } -func (f *fakeChannel) IsRunning() bool { return true } -func (f *fakeChannel) IsAllowed(string) bool { return true } -func (f *fakeChannel) IsAllowedSender(sender bus.SenderInfo) bool { return true } -func (f *fakeChannel) ReasoningChannelID() string { return f.id } +func (f *fakeChannel) Name() string { return "fake" } +func (f *fakeChannel) Start(ctx context.Context) error { return nil } +func (f *fakeChannel) Stop(ctx context.Context) error { return nil } +func (f *fakeChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) { + return nil, nil +} +func (f *fakeChannel) IsRunning() bool { return true } +func (f *fakeChannel) IsAllowed(string) bool { return true } +func (f *fakeChannel) IsAllowedSender(sender bus.SenderInfo) bool { return true } +func (f *fakeChannel) ReasoningChannelID() string { return f.id } type fakeMediaChannel struct { fakeChannel sentMedia []bus.OutboundMediaMessage } -func (f *fakeMediaChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error { +func (f *fakeMediaChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) { f.sentMedia = append(f.sentMedia, msg) - return nil + return nil, nil } func newStartedTestChannelManager( diff --git a/pkg/channels/README.md b/pkg/channels/README.md index 7f238ece5..c4d12ef59 100644 --- a/pkg/channels/README.md +++ b/pkg/channels/README.md @@ -252,28 +252,28 @@ func (c *TelegramChannel) Stop(ctx context.Context) error { **3e. Send method error returns** ```go -// Old code: returns plain error +// Old code: returned only error func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { if !c.running { return fmt.Errorf("not running") } // ... if err != nil { return err } } -// New code: must return sentinel errors for Manager to determine retry strategy -func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { +// New code: return delivered message IDs plus sentinel errors +func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning // ← Manager will not retry + return nil, channels.ErrNotRunning // ← Manager will not retry } // ... if err != nil { // Use ClassifySendError to wrap error based on HTTP status code - return channels.ClassifySendError(statusCode, err) + return nil, channels.ClassifySendError(statusCode, err) // Or manually wrap: - // return fmt.Errorf("%w: %v", channels.ErrTemporary, err) - // return fmt.Errorf("%w: %v", channels.ErrRateLimit, err) - // return fmt.Errorf("%w: %v", channels.ErrSendFailed, err) + // return nil, fmt.Errorf("%w: %v", channels.ErrTemporary, err) + // return nil, fmt.Errorf("%w: %v", channels.ErrRateLimit, err) + // return nil, fmt.Errorf("%w: %v", channels.ErrSendFailed, err) } - return nil + return []string{deliveredID}, nil // or return nil, nil if IDs are unavailable } ``` @@ -502,25 +502,25 @@ func (c *MatrixChannel) Stop(ctx context.Context) error { return nil } -func (c *MatrixChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { +func (c *MatrixChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) { // 1. Check running state if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } // 2. Send message to Matrix - err := c.sendToMatrix(ctx, msg.ChatID, msg.Content) + eventID, err := c.sendToMatrix(ctx, msg.ChatID, msg.Content) if err != nil { // 3. Must use error classification wrapping // If you have an HTTP status code: - // return channels.ClassifySendError(statusCode, err) + // return nil, channels.ClassifySendError(statusCode, err) // If it's a network error: - // return channels.ClassifyNetError(err) + // return nil, channels.ClassifyNetError(err) // If manual classification is needed: - return fmt.Errorf("%w: %v", channels.ErrTemporary, err) + return nil, fmt.Errorf("%w: %v", channels.ErrTemporary, err) } - return nil + return []string{eventID}, nil } // ========== Incoming Message Handling ========== @@ -580,9 +580,9 @@ func (c *MatrixChannel) handleIncoming(roomID, senderID, displayName, content st // ========== Internal Methods ========== -func (c *MatrixChannel) sendToMatrix(ctx context.Context, roomID, content string) error { +func (c *MatrixChannel) sendToMatrix(ctx context.Context, roomID, content string) (string, error) { // Actual Matrix SDK call - return nil + return "event-id", nil } ``` @@ -594,16 +594,17 @@ Depending on platform capabilities, your channel can optionally implement the fo ```go // If the platform supports sending images/files/audio/video -func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error { +func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } store := c.GetMediaStore() if store == nil { - return fmt.Errorf("no media store: %w", channels.ErrSendFailed) + return nil, fmt.Errorf("no media store: %w", channels.ErrSendFailed) } + var messageIDs []string for _, part := range msg.Parts { localPath, err := store.Resolve(part.Ref) if err != nil { @@ -620,8 +621,10 @@ func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess default: // Upload file to Matrix } + // Append platform IDs here when the API returns them. + // messageIDs = append(messageIDs, uploadedMessageID) } - return nil + return messageIDs, nil } ``` @@ -1270,7 +1273,7 @@ type Channel interface { Name() string Start(ctx context.Context) error Stop(ctx context.Context) error - Send(ctx context.Context, msg bus.OutboundMessage) error + Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) IsRunning() bool IsAllowed(senderID string) bool IsAllowedSender(sender bus.SenderInfo) bool @@ -1279,7 +1282,7 @@ type Channel interface { // ===== Optional ===== type MediaSender interface { - SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error + SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) } type TypingCapable interface { diff --git a/pkg/channels/README.zh.md b/pkg/channels/README.zh.md index 8bc8c8dbc..3edc5cb6b 100644 --- a/pkg/channels/README.zh.md +++ b/pkg/channels/README.zh.md @@ -252,28 +252,28 @@ func (c *TelegramChannel) Stop(ctx context.Context) error { **3e. Send 方法的错误返回** ```go -// 旧代码:返回普通 error +// 旧代码:只返回 error func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { if !c.running { return fmt.Errorf("not running") } // ... if err != nil { return err } } -// 新代码:必须返回哨兵错误,供 Manager 判断重试策略 -func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { +// 新代码:返回投递后的消息 ID,以及供 Manager 判断重试策略的哨兵错误 +func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning // ← Manager 不会重试 + return nil, channels.ErrNotRunning // ← Manager 不会重试 } // ... if err != nil { // 使用 ClassifySendError 根据 HTTP 状态码包装错误 - return channels.ClassifySendError(statusCode, err) + return nil, channels.ClassifySendError(statusCode, err) // 或手动包装: - // return fmt.Errorf("%w: %v", channels.ErrTemporary, err) - // return fmt.Errorf("%w: %v", channels.ErrRateLimit, err) - // return fmt.Errorf("%w: %v", channels.ErrSendFailed, err) + // return nil, fmt.Errorf("%w: %v", channels.ErrTemporary, err) + // return nil, fmt.Errorf("%w: %v", channels.ErrRateLimit, err) + // return nil, fmt.Errorf("%w: %v", channels.ErrSendFailed, err) } - return nil + return []string{deliveredID}, nil // 如果拿不到 ID,也可以返回 nil, nil } ``` @@ -502,25 +502,25 @@ func (c *MatrixChannel) Stop(ctx context.Context) error { return nil } -func (c *MatrixChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { +func (c *MatrixChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) { // 1. 检查运行状态 if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } // 2. 发送消息到 Matrix - err := c.sendToMatrix(ctx, msg.ChatID, msg.Content) + eventID, err := c.sendToMatrix(ctx, msg.ChatID, msg.Content) if err != nil { // 3. 必须使用错误分类包装 // 如果你有 HTTP 状态码: - // return channels.ClassifySendError(statusCode, err) + // return nil, channels.ClassifySendError(statusCode, err) // 如果是网络错误: - // return channels.ClassifyNetError(err) + // return nil, channels.ClassifyNetError(err) // 如果需要手动分类: - return fmt.Errorf("%w: %v", channels.ErrTemporary, err) + return nil, fmt.Errorf("%w: %v", channels.ErrTemporary, err) } - return nil + return []string{eventID}, nil } // ========== 消息接收处理 ========== @@ -580,9 +580,9 @@ func (c *MatrixChannel) handleIncoming(roomID, senderID, displayName, content st // ========== 内部方法 ========== -func (c *MatrixChannel) sendToMatrix(ctx context.Context, roomID, content string) error { +func (c *MatrixChannel) sendToMatrix(ctx context.Context, roomID, content string) (string, error) { // 实际的 Matrix SDK 调用 - return nil + return "event-id", nil } ``` @@ -594,16 +594,17 @@ func (c *MatrixChannel) sendToMatrix(ctx context.Context, roomID, content string ```go // 如果平台支持发送图片/文件/音频/视频 -func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error { +func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } store := c.GetMediaStore() if store == nil { - return fmt.Errorf("no media store: %w", channels.ErrSendFailed) + return nil, fmt.Errorf("no media store: %w", channels.ErrSendFailed) } + var messageIDs []string for _, part := range msg.Parts { localPath, err := store.Resolve(part.Ref) if err != nil { @@ -620,8 +621,10 @@ func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess default: // 上传文件到 Matrix } + // 如果 API 能返回平台消息 ID,就在这里追加。 + // messageIDs = append(messageIDs, uploadedMessageID) } - return nil + return messageIDs, nil } ``` @@ -1269,7 +1272,7 @@ type Channel interface { Name() string Start(ctx context.Context) error Stop(ctx context.Context) error - Send(ctx context.Context, msg bus.OutboundMessage) error + Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) IsRunning() bool IsAllowed(senderID string) bool IsAllowedSender(sender bus.SenderInfo) bool @@ -1278,7 +1281,7 @@ type Channel interface { // ===== 可选实现 ===== type MediaSender interface { - SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error + SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) } type TypingCapable interface { diff --git a/pkg/channels/base.go b/pkg/channels/base.go index f6ea691e7..bd4ced849 100644 --- a/pkg/channels/base.go +++ b/pkg/channels/base.go @@ -48,7 +48,7 @@ type Channel interface { Name() string Start(ctx context.Context) error Stop(ctx context.Context) error - Send(ctx context.Context, msg bus.OutboundMessage) error + Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) IsRunning() bool IsAllowed(senderID string) bool IsAllowedSender(sender bus.SenderInfo) bool diff --git a/pkg/channels/dingtalk/dingtalk.go b/pkg/channels/dingtalk/dingtalk.go index 450dcce54..04ccec8a2 100644 --- a/pkg/channels/dingtalk/dingtalk.go +++ b/pkg/channels/dingtalk/dingtalk.go @@ -104,20 +104,20 @@ func (c *DingTalkChannel) Stop(ctx context.Context) error { } // Send sends a message to DingTalk via the chatbot reply API -func (c *DingTalkChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { +func (c *DingTalkChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } // Get session webhook from storage sessionWebhookRaw, ok := c.sessionWebhooks.Load(msg.ChatID) if !ok { - return fmt.Errorf("no session_webhook found for chat %s, cannot send message", msg.ChatID) + return nil, fmt.Errorf("no session_webhook found for chat %s, cannot send message", msg.ChatID) } sessionWebhook, ok := sessionWebhookRaw.(string) if !ok { - return fmt.Errorf("invalid session_webhook type for chat %s", msg.ChatID) + return nil, fmt.Errorf("invalid session_webhook type for chat %s", msg.ChatID) } logger.DebugCF("dingtalk", "Sending message", map[string]any{ @@ -126,7 +126,7 @@ func (c *DingTalkChannel) Send(ctx context.Context, msg bus.OutboundMessage) err }) // Use the session webhook to send the reply - return c.SendDirectReply(ctx, sessionWebhook, msg.Content) + return nil, c.SendDirectReply(ctx, sessionWebhook, msg.Content) } // onChatBotMessageReceived implements the IChatBotMessageHandler function signature diff --git a/pkg/channels/discord/discord.go b/pkg/channels/discord/discord.go index cc0ef4ffe..b3070a822 100644 --- a/pkg/channels/discord/discord.go +++ b/pkg/channels/discord/discord.go @@ -128,37 +128,41 @@ func (c *DiscordChannel) Stop(ctx context.Context) error { return nil } -func (c *DiscordChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { +func (c *DiscordChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } channelID := msg.ChatID if channelID == "" { - return fmt.Errorf("channel ID is empty") + return nil, fmt.Errorf("channel ID is empty") } if len([]rune(msg.Content)) == 0 { - return nil + return nil, nil } - return c.sendChunk(ctx, channelID, msg.Content, msg.ReplyToMessageID) + msgID, err := c.sendChunk(ctx, channelID, msg.Content, msg.ReplyToMessageID) + if err != nil { + return nil, err + } + return []string{msgID}, nil } // SendMedia implements the channels.MediaSender interface. -func (c *DiscordChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error { +func (c *DiscordChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } channelID := msg.ChatID if channelID == "" { - return fmt.Errorf("channel ID is empty") + return nil, fmt.Errorf("channel ID is empty") } store := c.GetMediaStore() if store == nil { - return fmt.Errorf("no media store available: %w", channels.ErrSendFailed) + return nil, fmt.Errorf("no media store available: %w", channels.ErrSendFailed) } // Collect all files into a single ChannelMessageSendComplex call @@ -202,33 +206,41 @@ func (c *DiscordChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMes } if len(files) == 0 { - return nil + return nil, nil } sendCtx, cancel := context.WithTimeout(ctx, sendTimeout) defer cancel() - done := make(chan error, 1) + type mediaResult struct { + id string + err error + } + done := make(chan mediaResult, 1) go func() { - _, err := c.session.ChannelMessageSendComplex(channelID, &discordgo.MessageSend{ + sentMsg, err := c.session.ChannelMessageSendComplex(channelID, &discordgo.MessageSend{ Content: caption, Files: files, }) - done <- err + if err != nil { + done <- mediaResult{err: err} + return + } + done <- mediaResult{id: sentMsg.ID} }() select { - case err := <-done: + case r := <-done: // Close all file readers for _, f := range files { if closer, ok := f.Reader.(*os.File); ok { closer.Close() } } - if err != nil { - return fmt.Errorf("discord send media: %w", channels.ErrTemporary) + if r.err != nil { + return nil, fmt.Errorf("discord send media: %w", channels.ErrTemporary) } - return nil + return []string{r.id}, nil case <-sendCtx.Done(): // Close all file readers for _, f := range files { @@ -236,7 +248,7 @@ func (c *DiscordChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMes closer.Close() } } - return sendCtx.Err() + return nil, sendCtx.Err() } } @@ -264,18 +276,25 @@ func (c *DiscordChannel) SendPlaceholder(ctx context.Context, chatID string) (st return msg.ID, nil } -func (c *DiscordChannel) sendChunk(ctx context.Context, channelID, content, replyToID string) error { +func (c *DiscordChannel) sendChunk(ctx context.Context, channelID, content, replyToID string) (string, error) { // Use the passed ctx for timeout control sendCtx, cancel := context.WithTimeout(ctx, sendTimeout) defer cancel() - done := make(chan error, 1) + type result struct { + id string + err error + } + done := make(chan result, 1) go func() { - var err error + var ( + msg *discordgo.Message + err error + ) // If we have an ID, we send the message as "Reply" if replyToID != "" { - _, err = c.session.ChannelMessageSendComplex(channelID, &discordgo.MessageSend{ + msg, err = c.session.ChannelMessageSendComplex(channelID, &discordgo.MessageSend{ Content: content, Reference: &discordgo.MessageReference{ MessageID: replyToID, @@ -284,20 +303,21 @@ func (c *DiscordChannel) sendChunk(ctx context.Context, channelID, content, repl }) } else { // Otherwise, we send a normal message - _, err = c.session.ChannelMessageSend(channelID, content) + msg, err = c.session.ChannelMessageSend(channelID, content) } - done <- err + if err != nil { + done <- result{err: fmt.Errorf("discord send: %w", channels.ErrTemporary)} + return + } + done <- result{id: msg.ID} }() select { - case err := <-done: - if err != nil { - return fmt.Errorf("discord send: %w", channels.ErrTemporary) - } - return nil + case r := <-done: + return r.id, r.err case <-sendCtx.Done(): - return sendCtx.Err() + return "", sendCtx.Err() } } diff --git a/pkg/channels/feishu/feishu_32.go b/pkg/channels/feishu/feishu_32.go index f5e3aa224..f3fe2a6cb 100644 --- a/pkg/channels/feishu/feishu_32.go +++ b/pkg/channels/feishu/feishu_32.go @@ -36,8 +36,8 @@ func (c *FeishuChannel) Stop(ctx context.Context) error { } // Send is a stub method to satisfy the Channel interface -func (c *FeishuChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { - return errUnsupported +func (c *FeishuChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) { + return nil, errUnsupported } // EditMessage is a stub method to satisfy MessageEditor @@ -56,6 +56,6 @@ func (c *FeishuChannel) ReactToMessage(ctx context.Context, chatID, messageID st } // SendMedia is a stub method to satisfy MediaSender -func (c *FeishuChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error { - return errUnsupported +func (c *FeishuChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) { + return nil, errUnsupported } diff --git a/pkg/channels/feishu/feishu_64.go b/pkg/channels/feishu/feishu_64.go index d0c351119..b0b231d09 100644 --- a/pkg/channels/feishu/feishu_64.go +++ b/pkg/channels/feishu/feishu_64.go @@ -131,26 +131,26 @@ func (c *FeishuChannel) Stop(ctx context.Context) error { // Send sends a message using Interactive Card format for markdown rendering. // Falls back to plain text message if card sending fails (e.g., table limit exceeded). -func (c *FeishuChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { +func (c *FeishuChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } if msg.ChatID == "" { - return fmt.Errorf("chat ID is empty: %w", channels.ErrSendFailed) + return nil, fmt.Errorf("chat ID is empty: %w", channels.ErrSendFailed) } // Build interactive card with markdown content cardContent, err := buildMarkdownCard(msg.Content) if err != nil { // If card build fails, fall back to plain text - return c.sendText(ctx, msg.ChatID, msg.Content) + return nil, c.sendText(ctx, msg.ChatID, msg.Content) } // First attempt: try sending as interactive card err = c.sendCard(ctx, msg.ChatID, cardContent) if err == nil { - return nil + return nil, nil } // Check if error is due to card table limit (error code 11310) @@ -167,14 +167,14 @@ func (c *FeishuChannel) Send(ctx context.Context, msg bus.OutboundMessage) error // Second attempt: fall back to plain text message textErr := c.sendText(ctx, msg.ChatID, msg.Content) if textErr == nil { - return nil + return nil, nil } // If text also fails, return the text error - return textErr + return nil, textErr } // For other errors, return the original card error - return err + return nil, err } // EditMessage implements channels.MessageEditor. @@ -310,27 +310,27 @@ func (c *FeishuChannel) ReactToMessage(ctx context.Context, chatID, messageID st // SendMedia implements channels.MediaSender. // Uploads images/files via Feishu API then sends as messages. -func (c *FeishuChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error { +func (c *FeishuChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } if msg.ChatID == "" { - return fmt.Errorf("chat ID is empty: %w", channels.ErrSendFailed) + return nil, fmt.Errorf("chat ID is empty: %w", channels.ErrSendFailed) } store := c.GetMediaStore() if store == nil { - return fmt.Errorf("no media store available: %w", channels.ErrSendFailed) + return nil, fmt.Errorf("no media store available: %w", channels.ErrSendFailed) } for _, part := range msg.Parts { if err := c.sendMediaPart(ctx, msg.ChatID, part, store); err != nil { - return err + return nil, err } } - return nil + return nil, nil } // sendMediaPart resolves and sends a single media part. diff --git a/pkg/channels/irc/irc.go b/pkg/channels/irc/irc.go index 3a4f213ca..e8a70923f 100644 --- a/pkg/channels/irc/irc.go +++ b/pkg/channels/irc/irc.go @@ -130,18 +130,18 @@ func (c *IRCChannel) Stop(ctx context.Context) error { } // Send sends a message to an IRC channel or user. -func (c *IRCChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { +func (c *IRCChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } target := msg.ChatID if target == "" { - return fmt.Errorf("chat ID is empty: %w", channels.ErrSendFailed) + return nil, fmt.Errorf("chat ID is empty: %w", channels.ErrSendFailed) } if strings.TrimSpace(msg.Content) == "" { - return nil + return nil, nil } // Send each line separately (IRC is line-oriented) @@ -158,7 +158,7 @@ func (c *IRCChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { "target": target, "lines": len(lines), }) - return nil + return nil, nil } // StartTyping implements channels.TypingCapable using IRCv3 +typing client tag. diff --git a/pkg/channels/line/line.go b/pkg/channels/line/line.go index 867ab24ee..e29896389 100644 --- a/pkg/channels/line/line.go +++ b/pkg/channels/line/line.go @@ -496,9 +496,9 @@ func (c *LINEChannel) resolveChatID(source lineSource) string { // Send sends a message to LINE. It first tries the Reply API (free) // using a cached reply token, then falls back to the Push API. -func (c *LINEChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { +func (c *LINEChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } // Load and consume quote token for this chat @@ -516,28 +516,28 @@ func (c *LINEChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { "chat_id": msg.ChatID, "quoted": quoteToken != "", }) - return nil + return nil, nil } logger.DebugC("line", "Reply API failed, falling back to Push API") } } // Fall back to Push API - return c.sendPush(ctx, msg.ChatID, msg.Content, quoteToken) + return nil, c.sendPush(ctx, msg.ChatID, msg.Content, quoteToken) } // SendMedia implements the channels.MediaSender interface. // LINE requires media to be accessible via public URL; since we only have local files, // we fall back to sending a text message with the filename/caption. // For full support, an external file hosting service would be needed. -func (c *LINEChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error { +func (c *LINEChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } store := c.GetMediaStore() if store == nil { - return fmt.Errorf("no media store available: %w", channels.ErrSendFailed) + return nil, fmt.Errorf("no media store available: %w", channels.ErrSendFailed) } // LINE Messaging API requires publicly accessible URLs for media messages. @@ -549,11 +549,11 @@ func (c *LINEChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessag } if err := c.sendPush(ctx, msg.ChatID, caption, ""); err != nil { - return err + return nil, err } } - return nil + return nil, nil } // buildTextMessage creates a text message object, optionally with quoteToken. diff --git a/pkg/channels/maixcam/maixcam.go b/pkg/channels/maixcam/maixcam.go index ff9a3ed1a..bbbf2da56 100644 --- a/pkg/channels/maixcam/maixcam.go +++ b/pkg/channels/maixcam/maixcam.go @@ -240,15 +240,15 @@ func (c *MaixCamChannel) Stop(ctx context.Context) error { return nil } -func (c *MaixCamChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { +func (c *MaixCamChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } // Check ctx before entering write path select { case <-ctx.Done(): - return ctx.Err() + return nil, ctx.Err() default: } @@ -257,7 +257,7 @@ func (c *MaixCamChannel) Send(ctx context.Context, msg bus.OutboundMessage) erro if len(c.clients) == 0 { logger.WarnC("maixcam", "No MaixCam devices connected") - return fmt.Errorf("no connected MaixCam devices") + return nil, fmt.Errorf("no connected MaixCam devices") } response := map[string]any{ @@ -269,7 +269,7 @@ func (c *MaixCamChannel) Send(ctx context.Context, msg bus.OutboundMessage) erro data, err := json.Marshal(response) if err != nil { - return fmt.Errorf("failed to marshal response: %w", err) + return nil, fmt.Errorf("failed to marshal response: %w", err) } var sendErr error @@ -285,5 +285,5 @@ func (c *MaixCamChannel) Send(ctx context.Context, msg bus.OutboundMessage) erro _ = conn.SetWriteDeadline(time.Time{}) } - return sendErr + return nil, sendErr } diff --git a/pkg/channels/manager.go b/pkg/channels/manager.go index 4e8074189..5fbf35ebf 100644 --- a/pkg/channels/manager.go +++ b/pkg/channels/manager.go @@ -158,8 +158,8 @@ func (m *Manager) RecordReactionUndo(channel, chatID string, undo func()) { } // preSend handles typing stop, reaction undo, and placeholder editing before sending a message. -// Returns true if the message was already delivered (skip Send). -func (m *Manager) preSend(ctx context.Context, name string, msg bus.OutboundMessage, ch Channel) bool { +// Returns the delivered message IDs and true when delivery completed before a normal Send. +func (m *Manager) preSend(ctx context.Context, name string, msg bus.OutboundMessage, ch Channel) ([]string, bool) { key := name + ":" + msg.ChatID // 1. Stop typing @@ -188,7 +188,7 @@ func (m *Manager) preSend(ctx context.Context, name string, msg bus.OutboundMess } } } - return true + return nil, true } // 4. Try editing placeholder @@ -196,14 +196,14 @@ func (m *Manager) preSend(ctx context.Context, name string, msg bus.OutboundMess if entry, ok := v.(placeholderEntry); ok && entry.id != "" { if editor, ok := ch.(MessageEditor); ok { if err := editor.EditMessage(ctx, msg.ChatID, entry.id, msg.Content); err == nil { - return true // edited successfully, skip Send + return []string{entry.id}, true } // edit failed → fall through to normal Send } } } - return false + return nil, false } // preSendMedia handles typing stop, reaction undo, and placeholder cleanup @@ -699,23 +699,29 @@ func splitByLength(content string, maxLen int) []string { // - ErrNotRunning / ErrSendFailed: permanent, no retry // - ErrRateLimit: fixed delay retry // - ErrTemporary / unknown: exponential backoff retry -func (m *Manager) sendWithRetry(ctx context.Context, name string, w *channelWorker, msg bus.OutboundMessage) { +func (m *Manager) sendWithRetry( + ctx context.Context, + name string, + w *channelWorker, + msg bus.OutboundMessage, +) ([]string, bool) { // Rate limit: wait for token if err := w.limiter.Wait(ctx); err != nil { // ctx canceled, shutting down - return + return nil, false } // Pre-send: stop typing and try to edit placeholder - if m.preSend(ctx, name, msg, w.ch) { - return // placeholder was edited successfully, skip Send + if msgIDs, handled := m.preSend(ctx, name, msg, w.ch); handled { + return msgIDs, true } var lastErr error + var msgIDs []string for attempt := 0; attempt <= maxRetries; attempt++ { - lastErr = w.ch.Send(ctx, msg) + msgIDs, lastErr = w.ch.Send(ctx, msg) if lastErr == nil { - return + return msgIDs, true } // Permanent failures — don't retry @@ -734,7 +740,7 @@ func (m *Manager) sendWithRetry(ctx context.Context, name string, w *channelWork case <-time.After(rateLimitDelay): continue case <-ctx.Done(): - return + return nil, false } } @@ -743,7 +749,7 @@ func (m *Manager) sendWithRetry(ctx context.Context, name string, w *channelWork select { case <-time.After(backoff): case <-ctx.Done(): - return + return nil, false } } @@ -754,6 +760,8 @@ func (m *Manager) sendWithRetry(ctx context.Context, name string, w *channelWork "error": lastErr.Error(), "retries": maxRetries, }) + + return nil, false } func dispatchLoop[M any]( @@ -855,7 +863,7 @@ func (m *Manager) runMediaWorker(ctx context.Context, name string, w *channelWor if !ok { return } - _ = m.sendMediaWithRetry(ctx, name, w, msg) + _, _ = m.sendMediaWithRetry(ctx, name, w, msg) case <-ctx.Done(): return } @@ -863,14 +871,14 @@ func (m *Manager) runMediaWorker(ctx context.Context, name string, w *channelWor } // sendMediaWithRetry sends a media message through the channel with rate limiting and -// retry logic. It returns nil on success, or the last error after retries, -// including when the channel does not support MediaSender. +// retry logic. It returns the message IDs and nil on success, or nil and the last error +// after retries, including when the channel does not support MediaSender. func (m *Manager) sendMediaWithRetry( ctx context.Context, name string, w *channelWorker, msg bus.OutboundMediaMessage, -) error { +) ([]string, error) { ms, ok := w.ch.(MediaSender) if !ok { err := fmt.Errorf("channel %q does not support media sending", name) @@ -878,22 +886,23 @@ func (m *Manager) sendMediaWithRetry( "channel": name, "error": err.Error(), }) - return err + return nil, err } // Rate limit: wait for token if err := w.limiter.Wait(ctx); err != nil { - return err + return nil, err } // Pre-send: stop typing and clean up any placeholder before sending media. m.preSendMedia(ctx, name, msg, w.ch) var lastErr error + var msgIDs []string for attempt := 0; attempt <= maxRetries; attempt++ { - lastErr = ms.SendMedia(ctx, msg) + msgIDs, lastErr = ms.SendMedia(ctx, msg) if lastErr == nil { - return nil + return msgIDs, nil } // Permanent failures — don't retry @@ -912,7 +921,7 @@ func (m *Manager) sendMediaWithRetry( case <-time.After(rateLimitDelay): continue case <-ctx.Done(): - return ctx.Err() + return nil, ctx.Err() } } @@ -921,7 +930,7 @@ func (m *Manager) sendMediaWithRetry( select { case <-time.After(backoff): case <-ctx.Done(): - return ctx.Err() + return nil, ctx.Err() } } @@ -932,7 +941,7 @@ func (m *Manager) sendMediaWithRetry( "error": lastErr.Error(), "retries": maxRetries, }) - return lastErr + return nil, lastErr } // runTTLJanitor periodically scans the typingStops and placeholders maps @@ -1166,7 +1175,8 @@ func (m *Manager) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) e return fmt.Errorf("channel %s has no active worker", msg.Channel) } - return m.sendMediaWithRetry(ctx, msg.Channel, w, msg) + _, err := m.sendMediaWithRetry(ctx, msg.Channel, w, msg) + return err } func (m *Manager) SendToChannel(ctx context.Context, channelName, chatID, content string) error { @@ -1196,5 +1206,6 @@ func (m *Manager) SendToChannel(ctx context.Context, channelName, chatID, conten // Fallback: direct send (should not happen) channel, _ := m.channels[channelName] - return channel.Send(ctx, msg) + _, err := channel.Send(ctx, msg) + return err } diff --git a/pkg/channels/manager_test.go b/pkg/channels/manager_test.go index b4fd2ba3d..e76212905 100644 --- a/pkg/channels/manager_test.go +++ b/pkg/channels/manager_test.go @@ -25,9 +25,12 @@ type mockChannel struct { lastPlaceholderID string } -func (m *mockChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { +func (m *mockChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) { m.sentMessages = append(m.sentMessages, msg) - return m.sendFn(ctx, msg) + if m.sendFn == nil { + return nil, nil + } + return nil, m.sendFn(ctx, msg) } func (m *mockChannel) Start(ctx context.Context) error { return nil } @@ -46,16 +49,16 @@ func (m *mockChannel) EditMessage(ctx context.Context, chatID, messageID, conten type mockMediaChannel struct { mockChannel - sendMediaFn func(ctx context.Context, msg bus.OutboundMediaMessage) error + sendMediaFn func(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) sentMediaMessages []bus.OutboundMediaMessage } -func (m *mockMediaChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error { +func (m *mockMediaChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) { m.sentMediaMessages = append(m.sentMediaMessages, msg) if m.sendMediaFn != nil { return m.sendMediaFn(ctx, msg) } - return nil + return nil, nil } type mockDeletingMediaChannel struct { @@ -247,9 +250,9 @@ func TestSendMedia_Success(t *testing.T) { m := newTestManager() var callCount int ch := &mockMediaChannel{ - sendMediaFn: func(_ context.Context, _ bus.OutboundMediaMessage) error { + sendMediaFn: func(_ context.Context, _ bus.OutboundMediaMessage) ([]string, error) { callCount++ - return nil + return nil, nil }, } w := &channelWorker{ @@ -275,8 +278,8 @@ func TestSendMedia_Success(t *testing.T) { func TestSendMedia_PropagatesFailure(t *testing.T) { m := newTestManager() ch := &mockMediaChannel{ - sendMediaFn: func(_ context.Context, _ bus.OutboundMediaMessage) error { - return fmt.Errorf("bad upload: %w", ErrSendFailed) + sendMediaFn: func(_ context.Context, _ bus.OutboundMediaMessage) ([]string, error) { + return nil, fmt.Errorf("bad upload: %w", ErrSendFailed) }, } w := &channelWorker{ @@ -330,8 +333,8 @@ func TestSendMedia_DeletesPlaceholderBeforeSending(t *testing.T) { m := newTestManager() ch := &mockDeletingMediaChannel{ mockMediaChannel: mockMediaChannel{ - sendMediaFn: func(_ context.Context, _ bus.OutboundMediaMessage) error { - return nil + sendMediaFn: func(_ context.Context, _ bus.OutboundMediaMessage) ([]string, error) { + return nil, nil }, }, } @@ -628,7 +631,7 @@ func TestPreSend_PlaceholderEditSuccess(t *testing.T) { m.RecordPlaceholder("test", "123", "456") msg := bus.OutboundMessage{Channel: "test", ChatID: "123", Content: "hello"} - edited := m.preSend(context.Background(), "test", msg, ch) + _, edited := m.preSend(context.Background(), "test", msg, ch) if !edited { t.Fatal("expected preSend to return true (placeholder edited)") @@ -658,7 +661,7 @@ func TestPreSend_PlaceholderEditFails_FallsThrough(t *testing.T) { m.RecordPlaceholder("test", "123", "456") msg := bus.OutboundMessage{Channel: "test", ChatID: "123", Content: "hello"} - edited := m.preSend(context.Background(), "test", msg, ch) + _, edited := m.preSend(context.Background(), "test", msg, ch) if edited { t.Fatal("expected preSend to return false when edit fails") @@ -734,7 +737,7 @@ func TestPreSend_NoRegisteredState(t *testing.T) { } msg := bus.OutboundMessage{Channel: "test", ChatID: "123", Content: "hello"} - edited := m.preSend(context.Background(), "test", msg, ch) + _, edited := m.preSend(context.Background(), "test", msg, ch) if edited { t.Fatal("expected preSend to return false with no registered state") @@ -764,7 +767,7 @@ func TestPreSend_TypingAndPlaceholder(t *testing.T) { m.RecordPlaceholder("test", "123", "456") msg := bus.OutboundMessage{Channel: "test", ChatID: "123", Content: "hello"} - edited := m.preSend(context.Background(), "test", msg, ch) + _, edited := m.preSend(context.Background(), "test", msg, ch) if !stopCalled { t.Fatal("expected typing stop to be called") @@ -1025,7 +1028,7 @@ func TestPreSendStillWorksWithWrappedTypes(t *testing.T) { m.RecordPlaceholder("test", "chat1", "ph_id") msg := bus.OutboundMessage{Channel: "test", ChatID: "chat1", Content: "response"} - edited := m.preSend(context.Background(), "test", msg, ch) + _, edited := m.preSend(context.Background(), "test", msg, ch) if !stopCalled { t.Fatal("expected typing stop to be called via wrapped type") diff --git a/pkg/channels/matrix/matrix.go b/pkg/channels/matrix/matrix.go index b0009e482..96db964cf 100644 --- a/pkg/channels/matrix/matrix.go +++ b/pkg/channels/matrix/matrix.go @@ -380,26 +380,26 @@ func markdownToHTML(md string) string { return strings.TrimSpace(string(markdown.ToHTML([]byte(md), p, renderer))) } -func (c *MatrixChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { +func (c *MatrixChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } roomID := id.RoomID(strings.TrimSpace(msg.ChatID)) if roomID == "" { - return fmt.Errorf("matrix room ID is empty: %w", channels.ErrSendFailed) + return nil, fmt.Errorf("matrix room ID is empty: %w", channels.ErrSendFailed) } content := strings.TrimSpace(msg.Content) if content == "" { - return nil + return nil, nil } - _, err := c.client.SendMessageEvent(ctx, roomID, event.EventMessage, c.messageContent(content)) + resp, err := c.client.SendMessageEvent(ctx, roomID, event.EventMessage, c.messageContent(content)) if err != nil { - return fmt.Errorf("matrix send: %w", channels.ErrTemporary) + return nil, fmt.Errorf("matrix send: %w", channels.ErrTemporary) } - return nil + return []string{resp.EventID.String()}, nil } func (c *MatrixChannel) messageContent(text string) *event.MessageEventContent { @@ -412,9 +412,9 @@ func (c *MatrixChannel) messageContent(text string) *event.MessageEventContent { } // SendMedia implements channels.MediaSender. -func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error { +func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } sendCtx := ctx if sendCtx == nil { @@ -423,17 +423,18 @@ func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess roomID := id.RoomID(strings.TrimSpace(msg.ChatID)) if roomID == "" { - return fmt.Errorf("matrix room ID is empty: %w", channels.ErrSendFailed) + return nil, fmt.Errorf("matrix room ID is empty: %w", channels.ErrSendFailed) } store := c.GetMediaStore() if store == nil { - return fmt.Errorf("no media store available: %w", channels.ErrSendFailed) + return nil, fmt.Errorf("no media store available: %w", channels.ErrSendFailed) } + var eventIDs []string for _, part := range msg.Parts { if err := sendCtx.Err(); err != nil { - return err + return nil, err } localPath, meta, err := store.ResolveWithMeta(part.Ref) @@ -498,7 +499,7 @@ func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess "type": part.Type, "error": err.Error(), }) - return fmt.Errorf("matrix upload media: %w", channels.ErrTemporary) + return nil, fmt.Errorf("matrix upload media: %w", channels.ErrTemporary) } msgType := matrixOutboundMsgType(part.Type, filename, contentType) @@ -511,17 +512,21 @@ func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess uploadResp.ContentURI.CUString(), ) - if _, err := c.client.SendMessageEvent(sendCtx, roomID, event.EventMessage, content); err != nil { + sendResp, err := c.client.SendMessageEvent(sendCtx, roomID, event.EventMessage, content) + if err != nil { logger.ErrorCF("matrix", "Failed to send media message", map[string]any{ "room_id": roomID.String(), "type": msgType, "error": err.Error(), }) - return fmt.Errorf("matrix send media: %w", channels.ErrTemporary) + return nil, fmt.Errorf("matrix send media: %w", channels.ErrTemporary) + } + if sendResp != nil { + eventIDs = append(eventIDs, sendResp.EventID.String()) } } - return nil + return eventIDs, nil } // StartTyping implements channels.TypingCapable. diff --git a/pkg/channels/media.go b/pkg/channels/media.go index c645a6180..95905ae00 100644 --- a/pkg/channels/media.go +++ b/pkg/channels/media.go @@ -11,5 +11,5 @@ import ( // Manager discovers channels implementing this interface via type // assertion and routes OutboundMediaMessage to them. type MediaSender interface { - SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error + SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) } diff --git a/pkg/channels/onebot/onebot.go b/pkg/channels/onebot/onebot.go index e5f8b8fd1..a9b95c20f 100644 --- a/pkg/channels/onebot/onebot.go +++ b/pkg/channels/onebot/onebot.go @@ -391,15 +391,15 @@ func (c *OneBotChannel) Stop(ctx context.Context) error { return nil } -func (c *OneBotChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { +func (c *OneBotChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } // Check ctx before entering write path select { case <-ctx.Done(): - return ctx.Err() + return nil, ctx.Err() default: } @@ -408,12 +408,12 @@ func (c *OneBotChannel) Send(ctx context.Context, msg bus.OutboundMessage) error c.mu.Unlock() if conn == nil { - return fmt.Errorf("OneBot WebSocket not connected") + return nil, fmt.Errorf("OneBot WebSocket not connected") } action, params, err := c.buildSendRequest(msg) if err != nil { - return err + return nil, err } echo := fmt.Sprintf("send_%d", atomic.AddInt64(&c.echoCounter, 1)) @@ -426,7 +426,7 @@ func (c *OneBotChannel) Send(ctx context.Context, msg bus.OutboundMessage) error data, err := json.Marshal(req) if err != nil { - return fmt.Errorf("failed to marshal OneBot request: %w", err) + return nil, fmt.Errorf("failed to marshal OneBot request: %w", err) } c.writeMu.Lock() @@ -439,21 +439,21 @@ func (c *OneBotChannel) Send(ctx context.Context, msg bus.OutboundMessage) error logger.ErrorCF("onebot", "Failed to send message", map[string]any{ "error": err.Error(), }) - return fmt.Errorf("onebot send: %w", channels.ErrTemporary) + return nil, fmt.Errorf("onebot send: %w", channels.ErrTemporary) } - return nil + return nil, nil } // SendMedia implements the channels.MediaSender interface. -func (c *OneBotChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error { +func (c *OneBotChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } select { case <-ctx.Done(): - return ctx.Err() + return nil, ctx.Err() default: } @@ -462,12 +462,12 @@ func (c *OneBotChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess c.mu.Unlock() if conn == nil { - return fmt.Errorf("OneBot WebSocket not connected") + return nil, fmt.Errorf("OneBot WebSocket not connected") } store := c.GetMediaStore() if store == nil { - return fmt.Errorf("no media store available: %w", channels.ErrSendFailed) + return nil, fmt.Errorf("no media store available: %w", channels.ErrSendFailed) } // Build media segments @@ -508,7 +508,7 @@ func (c *OneBotChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess } if len(segments) == 0 { - return nil + return nil, nil } chatID := msg.ChatID @@ -524,7 +524,7 @@ func (c *OneBotChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess id, err := strconv.ParseInt(rawID, 10, 64) if err != nil { - return fmt.Errorf("invalid %s in chatID: %s: %w", idKey, chatID, channels.ErrSendFailed) + return nil, fmt.Errorf("invalid %s in chatID: %s: %w", idKey, chatID, channels.ErrSendFailed) } echo := fmt.Sprintf("send_%d", atomic.AddInt64(&c.echoCounter, 1)) @@ -537,7 +537,7 @@ func (c *OneBotChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess data, err := json.Marshal(req) if err != nil { - return fmt.Errorf("failed to marshal OneBot request: %w", err) + return nil, fmt.Errorf("failed to marshal OneBot request: %w", err) } c.writeMu.Lock() @@ -550,10 +550,10 @@ func (c *OneBotChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess logger.ErrorCF("onebot", "Failed to send media message", map[string]any{ "error": err.Error(), }) - return fmt.Errorf("onebot send media: %w", channels.ErrTemporary) + return nil, fmt.Errorf("onebot send media: %w", channels.ErrTemporary) } - return nil + return nil, nil } func (c *OneBotChannel) buildMessageSegments(chatID, content string) []oneBotMessageSegment { diff --git a/pkg/channels/pico/client.go b/pkg/channels/pico/client.go index 4fdcbbf39..b4bfd09e5 100644 --- a/pkg/channels/pico/client.go +++ b/pkg/channels/pico/client.go @@ -273,22 +273,22 @@ func (c *PicoClientChannel) handleServerMessage(pc *picoConn, msg PicoMessage) { } // Send sends a message to the remote server. -func (c *PicoClientChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { +func (c *PicoClientChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } c.mu.Lock() pc := c.conn c.mu.Unlock() if pc == nil || pc.closed.Load() { - return channels.ErrSendFailed + return nil, channels.ErrSendFailed } outMsg := newMessage(TypeMessageSend, map[string]any{ "content": msg.Content, }) outMsg.SessionID = strings.TrimPrefix(msg.ChatID, "pico_client:") - return pc.writeJSON(outMsg) + return nil, pc.writeJSON(outMsg) } // StartTyping implements channels.TypingCapable. diff --git a/pkg/channels/pico/client_test.go b/pkg/channels/pico/client_test.go index 7f2719e7d..7c5a62801 100644 --- a/pkg/channels/pico/client_test.go +++ b/pkg/channels/pico/client_test.go @@ -46,7 +46,7 @@ func TestSend_NotRunning(t *testing.T) { if err != nil { t.Fatal(err) } - err = ch.Send(context.Background(), bus.OutboundMessage{Content: "hi"}) + _, err = ch.Send(context.Background(), bus.OutboundMessage{Content: "hi"}) if !errors.Is(err, channels.ErrNotRunning) { t.Fatalf("expected ErrNotRunning, got %v", err) } @@ -124,7 +124,7 @@ func TestClientChannel_ConnectAndSend(t *testing.T) { defer ch.Stop(ctx) // Send a message - err = ch.Send(ctx, bus.OutboundMessage{ + _, err = ch.Send(ctx, bus.OutboundMessage{ ChatID: "pico_client:sess-1", Content: "hello", }) @@ -179,7 +179,7 @@ func TestClientChannel_ReceivesServerMessage(t *testing.T) { defer ch.Stop(ctx) // Send a message; the echo server replies with message.create - err = ch.Send(ctx, bus.OutboundMessage{ + _, err = ch.Send(ctx, bus.OutboundMessage{ ChatID: "pico_client:sess-echo", Content: "ping", }) @@ -252,7 +252,7 @@ func TestSend_ClosedConnection(t *testing.T) { ch.conn.close() ch.mu.Unlock() - err = ch.Send(ctx, bus.OutboundMessage{ + _, err = ch.Send(ctx, bus.OutboundMessage{ ChatID: "pico_client:sess-close", Content: "should fail", }) diff --git a/pkg/channels/pico/pico.go b/pkg/channels/pico/pico.go index 0e2bea67c..0a7bf15a4 100644 --- a/pkg/channels/pico/pico.go +++ b/pkg/channels/pico/pico.go @@ -234,16 +234,16 @@ func (c *PicoChannel) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // Send implements Channel — sends a message to the appropriate WebSocket connection. -func (c *PicoChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { +func (c *PicoChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } outMsg := newMessage(TypeMessageCreate, map[string]any{ "content": msg.Content, }) - return c.broadcastToSession(msg.ChatID, outMsg) + return nil, c.broadcastToSession(msg.ChatID, outMsg) } // EditMessage implements channels.MessageEditor. diff --git a/pkg/channels/qq/qq.go b/pkg/channels/qq/qq.go index dfea85ba3..3a8cf9652 100644 --- a/pkg/channels/qq/qq.go +++ b/pkg/channels/qq/qq.go @@ -200,9 +200,9 @@ func (c *QQChannel) getChatKind(chatID string) string { return "group" } -func (c *QQChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { +func (c *QQChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } chatKind := c.getChatKind(msg.ChatID) @@ -236,11 +236,14 @@ func (c *QQChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { } // Route to group or C2C. - var err error + var ( + sentMsg *dto.Message + err error + ) if chatKind == "group" { - _, err = c.api.PostGroupMessage(ctx, msg.ChatID, msgToCreate) + sentMsg, err = c.api.PostGroupMessage(ctx, msg.ChatID, msgToCreate) } else { - _, err = c.api.PostC2CMessage(ctx, msg.ChatID, msgToCreate) + sentMsg, err = c.api.PostC2CMessage(ctx, msg.ChatID, msgToCreate) } if err != nil { @@ -249,10 +252,13 @@ func (c *QQChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { "chat_kind": chatKind, "error": err.Error(), }) - return fmt.Errorf("qq send: %w", channels.ErrTemporary) + return nil, fmt.Errorf("qq send: %w", channels.ErrTemporary) } - return nil + if sentMsg == nil { + return nil, nil + } + return []string{sentMsg.ID}, nil } // StartTyping implements channels.TypingCapable. @@ -319,13 +325,14 @@ func (c *QQChannel) StartTyping(ctx context.Context, chatID string) (func(), err // QQ group/C2C media sending is a two-step flow: // 1. Upload media to /files using a remote URL or base64-encoded local bytes. // 2. Send a msg_type=7 message using the returned file_info. -func (c *QQChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error { +func (c *QQChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } chatKind := c.getChatKind(msg.ChatID) + var messageIDs []string for _, part := range msg.Parts { fileInfo, err := c.uploadMedia(ctx, chatKind, msg.ChatID, part) if err != nil { @@ -335,22 +342,26 @@ func (c *QQChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) "error": err.Error(), }) if errors.Is(err, channels.ErrSendFailed) { - return err + return nil, err } - return fmt.Errorf("qq send media: %w", channels.ErrTemporary) + return nil, fmt.Errorf("qq send media: %w", channels.ErrTemporary) } - if err := c.sendUploadedMedia(ctx, chatKind, msg.ChatID, part, fileInfo); err != nil { + sentMsg, err := c.sendUploadedMedia(ctx, chatKind, msg.ChatID, part, fileInfo) + if err != nil { logger.ErrorCF("qq", "Failed to send media", map[string]any{ "type": part.Type, "chat_id": msg.ChatID, "error": err.Error(), }) - return fmt.Errorf("qq send media: %w", channels.ErrTemporary) + return nil, fmt.Errorf("qq send media: %w", channels.ErrTemporary) + } + if sentMsg != nil && sentMsg.ID != "" { + messageIDs = append(messageIDs, sentMsg.ID) } } - return nil + return messageIDs, nil } type qqMediaUpload struct { @@ -517,7 +528,7 @@ func (c *QQChannel) sendUploadedMedia( chatKind, chatID string, part bus.MediaPart, fileInfo []byte, -) error { +) (*dto.Message, error) { msg := &dto.MessageToCreate{ Content: part.Caption, MsgType: dto.RichMediaMsg, @@ -532,11 +543,11 @@ func (c *QQChannel) sendUploadedMedia( } if chatKind == "group" { - _, err := c.api.PostGroupMessage(ctx, chatID, msg) - return err + sentMsg, err := c.api.PostGroupMessage(ctx, chatID, msg) + return sentMsg, err } - _, err := c.api.PostC2CMessage(ctx, chatID, msg) - return err + sentMsg, err := c.api.PostC2CMessage(ctx, chatID, msg) + return sentMsg, err } func (c *QQChannel) applyPassiveReplyMetadata(chatID string, msg *dto.MessageToCreate) { diff --git a/pkg/channels/qq/qq_test.go b/pkg/channels/qq/qq_test.go index 7ed736827..83a912cd7 100644 --- a/pkg/channels/qq/qq_test.go +++ b/pkg/channels/qq/qq_test.go @@ -209,7 +209,7 @@ func TestSendMedia_UploadsLocalFileAsBase64(t *testing.T) { ch.lastMsgID.Store("group-1", "msg-1") ch.msgSeqCounters.Store("group-1", new(atomic.Uint64)) - err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ + _, err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ ChatID: "group-1", Parts: []bus.MediaPart{{ Type: "image", @@ -303,7 +303,7 @@ func assertAudioWAVUploadType(t *testing.T, duration time.Duration, wantFileType ch.SetMediaStore(store) ch.chatType.Store("group-1", "group") - err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ + _, err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ ChatID: "group-1", Parts: []bus.MediaPart{{ Type: "audio", @@ -337,7 +337,7 @@ func TestSendMedia_RemoteAudioFallsBackToFileUpload(t *testing.T) { ch.SetRunning(true) ch.chatType.Store("user-1", "direct") - err := ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ + _, err := ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ ChatID: "user-1", Parts: []bus.MediaPart{{ Type: "audio", @@ -383,7 +383,7 @@ func TestSendMedia_LocalAudioWithUnknownDurationFallsBackToFileUpload(t *testing ch.SetMediaStore(store) ch.chatType.Store("group-1", "group") - err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ + _, err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ ChatID: "group-1", Parts: []bus.MediaPart{{ Type: "audio", @@ -417,7 +417,7 @@ func TestSendMedia_UsesRemoteURLUploadForC2C(t *testing.T) { ch.SetRunning(true) ch.chatType.Store("user-1", "direct") - err := ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ + _, err := ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ ChatID: "user-1", Parts: []bus.MediaPart{{ Type: "file", @@ -490,7 +490,7 @@ func TestSendMedia_LocalFileUploadIncludesStoredFilename(t *testing.T) { ch.SetMediaStore(store) ch.chatType.Store("user-1", "direct") - err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ + _, err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ ChatID: "user-1", Parts: []bus.MediaPart{{ Type: "file", @@ -528,7 +528,7 @@ func TestSendMedia_ReturnsSendFailedWithoutMediaStore(t *testing.T) { ch.SetRunning(true) ch.chatType.Store("group-1", "group") - err := ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ + _, err := ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ ChatID: "group-1", Parts: []bus.MediaPart{{ Type: "image", @@ -578,7 +578,7 @@ func TestSendMedia_ReturnsSendFailedWhenLocalFileExceedsBase64MiBLimit(t *testin ch.SetMediaStore(store) ch.chatType.Store("group-1", "group") - err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ + _, err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ ChatID: "group-1", Parts: []bus.MediaPart{{ Type: "file", diff --git a/pkg/channels/slack/slack.go b/pkg/channels/slack/slack.go index acd857a06..1e4a4fef5 100644 --- a/pkg/channels/slack/slack.go +++ b/pkg/channels/slack/slack.go @@ -108,14 +108,14 @@ func (c *SlackChannel) Stop(ctx context.Context) error { return nil } -func (c *SlackChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { +func (c *SlackChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } channelID, threadTS := parseSlackChatID(msg.ChatID) if channelID == "" { - return fmt.Errorf("invalid slack chat ID: %s", msg.ChatID) + return nil, fmt.Errorf("invalid slack chat ID: %s", msg.ChatID) } opts := []slack.MsgOption{ @@ -130,9 +130,9 @@ func (c *SlackChannel) Send(ctx context.Context, msg bus.OutboundMessage) error opts = append(opts, slack.MsgOptionTS(threadTS)) } - _, _, err := c.api.PostMessageContext(ctx, channelID, opts...) + _, ts, err := c.api.PostMessageContext(ctx, channelID, opts...) if err != nil { - return fmt.Errorf("slack send: %w", channels.ErrTemporary) + return nil, fmt.Errorf("slack send: %w", channels.ErrTemporary) } if ref, ok := c.pendingAcks.LoadAndDelete(msg.ChatID); ok { @@ -148,23 +148,23 @@ func (c *SlackChannel) Send(ctx context.Context, msg bus.OutboundMessage) error "thread_ts": threadTS, }) - return nil + return []string{ts}, nil } // SendMedia implements the channels.MediaSender interface. -func (c *SlackChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error { +func (c *SlackChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } channelID, _ := parseSlackChatID(msg.ChatID) if channelID == "" { - return fmt.Errorf("invalid slack chat ID: %s", msg.ChatID) + return nil, fmt.Errorf("invalid slack chat ID: %s", msg.ChatID) } store := c.GetMediaStore() if store == nil { - return fmt.Errorf("no media store available: %w", channels.ErrSendFailed) + return nil, fmt.Errorf("no media store available: %w", channels.ErrSendFailed) } for _, part := range msg.Parts { @@ -198,11 +198,13 @@ func (c *SlackChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessa "filename": filename, "error": err.Error(), }) - return fmt.Errorf("slack send media: %w", channels.ErrTemporary) + return nil, fmt.Errorf("slack send media: %w", channels.ErrTemporary) } } - return nil + // UploadFileV2 does not expose the posted message timestamp in its + // response; returning nil avoids conflating file IDs with message IDs. + return nil, nil } // ReactToMessage implements channels.ReactionCapable. diff --git a/pkg/channels/telegram/telegram.go b/pkg/channels/telegram/telegram.go index f76d625fb..831eb43cc 100644 --- a/pkg/channels/telegram/telegram.go +++ b/pkg/channels/telegram/telegram.go @@ -168,26 +168,27 @@ func (c *TelegramChannel) Stop(ctx context.Context) error { return nil } -func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { +func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } useMarkdownV2 := c.config.Channels.Telegram.UseMarkdownV2 chatID, threadID, err := parseTelegramChatID(msg.ChatID) if err != nil { - return fmt.Errorf("invalid chat ID %s: %w", msg.ChatID, channels.ErrSendFailed) + return nil, fmt.Errorf("invalid chat ID %s: %w", msg.ChatID, channels.ErrSendFailed) } if msg.Content == "" { - return nil + return nil, nil } // The Manager already splits messages to ≤4000 chars (WithMaxMessageLength), // so msg.Content is guaranteed to be within that limit. We still need to // check if HTML expansion pushes it beyond Telegram's 4096-char API limit. replyToID := msg.ReplyToMessageID + var messageIDs []string queue := []string{msg.Content} for len(queue) > 0 { chunk := queue[0] @@ -206,16 +207,18 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err } if smallerLen <= 0 { - if err := c.sendChunk(ctx, sendChunkParams{ + msgID, err := c.sendChunk(ctx, sendChunkParams{ chatID: chatID, threadID: threadID, content: content, replyToID: replyToID, mdFallback: chunk, useMarkdownV2: useMarkdownV2, - }); err != nil { - return err + }) + if err != nil { + return nil, err } + messageIDs = append(messageIDs, msgID) replyToID = "" continue } @@ -244,21 +247,23 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err continue } - if err := c.sendChunk(ctx, sendChunkParams{ + msgID, err := c.sendChunk(ctx, sendChunkParams{ chatID: chatID, threadID: threadID, content: content, replyToID: replyToID, mdFallback: chunk, useMarkdownV2: useMarkdownV2, - }); err != nil { - return err + }) + if err != nil { + return nil, err } + messageIDs = append(messageIDs, msgID) // Only the first chunk should be a reply; subsequent chunks are normal messages. replyToID = "" } - return nil + return messageIDs, nil } type sendChunkParams struct { @@ -275,7 +280,7 @@ type sendChunkParams struct { func (c *TelegramChannel) sendChunk( ctx context.Context, params sendChunkParams, -) error { +) (string, error) { tgMsg := tu.Message(tu.ID(params.chatID), params.content) tgMsg.MessageThreadID = params.threadID if params.useMarkdownV2 { @@ -292,17 +297,19 @@ func (c *TelegramChannel) sendChunk( } } - if _, err := c.bot.SendMessage(ctx, tgMsg); err != nil { + pMsg, err := c.bot.SendMessage(ctx, tgMsg) + if err != nil { logParseFailed(err, params.useMarkdownV2) tgMsg.Text = params.mdFallback tgMsg.ParseMode = "" - if _, err = c.bot.SendMessage(ctx, tgMsg); err != nil { - return fmt.Errorf("telegram send: %w", channels.ErrTemporary) + pMsg, err = c.bot.SendMessage(ctx, tgMsg) + if err != nil { + return "", fmt.Errorf("telegram send: %w", channels.ErrTemporary) } } - return nil + return strconv.Itoa(pMsg.MessageID), nil } // maxTypingDuration limits how long the typing indicator can run. @@ -420,21 +427,22 @@ func (c *TelegramChannel) SendPlaceholder(ctx context.Context, chatID string) (s } // SendMedia implements the channels.MediaSender interface. -func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error { +func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } chatID, threadID, err := parseTelegramChatID(msg.ChatID) if err != nil { - return fmt.Errorf("invalid chat ID %s: %w", msg.ChatID, channels.ErrSendFailed) + return nil, fmt.Errorf("invalid chat ID %s: %w", msg.ChatID, channels.ErrSendFailed) } store := c.GetMediaStore() if store == nil { - return fmt.Errorf("no media store available: %w", channels.ErrSendFailed) + return nil, fmt.Errorf("no media store available: %w", channels.ErrSendFailed) } + var messageIDs []string for _, part := range msg.Parts { localPath, err := store.Resolve(part.Ref) if err != nil { @@ -454,6 +462,7 @@ func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMe continue } + var tgResult *telego.Message switch part.Type { case "image": params := &telego.SendPhotoParams{ @@ -462,11 +471,11 @@ func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMe Photo: telego.InputFile{File: file}, Caption: part.Caption, } - _, err = c.bot.SendPhoto(ctx, params) + tgResult, err = c.bot.SendPhoto(ctx, params) if err != nil && strings.Contains(err.Error(), "PHOTO_INVALID_DIMENSIONS") { if _, seekErr := file.Seek(0, io.SeekStart); seekErr != nil { file.Close() - return fmt.Errorf("telegram rewind media after photo failure: %w", channels.ErrTemporary) + return nil, fmt.Errorf("telegram rewind media after photo failure: %w", channels.ErrTemporary) } docParams := &telego.SendDocumentParams{ @@ -475,7 +484,7 @@ func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMe Document: telego.InputFile{File: file}, Caption: part.Caption, } - _, err = c.bot.SendDocument(ctx, docParams) + tgResult, err = c.bot.SendDocument(ctx, docParams) } case "audio": // Send OGG files with "voice" in the filename as Telegram voice @@ -488,7 +497,7 @@ func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMe Voice: telego.InputFile{File: file}, Caption: part.Caption, } - _, err = c.bot.SendVoice(ctx, vparams) + tgResult, err = c.bot.SendVoice(ctx, vparams) } else { params := &telego.SendAudioParams{ ChatID: tu.ID(chatID), @@ -496,7 +505,7 @@ func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMe Audio: telego.InputFile{File: file}, Caption: part.Caption, } - _, err = c.bot.SendAudio(ctx, params) + tgResult, err = c.bot.SendAudio(ctx, params) } case "video": params := &telego.SendVideoParams{ @@ -505,7 +514,7 @@ func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMe Video: telego.InputFile{File: file}, Caption: part.Caption, } - _, err = c.bot.SendVideo(ctx, params) + tgResult, err = c.bot.SendVideo(ctx, params) default: // "file" or unknown types params := &telego.SendDocumentParams{ ChatID: tu.ID(chatID), @@ -513,9 +522,12 @@ func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMe Document: telego.InputFile{File: file}, Caption: part.Caption, } - _, err = c.bot.SendDocument(ctx, params) + tgResult, err = c.bot.SendDocument(ctx, params) } + if tgResult != nil { + messageIDs = append(messageIDs, strconv.Itoa(tgResult.MessageID)) + } file.Close() if err != nil { @@ -523,11 +535,11 @@ func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMe "type": part.Type, "error": err.Error(), }) - return fmt.Errorf("telegram send media: %w", channels.ErrTemporary) + return nil, fmt.Errorf("telegram send media: %w", channels.ErrTemporary) } } - return nil + return messageIDs, nil } func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Message) error { diff --git a/pkg/channels/telegram/telegram_test.go b/pkg/channels/telegram/telegram_test.go index 1be51abdc..4f7a2600b 100644 --- a/pkg/channels/telegram/telegram_test.go +++ b/pkg/channels/telegram/telegram_test.go @@ -176,7 +176,7 @@ func TestSendMedia_ImageFallbacksToDocumentOnInvalidDimensions(t *testing.T) { ) require.NoError(t, err) - err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ + _, err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ ChatID: "12345", Parts: []bus.MediaPart{{ Type: "image", @@ -214,7 +214,7 @@ func TestSendMedia_ImageNonDimensionErrorDoesNotFallback(t *testing.T) { ref, err := store.Store(localPath, media.MediaMeta{Filename: "image.png", ContentType: "image/png"}, "scope-1") require.NoError(t, err) - err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ + _, err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ ChatID: "12345", Parts: []bus.MediaPart{{ Type: "image", @@ -239,7 +239,7 @@ func TestSend_EmptyContent(t *testing.T) { } ch := newTestChannel(t, caller) - err := ch.Send(context.Background(), bus.OutboundMessage{ + _, err := ch.Send(context.Background(), bus.OutboundMessage{ ChatID: "12345", Content: "", }) @@ -256,7 +256,7 @@ func TestSend_ShortMessage_SingleCall(t *testing.T) { } ch := newTestChannel(t, caller) - err := ch.Send(context.Background(), bus.OutboundMessage{ + _, err := ch.Send(context.Background(), bus.OutboundMessage{ ChatID: "12345", Content: "Hello, world!", }) @@ -279,7 +279,7 @@ func TestSend_LongMessage_SingleCall(t *testing.T) { longContent := strings.Repeat("a", 4000) - err := ch.Send(context.Background(), bus.OutboundMessage{ + _, err := ch.Send(context.Background(), bus.OutboundMessage{ ChatID: "12345", Content: longContent, }) @@ -302,7 +302,7 @@ func TestSend_HTMLFallback_PerChunk(t *testing.T) { } ch := newTestChannel(t, caller) - err := ch.Send(context.Background(), bus.OutboundMessage{ + _, err := ch.Send(context.Background(), bus.OutboundMessage{ ChatID: "12345", Content: "Hello **world**", }) @@ -320,7 +320,7 @@ func TestSend_HTMLFallback_BothFail(t *testing.T) { } ch := newTestChannel(t, caller) - err := ch.Send(context.Background(), bus.OutboundMessage{ + _, err := ch.Send(context.Background(), bus.OutboundMessage{ ChatID: "12345", Content: "Hello", }) @@ -342,7 +342,7 @@ func TestSend_LongMessage_HTMLFallback_StopsOnError(t *testing.T) { longContent := strings.Repeat("x", 4001) - err := ch.Send(context.Background(), bus.OutboundMessage{ + _, err := ch.Send(context.Background(), bus.OutboundMessage{ ChatID: "12345", Content: longContent, }) @@ -372,7 +372,7 @@ func TestSend_MarkdownShortButHTMLLong_MultipleCalls(t *testing.T) { "HTML expansion must exceed Telegram limit for this test to be meaningful", ) - err := ch.Send(context.Background(), bus.OutboundMessage{ + _, err := ch.Send(context.Background(), bus.OutboundMessage{ ChatID: "12345", Content: markdownContent, }) @@ -407,7 +407,7 @@ func TestSend_HTMLOverflow_WordBoundary(t *testing.T) { // Ensure the test content matches the intended boundary conditions. assert.LessOrEqual(t, len([]rune(content)), 4000, "markdown content must not exceed chunk size for this test") - err := ch.Send(context.Background(), bus.OutboundMessage{ + _, err := ch.Send(context.Background(), bus.OutboundMessage{ ChatID: "123456", Content: content, }) @@ -443,7 +443,7 @@ func TestSend_NotRunning(t *testing.T) { ch := newTestChannel(t, caller) ch.SetRunning(false) - err := ch.Send(context.Background(), bus.OutboundMessage{ + _, err := ch.Send(context.Background(), bus.OutboundMessage{ ChatID: "12345", Content: "Hello", }) @@ -461,7 +461,7 @@ func TestSend_InvalidChatID(t *testing.T) { } ch := newTestChannel(t, caller) - err := ch.Send(context.Background(), bus.OutboundMessage{ + _, err := ch.Send(context.Background(), bus.OutboundMessage{ ChatID: "not-a-number", Content: "Hello", }) @@ -518,7 +518,7 @@ func TestSend_WithForumThreadID(t *testing.T) { } ch := newTestChannel(t, caller) - err := ch.Send(context.Background(), bus.OutboundMessage{ + _, err := ch.Send(context.Background(), bus.OutboundMessage{ ChatID: "-1001234567890/42", Content: "Hello from topic", }) diff --git a/pkg/channels/wecom/wecom.go b/pkg/channels/wecom/wecom.go index 6096b7db3..9689d5171 100644 --- a/pkg/channels/wecom/wecom.go +++ b/pkg/channels/wecom/wecom.go @@ -184,20 +184,20 @@ func (c *WeComChannel) BeginStream(_ context.Context, chatID string) (channels.S }, nil } -func (c *WeComChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { +func (c *WeComChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } content := strings.TrimSpace(msg.Content) if content == "" { - return nil + return nil, nil } if turn, ok := c.getTurn(msg.ChatID); ok { if time.Since(turn.CreatedAt) <= wecomStreamMaxDuration { if err := c.sendStreamReply(turn, content); err == nil { c.consumeTurn(msg.ChatID, turn) - return nil + return nil, nil } } c.consumeTurn(msg.ChatID, turn) @@ -205,20 +205,20 @@ func (c *WeComChannel) Send(ctx context.Context, msg bus.OutboundMessage) error if route, ok := c.routes.Get(msg.ChatID); ok { if err := c.sendActivePush(route.ChatID, route.ChatType, content); err != nil { - return err + return nil, err } - return nil + return nil, nil } if err := c.sendActivePush(msg.ChatID, 0, content); err != nil { - return err + return nil, err } - return nil + return nil, nil } -func (c *WeComChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error { +func (c *WeComChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } route, chatType, hasTurn := c.resolveMediaRoute(msg.ChatID) @@ -231,7 +231,7 @@ func (c *WeComChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessa if strings.TrimSpace(part.Ref) == "" { if caption := strings.TrimSpace(part.Caption); caption != "" { if err := c.sendActivePush(chatID, chatType, caption); err != nil { - return err + return nil, err } } continue @@ -239,7 +239,7 @@ func (c *WeComChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessa localPath, filename, contentType, cleanup, err := c.resolveOutboundPart(ctx, part) if err != nil { - return fmt.Errorf("wecom resolve media %q: %v: %w", part.Ref, err, channels.ErrSendFailed) + return nil, fmt.Errorf("wecom resolve media %q: %v: %w", part.Ref, err, channels.ErrSendFailed) } func() { @@ -283,11 +283,11 @@ func (c *WeComChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessa } }() if err != nil { - return err + return nil, err } } - return nil + return nil, nil } func (c *WeComChannel) connectLoop() { diff --git a/pkg/channels/wecom/wecom_test.go b/pkg/channels/wecom/wecom_test.go index c7a4adfc0..b3a87e246 100644 --- a/pkg/channels/wecom/wecom_test.go +++ b/pkg/channels/wecom/wecom_test.go @@ -190,7 +190,7 @@ func TestSend_StreamFailureFallsBackToActualChatID(t *testing.T) { return wecomTestAck(nil), nil } - if err := ch.Send(context.Background(), bus.OutboundMessage{ + if _, err := ch.Send(context.Background(), bus.OutboundMessage{ Channel: "wecom", ChatID: "chat-1", Content: "hello", @@ -247,7 +247,7 @@ func TestSend_DoesNotSplitStreamReply(t *testing.T) { } content := strings.Repeat("\u4e2d", 30000) - if err := ch.Send(context.Background(), bus.OutboundMessage{ + if _, err := ch.Send(context.Background(), bus.OutboundMessage{ Channel: "wecom", ChatID: "chat-1", Content: content, @@ -283,7 +283,7 @@ func TestSend_DoesNotSplitActivePush(t *testing.T) { } content := strings.Repeat("a", 30000) - if err := ch.Send(context.Background(), bus.OutboundMessage{ + if _, err := ch.Send(context.Background(), bus.OutboundMessage{ Channel: "wecom", ChatID: "chat-1", Content: content, @@ -346,7 +346,7 @@ func TestSendMedia_SendsActiveImage(t *testing.T) { } } - err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ + _, err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ Channel: "wecom", ChatID: "chat-1", Parts: []bus.MediaPart{{ @@ -457,7 +457,7 @@ func TestSendMedia_UsesTurnImageAndFinishesStream(t *testing.T) { } } - err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ + _, err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ Channel: "wecom", ChatID: "chat-1", Parts: []bus.MediaPart{{ @@ -553,7 +553,7 @@ func TestSendMedia_SendsActiveFile(t *testing.T) { } } - err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ + _, err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{ Channel: "wecom", ChatID: "chat-2", Parts: []bus.MediaPart{{ diff --git a/pkg/channels/weixin/media.go b/pkg/channels/weixin/media.go index 4da7f0db9..cf1b45612 100644 --- a/pkg/channels/weixin/media.go +++ b/pkg/channels/weixin/media.go @@ -1097,12 +1097,12 @@ func (c *WeixinChannel) StartTyping(ctx context.Context, chatID string) (func(), } // SendMedia implements channels.MediaSender. -func (c *WeixinChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error { +func (c *WeixinChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) { if !c.IsRunning() { - return basechannels.ErrNotRunning + return nil, basechannels.ErrNotRunning } if err := c.ensureSessionActive(); err != nil { - return err + return nil, err } contextToken := "" @@ -1110,7 +1110,7 @@ func (c *WeixinChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess contextToken, _ = v.(string) } if contextToken == "" { - return fmt.Errorf( + return nil, fmt.Errorf( "weixin send media: missing context token for chat %s: %w", msg.ChatID, basechannels.ErrSendFailed, @@ -1125,7 +1125,7 @@ func (c *WeixinChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess "ref": part.Ref, "error": err.Error(), }) - return fmt.Errorf("weixin send media: %w", basechannels.ErrSendFailed) + return nil, fmt.Errorf("weixin send media: %w", basechannels.ErrSendFailed) } func() { if cleanup != nil { @@ -1147,11 +1147,11 @@ func (c *WeixinChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess "error": err.Error(), }) if c.remainingPause() > 0 { - return fmt.Errorf("weixin send media: %w", basechannels.ErrSendFailed) + return nil, fmt.Errorf("weixin send media: %w", basechannels.ErrSendFailed) } - return fmt.Errorf("weixin send media: %w", basechannels.ErrTemporary) + return nil, fmt.Errorf("weixin send media: %w", basechannels.ErrTemporary) } } - return nil + return nil, nil } diff --git a/pkg/channels/weixin/weixin.go b/pkg/channels/weixin/weixin.go index 65fabe399..0e9010131 100644 --- a/pkg/channels/weixin/weixin.go +++ b/pkg/channels/weixin/weixin.go @@ -358,16 +358,16 @@ func (c *WeixinChannel) handleInboundMessage(ctx context.Context, msg WeixinMess } // Send implements channels.Channel by sending a text message to the WeChat user. -func (c *WeixinChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { +func (c *WeixinChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } if err := c.ensureSessionActive(); err != nil { - return err + return nil, err } if msg.Content == "" { - return nil + return nil, nil } // We need a context_token to send a reply. It should be stored in the conversation metadata. @@ -386,7 +386,7 @@ func (c *WeixinChannel) Send(ctx context.Context, msg bus.OutboundMessage) error logger.ErrorCF("weixin", "Missing context token, cannot send message", map[string]any{ "to_user_id": toUserID, }) - return fmt.Errorf("weixin send: %w: missing context token for chat %s", channels.ErrSendFailed, toUserID) + return nil, fmt.Errorf("weixin send: %w: missing context token for chat %s", channels.ErrSendFailed, toUserID) } if err := c.sendTextMessage(ctx, toUserID, contextToken, msg.Content); err != nil { @@ -395,10 +395,10 @@ func (c *WeixinChannel) Send(ctx context.Context, msg bus.OutboundMessage) error "error": err.Error(), }) if c.remainingPause() > 0 { - return fmt.Errorf("weixin send: %w", channels.ErrSendFailed) + return nil, fmt.Errorf("weixin send: %w", channels.ErrSendFailed) } - return fmt.Errorf("weixin send: %w", channels.ErrTemporary) + return nil, fmt.Errorf("weixin send: %w", channels.ErrTemporary) } - return nil + return nil, nil } diff --git a/pkg/channels/whatsapp/whatsapp.go b/pkg/channels/whatsapp/whatsapp.go index 70b3e02bf..98622fe37 100644 --- a/pkg/channels/whatsapp/whatsapp.go +++ b/pkg/channels/whatsapp/whatsapp.go @@ -104,15 +104,15 @@ func (c *WhatsAppChannel) Stop(ctx context.Context) error { return nil } -func (c *WhatsAppChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { +func (c *WhatsAppChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } // Check ctx before acquiring lock select { case <-ctx.Done(): - return ctx.Err() + return nil, ctx.Err() default: } @@ -120,7 +120,7 @@ func (c *WhatsAppChannel) Send(ctx context.Context, msg bus.OutboundMessage) err defer c.mu.Unlock() if c.conn == nil { - return fmt.Errorf("whatsapp connection not established: %w", channels.ErrTemporary) + return nil, fmt.Errorf("whatsapp connection not established: %w", channels.ErrTemporary) } payload := map[string]any{ @@ -131,17 +131,17 @@ func (c *WhatsAppChannel) Send(ctx context.Context, msg bus.OutboundMessage) err data, err := json.Marshal(payload) if err != nil { - return fmt.Errorf("failed to marshal message: %w", err) + return nil, fmt.Errorf("failed to marshal message: %w", err) } _ = c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := c.conn.WriteMessage(websocket.TextMessage, data); err != nil { _ = c.conn.SetWriteDeadline(time.Time{}) - return fmt.Errorf("whatsapp send: %w", channels.ErrTemporary) + return nil, fmt.Errorf("whatsapp send: %w", channels.ErrTemporary) } _ = c.conn.SetWriteDeadline(time.Time{}) - return nil + return nil, nil } func (c *WhatsAppChannel) listen() { diff --git a/pkg/channels/whatsapp_native/whatsapp_native.go b/pkg/channels/whatsapp_native/whatsapp_native.go index 188a7c8fa..d0a74a405 100644 --- a/pkg/channels/whatsapp_native/whatsapp_native.go +++ b/pkg/channels/whatsapp_native/whatsapp_native.go @@ -396,13 +396,13 @@ func (c *WhatsAppNativeChannel) handleIncoming(evt *events.Message) { c.HandleMessage(c.runCtx, peer, messageID, senderID, chatID, content, mediaPaths, metadata, sender) } -func (c *WhatsAppNativeChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { +func (c *WhatsAppNativeChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) { if !c.IsRunning() { - return channels.ErrNotRunning + return nil, channels.ErrNotRunning } select { case <-ctx.Done(): - return ctx.Err() + return nil, ctx.Err() default: } @@ -411,18 +411,18 @@ func (c *WhatsAppNativeChannel) Send(ctx context.Context, msg bus.OutboundMessag c.mu.Unlock() if client == nil || !client.IsConnected() { - return fmt.Errorf("whatsapp connection not established: %w", channels.ErrTemporary) + return nil, fmt.Errorf("whatsapp connection not established: %w", channels.ErrTemporary) } // Detect unpaired state: the client is connected (to WhatsApp servers) // but has not completed QR-login yet, so sending would fail. if client.Store.ID == nil { - return fmt.Errorf("whatsapp not yet paired (QR login pending): %w", channels.ErrTemporary) + return nil, fmt.Errorf("whatsapp not yet paired (QR login pending): %w", channels.ErrTemporary) } to, err := parseJID(msg.ChatID) if err != nil { - return fmt.Errorf("invalid chat id %q: %w", msg.ChatID, err) + return nil, fmt.Errorf("invalid chat id %q: %w", msg.ChatID, err) } waMsg := &waE2E.Message{ @@ -430,9 +430,9 @@ func (c *WhatsAppNativeChannel) Send(ctx context.Context, msg bus.OutboundMessag } if _, err = client.SendMessage(ctx, to, waMsg); err != nil { - return fmt.Errorf("whatsapp send: %w", channels.ErrTemporary) + return nil, fmt.Errorf("whatsapp send: %w", channels.ErrTemporary) } - return nil + return nil, nil } // parseJID converts a chat ID (phone number or JID string) to types.JID.