diff --git a/internal/processing/mqtt-message-create.go b/internal/processing/mqtt-message-create.go new file mode 100644 index 0000000..a7db56d --- /dev/null +++ b/internal/processing/mqtt-message-create.go @@ -0,0 +1,90 @@ +package processing + +import ( + "context" + "fmt" +) + +type MQTTMessage struct { + Topic string + QoS byte + Payload any + Retained bool +} + +type MQTTMessageCreate struct { + config ProcessorConfig + Topic string + QoS byte + Retained bool + Payload any +} + +func (mmc *MQTTMessageCreate) Process(ctx context.Context, payload any) (any, error) { + + message := MQTTMessage{ + Topic: mmc.Topic, + QoS: mmc.QoS, + Retained: mmc.Retained, + Payload: mmc.Payload, + } + + return message, nil +} + +func (mmc *MQTTMessageCreate) Type() string { + return mmc.config.Type +} + +func init() { + RegisterProcessor(ProcessorRegistration{ + Type: "mqtt.message.create", + New: func(config ProcessorConfig) (Processor, error) { + params := config.Params + topic, ok := params["topic"] + + if !ok { + return nil, fmt.Errorf("mqtt.message.create requires an topic parameter") + } + + topicString, ok := topic.(string) + + if !ok { + return nil, fmt.Errorf("mqtt.message.create topic must be a string") + } + + qos, ok := params["qos"] + + if !ok { + return nil, fmt.Errorf("mqtt.message.create requires an qos parameter") + } + + qosByte, ok := qos.(float64) + + if !ok { + return nil, fmt.Errorf("mqtt.message.create qos must be a number") + } + + retained, ok := params["retained"] + + if !ok { + return nil, fmt.Errorf("mqtt.message.create requires an retained parameter") + } + + retainedBool, ok := retained.(bool) + + if !ok { + return nil, fmt.Errorf("mqtt.message.create retained must be a boolean") + } + + //TODO(jwetzell): convert payload into []byte or string for sending + payload, ok := params["payload"] + + if !ok { + return nil, fmt.Errorf("mqtt.message.create requires an payload parameter") + } + + return &MQTTMessageCreate{config: config, Topic: topicString, QoS: byte(qosByte), Retained: retainedBool, Payload: payload}, nil + }, + }) +} diff --git a/mqtt-client.go b/mqtt-client.go index bcd685a..122244d 100644 --- a/mqtt-client.go +++ b/mqtt-client.go @@ -5,6 +5,7 @@ import ( "log/slog" mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/jwetzell/showbridge-go/internal/processing" ) type MQTTClient struct { @@ -104,5 +105,23 @@ func (mc *MQTTClient) Run() error { } func (mc *MQTTClient) Output(payload any) error { - return fmt.Errorf("net.mqtt.client output is not implemented") + payloadMessage, ok := payload.(processing.MQTTMessage) + + if !ok { + return fmt.Errorf("net.mqtt.client is only able to output MQTTMessage") + } + + if mc.client == nil { + return fmt.Errorf("net.mqtt.client client is not setup") + } + + if !mc.client.IsConnected() { + return fmt.Errorf("net.mqtt.client is not connected") + } + + token := mc.client.Publish(payloadMessage.Topic, payloadMessage.QoS, payloadMessage.Retained, payloadMessage.Payload) + + token.Wait() + + return token.Error() }