Files
picoclaw/pkg/channels/whatsapp/whatsapp.go
T
DimonB 6c0798ca3f feat(channels): make Channel.Send return delivered message IDs (#2190)
* feat(channels): Channel.Send and MediaSender.SendMedia return delivered message IDs

Change Channel.Send signature from (ctx, msg) error to (ctx, msg) ([]string, error)
and MediaSender.SendMedia similarly, so callers can capture platform message IDs
for threading, reactions, and history annotation.

Adapters that return real IDs: Telegram (per-chunk MessageID), Discord (Message.ID),
Slack Send (ts), QQ (sentMsg.ID), Matrix (EventID). Slack SendMedia returns nil
because UploadFileV2 does not expose the posted message timestamp in its response.
All other adapters return nil IDs.

preSend and sendWithRetry in manager.go updated to propagate ([]string, bool).
README examples updated for both English and Chinese docs.

* style: apply golangci-lint fixes (golines)

* docs: fix Send migration guide — restore old error-only signature in before/after example
2026-03-31 11:07:32 +08:00

253 lines
5.2 KiB
Go

package whatsapp
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/channels"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/identity"
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/utils"
)
type WhatsAppChannel struct {
*channels.BaseChannel
conn *websocket.Conn
config config.WhatsAppConfig
url string
ctx context.Context
cancel context.CancelFunc
mu sync.Mutex
connected bool
}
func NewWhatsAppChannel(cfg config.WhatsAppConfig, bus *bus.MessageBus) (*WhatsAppChannel, error) {
base := channels.NewBaseChannel(
"whatsapp",
cfg,
bus,
cfg.AllowFrom,
channels.WithMaxMessageLength(65536),
channels.WithReasoningChannelID(cfg.ReasoningChannelID),
)
return &WhatsAppChannel{
BaseChannel: base,
config: cfg,
url: cfg.BridgeURL,
connected: false,
}, nil
}
func (c *WhatsAppChannel) Start(ctx context.Context) error {
logger.InfoCF("whatsapp", "Starting WhatsApp channel", map[string]any{
"bridge_url": c.url,
})
c.ctx, c.cancel = context.WithCancel(ctx)
dialer := websocket.DefaultDialer
dialer.HandshakeTimeout = 10 * time.Second
conn, resp, err := dialer.Dial(c.url, nil)
if resp != nil {
resp.Body.Close()
}
if err != nil {
c.cancel()
return fmt.Errorf("failed to connect to WhatsApp bridge: %w", err)
}
c.mu.Lock()
c.conn = conn
c.connected = true
c.mu.Unlock()
c.SetRunning(true)
logger.InfoC("whatsapp", "WhatsApp channel connected")
go c.listen()
return nil
}
func (c *WhatsAppChannel) Stop(ctx context.Context) error {
logger.InfoC("whatsapp", "Stopping WhatsApp channel...")
// Cancel context first to signal listen goroutine to exit
if c.cancel != nil {
c.cancel()
}
c.mu.Lock()
defer c.mu.Unlock()
if c.conn != nil {
if err := c.conn.Close(); err != nil {
logger.ErrorCF("whatsapp", "Error closing WhatsApp connection", map[string]any{
"error": err.Error(),
})
}
c.conn = nil
}
c.connected = false
c.SetRunning(false)
return nil
}
func (c *WhatsAppChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
if !c.IsRunning() {
return nil, channels.ErrNotRunning
}
// Check ctx before acquiring lock
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
c.mu.Lock()
defer c.mu.Unlock()
if c.conn == nil {
return nil, fmt.Errorf("whatsapp connection not established: %w", channels.ErrTemporary)
}
payload := map[string]any{
"type": "message",
"to": msg.ChatID,
"content": msg.Content,
}
data, err := json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("failed to marshal message: %w", err)
}
_ = c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := c.conn.WriteMessage(websocket.TextMessage, data); err != nil {
_ = c.conn.SetWriteDeadline(time.Time{})
return nil, fmt.Errorf("whatsapp send: %w", channels.ErrTemporary)
}
_ = c.conn.SetWriteDeadline(time.Time{})
return nil, nil
}
func (c *WhatsAppChannel) listen() {
for {
select {
case <-c.ctx.Done():
return
default:
c.mu.Lock()
conn := c.conn
c.mu.Unlock()
if conn == nil {
time.Sleep(1 * time.Second)
continue
}
_, message, err := conn.ReadMessage()
if err != nil {
logger.ErrorCF("whatsapp", "WhatsApp read error", map[string]any{
"error": err.Error(),
})
time.Sleep(2 * time.Second)
continue
}
var msg map[string]any
if err := json.Unmarshal(message, &msg); err != nil {
logger.ErrorCF("whatsapp", "Failed to unmarshal WhatsApp message", map[string]any{
"error": err.Error(),
})
continue
}
msgType, ok := msg["type"].(string)
if !ok {
continue
}
if msgType == "message" {
c.handleIncomingMessage(msg)
}
}
}
}
func (c *WhatsAppChannel) handleIncomingMessage(msg map[string]any) {
senderID, ok := msg["from"].(string)
if !ok {
return
}
chatID, ok := msg["chat"].(string)
if !ok {
chatID = senderID
}
content, ok := msg["content"].(string)
if !ok {
content = ""
}
var mediaPaths []string
if mediaData, ok := msg["media"].([]any); ok {
mediaPaths = make([]string, 0, len(mediaData))
for _, m := range mediaData {
if path, ok := m.(string); ok {
mediaPaths = append(mediaPaths, path)
}
}
}
metadata := make(map[string]string)
var messageID string
if mid, ok := msg["id"].(string); ok {
messageID = mid
}
if userName, ok := msg["from_name"].(string); ok {
metadata["user_name"] = userName
}
var peer bus.Peer
if chatID == senderID {
peer = bus.Peer{Kind: "direct", ID: senderID}
} else {
peer = bus.Peer{Kind: "group", ID: chatID}
}
logger.InfoCF("whatsapp", "WhatsApp message received", map[string]any{
"sender": senderID,
"preview": utils.Truncate(content, 50),
})
sender := bus.SenderInfo{
Platform: "whatsapp",
PlatformID: senderID,
CanonicalID: identity.BuildCanonicalID("whatsapp", senderID),
}
if display, ok := metadata["user_name"]; ok {
sender.DisplayName = display
}
if !c.IsAllowedSender(sender) {
return
}
c.HandleMessage(c.ctx, peer, messageID, senderID, chatID, content, mediaPaths, metadata, sender)
}