Added a native WhatsApp channel implementation.

This commit is contained in:
Aditya Kalro
2026-02-22 12:29:27 -08:00
parent 779e4dfc38
commit c1ed163e77
13 changed files with 531 additions and 12 deletions
+31 -1
View File
@@ -209,8 +209,15 @@ func (al *AgentLoop) Run(ctx context.Context) error {
ChatID: msg.ChatID,
Content: response,
})
logger.InfoCF("agent", "Published outbound response",
map[string]any{
"channel": msg.Channel,
"chat_id": msg.ChatID,
"content_len": len(response),
})
} else {
logger.DebugCF("agent", "Skipped outbound (message tool already sent)", map[string]any{"channel": msg.Channel})
}
}
}()
}
}
@@ -308,8 +315,14 @@ func (al *AgentLoop) ProcessDirectWithChannel(
// ProcessHeartbeat processes a heartbeat request without session history.
// Each heartbeat is independent and doesn't accumulate context.
// It uses the same mutex as processMessage so heartbeat and user messages never run concurrently.
func (al *AgentLoop) ProcessHeartbeat(ctx context.Context, content, channel, chatID string) (string, error) {
agent := al.registry.GetDefaultAgent()
if agent == nil {
return "", fmt.Errorf("no default agent for heartbeat")
}
al.agentMu.Lock()
defer al.agentMu.Unlock()
return al.runAgentLoop(ctx, agent, processOptions{
SessionKey: "heartbeat",
Channel: channel,
@@ -362,6 +375,16 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
if !ok {
agent = al.registry.GetDefaultAgent()
}
if agent == nil {
return "", fmt.Errorf("no agent available for route (agent_id=%s)", route.AgentID)
}
// Reset message-tool state for this round so we don't skip publishing due to a previous round.
if tool, ok := agent.Tools.Get("message"); ok {
if mt, ok := tool.(tools.ContextualTool); ok {
mt.SetContext(msg.Channel, msg.ChatID)
}
}
// Use routed session key, but honor pre-set agent-scoped keys (for ProcessDirect/cron)
sessionKey := route.SessionKey
@@ -376,6 +399,8 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
"matched_by": route.MatchedBy,
})
al.agentMu.Lock()
defer al.agentMu.Unlock()
return al.runAgentLoop(ctx, agent, processOptions{
SessionKey: sessionKey,
Channel: msg.Channel,
@@ -428,10 +453,15 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe
// Use default agent for system messages
agent := al.registry.GetDefaultAgent()
if agent == nil {
return "", fmt.Errorf("no default agent for system message")
}
// Use the origin session for context
sessionKey := routing.BuildAgentMainSessionKey(agent.ID)
al.agentMu.Lock()
defer al.agentMu.Unlock()
return al.runAgentLoop(ctx, agent, processOptions{
SessionKey: sessionKey,
Channel: originChannel,
+22 -2
View File
@@ -12,6 +12,7 @@ import (
"fmt"
"math"
"net/http"
"path/filepath"
"sync"
"time"
@@ -210,8 +211,27 @@ func (m *Manager) initChannels() error {
m.initChannel("telegram", "Telegram")
}
if m.config.Channels.WhatsApp.Enabled && m.config.Channels.WhatsApp.BridgeURL != "" {
m.initChannel("whatsapp", "WhatsApp")
if m.config.Channels.WhatsApp.Enabled {
waCfg := m.config.Channels.WhatsApp
useNative := waCfg.UseNative
if useNative {
logger.DebugC("channels", "Attempting to initialize WhatsApp native channel (whatsmeow)")
storePath := waCfg.SessionStorePath
if storePath == "" {
storePath = filepath.Join(m.config.WorkspacePath(), "whatsapp")
}
ch, err := NewWhatsAppNativeChannel(waCfg, m.bus, storePath)
if err != nil {
logger.ErrorCF("channels", "Failed to initialize WhatsApp native channel", map[string]any{
"error": err.Error(),
})
} else {
m.channels["whatsapp"] = ch
logger.InfoC("channels", "WhatsApp native channel enabled successfully")
}
} else if waCfg.BridgeURL != "" {
m.initChannel("whatsapp", "WhatsApp")
}
}
if m.config.Channels.Feishu.Enabled {
+235
View File
@@ -0,0 +1,235 @@
// PicoClaw - Ultra-lightweight personal AI agent
// License: MIT
//
// Copyright (c) 2026 PicoClaw contributors
package channels
import (
"context"
"database/sql"
"fmt"
"log"
"os"
"path/filepath"
"strings"
"sync"
"github.com/mdp/qrterminal/v3"
_ "modernc.org/sqlite"
"go.mau.fi/whatsmeow"
"go.mau.fi/whatsmeow/store/sqlstore"
"go.mau.fi/whatsmeow/types/events"
waLog "go.mau.fi/whatsmeow/util/log"
"google.golang.org/protobuf/proto"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/utils"
"go.mau.fi/whatsmeow/proto/waE2E"
"go.mau.fi/whatsmeow/types"
)
const (
sqliteDriver = "sqlite"
whatsappDBName = "store.db"
)
// WhatsAppNativeChannel implements the WhatsApp channel using whatsmeow (in-process, no external bridge).
type WhatsAppNativeChannel struct {
*BaseChannel
config config.WhatsAppConfig
storePath string
client *whatsmeow.Client
container *sqlstore.Container
mu sync.Mutex
}
// NewWhatsAppNativeChannel creates a WhatsApp channel that uses whatsmeow for connection.
// storePath is the directory for the SQLite session store (e.g. workspace/whatsapp).
func NewWhatsAppNativeChannel(cfg config.WhatsAppConfig, bus *bus.MessageBus, storePath string) (*WhatsAppNativeChannel, error) {
base := NewBaseChannel("whatsapp", cfg, bus, cfg.AllowFrom)
if storePath == "" {
storePath = "whatsapp"
}
return &WhatsAppNativeChannel{
BaseChannel: base,
config: cfg,
storePath: storePath,
}, nil
}
func (c *WhatsAppNativeChannel) Start(ctx context.Context) error {
log.Printf("Starting WhatsApp native channel (whatsmeow), store: %s", c.storePath)
if err := os.MkdirAll(c.storePath, 0700); err != nil {
return fmt.Errorf("create session store dir: %w", err)
}
dbPath := filepath.Join(c.storePath, whatsappDBName)
connStr := "file:" + dbPath + "?_foreign_keys=on"
// Open DB and enable foreign keys explicitly (modernc.org/sqlite does not set them from URI).
db, err := sql.Open(sqliteDriver, connStr)
if err != nil {
return fmt.Errorf("open whatsapp store: %w", err)
}
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
if _, err = db.ExecContext(ctx, "PRAGMA foreign_keys = ON"); err != nil {
_ = db.Close()
return fmt.Errorf("enable foreign keys: %w", err)
}
waLogger := waLog.Stdout("WhatsApp", "WARN", true)
container := sqlstore.NewWithDB(db, sqliteDriver, waLogger)
if err = container.Upgrade(ctx); err != nil {
_ = db.Close()
return fmt.Errorf("open whatsapp store: %w", err)
}
deviceStore, err := container.GetFirstDevice(ctx)
if err != nil {
_ = container.Close()
return fmt.Errorf("get device store: %w", err)
}
client := whatsmeow.NewClient(deviceStore, waLogger)
client.AddEventHandler(c.eventHandler)
c.mu.Lock()
c.container = container
c.client = client
c.mu.Unlock()
if client.Store.ID == nil {
qrChan, err := client.GetQRChannel(ctx)
if err != nil {
_ = container.Close()
return fmt.Errorf("get QR channel: %w", err)
}
if err := client.Connect(); err != nil {
_ = container.Close()
return fmt.Errorf("connect: %w", err)
}
for evt := range qrChan {
if evt.Event == "code" {
log.Println("Scan this QR code with WhatsApp (Linked Devices):")
qrterminal.GenerateWithConfig(evt.Code, qrterminal.Config{
Level: qrterminal.L,
Writer: os.Stdout,
HalfBlocks: true,
})
} else {
log.Printf("WhatsApp login event: %s", evt.Event)
}
}
} else {
if err := client.Connect(); err != nil {
_ = container.Close()
return fmt.Errorf("connect: %w", err)
}
}
c.setRunning(true)
log.Println("WhatsApp native channel connected")
return nil
}
func (c *WhatsAppNativeChannel) Stop(ctx context.Context) error {
log.Println("Stopping WhatsApp native channel...")
c.mu.Lock()
client := c.client
container := c.container
c.client = nil
c.container = nil
c.mu.Unlock()
if client != nil {
client.Disconnect()
}
if container != nil {
_ = container.Close()
}
c.setRunning(false)
return nil
}
func (c *WhatsAppNativeChannel) eventHandler(evt interface{}) {
switch v := evt.(type) {
case *events.Message:
c.handleIncoming(v)
}
}
func (c *WhatsAppNativeChannel) handleIncoming(evt *events.Message) {
if evt.Message == nil {
return
}
senderID := evt.Info.Sender.String()
chatID := evt.Info.Chat.String()
content := evt.Message.GetConversation()
if content == "" && evt.Message.ExtendedTextMessage != nil {
content = evt.Message.ExtendedTextMessage.GetText()
}
var mediaPaths []string
// Optional: resolve media to local paths if needed; for now we only forward text to the bus.
_ = mediaPaths
metadata := make(map[string]string)
metadata["message_id"] = evt.Info.ID
if evt.Info.PushName != "" {
metadata["user_name"] = evt.Info.PushName
}
if evt.Info.Chat.Server == types.GroupServer {
metadata["peer_kind"] = "group"
metadata["peer_id"] = chatID
} else {
metadata["peer_kind"] = "direct"
metadata["peer_id"] = senderID
}
log.Printf("WhatsApp message from %s: %s...", senderID, utils.Truncate(content, 50))
c.HandleMessage(senderID, chatID, content, mediaPaths, metadata)
}
func (c *WhatsAppNativeChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
c.mu.Lock()
client := c.client
c.mu.Unlock()
if client == nil || !client.IsConnected() {
return fmt.Errorf("whatsapp connection not established")
}
to, err := parseJID(msg.ChatID)
if err != nil {
return fmt.Errorf("invalid chat id %q: %w", msg.ChatID, err)
}
waMsg := &waE2E.Message{
Conversation: proto.String(msg.Content),
}
_, err = client.SendMessage(ctx, to, waMsg)
if err != nil {
return fmt.Errorf("send message: %w", err)
}
return nil
}
// parseJID converts a chat ID (phone number or JID string) to types.JID.
func parseJID(s string) (types.JID, error) {
s = strings.TrimSpace(s)
if s == "" {
return types.JID{}, fmt.Errorf("empty chat id")
}
if strings.Contains(s, "@") {
return types.ParseJID(s)
}
// Assume phone number for user chat.
return types.NewJID(s, types.DefaultUserServer), nil
}
+5 -3
View File
@@ -223,9 +223,11 @@ type PlaceholderConfig struct {
}
type WhatsAppConfig struct {
Enabled bool `json:"enabled" env:"PICOCLAW_CHANNELS_WHATSAPP_ENABLED"`
BridgeURL string `json:"bridge_url" env:"PICOCLAW_CHANNELS_WHATSAPP_BRIDGE_URL"`
AllowFrom FlexibleStringSlice `json:"allow_from" env:"PICOCLAW_CHANNELS_WHATSAPP_ALLOW_FROM"`
Enabled bool `json:"enabled" env:"PICOCLAW_CHANNELS_WHATSAPP_ENABLED"`
BridgeURL string `json:"bridge_url" env:"PICOCLAW_CHANNELS_WHATSAPP_BRIDGE_URL"`
UseNative bool `json:"use_native" env:"PICOCLAW_CHANNELS_WHATSAPP_USE_NATIVE"`
SessionStorePath string `json:"session_store_path" env:"PICOCLAW_CHANNELS_WHATSAPP_SESSION_STORE_PATH"`
AllowFrom FlexibleStringSlice `json:"allow_from" env:"PICOCLAW_CHANNELS_WHATSAPP_ALLOW_FROM"`
}
type TelegramConfig struct {
+5 -3
View File
@@ -25,9 +25,11 @@ func DefaultConfig() *Config {
},
Channels: ChannelsConfig{
WhatsApp: WhatsAppConfig{
Enabled: false,
BridgeURL: "ws://localhost:3001",
AllowFrom: FlexibleStringSlice{},
Enabled: false,
BridgeURL: "ws://localhost:3001",
UseNative: false,
SessionStorePath: "",
AllowFrom: FlexibleStringSlice{},
},
Telegram: TelegramConfig{
Enabled: false,
+6
View File
@@ -165,6 +165,12 @@ func ConvertConfig(data map[string]any) (*config.Config, []string, error) {
if v, ok := getString(cMap, "bridge_url"); ok {
cfg.Channels.WhatsApp.BridgeURL = v
}
if v, ok := getBool(cMap, "use_native"); ok {
cfg.Channels.WhatsApp.UseNative = v
}
if v, ok := getString(cMap, "session_store_path"); ok {
cfg.Channels.WhatsApp.SessionStorePath = v
}
case "feishu":
cfg.Channels.Feishu.Enabled = enabled
cfg.Channels.Feishu.AllowFrom = allowFrom