From f062cb41d70a5bfb05b06a5bbcc819b1bf129ca5 Mon Sep 17 00:00:00 2001 From: hehaijunandhenry Date: Fri, 8 May 2026 14:48:43 +0800 Subject: [PATCH] 1 --- pkg/channels/manager_channel.go | 5 +++++ pkg/channels/mqtt/mqtt.go | 21 +++++++++++++++++---- pkg/config/config.go | 16 ++++++++-------- 3 files changed, 30 insertions(+), 12 deletions(-) diff --git a/pkg/channels/manager_channel.go b/pkg/channels/manager_channel.go index 1f5978e7d..a5e9a49be 100644 --- a/pkg/channels/manager_channel.go +++ b/pkg/channels/manager_channel.go @@ -102,6 +102,11 @@ func hiddenValues(key string, value map[string]any, ch *config.Channel) { } } value["webhooks"] = webhooks + case "mqtt": + if settings, ok := v.(*config.MQTTSettings); ok { + value["username"] = settings.Username.String() + value["password"] = settings.Password.String() + } } } diff --git a/pkg/channels/mqtt/mqtt.go b/pkg/channels/mqtt/mqtt.go index d183fcc3e..39956ed9c 100644 --- a/pkg/channels/mqtt/mqtt.go +++ b/pkg/channels/mqtt/mqtt.go @@ -8,6 +8,7 @@ import ( "encoding/json" "fmt" "strings" + "sync" "time" pahomqtt "github.com/eclipse/paho.mqtt.golang" @@ -86,9 +87,13 @@ func (c *MQTTChannel) Start(ctx context.Context) error { opts.SetPassword(c.cfg.Password.String()) } + firstSubscribe := make(chan error, 1) + var once sync.Once + opts.SetOnConnectHandler(func(client pahomqtt.Client) { logger.InfoC("mqtt", "MQTT connected, subscribing to inbound topic") - c.subscribe(client) + err := c.subscribe(client) + once.Do(func() { firstSubscribe <- err }) }) opts.SetConnectionLostHandler(func(_ pahomqtt.Client, err error) { @@ -98,12 +103,19 @@ func (c *MQTTChannel) Start(ctx context.Context) error { client := pahomqtt.NewClient(opts) token := client.Connect() if !token.WaitTimeout(10 * time.Second) { + client.Disconnect(250) return fmt.Errorf("mqtt connect timed out after 10s (broker: %s)", c.cfg.Broker) } if err := token.Error(); err != nil { + client.Disconnect(250) return fmt.Errorf("mqtt connect failed: %w", err) } + if err := <-firstSubscribe; err != nil { + client.Disconnect(250) + return fmt.Errorf("mqtt subscribe failed: %w", err) + } + c.client = client c.SetRunning(true) @@ -144,7 +156,7 @@ func (c *MQTTChannel) clientIDFromTopic(topic string) (string, bool) { } // subscribe subscribes to the inbound topic for this agent. -func (c *MQTTChannel) subscribe(client pahomqtt.Client) { +func (c *MQTTChannel) subscribe(client pahomqtt.Client) error { topic := fmt.Sprintf("%s/%s/+/request", c.topicPrefix(), c.cfg.AgentID) token := client.Subscribe(topic, c.qos, func(_ pahomqtt.Client, msg pahomqtt.Message) { c.handleInbound(msg) @@ -155,9 +167,10 @@ func (c *MQTTChannel) subscribe(client pahomqtt.Client) { "topic": topic, "error": err.Error(), }) - } else { - logger.InfoCF("mqtt", "Subscribed to inbound topic", map[string]any{"topic": topic}) + return err } + logger.InfoCF("mqtt", "Subscribed to inbound topic", map[string]any{"topic": topic}) + return nil } // handleInbound processes an inbound MQTT message. diff --git a/pkg/config/config.go b/pkg/config/config.go index c2f400e23..5757f53cf 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -517,14 +517,14 @@ type TeamsWebhookTarget struct { } type MQTTSettings struct { - Broker string `json:"broker" yaml:"-" env:"PICOCLAW_CHANNELS_MQTT_BROKER"` - AgentID string `json:"agent_id" yaml:"-" env:"PICOCLAW_CHANNELS_MQTT_AGENT_ID"` - TopicPrefix string `json:"topic_prefix,omitempty" yaml:"-" env:"PICOCLAW_CHANNELS_MQTT_TOPIC_PREFIX"` - Username SecureString `json:"username,omitzero" yaml:"username,omitempty" env:"PICOCLAW_CHANNELS_MQTT_USERNAME"` - Password SecureString `json:"password,omitzero" yaml:"password,omitempty" env:"PICOCLAW_CHANNELS_MQTT_PASSWORD"` - ClientID string `json:"client_id,omitempty" yaml:"-" env:"PICOCLAW_CHANNELS_MQTT_CLIENT_ID"` - KeepAlive int `json:"keep_alive,omitempty" yaml:"-" env:"PICOCLAW_CHANNELS_MQTT_KEEP_ALIVE"` - QoS int `json:"qos,omitempty" yaml:"-" env:"PICOCLAW_CHANNELS_MQTT_QOS"` + Broker string `json:"broker" yaml:"-" env:"PICOCLAW_CHANNELS_MQTT_BROKER"` + AgentID string `json:"agent_id" yaml:"-" env:"PICOCLAW_CHANNELS_MQTT_AGENT_ID"` + TopicPrefix string `json:"topic_prefix,omitempty" yaml:"-" env:"PICOCLAW_CHANNELS_MQTT_TOPIC_PREFIX"` + Username SecureString `json:"username,omitzero" yaml:"username,omitempty" env:"PICOCLAW_CHANNELS_MQTT_USERNAME"` + Password SecureString `json:"password,omitzero" yaml:"password,omitempty" env:"PICOCLAW_CHANNELS_MQTT_PASSWORD"` + ClientID string `json:"client_id,omitempty" yaml:"-" env:"PICOCLAW_CHANNELS_MQTT_CLIENT_ID"` + KeepAlive int `json:"keep_alive,omitempty" yaml:"-" env:"PICOCLAW_CHANNELS_MQTT_KEEP_ALIVE"` + QoS int `json:"qos,omitempty" yaml:"-" env:"PICOCLAW_CHANNELS_MQTT_QOS"` } type HeartbeatConfig struct {