add output for mqtt client

This commit is contained in:
Joel Wetzell
2025-11-22 22:46:00 -06:00
parent 532a37816f
commit 156c6fbe5b
2 changed files with 110 additions and 1 deletions

View File

@@ -0,0 +1,90 @@
package processing
import (
"context"
"fmt"
)
type MQTTMessage struct {
Topic string
QoS byte
Payload any
Retained bool
}
type MQTTMessageCreate struct {
config ProcessorConfig
Topic string
QoS byte
Retained bool
Payload any
}
func (mmc *MQTTMessageCreate) Process(ctx context.Context, payload any) (any, error) {
message := MQTTMessage{
Topic: mmc.Topic,
QoS: mmc.QoS,
Retained: mmc.Retained,
Payload: mmc.Payload,
}
return message, nil
}
func (mmc *MQTTMessageCreate) Type() string {
return mmc.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "mqtt.message.create",
New: func(config ProcessorConfig) (Processor, error) {
params := config.Params
topic, ok := params["topic"]
if !ok {
return nil, fmt.Errorf("mqtt.message.create requires an topic parameter")
}
topicString, ok := topic.(string)
if !ok {
return nil, fmt.Errorf("mqtt.message.create topic must be a string")
}
qos, ok := params["qos"]
if !ok {
return nil, fmt.Errorf("mqtt.message.create requires an qos parameter")
}
qosByte, ok := qos.(float64)
if !ok {
return nil, fmt.Errorf("mqtt.message.create qos must be a number")
}
retained, ok := params["retained"]
if !ok {
return nil, fmt.Errorf("mqtt.message.create requires an retained parameter")
}
retainedBool, ok := retained.(bool)
if !ok {
return nil, fmt.Errorf("mqtt.message.create retained must be a boolean")
}
//TODO(jwetzell): convert payload into []byte or string for sending
payload, ok := params["payload"]
if !ok {
return nil, fmt.Errorf("mqtt.message.create requires an payload parameter")
}
return &MQTTMessageCreate{config: config, Topic: topicString, QoS: byte(qosByte), Retained: retainedBool, Payload: payload}, nil
},
})
}

View File

@@ -5,6 +5,7 @@ import (
"log/slog" "log/slog"
mqtt "github.com/eclipse/paho.mqtt.golang" mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/jwetzell/showbridge-go/internal/processing"
) )
type MQTTClient struct { type MQTTClient struct {
@@ -104,5 +105,23 @@ func (mc *MQTTClient) Run() error {
} }
func (mc *MQTTClient) Output(payload any) error { func (mc *MQTTClient) Output(payload any) error {
return fmt.Errorf("net.mqtt.client output is not implemented") payloadMessage, ok := payload.(processing.MQTTMessage)
if !ok {
return fmt.Errorf("net.mqtt.client is only able to output MQTTMessage")
}
if mc.client == nil {
return fmt.Errorf("net.mqtt.client client is not setup")
}
if !mc.client.IsConnected() {
return fmt.Errorf("net.mqtt.client is not connected")
}
token := mc.client.Publish(payloadMessage.Topic, payloadMessage.QoS, payloadMessage.Retained, payloadMessage.Payload)
token.Wait()
return token.Error()
} }