Files
showbridge-go/internal/module/mqtt-client.go
2026-03-18 12:47:28 -05:00

132 lines
2.9 KiB
Go

package module
import (
"context"
"errors"
"fmt"
"log/slog"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
type MQTTClient struct {
config config.ModuleConfig
ctx context.Context
router common.RouteIO
Broker string
ClientID string
Topic string
client mqtt.Client
logger *slog.Logger
cancel context.CancelFunc
}
func init() {
RegisterModule(ModuleRegistration{
Type: "mqtt.client",
New: func(config config.ModuleConfig) (common.Module, error) {
params := config.Params
brokerString, err := params.GetString("broker")
if err != nil {
return nil, fmt.Errorf("mqtt.client broker error: %w", err)
}
topicString, err := params.GetString("topic")
if err != nil {
return nil, fmt.Errorf("mqtt.client topic error: %w", err)
}
clientIdString, err := params.GetString("clientId")
if err != nil {
return nil, fmt.Errorf("mqtt.client clientId error: %w", err)
}
return &MQTTClient{config: config, Broker: brokerString, Topic: topicString, ClientID: clientIdString, logger: CreateLogger(config)}, nil
},
})
}
func (mc *MQTTClient) Id() string {
return mc.config.Id
}
func (mc *MQTTClient) Type() string {
return mc.config.Type
}
func (mc *MQTTClient) Start(ctx context.Context) error {
mc.logger.Debug("running")
router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO)
if !ok {
return errors.New("mqtt.client unable to get router from context")
}
mc.router = router
moduleContext, cancel := context.WithCancel(ctx)
mc.ctx = moduleContext
mc.cancel = cancel
opts := mqtt.NewClientOptions()
opts.AddBroker(mc.Broker)
opts.SetClientID(mc.ClientID)
opts.SetAutoReconnect(true)
opts.SetCleanSession(false)
opts.OnConnect = func(c mqtt.Client) {
token := mc.client.Subscribe(mc.Topic, 1, func(c mqtt.Client, m mqtt.Message) {
mc.router.HandleInput(mc.ctx, mc.Id(), m)
})
token.Wait()
}
mc.client = mqtt.NewClient(opts)
defer mc.client.Disconnect(250)
token := mc.client.Connect()
token.Wait()
err := token.Error()
if err != nil {
return err
}
<-mc.ctx.Done()
mc.logger.Debug("done")
return nil
}
func (mc *MQTTClient) Output(ctx context.Context, payload any) error {
payloadMessage, ok := common.GetAnyAs[mqtt.Message](payload)
if !ok {
return errors.New("mqtt.client is only able to output a MQTTMessage")
}
if mc.client == nil {
return errors.New("mqtt.client client is not setup")
}
if !mc.client.IsConnected() {
return errors.New("mqtt.client is not connected")
}
token := mc.client.Publish(payloadMessage.Topic(), payloadMessage.Qos(), payloadMessage.Retained(), payloadMessage.Payload())
token.Wait()
return token.Error()
}
func (mc *MQTTClient) Stop() {
mc.cancel()
}
func (mc *MQTTClient) Get(key string) (any, error) {
return nil, errors.New("mqtt.client does not support Get")
}