Complete the whatsapp native channel implementation based on the new channel interface

This commit is contained in:
Aditya Kalro
2026-02-26 22:35:52 -08:00
parent a8644ca1c5
commit 42ee9ab1e3
6 changed files with 69 additions and 53 deletions
+2 -22
View File
@@ -12,7 +12,6 @@ import (
"fmt"
"math"
"net/http"
"path/filepath"
"sync"
"time"
@@ -213,27 +212,8 @@ func (m *Manager) initChannels() error {
if m.config.Channels.WhatsApp.Enabled {
waCfg := m.config.Channels.WhatsApp
useNative := waCfg.UseNative
if useNative {
logger.DebugC("channels", "Attempting to initialize WhatsApp native channel (whatsmeow)")
storePath := waCfg.SessionStorePath
if storePath == "" {
storePath = filepath.Join(m.config.WorkspacePath(), "whatsapp")
}
newNative := getWhatsAppNativeFactory()
if newNative == nil {
logger.ErrorCF("channels", "WhatsApp native not linked; import _ github.com/sipeed/picoclaw/pkg/channels/whatsapp or build with -tags whatsapp_native", nil)
} else {
ch, err := newNative(waCfg, m.bus, storePath)
if err != nil {
logger.ErrorCF("channels", "Failed to initialize WhatsApp native channel", map[string]any{
"error": err.Error(),
})
} else {
m.channels["whatsapp"] = ch
logger.InfoC("channels", "WhatsApp native channel enabled successfully")
}
}
if waCfg.UseNative {
m.initChannel("whatsapp_native", "WhatsApp Native")
} else if waCfg.BridgeURL != "" {
m.initChannel("whatsapp", "WhatsApp")
}
+20
View File
@@ -0,0 +1,20 @@
package whatsapp
import (
"path/filepath"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/channels"
"github.com/sipeed/picoclaw/pkg/config"
)
func init() {
channels.RegisterFactory("whatsapp_native", func(cfg *config.Config, b *bus.MessageBus) (channels.Channel, error) {
waCfg := cfg.Channels.WhatsApp
storePath := waCfg.SessionStorePath
if storePath == "" {
storePath = filepath.Join(cfg.WorkspacePath(), "whatsapp")
}
return NewWhatsAppNativeChannel(waCfg, b, storePath)
})
}
@@ -5,7 +5,7 @@
//
// Copyright (c) 2026 PicoClaw contributors
package channels
package whatsapp
import (
"context"
@@ -24,29 +24,30 @@ import (
"go.mau.fi/whatsmeow/store/sqlstore"
"go.mau.fi/whatsmeow/types/events"
waLog "go.mau.fi/whatsmeow/util/log"
"go.mau.fi/whatsmeow/proto/waE2E"
"go.mau.fi/whatsmeow/types"
"google.golang.org/protobuf/proto"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/channels"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/identity"
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/utils"
"go.mau.fi/whatsmeow/proto/waE2E"
"go.mau.fi/whatsmeow/types"
)
const (
sqliteDriver = "sqlite"
whatsappDBName = "store.db"
reconnectInitial = 5 * time.Second
reconnectMax = 5 * time.Minute
reconnectInitial = 5 * time.Second
reconnectMax = 5 * time.Minute
reconnectMultiplier = 2.0
)
// WhatsAppNativeChannel implements the WhatsApp channel using whatsmeow (in-process, no external bridge).
type WhatsAppNativeChannel struct {
*BaseChannel
*channels.BaseChannel
config config.WhatsAppConfig
storePath string
client *whatsmeow.Client
@@ -60,8 +61,8 @@ type WhatsAppNativeChannel struct {
// NewWhatsAppNativeChannel creates a WhatsApp channel that uses whatsmeow for connection.
// storePath is the directory for the SQLite session store (e.g. workspace/whatsapp).
func NewWhatsAppNativeChannel(cfg config.WhatsAppConfig, bus *bus.MessageBus, storePath string) (Channel, error) {
base := NewBaseChannel("whatsapp", cfg, bus, cfg.AllowFrom)
func NewWhatsAppNativeChannel(cfg config.WhatsAppConfig, bus *bus.MessageBus, storePath string) (channels.Channel, error) {
base := channels.NewBaseChannel("whatsapp_native", cfg, bus, cfg.AllowFrom, channels.WithMaxMessageLength(65536))
if storePath == "" {
storePath = "whatsapp"
}
@@ -74,7 +75,7 @@ func NewWhatsAppNativeChannel(cfg config.WhatsAppConfig, bus *bus.MessageBus, st
}
func (c *WhatsAppNativeChannel) Start(ctx context.Context) error {
logger.InfoCF("channels", "Starting WhatsApp native channel (whatsmeow)", map[string]any{"store": c.storePath})
logger.InfoCF("whatsapp", "Starting WhatsApp native channel (whatsmeow)", map[string]any{"store": c.storePath})
if err := os.MkdirAll(c.storePath, 0700); err != nil {
return fmt.Errorf("create session store dir: %w", err)
@@ -83,7 +84,6 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error {
dbPath := filepath.Join(c.storePath, whatsappDBName)
connStr := "file:" + dbPath + "?_foreign_keys=on"
// Open DB and enable foreign keys explicitly (modernc.org/sqlite does not set them from URI).
db, err := sql.Open(sqliteDriver, connStr)
if err != nil {
return fmt.Errorf("open whatsapp store: %w", err)
@@ -128,14 +128,14 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error {
}
for evt := range qrChan {
if evt.Event == "code" {
logger.InfoCF("channels", "Scan this QR code with WhatsApp (Linked Devices):", nil)
logger.InfoCF("whatsapp", "Scan this QR code with WhatsApp (Linked Devices):", nil)
qrterminal.GenerateWithConfig(evt.Code, qrterminal.Config{
Level: qrterminal.L,
Writer: os.Stdout,
HalfBlocks: true,
})
} else {
logger.InfoCF("channels", "WhatsApp login event", map[string]any{"event": evt.Event})
logger.InfoCF("whatsapp", "WhatsApp login event", map[string]any{"event": evt.Event})
}
}
} else {
@@ -147,12 +147,12 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error {
c.runCtx, c.runCancel = context.WithCancel(ctx)
c.SetRunning(true)
logger.InfoCF("channels", "WhatsApp native channel connected", nil)
logger.InfoC("whatsapp", "WhatsApp native channel connected")
return nil
}
func (c *WhatsAppNativeChannel) Stop(ctx context.Context) error {
logger.InfoCF("channels", "Stopping WhatsApp native channel", nil)
logger.InfoC("whatsapp", "Stopping WhatsApp native channel")
if c.runCancel != nil {
c.runCancel()
}
@@ -178,7 +178,7 @@ func (c *WhatsAppNativeChannel) eventHandler(evt interface{}) {
case *events.Message:
c.handleIncoming(evt.(*events.Message))
case *events.Disconnected:
logger.InfoCF("channels", "WhatsApp disconnected, will attempt reconnection", nil)
logger.InfoCF("whatsapp", "WhatsApp disconnected, will attempt reconnection", nil)
c.reconnectMu.Lock()
if c.reconnecting {
c.reconnectMu.Unlock()
@@ -212,14 +212,14 @@ func (c *WhatsAppNativeChannel) reconnectWithBackoff() {
return
}
logger.InfoCF("channels", "WhatsApp reconnecting", map[string]any{"backoff": backoff.String()})
logger.InfoCF("whatsapp", "WhatsApp reconnecting", map[string]any{"backoff": backoff.String()})
err := client.Connect()
if err == nil {
logger.InfoCF("channels", "WhatsApp reconnected", nil)
logger.InfoC("whatsapp", "WhatsApp reconnected")
return
}
logger.WarnCF("channels", "WhatsApp reconnect failed", map[string]any{"error": err.Error()})
logger.WarnCF("whatsapp", "WhatsApp reconnect failed", map[string]any{"error": err.Error()})
select {
case <-c.runCtx.Done():
@@ -248,11 +248,11 @@ func (c *WhatsAppNativeChannel) handleIncoming(evt *events.Message) {
}
content = utils.SanitizeMessageContent(content)
if content == "" { return } // ignore empty messages
if content == "" {
return
}
var mediaPaths []string
// Optional: resolve media to local paths if needed; for now we only forward text to the bus.
_ = mediaPaths
metadata := make(map[string]string)
metadata["message_id"] = evt.Info.ID
@@ -276,21 +276,34 @@ func (c *WhatsAppNativeChannel) handleIncoming(evt *events.Message) {
sender := bus.SenderInfo{
Platform: "whatsapp",
PlatformID: senderID,
CanonicalID: "whatsapp:" + senderID,
CanonicalID: identity.BuildCanonicalID("whatsapp", senderID),
DisplayName: evt.Info.PushName,
}
logger.DebugCF("channels", "WhatsApp message received", map[string]any{"sender_id": senderID, "content_preview": utils.Truncate(content, 50)})
if !c.IsAllowedSender(sender) {
return
}
logger.DebugCF("whatsapp", "WhatsApp message received", map[string]any{"sender_id": senderID, "content_preview": utils.Truncate(content, 50)})
c.HandleMessage(c.runCtx, peer, messageID, senderID, chatID, content, mediaPaths, metadata, sender)
}
func (c *WhatsAppNativeChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
if !c.IsRunning() {
return channels.ErrNotRunning
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
c.mu.Lock()
client := c.client
c.mu.Unlock()
if client == nil || !client.IsConnected() {
return fmt.Errorf("whatsapp connection not established")
return fmt.Errorf("whatsapp connection not established: %w", channels.ErrTemporary)
}
to, err := parseJID(msg.ChatID)
@@ -302,9 +315,8 @@ func (c *WhatsAppNativeChannel) Send(ctx context.Context, msg bus.OutboundMessag
Conversation: proto.String(msg.Content),
}
_, err = client.SendMessage(ctx, to, waMsg)
if err != nil {
return fmt.Errorf("send message: %w", err)
if _, err = client.SendMessage(ctx, to, waMsg); err != nil {
return fmt.Errorf("whatsapp send: %w", channels.ErrTemporary)
}
return nil
}
@@ -318,6 +330,5 @@ func parseJID(s string) (types.JID, error) {
if strings.Contains(s, "@") {
return types.ParseJID(s)
}
// Assume phone number for user chat.
return types.NewJID(s, types.DefaultUserServer), nil
}
@@ -1,16 +1,17 @@
//go:build !whatsapp_native
package channels
package whatsapp
import (
"fmt"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/channels"
"github.com/sipeed/picoclaw/pkg/config"
)
// NewWhatsAppNativeChannel returns an error when the binary was not built with -tags whatsapp_native.
// Build with: go build -tags whatsapp_native ./cmd/...
func NewWhatsAppNativeChannel(cfg config.WhatsAppConfig, bus *bus.MessageBus, storePath string) (Channel, error) {
func NewWhatsAppNativeChannel(cfg config.WhatsAppConfig, bus *bus.MessageBus, storePath string) (channels.Channel, error) {
return nil, fmt.Errorf("whatsapp native not compiled in; build with -tags whatsapp_native")
}