feat(channels): make Channel.Send return delivered message IDs (#2190)

* feat(channels): Channel.Send and MediaSender.SendMedia return delivered message IDs

Change Channel.Send signature from (ctx, msg) error to (ctx, msg) ([]string, error)
and MediaSender.SendMedia similarly, so callers can capture platform message IDs
for threading, reactions, and history annotation.

Adapters that return real IDs: Telegram (per-chunk MessageID), Discord (Message.ID),
Slack Send (ts), QQ (sentMsg.ID), Matrix (EventID). Slack SendMedia returns nil
because UploadFileV2 does not expose the posted message timestamp in its response.
All other adapters return nil IDs.

preSend and sendWithRetry in manager.go updated to propagate ([]string, bool).
README examples updated for both English and Chinese docs.

* style: apply golangci-lint fixes (golines)

* docs: fix Send migration guide — restore old error-only signature in before/after example
This commit is contained in:
DimonB
2026-03-31 06:07:32 +03:00
committed by GitHub
parent 2d8556205f
commit 6c0798ca3f
30 changed files with 424 additions and 352 deletions
+12 -10
View File
@@ -25,23 +25,25 @@ import (
type fakeChannel struct{ id string }
func (f *fakeChannel) Name() string { return "fake" }
func (f *fakeChannel) Start(ctx context.Context) error { return nil }
func (f *fakeChannel) Stop(ctx context.Context) error { return nil }
func (f *fakeChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { return nil }
func (f *fakeChannel) IsRunning() bool { return true }
func (f *fakeChannel) IsAllowed(string) bool { return true }
func (f *fakeChannel) IsAllowedSender(sender bus.SenderInfo) bool { return true }
func (f *fakeChannel) ReasoningChannelID() string { return f.id }
func (f *fakeChannel) Name() string { return "fake" }
func (f *fakeChannel) Start(ctx context.Context) error { return nil }
func (f *fakeChannel) Stop(ctx context.Context) error { return nil }
func (f *fakeChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
return nil, nil
}
func (f *fakeChannel) IsRunning() bool { return true }
func (f *fakeChannel) IsAllowed(string) bool { return true }
func (f *fakeChannel) IsAllowedSender(sender bus.SenderInfo) bool { return true }
func (f *fakeChannel) ReasoningChannelID() string { return f.id }
type fakeMediaChannel struct {
fakeChannel
sentMedia []bus.OutboundMediaMessage
}
func (f *fakeMediaChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
func (f *fakeMediaChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) {
f.sentMedia = append(f.sentMedia, msg)
return nil
return nil, nil
}
func newStartedTestChannelManager(
+27 -24
View File
@@ -252,28 +252,28 @@ func (c *TelegramChannel) Stop(ctx context.Context) error {
**3e. Send method error returns**
```go
// Old code: returns plain error
// Old code: returned only error
func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
if !c.running { return fmt.Errorf("not running") }
// ...
if err != nil { return err }
}
// New code: must return sentinel errors for Manager to determine retry strategy
func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
// New code: return delivered message IDs plus sentinel errors
func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning // ← Manager will not retry
return nil, channels.ErrNotRunning // ← Manager will not retry
}
// ...
if err != nil {
// Use ClassifySendError to wrap error based on HTTP status code
return channels.ClassifySendError(statusCode, err)
return nil, channels.ClassifySendError(statusCode, err)
// Or manually wrap:
// return fmt.Errorf("%w: %v", channels.ErrTemporary, err)
// return fmt.Errorf("%w: %v", channels.ErrRateLimit, err)
// return fmt.Errorf("%w: %v", channels.ErrSendFailed, err)
// return nil, fmt.Errorf("%w: %v", channels.ErrTemporary, err)
// return nil, fmt.Errorf("%w: %v", channels.ErrRateLimit, err)
// return nil, fmt.Errorf("%w: %v", channels.ErrSendFailed, err)
}
return nil
return []string{deliveredID}, nil // or return nil, nil if IDs are unavailable
}
```
@@ -502,25 +502,25 @@ func (c *MatrixChannel) Stop(ctx context.Context) error {
return nil
}
func (c *MatrixChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
func (c *MatrixChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
// 1. Check running state
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
// 2. Send message to Matrix
err := c.sendToMatrix(ctx, msg.ChatID, msg.Content)
eventID, err := c.sendToMatrix(ctx, msg.ChatID, msg.Content)
if err != nil {
// 3. Must use error classification wrapping
// If you have an HTTP status code:
// return channels.ClassifySendError(statusCode, err)
// return nil, channels.ClassifySendError(statusCode, err)
// If it's a network error:
// return channels.ClassifyNetError(err)
// return nil, channels.ClassifyNetError(err)
// If manual classification is needed:
return fmt.Errorf("%w: %v", channels.ErrTemporary, err)
return nil, fmt.Errorf("%w: %v", channels.ErrTemporary, err)
}
return nil
return []string{eventID}, nil
}
// ========== Incoming Message Handling ==========
@@ -580,9 +580,9 @@ func (c *MatrixChannel) handleIncoming(roomID, senderID, displayName, content st
// ========== Internal Methods ==========
func (c *MatrixChannel) sendToMatrix(ctx context.Context, roomID, content string) error {
func (c *MatrixChannel) sendToMatrix(ctx context.Context, roomID, content string) (string, error) {
// Actual Matrix SDK call
return nil
return "event-id", nil
}
```
@@ -594,16 +594,17 @@ Depending on platform capabilities, your channel can optionally implement the fo
```go
// If the platform supports sending images/files/audio/video
func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
store := c.GetMediaStore()
if store == nil {
return fmt.Errorf("no media store: %w", channels.ErrSendFailed)
return nil, fmt.Errorf("no media store: %w", channels.ErrSendFailed)
}
var messageIDs []string
for _, part := range msg.Parts {
localPath, err := store.Resolve(part.Ref)
if err != nil {
@@ -620,8 +621,10 @@ func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess
default:
// Upload file to Matrix
}
// Append platform IDs here when the API returns them.
// messageIDs = append(messageIDs, uploadedMessageID)
}
return nil
return messageIDs, nil
}
```
@@ -1270,7 +1273,7 @@ type Channel interface {
Name() string
Start(ctx context.Context) error
Stop(ctx context.Context) error
Send(ctx context.Context, msg bus.OutboundMessage) error
Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error)
IsRunning() bool
IsAllowed(senderID string) bool
IsAllowedSender(sender bus.SenderInfo) bool
@@ -1279,7 +1282,7 @@ type Channel interface {
// ===== Optional =====
type MediaSender interface {
SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error
SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error)
}
type TypingCapable interface {
+27 -24
View File
@@ -252,28 +252,28 @@ func (c *TelegramChannel) Stop(ctx context.Context) error {
**3e. Send 方法的错误返回**
```go
// 旧代码:返回普通 error
// 旧代码:返回 error
func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
if !c.running { return fmt.Errorf("not running") }
// ...
if err != nil { return err }
}
// 新代码:必须返回哨兵错误,供 Manager 判断重试策略
func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
// 新代码:返回投递后的消息 ID,以及供 Manager 判断重试策略的哨兵错误
func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning // ← Manager 不会重试
return nil, channels.ErrNotRunning // ← Manager 不会重试
}
// ...
if err != nil {
// 使用 ClassifySendError 根据 HTTP 状态码包装错误
return channels.ClassifySendError(statusCode, err)
return nil, channels.ClassifySendError(statusCode, err)
// 或手动包装:
// return fmt.Errorf("%w: %v", channels.ErrTemporary, err)
// return fmt.Errorf("%w: %v", channels.ErrRateLimit, err)
// return fmt.Errorf("%w: %v", channels.ErrSendFailed, err)
// return nil, fmt.Errorf("%w: %v", channels.ErrTemporary, err)
// return nil, fmt.Errorf("%w: %v", channels.ErrRateLimit, err)
// return nil, fmt.Errorf("%w: %v", channels.ErrSendFailed, err)
}
return nil
return []string{deliveredID}, nil // 如果拿不到 ID,也可以返回 nil, nil
}
```
@@ -502,25 +502,25 @@ func (c *MatrixChannel) Stop(ctx context.Context) error {
return nil
}
func (c *MatrixChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
func (c *MatrixChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
// 1. 检查运行状态
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
// 2. 发送消息到 Matrix
err := c.sendToMatrix(ctx, msg.ChatID, msg.Content)
eventID, err := c.sendToMatrix(ctx, msg.ChatID, msg.Content)
if err != nil {
// 3. 必须使用错误分类包装
// 如果你有 HTTP 状态码:
// return channels.ClassifySendError(statusCode, err)
// return nil, channels.ClassifySendError(statusCode, err)
// 如果是网络错误:
// return channels.ClassifyNetError(err)
// return nil, channels.ClassifyNetError(err)
// 如果需要手动分类:
return fmt.Errorf("%w: %v", channels.ErrTemporary, err)
return nil, fmt.Errorf("%w: %v", channels.ErrTemporary, err)
}
return nil
return []string{eventID}, nil
}
// ========== 消息接收处理 ==========
@@ -580,9 +580,9 @@ func (c *MatrixChannel) handleIncoming(roomID, senderID, displayName, content st
// ========== 内部方法 ==========
func (c *MatrixChannel) sendToMatrix(ctx context.Context, roomID, content string) error {
func (c *MatrixChannel) sendToMatrix(ctx context.Context, roomID, content string) (string, error) {
// 实际的 Matrix SDK 调用
return nil
return "event-id", nil
}
```
@@ -594,16 +594,17 @@ func (c *MatrixChannel) sendToMatrix(ctx context.Context, roomID, content string
```go
// 如果平台支持发送图片/文件/音频/视频
func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
store := c.GetMediaStore()
if store == nil {
return fmt.Errorf("no media store: %w", channels.ErrSendFailed)
return nil, fmt.Errorf("no media store: %w", channels.ErrSendFailed)
}
var messageIDs []string
for _, part := range msg.Parts {
localPath, err := store.Resolve(part.Ref)
if err != nil {
@@ -620,8 +621,10 @@ func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess
default:
// 上传文件到 Matrix
}
// 如果 API 能返回平台消息 ID,就在这里追加。
// messageIDs = append(messageIDs, uploadedMessageID)
}
return nil
return messageIDs, nil
}
```
@@ -1269,7 +1272,7 @@ type Channel interface {
Name() string
Start(ctx context.Context) error
Stop(ctx context.Context) error
Send(ctx context.Context, msg bus.OutboundMessage) error
Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error)
IsRunning() bool
IsAllowed(senderID string) bool
IsAllowedSender(sender bus.SenderInfo) bool
@@ -1278,7 +1281,7 @@ type Channel interface {
// ===== 可选实现 =====
type MediaSender interface {
SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error
SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error)
}
type TypingCapable interface {
+1 -1
View File
@@ -48,7 +48,7 @@ type Channel interface {
Name() string
Start(ctx context.Context) error
Stop(ctx context.Context) error
Send(ctx context.Context, msg bus.OutboundMessage) error
Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error)
IsRunning() bool
IsAllowed(senderID string) bool
IsAllowedSender(sender bus.SenderInfo) bool
+5 -5
View File
@@ -104,20 +104,20 @@ func (c *DingTalkChannel) Stop(ctx context.Context) error {
}
// Send sends a message to DingTalk via the chatbot reply API
func (c *DingTalkChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
func (c *DingTalkChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
// Get session webhook from storage
sessionWebhookRaw, ok := c.sessionWebhooks.Load(msg.ChatID)
if !ok {
return fmt.Errorf("no session_webhook found for chat %s, cannot send message", msg.ChatID)
return nil, fmt.Errorf("no session_webhook found for chat %s, cannot send message", msg.ChatID)
}
sessionWebhook, ok := sessionWebhookRaw.(string)
if !ok {
return fmt.Errorf("invalid session_webhook type for chat %s", msg.ChatID)
return nil, fmt.Errorf("invalid session_webhook type for chat %s", msg.ChatID)
}
logger.DebugCF("dingtalk", "Sending message", map[string]any{
@@ -126,7 +126,7 @@ func (c *DingTalkChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
})
// Use the session webhook to send the reply
return c.SendDirectReply(ctx, sessionWebhook, msg.Content)
return nil, c.SendDirectReply(ctx, sessionWebhook, msg.Content)
}
// onChatBotMessageReceived implements the IChatBotMessageHandler function signature
+50 -30
View File
@@ -128,37 +128,41 @@ func (c *DiscordChannel) Stop(ctx context.Context) error {
return nil
}
func (c *DiscordChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
func (c *DiscordChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
channelID := msg.ChatID
if channelID == "" {
return fmt.Errorf("channel ID is empty")
return nil, fmt.Errorf("channel ID is empty")
}
if len([]rune(msg.Content)) == 0 {
return nil
return nil, nil
}
return c.sendChunk(ctx, channelID, msg.Content, msg.ReplyToMessageID)
msgID, err := c.sendChunk(ctx, channelID, msg.Content, msg.ReplyToMessageID)
if err != nil {
return nil, err
}
return []string{msgID}, nil
}
// SendMedia implements the channels.MediaSender interface.
func (c *DiscordChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
func (c *DiscordChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
channelID := msg.ChatID
if channelID == "" {
return fmt.Errorf("channel ID is empty")
return nil, fmt.Errorf("channel ID is empty")
}
store := c.GetMediaStore()
if store == nil {
return fmt.Errorf("no media store available: %w", channels.ErrSendFailed)
return nil, fmt.Errorf("no media store available: %w", channels.ErrSendFailed)
}
// Collect all files into a single ChannelMessageSendComplex call
@@ -202,33 +206,41 @@ func (c *DiscordChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMes
}
if len(files) == 0 {
return nil
return nil, nil
}
sendCtx, cancel := context.WithTimeout(ctx, sendTimeout)
defer cancel()
done := make(chan error, 1)
type mediaResult struct {
id string
err error
}
done := make(chan mediaResult, 1)
go func() {
_, err := c.session.ChannelMessageSendComplex(channelID, &discordgo.MessageSend{
sentMsg, err := c.session.ChannelMessageSendComplex(channelID, &discordgo.MessageSend{
Content: caption,
Files: files,
})
done <- err
if err != nil {
done <- mediaResult{err: err}
return
}
done <- mediaResult{id: sentMsg.ID}
}()
select {
case err := <-done:
case r := <-done:
// Close all file readers
for _, f := range files {
if closer, ok := f.Reader.(*os.File); ok {
closer.Close()
}
}
if err != nil {
return fmt.Errorf("discord send media: %w", channels.ErrTemporary)
if r.err != nil {
return nil, fmt.Errorf("discord send media: %w", channels.ErrTemporary)
}
return nil
return []string{r.id}, nil
case <-sendCtx.Done():
// Close all file readers
for _, f := range files {
@@ -236,7 +248,7 @@ func (c *DiscordChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMes
closer.Close()
}
}
return sendCtx.Err()
return nil, sendCtx.Err()
}
}
@@ -264,18 +276,25 @@ func (c *DiscordChannel) SendPlaceholder(ctx context.Context, chatID string) (st
return msg.ID, nil
}
func (c *DiscordChannel) sendChunk(ctx context.Context, channelID, content, replyToID string) error {
func (c *DiscordChannel) sendChunk(ctx context.Context, channelID, content, replyToID string) (string, error) {
// Use the passed ctx for timeout control
sendCtx, cancel := context.WithTimeout(ctx, sendTimeout)
defer cancel()
done := make(chan error, 1)
type result struct {
id string
err error
}
done := make(chan result, 1)
go func() {
var err error
var (
msg *discordgo.Message
err error
)
// If we have an ID, we send the message as "Reply"
if replyToID != "" {
_, err = c.session.ChannelMessageSendComplex(channelID, &discordgo.MessageSend{
msg, err = c.session.ChannelMessageSendComplex(channelID, &discordgo.MessageSend{
Content: content,
Reference: &discordgo.MessageReference{
MessageID: replyToID,
@@ -284,20 +303,21 @@ func (c *DiscordChannel) sendChunk(ctx context.Context, channelID, content, repl
})
} else {
// Otherwise, we send a normal message
_, err = c.session.ChannelMessageSend(channelID, content)
msg, err = c.session.ChannelMessageSend(channelID, content)
}
done <- err
if err != nil {
done <- result{err: fmt.Errorf("discord send: %w", channels.ErrTemporary)}
return
}
done <- result{id: msg.ID}
}()
select {
case err := <-done:
if err != nil {
return fmt.Errorf("discord send: %w", channels.ErrTemporary)
}
return nil
case r := <-done:
return r.id, r.err
case <-sendCtx.Done():
return sendCtx.Err()
return "", sendCtx.Err()
}
}
+4 -4
View File
@@ -36,8 +36,8 @@ func (c *FeishuChannel) Stop(ctx context.Context) error {
}
// Send is a stub method to satisfy the Channel interface
func (c *FeishuChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
return errUnsupported
func (c *FeishuChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
return nil, errUnsupported
}
// EditMessage is a stub method to satisfy MessageEditor
@@ -56,6 +56,6 @@ func (c *FeishuChannel) ReactToMessage(ctx context.Context, chatID, messageID st
}
// SendMedia is a stub method to satisfy MediaSender
func (c *FeishuChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
return errUnsupported
func (c *FeishuChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) {
return nil, errUnsupported
}
+14 -14
View File
@@ -131,26 +131,26 @@ func (c *FeishuChannel) Stop(ctx context.Context) error {
// Send sends a message using Interactive Card format for markdown rendering.
// Falls back to plain text message if card sending fails (e.g., table limit exceeded).
func (c *FeishuChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
func (c *FeishuChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
if msg.ChatID == "" {
return fmt.Errorf("chat ID is empty: %w", channels.ErrSendFailed)
return nil, fmt.Errorf("chat ID is empty: %w", channels.ErrSendFailed)
}
// Build interactive card with markdown content
cardContent, err := buildMarkdownCard(msg.Content)
if err != nil {
// If card build fails, fall back to plain text
return c.sendText(ctx, msg.ChatID, msg.Content)
return nil, c.sendText(ctx, msg.ChatID, msg.Content)
}
// First attempt: try sending as interactive card
err = c.sendCard(ctx, msg.ChatID, cardContent)
if err == nil {
return nil
return nil, nil
}
// Check if error is due to card table limit (error code 11310)
@@ -167,14 +167,14 @@ func (c *FeishuChannel) Send(ctx context.Context, msg bus.OutboundMessage) error
// Second attempt: fall back to plain text message
textErr := c.sendText(ctx, msg.ChatID, msg.Content)
if textErr == nil {
return nil
return nil, nil
}
// If text also fails, return the text error
return textErr
return nil, textErr
}
// For other errors, return the original card error
return err
return nil, err
}
// EditMessage implements channels.MessageEditor.
@@ -310,27 +310,27 @@ func (c *FeishuChannel) ReactToMessage(ctx context.Context, chatID, messageID st
// SendMedia implements channels.MediaSender.
// Uploads images/files via Feishu API then sends as messages.
func (c *FeishuChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
func (c *FeishuChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
if msg.ChatID == "" {
return fmt.Errorf("chat ID is empty: %w", channels.ErrSendFailed)
return nil, fmt.Errorf("chat ID is empty: %w", channels.ErrSendFailed)
}
store := c.GetMediaStore()
if store == nil {
return fmt.Errorf("no media store available: %w", channels.ErrSendFailed)
return nil, fmt.Errorf("no media store available: %w", channels.ErrSendFailed)
}
for _, part := range msg.Parts {
if err := c.sendMediaPart(ctx, msg.ChatID, part, store); err != nil {
return err
return nil, err
}
}
return nil
return nil, nil
}
// sendMediaPart resolves and sends a single media part.
+5 -5
View File
@@ -130,18 +130,18 @@ func (c *IRCChannel) Stop(ctx context.Context) error {
}
// Send sends a message to an IRC channel or user.
func (c *IRCChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
func (c *IRCChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
target := msg.ChatID
if target == "" {
return fmt.Errorf("chat ID is empty: %w", channels.ErrSendFailed)
return nil, fmt.Errorf("chat ID is empty: %w", channels.ErrSendFailed)
}
if strings.TrimSpace(msg.Content) == "" {
return nil
return nil, nil
}
// Send each line separately (IRC is line-oriented)
@@ -158,7 +158,7 @@ func (c *IRCChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
"target": target,
"lines": len(lines),
})
return nil
return nil, nil
}
// StartTyping implements channels.TypingCapable using IRCv3 +typing client tag.
+9 -9
View File
@@ -496,9 +496,9 @@ func (c *LINEChannel) resolveChatID(source lineSource) string {
// Send sends a message to LINE. It first tries the Reply API (free)
// using a cached reply token, then falls back to the Push API.
func (c *LINEChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
func (c *LINEChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
// Load and consume quote token for this chat
@@ -516,28 +516,28 @@ func (c *LINEChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
"chat_id": msg.ChatID,
"quoted": quoteToken != "",
})
return nil
return nil, nil
}
logger.DebugC("line", "Reply API failed, falling back to Push API")
}
}
// Fall back to Push API
return c.sendPush(ctx, msg.ChatID, msg.Content, quoteToken)
return nil, c.sendPush(ctx, msg.ChatID, msg.Content, quoteToken)
}
// SendMedia implements the channels.MediaSender interface.
// LINE requires media to be accessible via public URL; since we only have local files,
// we fall back to sending a text message with the filename/caption.
// For full support, an external file hosting service would be needed.
func (c *LINEChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
func (c *LINEChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
store := c.GetMediaStore()
if store == nil {
return fmt.Errorf("no media store available: %w", channels.ErrSendFailed)
return nil, fmt.Errorf("no media store available: %w", channels.ErrSendFailed)
}
// LINE Messaging API requires publicly accessible URLs for media messages.
@@ -549,11 +549,11 @@ func (c *LINEChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessag
}
if err := c.sendPush(ctx, msg.ChatID, caption, ""); err != nil {
return err
return nil, err
}
}
return nil
return nil, nil
}
// buildTextMessage creates a text message object, optionally with quoteToken.
+6 -6
View File
@@ -240,15 +240,15 @@ func (c *MaixCamChannel) Stop(ctx context.Context) error {
return nil
}
func (c *MaixCamChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
func (c *MaixCamChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
// Check ctx before entering write path
select {
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
default:
}
@@ -257,7 +257,7 @@ func (c *MaixCamChannel) Send(ctx context.Context, msg bus.OutboundMessage) erro
if len(c.clients) == 0 {
logger.WarnC("maixcam", "No MaixCam devices connected")
return fmt.Errorf("no connected MaixCam devices")
return nil, fmt.Errorf("no connected MaixCam devices")
}
response := map[string]any{
@@ -269,7 +269,7 @@ func (c *MaixCamChannel) Send(ctx context.Context, msg bus.OutboundMessage) erro
data, err := json.Marshal(response)
if err != nil {
return fmt.Errorf("failed to marshal response: %w", err)
return nil, fmt.Errorf("failed to marshal response: %w", err)
}
var sendErr error
@@ -285,5 +285,5 @@ func (c *MaixCamChannel) Send(ctx context.Context, msg bus.OutboundMessage) erro
_ = conn.SetWriteDeadline(time.Time{})
}
return sendErr
return nil, sendErr
}
+37 -26
View File
@@ -158,8 +158,8 @@ func (m *Manager) RecordReactionUndo(channel, chatID string, undo func()) {
}
// preSend handles typing stop, reaction undo, and placeholder editing before sending a message.
// Returns true if the message was already delivered (skip Send).
func (m *Manager) preSend(ctx context.Context, name string, msg bus.OutboundMessage, ch Channel) bool {
// Returns the delivered message IDs and true when delivery completed before a normal Send.
func (m *Manager) preSend(ctx context.Context, name string, msg bus.OutboundMessage, ch Channel) ([]string, bool) {
key := name + ":" + msg.ChatID
// 1. Stop typing
@@ -188,7 +188,7 @@ func (m *Manager) preSend(ctx context.Context, name string, msg bus.OutboundMess
}
}
}
return true
return nil, true
}
// 4. Try editing placeholder
@@ -196,14 +196,14 @@ func (m *Manager) preSend(ctx context.Context, name string, msg bus.OutboundMess
if entry, ok := v.(placeholderEntry); ok && entry.id != "" {
if editor, ok := ch.(MessageEditor); ok {
if err := editor.EditMessage(ctx, msg.ChatID, entry.id, msg.Content); err == nil {
return true // edited successfully, skip Send
return []string{entry.id}, true
}
// edit failed → fall through to normal Send
}
}
}
return false
return nil, false
}
// preSendMedia handles typing stop, reaction undo, and placeholder cleanup
@@ -699,23 +699,29 @@ func splitByLength(content string, maxLen int) []string {
// - ErrNotRunning / ErrSendFailed: permanent, no retry
// - ErrRateLimit: fixed delay retry
// - ErrTemporary / unknown: exponential backoff retry
func (m *Manager) sendWithRetry(ctx context.Context, name string, w *channelWorker, msg bus.OutboundMessage) {
func (m *Manager) sendWithRetry(
ctx context.Context,
name string,
w *channelWorker,
msg bus.OutboundMessage,
) ([]string, bool) {
// Rate limit: wait for token
if err := w.limiter.Wait(ctx); err != nil {
// ctx canceled, shutting down
return
return nil, false
}
// Pre-send: stop typing and try to edit placeholder
if m.preSend(ctx, name, msg, w.ch) {
return // placeholder was edited successfully, skip Send
if msgIDs, handled := m.preSend(ctx, name, msg, w.ch); handled {
return msgIDs, true
}
var lastErr error
var msgIDs []string
for attempt := 0; attempt <= maxRetries; attempt++ {
lastErr = w.ch.Send(ctx, msg)
msgIDs, lastErr = w.ch.Send(ctx, msg)
if lastErr == nil {
return
return msgIDs, true
}
// Permanent failures — don't retry
@@ -734,7 +740,7 @@ func (m *Manager) sendWithRetry(ctx context.Context, name string, w *channelWork
case <-time.After(rateLimitDelay):
continue
case <-ctx.Done():
return
return nil, false
}
}
@@ -743,7 +749,7 @@ func (m *Manager) sendWithRetry(ctx context.Context, name string, w *channelWork
select {
case <-time.After(backoff):
case <-ctx.Done():
return
return nil, false
}
}
@@ -754,6 +760,8 @@ func (m *Manager) sendWithRetry(ctx context.Context, name string, w *channelWork
"error": lastErr.Error(),
"retries": maxRetries,
})
return nil, false
}
func dispatchLoop[M any](
@@ -855,7 +863,7 @@ func (m *Manager) runMediaWorker(ctx context.Context, name string, w *channelWor
if !ok {
return
}
_ = m.sendMediaWithRetry(ctx, name, w, msg)
_, _ = m.sendMediaWithRetry(ctx, name, w, msg)
case <-ctx.Done():
return
}
@@ -863,14 +871,14 @@ func (m *Manager) runMediaWorker(ctx context.Context, name string, w *channelWor
}
// sendMediaWithRetry sends a media message through the channel with rate limiting and
// retry logic. It returns nil on success, or the last error after retries,
// including when the channel does not support MediaSender.
// retry logic. It returns the message IDs and nil on success, or nil and the last error
// after retries, including when the channel does not support MediaSender.
func (m *Manager) sendMediaWithRetry(
ctx context.Context,
name string,
w *channelWorker,
msg bus.OutboundMediaMessage,
) error {
) ([]string, error) {
ms, ok := w.ch.(MediaSender)
if !ok {
err := fmt.Errorf("channel %q does not support media sending", name)
@@ -878,22 +886,23 @@ func (m *Manager) sendMediaWithRetry(
"channel": name,
"error": err.Error(),
})
return err
return nil, err
}
// Rate limit: wait for token
if err := w.limiter.Wait(ctx); err != nil {
return err
return nil, err
}
// Pre-send: stop typing and clean up any placeholder before sending media.
m.preSendMedia(ctx, name, msg, w.ch)
var lastErr error
var msgIDs []string
for attempt := 0; attempt <= maxRetries; attempt++ {
lastErr = ms.SendMedia(ctx, msg)
msgIDs, lastErr = ms.SendMedia(ctx, msg)
if lastErr == nil {
return nil
return msgIDs, nil
}
// Permanent failures — don't retry
@@ -912,7 +921,7 @@ func (m *Manager) sendMediaWithRetry(
case <-time.After(rateLimitDelay):
continue
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
}
}
@@ -921,7 +930,7 @@ func (m *Manager) sendMediaWithRetry(
select {
case <-time.After(backoff):
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
}
}
@@ -932,7 +941,7 @@ func (m *Manager) sendMediaWithRetry(
"error": lastErr.Error(),
"retries": maxRetries,
})
return lastErr
return nil, lastErr
}
// runTTLJanitor periodically scans the typingStops and placeholders maps
@@ -1166,7 +1175,8 @@ func (m *Manager) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) e
return fmt.Errorf("channel %s has no active worker", msg.Channel)
}
return m.sendMediaWithRetry(ctx, msg.Channel, w, msg)
_, err := m.sendMediaWithRetry(ctx, msg.Channel, w, msg)
return err
}
func (m *Manager) SendToChannel(ctx context.Context, channelName, chatID, content string) error {
@@ -1196,5 +1206,6 @@ func (m *Manager) SendToChannel(ctx context.Context, channelName, chatID, conten
// Fallback: direct send (should not happen)
channel, _ := m.channels[channelName]
return channel.Send(ctx, msg)
_, err := channel.Send(ctx, msg)
return err
}
+19 -16
View File
@@ -25,9 +25,12 @@ type mockChannel struct {
lastPlaceholderID string
}
func (m *mockChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
func (m *mockChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
m.sentMessages = append(m.sentMessages, msg)
return m.sendFn(ctx, msg)
if m.sendFn == nil {
return nil, nil
}
return nil, m.sendFn(ctx, msg)
}
func (m *mockChannel) Start(ctx context.Context) error { return nil }
@@ -46,16 +49,16 @@ func (m *mockChannel) EditMessage(ctx context.Context, chatID, messageID, conten
type mockMediaChannel struct {
mockChannel
sendMediaFn func(ctx context.Context, msg bus.OutboundMediaMessage) error
sendMediaFn func(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error)
sentMediaMessages []bus.OutboundMediaMessage
}
func (m *mockMediaChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
func (m *mockMediaChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) {
m.sentMediaMessages = append(m.sentMediaMessages, msg)
if m.sendMediaFn != nil {
return m.sendMediaFn(ctx, msg)
}
return nil
return nil, nil
}
type mockDeletingMediaChannel struct {
@@ -247,9 +250,9 @@ func TestSendMedia_Success(t *testing.T) {
m := newTestManager()
var callCount int
ch := &mockMediaChannel{
sendMediaFn: func(_ context.Context, _ bus.OutboundMediaMessage) error {
sendMediaFn: func(_ context.Context, _ bus.OutboundMediaMessage) ([]string, error) {
callCount++
return nil
return nil, nil
},
}
w := &channelWorker{
@@ -275,8 +278,8 @@ func TestSendMedia_Success(t *testing.T) {
func TestSendMedia_PropagatesFailure(t *testing.T) {
m := newTestManager()
ch := &mockMediaChannel{
sendMediaFn: func(_ context.Context, _ bus.OutboundMediaMessage) error {
return fmt.Errorf("bad upload: %w", ErrSendFailed)
sendMediaFn: func(_ context.Context, _ bus.OutboundMediaMessage) ([]string, error) {
return nil, fmt.Errorf("bad upload: %w", ErrSendFailed)
},
}
w := &channelWorker{
@@ -330,8 +333,8 @@ func TestSendMedia_DeletesPlaceholderBeforeSending(t *testing.T) {
m := newTestManager()
ch := &mockDeletingMediaChannel{
mockMediaChannel: mockMediaChannel{
sendMediaFn: func(_ context.Context, _ bus.OutboundMediaMessage) error {
return nil
sendMediaFn: func(_ context.Context, _ bus.OutboundMediaMessage) ([]string, error) {
return nil, nil
},
},
}
@@ -628,7 +631,7 @@ func TestPreSend_PlaceholderEditSuccess(t *testing.T) {
m.RecordPlaceholder("test", "123", "456")
msg := bus.OutboundMessage{Channel: "test", ChatID: "123", Content: "hello"}
edited := m.preSend(context.Background(), "test", msg, ch)
_, edited := m.preSend(context.Background(), "test", msg, ch)
if !edited {
t.Fatal("expected preSend to return true (placeholder edited)")
@@ -658,7 +661,7 @@ func TestPreSend_PlaceholderEditFails_FallsThrough(t *testing.T) {
m.RecordPlaceholder("test", "123", "456")
msg := bus.OutboundMessage{Channel: "test", ChatID: "123", Content: "hello"}
edited := m.preSend(context.Background(), "test", msg, ch)
_, edited := m.preSend(context.Background(), "test", msg, ch)
if edited {
t.Fatal("expected preSend to return false when edit fails")
@@ -734,7 +737,7 @@ func TestPreSend_NoRegisteredState(t *testing.T) {
}
msg := bus.OutboundMessage{Channel: "test", ChatID: "123", Content: "hello"}
edited := m.preSend(context.Background(), "test", msg, ch)
_, edited := m.preSend(context.Background(), "test", msg, ch)
if edited {
t.Fatal("expected preSend to return false with no registered state")
@@ -764,7 +767,7 @@ func TestPreSend_TypingAndPlaceholder(t *testing.T) {
m.RecordPlaceholder("test", "123", "456")
msg := bus.OutboundMessage{Channel: "test", ChatID: "123", Content: "hello"}
edited := m.preSend(context.Background(), "test", msg, ch)
_, edited := m.preSend(context.Background(), "test", msg, ch)
if !stopCalled {
t.Fatal("expected typing stop to be called")
@@ -1025,7 +1028,7 @@ func TestPreSendStillWorksWithWrappedTypes(t *testing.T) {
m.RecordPlaceholder("test", "chat1", "ph_id")
msg := bus.OutboundMessage{Channel: "test", ChatID: "chat1", Content: "response"}
edited := m.preSend(context.Background(), "test", msg, ch)
_, edited := m.preSend(context.Background(), "test", msg, ch)
if !stopCalled {
t.Fatal("expected typing stop to be called via wrapped type")
+21 -16
View File
@@ -380,26 +380,26 @@ func markdownToHTML(md string) string {
return strings.TrimSpace(string(markdown.ToHTML([]byte(md), p, renderer)))
}
func (c *MatrixChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
func (c *MatrixChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
roomID := id.RoomID(strings.TrimSpace(msg.ChatID))
if roomID == "" {
return fmt.Errorf("matrix room ID is empty: %w", channels.ErrSendFailed)
return nil, fmt.Errorf("matrix room ID is empty: %w", channels.ErrSendFailed)
}
content := strings.TrimSpace(msg.Content)
if content == "" {
return nil
return nil, nil
}
_, err := c.client.SendMessageEvent(ctx, roomID, event.EventMessage, c.messageContent(content))
resp, err := c.client.SendMessageEvent(ctx, roomID, event.EventMessage, c.messageContent(content))
if err != nil {
return fmt.Errorf("matrix send: %w", channels.ErrTemporary)
return nil, fmt.Errorf("matrix send: %w", channels.ErrTemporary)
}
return nil
return []string{resp.EventID.String()}, nil
}
func (c *MatrixChannel) messageContent(text string) *event.MessageEventContent {
@@ -412,9 +412,9 @@ func (c *MatrixChannel) messageContent(text string) *event.MessageEventContent {
}
// SendMedia implements channels.MediaSender.
func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
sendCtx := ctx
if sendCtx == nil {
@@ -423,17 +423,18 @@ func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess
roomID := id.RoomID(strings.TrimSpace(msg.ChatID))
if roomID == "" {
return fmt.Errorf("matrix room ID is empty: %w", channels.ErrSendFailed)
return nil, fmt.Errorf("matrix room ID is empty: %w", channels.ErrSendFailed)
}
store := c.GetMediaStore()
if store == nil {
return fmt.Errorf("no media store available: %w", channels.ErrSendFailed)
return nil, fmt.Errorf("no media store available: %w", channels.ErrSendFailed)
}
var eventIDs []string
for _, part := range msg.Parts {
if err := sendCtx.Err(); err != nil {
return err
return nil, err
}
localPath, meta, err := store.ResolveWithMeta(part.Ref)
@@ -498,7 +499,7 @@ func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess
"type": part.Type,
"error": err.Error(),
})
return fmt.Errorf("matrix upload media: %w", channels.ErrTemporary)
return nil, fmt.Errorf("matrix upload media: %w", channels.ErrTemporary)
}
msgType := matrixOutboundMsgType(part.Type, filename, contentType)
@@ -511,17 +512,21 @@ func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess
uploadResp.ContentURI.CUString(),
)
if _, err := c.client.SendMessageEvent(sendCtx, roomID, event.EventMessage, content); err != nil {
sendResp, err := c.client.SendMessageEvent(sendCtx, roomID, event.EventMessage, content)
if err != nil {
logger.ErrorCF("matrix", "Failed to send media message", map[string]any{
"room_id": roomID.String(),
"type": msgType,
"error": err.Error(),
})
return fmt.Errorf("matrix send media: %w", channels.ErrTemporary)
return nil, fmt.Errorf("matrix send media: %w", channels.ErrTemporary)
}
if sendResp != nil {
eventIDs = append(eventIDs, sendResp.EventID.String())
}
}
return nil
return eventIDs, nil
}
// StartTyping implements channels.TypingCapable.
+1 -1
View File
@@ -11,5 +11,5 @@ import (
// Manager discovers channels implementing this interface via type
// assertion and routes OutboundMediaMessage to them.
type MediaSender interface {
SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error
SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error)
}
+18 -18
View File
@@ -391,15 +391,15 @@ func (c *OneBotChannel) Stop(ctx context.Context) error {
return nil
}
func (c *OneBotChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
func (c *OneBotChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
// Check ctx before entering write path
select {
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
default:
}
@@ -408,12 +408,12 @@ func (c *OneBotChannel) Send(ctx context.Context, msg bus.OutboundMessage) error
c.mu.Unlock()
if conn == nil {
return fmt.Errorf("OneBot WebSocket not connected")
return nil, fmt.Errorf("OneBot WebSocket not connected")
}
action, params, err := c.buildSendRequest(msg)
if err != nil {
return err
return nil, err
}
echo := fmt.Sprintf("send_%d", atomic.AddInt64(&c.echoCounter, 1))
@@ -426,7 +426,7 @@ func (c *OneBotChannel) Send(ctx context.Context, msg bus.OutboundMessage) error
data, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("failed to marshal OneBot request: %w", err)
return nil, fmt.Errorf("failed to marshal OneBot request: %w", err)
}
c.writeMu.Lock()
@@ -439,21 +439,21 @@ func (c *OneBotChannel) Send(ctx context.Context, msg bus.OutboundMessage) error
logger.ErrorCF("onebot", "Failed to send message", map[string]any{
"error": err.Error(),
})
return fmt.Errorf("onebot send: %w", channels.ErrTemporary)
return nil, fmt.Errorf("onebot send: %w", channels.ErrTemporary)
}
return nil
return nil, nil
}
// SendMedia implements the channels.MediaSender interface.
func (c *OneBotChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
func (c *OneBotChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
select {
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
default:
}
@@ -462,12 +462,12 @@ func (c *OneBotChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess
c.mu.Unlock()
if conn == nil {
return fmt.Errorf("OneBot WebSocket not connected")
return nil, fmt.Errorf("OneBot WebSocket not connected")
}
store := c.GetMediaStore()
if store == nil {
return fmt.Errorf("no media store available: %w", channels.ErrSendFailed)
return nil, fmt.Errorf("no media store available: %w", channels.ErrSendFailed)
}
// Build media segments
@@ -508,7 +508,7 @@ func (c *OneBotChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess
}
if len(segments) == 0 {
return nil
return nil, nil
}
chatID := msg.ChatID
@@ -524,7 +524,7 @@ func (c *OneBotChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess
id, err := strconv.ParseInt(rawID, 10, 64)
if err != nil {
return fmt.Errorf("invalid %s in chatID: %s: %w", idKey, chatID, channels.ErrSendFailed)
return nil, fmt.Errorf("invalid %s in chatID: %s: %w", idKey, chatID, channels.ErrSendFailed)
}
echo := fmt.Sprintf("send_%d", atomic.AddInt64(&c.echoCounter, 1))
@@ -537,7 +537,7 @@ func (c *OneBotChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess
data, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("failed to marshal OneBot request: %w", err)
return nil, fmt.Errorf("failed to marshal OneBot request: %w", err)
}
c.writeMu.Lock()
@@ -550,10 +550,10 @@ func (c *OneBotChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess
logger.ErrorCF("onebot", "Failed to send media message", map[string]any{
"error": err.Error(),
})
return fmt.Errorf("onebot send media: %w", channels.ErrTemporary)
return nil, fmt.Errorf("onebot send media: %w", channels.ErrTemporary)
}
return nil
return nil, nil
}
func (c *OneBotChannel) buildMessageSegments(chatID, content string) []oneBotMessageSegment {
+4 -4
View File
@@ -273,22 +273,22 @@ func (c *PicoClientChannel) handleServerMessage(pc *picoConn, msg PicoMessage) {
}
// Send sends a message to the remote server.
func (c *PicoClientChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
func (c *PicoClientChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
c.mu.Lock()
pc := c.conn
c.mu.Unlock()
if pc == nil || pc.closed.Load() {
return channels.ErrSendFailed
return nil, channels.ErrSendFailed
}
outMsg := newMessage(TypeMessageSend, map[string]any{
"content": msg.Content,
})
outMsg.SessionID = strings.TrimPrefix(msg.ChatID, "pico_client:")
return pc.writeJSON(outMsg)
return nil, pc.writeJSON(outMsg)
}
// StartTyping implements channels.TypingCapable.
+4 -4
View File
@@ -46,7 +46,7 @@ func TestSend_NotRunning(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = ch.Send(context.Background(), bus.OutboundMessage{Content: "hi"})
_, err = ch.Send(context.Background(), bus.OutboundMessage{Content: "hi"})
if !errors.Is(err, channels.ErrNotRunning) {
t.Fatalf("expected ErrNotRunning, got %v", err)
}
@@ -124,7 +124,7 @@ func TestClientChannel_ConnectAndSend(t *testing.T) {
defer ch.Stop(ctx)
// Send a message
err = ch.Send(ctx, bus.OutboundMessage{
_, err = ch.Send(ctx, bus.OutboundMessage{
ChatID: "pico_client:sess-1",
Content: "hello",
})
@@ -179,7 +179,7 @@ func TestClientChannel_ReceivesServerMessage(t *testing.T) {
defer ch.Stop(ctx)
// Send a message; the echo server replies with message.create
err = ch.Send(ctx, bus.OutboundMessage{
_, err = ch.Send(ctx, bus.OutboundMessage{
ChatID: "pico_client:sess-echo",
Content: "ping",
})
@@ -252,7 +252,7 @@ func TestSend_ClosedConnection(t *testing.T) {
ch.conn.close()
ch.mu.Unlock()
err = ch.Send(ctx, bus.OutboundMessage{
_, err = ch.Send(ctx, bus.OutboundMessage{
ChatID: "pico_client:sess-close",
Content: "should fail",
})
+3 -3
View File
@@ -234,16 +234,16 @@ func (c *PicoChannel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
// Send implements Channel — sends a message to the appropriate WebSocket connection.
func (c *PicoChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
func (c *PicoChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
outMsg := newMessage(TypeMessageCreate, map[string]any{
"content": msg.Content,
})
return c.broadcastToSession(msg.ChatID, outMsg)
return nil, c.broadcastToSession(msg.ChatID, outMsg)
}
// EditMessage implements channels.MessageEditor.
+30 -19
View File
@@ -200,9 +200,9 @@ func (c *QQChannel) getChatKind(chatID string) string {
return "group"
}
func (c *QQChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
func (c *QQChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
chatKind := c.getChatKind(msg.ChatID)
@@ -236,11 +236,14 @@ func (c *QQChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
}
// Route to group or C2C.
var err error
var (
sentMsg *dto.Message
err error
)
if chatKind == "group" {
_, err = c.api.PostGroupMessage(ctx, msg.ChatID, msgToCreate)
sentMsg, err = c.api.PostGroupMessage(ctx, msg.ChatID, msgToCreate)
} else {
_, err = c.api.PostC2CMessage(ctx, msg.ChatID, msgToCreate)
sentMsg, err = c.api.PostC2CMessage(ctx, msg.ChatID, msgToCreate)
}
if err != nil {
@@ -249,10 +252,13 @@ func (c *QQChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
"chat_kind": chatKind,
"error": err.Error(),
})
return fmt.Errorf("qq send: %w", channels.ErrTemporary)
return nil, fmt.Errorf("qq send: %w", channels.ErrTemporary)
}
return nil
if sentMsg == nil {
return nil, nil
}
return []string{sentMsg.ID}, nil
}
// StartTyping implements channels.TypingCapable.
@@ -319,13 +325,14 @@ func (c *QQChannel) StartTyping(ctx context.Context, chatID string) (func(), err
// QQ group/C2C media sending is a two-step flow:
// 1. Upload media to /files using a remote URL or base64-encoded local bytes.
// 2. Send a msg_type=7 message using the returned file_info.
func (c *QQChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
func (c *QQChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
chatKind := c.getChatKind(msg.ChatID)
var messageIDs []string
for _, part := range msg.Parts {
fileInfo, err := c.uploadMedia(ctx, chatKind, msg.ChatID, part)
if err != nil {
@@ -335,22 +342,26 @@ func (c *QQChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage)
"error": err.Error(),
})
if errors.Is(err, channels.ErrSendFailed) {
return err
return nil, err
}
return fmt.Errorf("qq send media: %w", channels.ErrTemporary)
return nil, fmt.Errorf("qq send media: %w", channels.ErrTemporary)
}
if err := c.sendUploadedMedia(ctx, chatKind, msg.ChatID, part, fileInfo); err != nil {
sentMsg, err := c.sendUploadedMedia(ctx, chatKind, msg.ChatID, part, fileInfo)
if err != nil {
logger.ErrorCF("qq", "Failed to send media", map[string]any{
"type": part.Type,
"chat_id": msg.ChatID,
"error": err.Error(),
})
return fmt.Errorf("qq send media: %w", channels.ErrTemporary)
return nil, fmt.Errorf("qq send media: %w", channels.ErrTemporary)
}
if sentMsg != nil && sentMsg.ID != "" {
messageIDs = append(messageIDs, sentMsg.ID)
}
}
return nil
return messageIDs, nil
}
type qqMediaUpload struct {
@@ -517,7 +528,7 @@ func (c *QQChannel) sendUploadedMedia(
chatKind, chatID string,
part bus.MediaPart,
fileInfo []byte,
) error {
) (*dto.Message, error) {
msg := &dto.MessageToCreate{
Content: part.Caption,
MsgType: dto.RichMediaMsg,
@@ -532,11 +543,11 @@ func (c *QQChannel) sendUploadedMedia(
}
if chatKind == "group" {
_, err := c.api.PostGroupMessage(ctx, chatID, msg)
return err
sentMsg, err := c.api.PostGroupMessage(ctx, chatID, msg)
return sentMsg, err
}
_, err := c.api.PostC2CMessage(ctx, chatID, msg)
return err
sentMsg, err := c.api.PostC2CMessage(ctx, chatID, msg)
return sentMsg, err
}
func (c *QQChannel) applyPassiveReplyMetadata(chatID string, msg *dto.MessageToCreate) {
+8 -8
View File
@@ -209,7 +209,7 @@ func TestSendMedia_UploadsLocalFileAsBase64(t *testing.T) {
ch.lastMsgID.Store("group-1", "msg-1")
ch.msgSeqCounters.Store("group-1", new(atomic.Uint64))
err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
_, err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
ChatID: "group-1",
Parts: []bus.MediaPart{{
Type: "image",
@@ -303,7 +303,7 @@ func assertAudioWAVUploadType(t *testing.T, duration time.Duration, wantFileType
ch.SetMediaStore(store)
ch.chatType.Store("group-1", "group")
err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
_, err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
ChatID: "group-1",
Parts: []bus.MediaPart{{
Type: "audio",
@@ -337,7 +337,7 @@ func TestSendMedia_RemoteAudioFallsBackToFileUpload(t *testing.T) {
ch.SetRunning(true)
ch.chatType.Store("user-1", "direct")
err := ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
_, err := ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
ChatID: "user-1",
Parts: []bus.MediaPart{{
Type: "audio",
@@ -383,7 +383,7 @@ func TestSendMedia_LocalAudioWithUnknownDurationFallsBackToFileUpload(t *testing
ch.SetMediaStore(store)
ch.chatType.Store("group-1", "group")
err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
_, err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
ChatID: "group-1",
Parts: []bus.MediaPart{{
Type: "audio",
@@ -417,7 +417,7 @@ func TestSendMedia_UsesRemoteURLUploadForC2C(t *testing.T) {
ch.SetRunning(true)
ch.chatType.Store("user-1", "direct")
err := ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
_, err := ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
ChatID: "user-1",
Parts: []bus.MediaPart{{
Type: "file",
@@ -490,7 +490,7 @@ func TestSendMedia_LocalFileUploadIncludesStoredFilename(t *testing.T) {
ch.SetMediaStore(store)
ch.chatType.Store("user-1", "direct")
err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
_, err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
ChatID: "user-1",
Parts: []bus.MediaPart{{
Type: "file",
@@ -528,7 +528,7 @@ func TestSendMedia_ReturnsSendFailedWithoutMediaStore(t *testing.T) {
ch.SetRunning(true)
ch.chatType.Store("group-1", "group")
err := ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
_, err := ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
ChatID: "group-1",
Parts: []bus.MediaPart{{
Type: "image",
@@ -578,7 +578,7 @@ func TestSendMedia_ReturnsSendFailedWhenLocalFileExceedsBase64MiBLimit(t *testin
ch.SetMediaStore(store)
ch.chatType.Store("group-1", "group")
err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
_, err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
ChatID: "group-1",
Parts: []bus.MediaPart{{
Type: "file",
+14 -12
View File
@@ -108,14 +108,14 @@ func (c *SlackChannel) Stop(ctx context.Context) error {
return nil
}
func (c *SlackChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
func (c *SlackChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
channelID, threadTS := parseSlackChatID(msg.ChatID)
if channelID == "" {
return fmt.Errorf("invalid slack chat ID: %s", msg.ChatID)
return nil, fmt.Errorf("invalid slack chat ID: %s", msg.ChatID)
}
opts := []slack.MsgOption{
@@ -130,9 +130,9 @@ func (c *SlackChannel) Send(ctx context.Context, msg bus.OutboundMessage) error
opts = append(opts, slack.MsgOptionTS(threadTS))
}
_, _, err := c.api.PostMessageContext(ctx, channelID, opts...)
_, ts, err := c.api.PostMessageContext(ctx, channelID, opts...)
if err != nil {
return fmt.Errorf("slack send: %w", channels.ErrTemporary)
return nil, fmt.Errorf("slack send: %w", channels.ErrTemporary)
}
if ref, ok := c.pendingAcks.LoadAndDelete(msg.ChatID); ok {
@@ -148,23 +148,23 @@ func (c *SlackChannel) Send(ctx context.Context, msg bus.OutboundMessage) error
"thread_ts": threadTS,
})
return nil
return []string{ts}, nil
}
// SendMedia implements the channels.MediaSender interface.
func (c *SlackChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
func (c *SlackChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
channelID, _ := parseSlackChatID(msg.ChatID)
if channelID == "" {
return fmt.Errorf("invalid slack chat ID: %s", msg.ChatID)
return nil, fmt.Errorf("invalid slack chat ID: %s", msg.ChatID)
}
store := c.GetMediaStore()
if store == nil {
return fmt.Errorf("no media store available: %w", channels.ErrSendFailed)
return nil, fmt.Errorf("no media store available: %w", channels.ErrSendFailed)
}
for _, part := range msg.Parts {
@@ -198,11 +198,13 @@ func (c *SlackChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessa
"filename": filename,
"error": err.Error(),
})
return fmt.Errorf("slack send media: %w", channels.ErrTemporary)
return nil, fmt.Errorf("slack send media: %w", channels.ErrTemporary)
}
}
return nil
// UploadFileV2 does not expose the posted message timestamp in its
// response; returning nil avoids conflating file IDs with message IDs.
return nil, nil
}
// ReactToMessage implements channels.ReactionCapable.
+41 -29
View File
@@ -168,26 +168,27 @@ func (c *TelegramChannel) Stop(ctx context.Context) error {
return nil
}
func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
useMarkdownV2 := c.config.Channels.Telegram.UseMarkdownV2
chatID, threadID, err := parseTelegramChatID(msg.ChatID)
if err != nil {
return fmt.Errorf("invalid chat ID %s: %w", msg.ChatID, channels.ErrSendFailed)
return nil, fmt.Errorf("invalid chat ID %s: %w", msg.ChatID, channels.ErrSendFailed)
}
if msg.Content == "" {
return nil
return nil, nil
}
// The Manager already splits messages to ≤4000 chars (WithMaxMessageLength),
// so msg.Content is guaranteed to be within that limit. We still need to
// check if HTML expansion pushes it beyond Telegram's 4096-char API limit.
replyToID := msg.ReplyToMessageID
var messageIDs []string
queue := []string{msg.Content}
for len(queue) > 0 {
chunk := queue[0]
@@ -206,16 +207,18 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
}
if smallerLen <= 0 {
if err := c.sendChunk(ctx, sendChunkParams{
msgID, err := c.sendChunk(ctx, sendChunkParams{
chatID: chatID,
threadID: threadID,
content: content,
replyToID: replyToID,
mdFallback: chunk,
useMarkdownV2: useMarkdownV2,
}); err != nil {
return err
})
if err != nil {
return nil, err
}
messageIDs = append(messageIDs, msgID)
replyToID = ""
continue
}
@@ -244,21 +247,23 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
continue
}
if err := c.sendChunk(ctx, sendChunkParams{
msgID, err := c.sendChunk(ctx, sendChunkParams{
chatID: chatID,
threadID: threadID,
content: content,
replyToID: replyToID,
mdFallback: chunk,
useMarkdownV2: useMarkdownV2,
}); err != nil {
return err
})
if err != nil {
return nil, err
}
messageIDs = append(messageIDs, msgID)
// Only the first chunk should be a reply; subsequent chunks are normal messages.
replyToID = ""
}
return nil
return messageIDs, nil
}
type sendChunkParams struct {
@@ -275,7 +280,7 @@ type sendChunkParams struct {
func (c *TelegramChannel) sendChunk(
ctx context.Context,
params sendChunkParams,
) error {
) (string, error) {
tgMsg := tu.Message(tu.ID(params.chatID), params.content)
tgMsg.MessageThreadID = params.threadID
if params.useMarkdownV2 {
@@ -292,17 +297,19 @@ func (c *TelegramChannel) sendChunk(
}
}
if _, err := c.bot.SendMessage(ctx, tgMsg); err != nil {
pMsg, err := c.bot.SendMessage(ctx, tgMsg)
if err != nil {
logParseFailed(err, params.useMarkdownV2)
tgMsg.Text = params.mdFallback
tgMsg.ParseMode = ""
if _, err = c.bot.SendMessage(ctx, tgMsg); err != nil {
return fmt.Errorf("telegram send: %w", channels.ErrTemporary)
pMsg, err = c.bot.SendMessage(ctx, tgMsg)
if err != nil {
return "", fmt.Errorf("telegram send: %w", channels.ErrTemporary)
}
}
return nil
return strconv.Itoa(pMsg.MessageID), nil
}
// maxTypingDuration limits how long the typing indicator can run.
@@ -420,21 +427,22 @@ func (c *TelegramChannel) SendPlaceholder(ctx context.Context, chatID string) (s
}
// SendMedia implements the channels.MediaSender interface.
func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
chatID, threadID, err := parseTelegramChatID(msg.ChatID)
if err != nil {
return fmt.Errorf("invalid chat ID %s: %w", msg.ChatID, channels.ErrSendFailed)
return nil, fmt.Errorf("invalid chat ID %s: %w", msg.ChatID, channels.ErrSendFailed)
}
store := c.GetMediaStore()
if store == nil {
return fmt.Errorf("no media store available: %w", channels.ErrSendFailed)
return nil, fmt.Errorf("no media store available: %w", channels.ErrSendFailed)
}
var messageIDs []string
for _, part := range msg.Parts {
localPath, err := store.Resolve(part.Ref)
if err != nil {
@@ -454,6 +462,7 @@ func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMe
continue
}
var tgResult *telego.Message
switch part.Type {
case "image":
params := &telego.SendPhotoParams{
@@ -462,11 +471,11 @@ func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMe
Photo: telego.InputFile{File: file},
Caption: part.Caption,
}
_, err = c.bot.SendPhoto(ctx, params)
tgResult, err = c.bot.SendPhoto(ctx, params)
if err != nil && strings.Contains(err.Error(), "PHOTO_INVALID_DIMENSIONS") {
if _, seekErr := file.Seek(0, io.SeekStart); seekErr != nil {
file.Close()
return fmt.Errorf("telegram rewind media after photo failure: %w", channels.ErrTemporary)
return nil, fmt.Errorf("telegram rewind media after photo failure: %w", channels.ErrTemporary)
}
docParams := &telego.SendDocumentParams{
@@ -475,7 +484,7 @@ func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMe
Document: telego.InputFile{File: file},
Caption: part.Caption,
}
_, err = c.bot.SendDocument(ctx, docParams)
tgResult, err = c.bot.SendDocument(ctx, docParams)
}
case "audio":
// Send OGG files with "voice" in the filename as Telegram voice
@@ -488,7 +497,7 @@ func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMe
Voice: telego.InputFile{File: file},
Caption: part.Caption,
}
_, err = c.bot.SendVoice(ctx, vparams)
tgResult, err = c.bot.SendVoice(ctx, vparams)
} else {
params := &telego.SendAudioParams{
ChatID: tu.ID(chatID),
@@ -496,7 +505,7 @@ func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMe
Audio: telego.InputFile{File: file},
Caption: part.Caption,
}
_, err = c.bot.SendAudio(ctx, params)
tgResult, err = c.bot.SendAudio(ctx, params)
}
case "video":
params := &telego.SendVideoParams{
@@ -505,7 +514,7 @@ func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMe
Video: telego.InputFile{File: file},
Caption: part.Caption,
}
_, err = c.bot.SendVideo(ctx, params)
tgResult, err = c.bot.SendVideo(ctx, params)
default: // "file" or unknown types
params := &telego.SendDocumentParams{
ChatID: tu.ID(chatID),
@@ -513,9 +522,12 @@ func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMe
Document: telego.InputFile{File: file},
Caption: part.Caption,
}
_, err = c.bot.SendDocument(ctx, params)
tgResult, err = c.bot.SendDocument(ctx, params)
}
if tgResult != nil {
messageIDs = append(messageIDs, strconv.Itoa(tgResult.MessageID))
}
file.Close()
if err != nil {
@@ -523,11 +535,11 @@ func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMe
"type": part.Type,
"error": err.Error(),
})
return fmt.Errorf("telegram send media: %w", channels.ErrTemporary)
return nil, fmt.Errorf("telegram send media: %w", channels.ErrTemporary)
}
}
return nil
return messageIDs, nil
}
func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Message) error {
+13 -13
View File
@@ -176,7 +176,7 @@ func TestSendMedia_ImageFallbacksToDocumentOnInvalidDimensions(t *testing.T) {
)
require.NoError(t, err)
err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
_, err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
ChatID: "12345",
Parts: []bus.MediaPart{{
Type: "image",
@@ -214,7 +214,7 @@ func TestSendMedia_ImageNonDimensionErrorDoesNotFallback(t *testing.T) {
ref, err := store.Store(localPath, media.MediaMeta{Filename: "image.png", ContentType: "image/png"}, "scope-1")
require.NoError(t, err)
err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
_, err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
ChatID: "12345",
Parts: []bus.MediaPart{{
Type: "image",
@@ -239,7 +239,7 @@ func TestSend_EmptyContent(t *testing.T) {
}
ch := newTestChannel(t, caller)
err := ch.Send(context.Background(), bus.OutboundMessage{
_, err := ch.Send(context.Background(), bus.OutboundMessage{
ChatID: "12345",
Content: "",
})
@@ -256,7 +256,7 @@ func TestSend_ShortMessage_SingleCall(t *testing.T) {
}
ch := newTestChannel(t, caller)
err := ch.Send(context.Background(), bus.OutboundMessage{
_, err := ch.Send(context.Background(), bus.OutboundMessage{
ChatID: "12345",
Content: "Hello, world!",
})
@@ -279,7 +279,7 @@ func TestSend_LongMessage_SingleCall(t *testing.T) {
longContent := strings.Repeat("a", 4000)
err := ch.Send(context.Background(), bus.OutboundMessage{
_, err := ch.Send(context.Background(), bus.OutboundMessage{
ChatID: "12345",
Content: longContent,
})
@@ -302,7 +302,7 @@ func TestSend_HTMLFallback_PerChunk(t *testing.T) {
}
ch := newTestChannel(t, caller)
err := ch.Send(context.Background(), bus.OutboundMessage{
_, err := ch.Send(context.Background(), bus.OutboundMessage{
ChatID: "12345",
Content: "Hello **world**",
})
@@ -320,7 +320,7 @@ func TestSend_HTMLFallback_BothFail(t *testing.T) {
}
ch := newTestChannel(t, caller)
err := ch.Send(context.Background(), bus.OutboundMessage{
_, err := ch.Send(context.Background(), bus.OutboundMessage{
ChatID: "12345",
Content: "Hello",
})
@@ -342,7 +342,7 @@ func TestSend_LongMessage_HTMLFallback_StopsOnError(t *testing.T) {
longContent := strings.Repeat("x", 4001)
err := ch.Send(context.Background(), bus.OutboundMessage{
_, err := ch.Send(context.Background(), bus.OutboundMessage{
ChatID: "12345",
Content: longContent,
})
@@ -372,7 +372,7 @@ func TestSend_MarkdownShortButHTMLLong_MultipleCalls(t *testing.T) {
"HTML expansion must exceed Telegram limit for this test to be meaningful",
)
err := ch.Send(context.Background(), bus.OutboundMessage{
_, err := ch.Send(context.Background(), bus.OutboundMessage{
ChatID: "12345",
Content: markdownContent,
})
@@ -407,7 +407,7 @@ func TestSend_HTMLOverflow_WordBoundary(t *testing.T) {
// Ensure the test content matches the intended boundary conditions.
assert.LessOrEqual(t, len([]rune(content)), 4000, "markdown content must not exceed chunk size for this test")
err := ch.Send(context.Background(), bus.OutboundMessage{
_, err := ch.Send(context.Background(), bus.OutboundMessage{
ChatID: "123456",
Content: content,
})
@@ -443,7 +443,7 @@ func TestSend_NotRunning(t *testing.T) {
ch := newTestChannel(t, caller)
ch.SetRunning(false)
err := ch.Send(context.Background(), bus.OutboundMessage{
_, err := ch.Send(context.Background(), bus.OutboundMessage{
ChatID: "12345",
Content: "Hello",
})
@@ -461,7 +461,7 @@ func TestSend_InvalidChatID(t *testing.T) {
}
ch := newTestChannel(t, caller)
err := ch.Send(context.Background(), bus.OutboundMessage{
_, err := ch.Send(context.Background(), bus.OutboundMessage{
ChatID: "not-a-number",
Content: "Hello",
})
@@ -518,7 +518,7 @@ func TestSend_WithForumThreadID(t *testing.T) {
}
ch := newTestChannel(t, caller)
err := ch.Send(context.Background(), bus.OutboundMessage{
_, err := ch.Send(context.Background(), bus.OutboundMessage{
ChatID: "-1001234567890/42",
Content: "Hello from topic",
})
+14 -14
View File
@@ -184,20 +184,20 @@ func (c *WeComChannel) BeginStream(_ context.Context, chatID string) (channels.S
}, nil
}
func (c *WeComChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
func (c *WeComChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
content := strings.TrimSpace(msg.Content)
if content == "" {
return nil
return nil, nil
}
if turn, ok := c.getTurn(msg.ChatID); ok {
if time.Since(turn.CreatedAt) <= wecomStreamMaxDuration {
if err := c.sendStreamReply(turn, content); err == nil {
c.consumeTurn(msg.ChatID, turn)
return nil
return nil, nil
}
}
c.consumeTurn(msg.ChatID, turn)
@@ -205,20 +205,20 @@ func (c *WeComChannel) Send(ctx context.Context, msg bus.OutboundMessage) error
if route, ok := c.routes.Get(msg.ChatID); ok {
if err := c.sendActivePush(route.ChatID, route.ChatType, content); err != nil {
return err
return nil, err
}
return nil
return nil, nil
}
if err := c.sendActivePush(msg.ChatID, 0, content); err != nil {
return err
return nil, err
}
return nil
return nil, nil
}
func (c *WeComChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
func (c *WeComChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
route, chatType, hasTurn := c.resolveMediaRoute(msg.ChatID)
@@ -231,7 +231,7 @@ func (c *WeComChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessa
if strings.TrimSpace(part.Ref) == "" {
if caption := strings.TrimSpace(part.Caption); caption != "" {
if err := c.sendActivePush(chatID, chatType, caption); err != nil {
return err
return nil, err
}
}
continue
@@ -239,7 +239,7 @@ func (c *WeComChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessa
localPath, filename, contentType, cleanup, err := c.resolveOutboundPart(ctx, part)
if err != nil {
return fmt.Errorf("wecom resolve media %q: %v: %w", part.Ref, err, channels.ErrSendFailed)
return nil, fmt.Errorf("wecom resolve media %q: %v: %w", part.Ref, err, channels.ErrSendFailed)
}
func() {
@@ -283,11 +283,11 @@ func (c *WeComChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessa
}
}()
if err != nil {
return err
return nil, err
}
}
return nil
return nil, nil
}
func (c *WeComChannel) connectLoop() {
+6 -6
View File
@@ -190,7 +190,7 @@ func TestSend_StreamFailureFallsBackToActualChatID(t *testing.T) {
return wecomTestAck(nil), nil
}
if err := ch.Send(context.Background(), bus.OutboundMessage{
if _, err := ch.Send(context.Background(), bus.OutboundMessage{
Channel: "wecom",
ChatID: "chat-1",
Content: "hello",
@@ -247,7 +247,7 @@ func TestSend_DoesNotSplitStreamReply(t *testing.T) {
}
content := strings.Repeat("\u4e2d", 30000)
if err := ch.Send(context.Background(), bus.OutboundMessage{
if _, err := ch.Send(context.Background(), bus.OutboundMessage{
Channel: "wecom",
ChatID: "chat-1",
Content: content,
@@ -283,7 +283,7 @@ func TestSend_DoesNotSplitActivePush(t *testing.T) {
}
content := strings.Repeat("a", 30000)
if err := ch.Send(context.Background(), bus.OutboundMessage{
if _, err := ch.Send(context.Background(), bus.OutboundMessage{
Channel: "wecom",
ChatID: "chat-1",
Content: content,
@@ -346,7 +346,7 @@ func TestSendMedia_SendsActiveImage(t *testing.T) {
}
}
err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
_, err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
Channel: "wecom",
ChatID: "chat-1",
Parts: []bus.MediaPart{{
@@ -457,7 +457,7 @@ func TestSendMedia_UsesTurnImageAndFinishesStream(t *testing.T) {
}
}
err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
_, err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
Channel: "wecom",
ChatID: "chat-1",
Parts: []bus.MediaPart{{
@@ -553,7 +553,7 @@ func TestSendMedia_SendsActiveFile(t *testing.T) {
}
}
err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
_, err = ch.SendMedia(context.Background(), bus.OutboundMediaMessage{
Channel: "wecom",
ChatID: "chat-2",
Parts: []bus.MediaPart{{
+8 -8
View File
@@ -1097,12 +1097,12 @@ func (c *WeixinChannel) StartTyping(ctx context.Context, chatID string) (func(),
}
// SendMedia implements channels.MediaSender.
func (c *WeixinChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
func (c *WeixinChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) {
if !c.IsRunning() {
return basechannels.ErrNotRunning
return nil, basechannels.ErrNotRunning
}
if err := c.ensureSessionActive(); err != nil {
return err
return nil, err
}
contextToken := ""
@@ -1110,7 +1110,7 @@ func (c *WeixinChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess
contextToken, _ = v.(string)
}
if contextToken == "" {
return fmt.Errorf(
return nil, fmt.Errorf(
"weixin send media: missing context token for chat %s: %w",
msg.ChatID,
basechannels.ErrSendFailed,
@@ -1125,7 +1125,7 @@ func (c *WeixinChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess
"ref": part.Ref,
"error": err.Error(),
})
return fmt.Errorf("weixin send media: %w", basechannels.ErrSendFailed)
return nil, fmt.Errorf("weixin send media: %w", basechannels.ErrSendFailed)
}
func() {
if cleanup != nil {
@@ -1147,11 +1147,11 @@ func (c *WeixinChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMess
"error": err.Error(),
})
if c.remainingPause() > 0 {
return fmt.Errorf("weixin send media: %w", basechannels.ErrSendFailed)
return nil, fmt.Errorf("weixin send media: %w", basechannels.ErrSendFailed)
}
return fmt.Errorf("weixin send media: %w", basechannels.ErrTemporary)
return nil, fmt.Errorf("weixin send media: %w", basechannels.ErrTemporary)
}
}
return nil
return nil, nil
}
+8 -8
View File
@@ -358,16 +358,16 @@ func (c *WeixinChannel) handleInboundMessage(ctx context.Context, msg WeixinMess
}
// Send implements channels.Channel by sending a text message to the WeChat user.
func (c *WeixinChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
func (c *WeixinChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
if err := c.ensureSessionActive(); err != nil {
return err
return nil, err
}
if msg.Content == "" {
return nil
return nil, nil
}
// We need a context_token to send a reply. It should be stored in the conversation metadata.
@@ -386,7 +386,7 @@ func (c *WeixinChannel) Send(ctx context.Context, msg bus.OutboundMessage) error
logger.ErrorCF("weixin", "Missing context token, cannot send message", map[string]any{
"to_user_id": toUserID,
})
return fmt.Errorf("weixin send: %w: missing context token for chat %s", channels.ErrSendFailed, toUserID)
return nil, fmt.Errorf("weixin send: %w: missing context token for chat %s", channels.ErrSendFailed, toUserID)
}
if err := c.sendTextMessage(ctx, toUserID, contextToken, msg.Content); err != nil {
@@ -395,10 +395,10 @@ func (c *WeixinChannel) Send(ctx context.Context, msg bus.OutboundMessage) error
"error": err.Error(),
})
if c.remainingPause() > 0 {
return fmt.Errorf("weixin send: %w", channels.ErrSendFailed)
return nil, fmt.Errorf("weixin send: %w", channels.ErrSendFailed)
}
return fmt.Errorf("weixin send: %w", channels.ErrTemporary)
return nil, fmt.Errorf("weixin send: %w", channels.ErrTemporary)
}
return nil
return nil, nil
}
+7 -7
View File
@@ -104,15 +104,15 @@ func (c *WhatsAppChannel) Stop(ctx context.Context) error {
return nil
}
func (c *WhatsAppChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
func (c *WhatsAppChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
// Check ctx before acquiring lock
select {
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
default:
}
@@ -120,7 +120,7 @@ func (c *WhatsAppChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
defer c.mu.Unlock()
if c.conn == nil {
return fmt.Errorf("whatsapp connection not established: %w", channels.ErrTemporary)
return nil, fmt.Errorf("whatsapp connection not established: %w", channels.ErrTemporary)
}
payload := map[string]any{
@@ -131,17 +131,17 @@ func (c *WhatsAppChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
data, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
return nil, fmt.Errorf("failed to marshal message: %w", err)
}
_ = c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := c.conn.WriteMessage(websocket.TextMessage, data); err != nil {
_ = c.conn.SetWriteDeadline(time.Time{})
return fmt.Errorf("whatsapp send: %w", channels.ErrTemporary)
return nil, fmt.Errorf("whatsapp send: %w", channels.ErrTemporary)
}
_ = c.conn.SetWriteDeadline(time.Time{})
return nil
return nil, nil
}
func (c *WhatsAppChannel) listen() {
@@ -396,13 +396,13 @@ func (c *WhatsAppNativeChannel) handleIncoming(evt *events.Message) {
c.HandleMessage(c.runCtx, peer, messageID, senderID, chatID, content, mediaPaths, metadata, sender)
}
func (c *WhatsAppNativeChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
func (c *WhatsAppNativeChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
if !c.IsRunning() {
return channels.ErrNotRunning
return nil, channels.ErrNotRunning
}
select {
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
default:
}
@@ -411,18 +411,18 @@ func (c *WhatsAppNativeChannel) Send(ctx context.Context, msg bus.OutboundMessag
c.mu.Unlock()
if client == nil || !client.IsConnected() {
return fmt.Errorf("whatsapp connection not established: %w", channels.ErrTemporary)
return nil, fmt.Errorf("whatsapp connection not established: %w", channels.ErrTemporary)
}
// Detect unpaired state: the client is connected (to WhatsApp servers)
// but has not completed QR-login yet, so sending would fail.
if client.Store.ID == nil {
return fmt.Errorf("whatsapp not yet paired (QR login pending): %w", channels.ErrTemporary)
return nil, fmt.Errorf("whatsapp not yet paired (QR login pending): %w", channels.ErrTemporary)
}
to, err := parseJID(msg.ChatID)
if err != nil {
return fmt.Errorf("invalid chat id %q: %w", msg.ChatID, err)
return nil, fmt.Errorf("invalid chat id %q: %w", msg.ChatID, err)
}
waMsg := &waE2E.Message{
@@ -430,9 +430,9 @@ func (c *WhatsAppNativeChannel) Send(ctx context.Context, msg bus.OutboundMessag
}
if _, err = client.SendMessage(ctx, to, waMsg); err != nil {
return fmt.Errorf("whatsapp send: %w", channels.ErrTemporary)
return nil, fmt.Errorf("whatsapp send: %w", channels.ErrTemporary)
}
return nil
return nil, nil
}
// parseJID converts a chat ID (phone number or JID string) to types.JID.