Merge pull request #2705 from hehaijunandhenry/main

add MQTT channel support
This commit is contained in:
美電球
2026-05-08 18:50:08 +08:00
committed by GitHub
33 changed files with 1951 additions and 13 deletions
+1
View File
@@ -1310,6 +1310,7 @@ make test # Full test suite
| `pkg/channels/whatsapp/` | `"whatsapp"` | — (Bridge mode) |
| `pkg/channels/whatsapp_native/` | `"whatsapp_native"` | — (Native whatsmeow mode) |
| `pkg/channels/maixcam/` | `"maixcam"` | — |
| `pkg/channels/mqtt/` | `"mqtt"` | — |
| `pkg/channels/pico/` | `"pico"` | TypingCapable, PlaceholderCapable, MessageEditor, WebhookHandler |
### A.3 Interface Quick Reference
+1
View File
@@ -1308,6 +1308,7 @@ make test # 全量测试
| `pkg/channels/whatsapp/` | `"whatsapp"` | — (Bridge 模式) |
| `pkg/channels/whatsapp_native/` | `"whatsapp_native"` | — (原生 whatsmeow 模式) |
| `pkg/channels/maixcam/` | `"maixcam"` | — |
| `pkg/channels/mqtt/` | `"mqtt"` | — |
| `pkg/channels/pico/` | `"pico"` | TypingCapable, PlaceholderCapable, MessageEditor, WebhookHandler |
### A.3 接口速查表
+2
View File
@@ -672,6 +672,8 @@ func (m *Manager) getChannelConfigAndEnabled(channelName string) (*config.Channe
return bc, settings.Token.String() != ""
case *config.VKSettings:
return bc, settings.GroupID != 0 && settings.Token.String() != ""
case *config.MQTTSettings:
return bc, settings.Broker != "" && settings.AgentID != ""
}
return bc, bc.Enabled
+5
View File
@@ -102,6 +102,11 @@ func hiddenValues(key string, value map[string]any, ch *config.Channel) {
}
}
value["webhooks"] = webhooks
case "mqtt":
if settings, ok := v.(*config.MQTTSettings); ok {
value["username"] = settings.Username.String()
value["password"] = settings.Password.String()
}
}
}
+16
View File
@@ -0,0 +1,16 @@
package mqtt
import (
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/channels"
"github.com/sipeed/picoclaw/pkg/config"
)
func init() {
channels.RegisterSafeFactory(
config.ChannelMQTT,
func(bc *config.Channel, cfg *config.MQTTSettings, b *bus.MessageBus) (channels.Channel, error) {
return NewMQTTChannel(bc, cfg, b)
},
)
}
+255
View File
@@ -0,0 +1,255 @@
package mqtt
import (
"context"
"crypto/rand"
"crypto/tls"
"encoding/hex"
"encoding/json"
"fmt"
"strings"
"sync"
"time"
pahomqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/channels"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/logger"
)
// mqttPayload is the JSON payload for both inbound and outbound messages.
type mqttPayload struct {
Text string `json:"text"`
}
// MQTTChannel implements the Channel interface for MQTT-based communication.
type MQTTChannel struct {
*channels.BaseChannel
bc *config.Channel
cfg *config.MQTTSettings
client pahomqtt.Client
qos byte
clientID string
}
// NewMQTTChannel creates a new MQTT channel instance.
func NewMQTTChannel(bc *config.Channel, cfg *config.MQTTSettings, b *bus.MessageBus) (*MQTTChannel, error) {
if cfg.Broker == "" {
return nil, fmt.Errorf("mqtt broker is required")
}
if cfg.AgentID == "" {
return nil, fmt.Errorf("mqtt agent_id is required")
}
base := channels.NewBaseChannel("mqtt", cfg, b, bc.AllowFrom,
channels.WithGroupTrigger(bc.GroupTrigger),
channels.WithReasoningChannelID(bc.ReasoningChannelID),
)
mqttClientID := cfg.ClientID
if mqttClientID == "" {
var suffix [4]byte
_, _ = rand.Read(suffix[:])
mqttClientID = fmt.Sprintf("picoclaw-mqtt-%s-%s", cfg.AgentID, hex.EncodeToString(suffix[:]))
}
return &MQTTChannel{
BaseChannel: base,
bc: bc,
cfg: cfg,
qos: byte(cfg.QoS),
clientID: mqttClientID,
}, nil
}
// Start connects to the MQTT broker and begins listening for inbound messages.
func (c *MQTTChannel) Start(ctx context.Context) error {
logger.InfoC("mqtt", "Starting MQTT channel")
keepAlive := c.cfg.KeepAlive
if keepAlive <= 0 {
keepAlive = 60
}
opts := pahomqtt.NewClientOptions()
opts.AddBroker(c.cfg.Broker)
opts.SetClientID(c.clientID)
opts.SetKeepAlive(time.Duration(keepAlive) * time.Second)
opts.SetAutoReconnect(true)
opts.SetConnectRetry(true)
opts.SetConnectRetryInterval(5 * time.Second)
opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true}) //nolint:gosec
if c.cfg.Username.String() != "" {
opts.SetUsername(c.cfg.Username.String())
opts.SetPassword(c.cfg.Password.String())
}
firstSubscribe := make(chan error, 1)
var once sync.Once
opts.SetOnConnectHandler(func(client pahomqtt.Client) {
logger.InfoC("mqtt", "MQTT connected, subscribing to inbound topic")
err := c.subscribe(client)
once.Do(func() { firstSubscribe <- err })
})
opts.SetConnectionLostHandler(func(_ pahomqtt.Client, err error) {
logger.WarnCF("mqtt", "MQTT connection lost", map[string]any{"error": err.Error()})
})
client := pahomqtt.NewClient(opts)
token := client.Connect()
if !token.WaitTimeout(10 * time.Second) {
client.Disconnect(250)
return fmt.Errorf("mqtt connect timed out after 10s (broker: %s)", c.cfg.Broker)
}
if err := token.Error(); err != nil {
client.Disconnect(250)
return fmt.Errorf("mqtt connect failed: %w", err)
}
if err := <-firstSubscribe; err != nil {
client.Disconnect(250)
return fmt.Errorf("mqtt subscribe failed: %w", err)
}
c.client = client
c.SetRunning(true)
logger.InfoCF("mqtt", "MQTT channel started", map[string]any{
"broker": c.cfg.Broker,
"agent_id": c.cfg.AgentID,
})
return nil
}
// topicPrefix returns the configured topic prefix, normalizing slashes.
// Trailing slashes are stripped; the result may or may not have a leading slash
// depending on what the user configured.
func (c *MQTTChannel) topicPrefix() string {
p := strings.TrimRight(c.cfg.TopicPrefix, "/")
if p == "" {
return "/picoclaw"
}
return p
}
// clientIDFromTopic extracts the client_id segment from a received topic.
// Topic structure: {prefix}/{agent_id}/{client_id}/request
func (c *MQTTChannel) clientIDFromTopic(topic string) (string, bool) {
prefix := c.topicPrefix()
// Build the expected fixed portion: {prefix}/{agent_id}/
fixed := prefix + "/" + c.cfg.AgentID + "/"
after, ok := strings.CutPrefix(topic, fixed)
if !ok {
return "", false
}
// after = "{client_id}/request"
slash := strings.IndexByte(after, '/')
if slash < 0 {
return "", false
}
return after[:slash], true
}
// subscribe subscribes to the inbound topic for this agent.
func (c *MQTTChannel) subscribe(client pahomqtt.Client) error {
topic := fmt.Sprintf("%s/%s/+/request", c.topicPrefix(), c.cfg.AgentID)
token := client.Subscribe(topic, c.qos, func(_ pahomqtt.Client, msg pahomqtt.Message) {
c.handleInbound(msg)
})
token.Wait()
if err := token.Error(); err != nil {
logger.ErrorCF("mqtt", "Failed to subscribe", map[string]any{
"topic": topic,
"error": err.Error(),
})
return err
}
logger.InfoCF("mqtt", "Subscribed to inbound topic", map[string]any{"topic": topic})
return nil
}
// handleInbound processes an inbound MQTT message.
func (c *MQTTChannel) handleInbound(msg pahomqtt.Message) {
topic := msg.Topic()
clientID, ok := c.clientIDFromTopic(topic)
if !ok {
logger.WarnCF("mqtt", "Unexpected topic format", map[string]any{"topic": topic})
return
}
chatID := "mqtt:" + clientID
var payload mqttPayload
if err := json.Unmarshal(msg.Payload(), &payload); err != nil {
logger.WarnCF("mqtt", "Failed to parse inbound payload", map[string]any{
"topic": topic,
"error": err.Error(),
})
return
}
if payload.Text == "" {
logger.WarnCF("mqtt", "Inbound payload missing text", map[string]any{"topic": topic})
return
}
inboundCtx := bus.InboundContext{
Channel: "mqtt",
ChatID: chatID,
ChatType: "direct",
SenderID: clientID,
}
c.HandleInboundContext(context.Background(), chatID, payload.Text, nil, inboundCtx)
}
// Stop disconnects from the MQTT broker.
func (c *MQTTChannel) Stop(_ context.Context) error {
logger.InfoC("mqtt", "Stopping MQTT channel")
c.SetRunning(false)
if c.client != nil {
c.client.Disconnect(500)
}
logger.InfoC("mqtt", "MQTT channel stopped")
return nil
}
// Send publishes a response to the client via MQTT.
func (c *MQTTChannel) Send(_ context.Context, msg bus.OutboundMessage) ([]string, error) {
if !c.IsRunning() {
return nil, channels.ErrNotRunning
}
if strings.TrimSpace(msg.Content) == "" {
return nil, nil
}
clientID := strings.TrimPrefix(msg.ChatID, "mqtt:")
if clientID == msg.ChatID {
logger.WarnCF("mqtt", "Send called with unexpected chatID format", map[string]any{"chat_id": msg.ChatID})
return nil, nil
}
topic := fmt.Sprintf("%s/%s/%s/response", c.topicPrefix(), c.cfg.AgentID, clientID)
data, err := json.Marshal(mqttPayload{Text: msg.Content})
if err != nil {
return nil, fmt.Errorf("mqtt: failed to marshal outbound payload: %w", err)
}
token := c.client.Publish(topic, c.qos, false, data)
token.Wait()
if err := token.Error(); err != nil {
return nil, fmt.Errorf("mqtt: publish failed: %w", err)
}
logger.DebugCF("mqtt", "Published response", map[string]any{"topic": topic})
return nil, nil
}
+11
View File
@@ -516,6 +516,17 @@ type TeamsWebhookTarget struct {
Title string `json:"title,omitempty" yaml:"-"`
}
type MQTTSettings struct {
Broker string `json:"broker" yaml:"-" env:"PICOCLAW_CHANNELS_MQTT_BROKER"`
AgentID string `json:"agent_id" yaml:"-" env:"PICOCLAW_CHANNELS_MQTT_AGENT_ID"`
TopicPrefix string `json:"topic_prefix,omitempty" yaml:"-" env:"PICOCLAW_CHANNELS_MQTT_TOPIC_PREFIX"`
Username SecureString `json:"username,omitzero" yaml:"username,omitempty" env:"PICOCLAW_CHANNELS_MQTT_USERNAME"`
Password SecureString `json:"password,omitzero" yaml:"password,omitempty" env:"PICOCLAW_CHANNELS_MQTT_PASSWORD"`
ClientID string `json:"client_id,omitempty" yaml:"-" env:"PICOCLAW_CHANNELS_MQTT_CLIENT_ID"`
KeepAlive int `json:"keep_alive,omitempty" yaml:"-" env:"PICOCLAW_CHANNELS_MQTT_KEEP_ALIVE"`
QoS int `json:"qos,omitempty" yaml:"-" env:"PICOCLAW_CHANNELS_MQTT_QOS"`
}
type HeartbeatConfig struct {
Enabled bool `json:"enabled" env:"PICOCLAW_HEARTBEAT_ENABLED"`
Interval int `json:"interval" env:"PICOCLAW_HEARTBEAT_INTERVAL"` // minutes, min 5
+2
View File
@@ -33,6 +33,7 @@ const (
ChannelWhatsApp = "whatsapp"
ChannelWhatsAppNative = "whatsapp_native"
ChannelTeamsWebHook = "teams_webhook"
ChannelMQTT = "mqtt"
)
func initChannel() {
@@ -640,6 +641,7 @@ var channelSettingsFactory = map[string]any{
ChannelWhatsApp: (WhatsAppSettings{}),
ChannelWhatsAppNative: (WhatsAppSettings{}),
ChannelTeamsWebHook: (TeamsWebhookSettings{}),
ChannelMQTT: (MQTTSettings{}),
}
// newChannelSettings creates a fresh zero-value pointer for the given channel type.
+1
View File
@@ -25,6 +25,7 @@ import (
_ "github.com/sipeed/picoclaw/pkg/channels/irc"
_ "github.com/sipeed/picoclaw/pkg/channels/line"
_ "github.com/sipeed/picoclaw/pkg/channels/maixcam"
_ "github.com/sipeed/picoclaw/pkg/channels/mqtt"
_ "github.com/sipeed/picoclaw/pkg/channels/onebot"
_ "github.com/sipeed/picoclaw/pkg/channels/pico"
_ "github.com/sipeed/picoclaw/pkg/channels/qq"