From 8ca105a0b6ff519e94f489705568b551feabd156 Mon Sep 17 00:00:00 2001 From: Joel Wetzell Date: Wed, 10 Dec 2025 07:31:18 -0600 Subject: [PATCH] implement mqtt.Message for internal MQTTMessage type --- internal/module/mqtt-client.go | 7 +-- internal/processor/mqtt-message-create.go | 60 +++++++++++++++++++---- 2 files changed, 54 insertions(+), 13 deletions(-) diff --git a/internal/module/mqtt-client.go b/internal/module/mqtt-client.go index 26d4980..f7d1447 100644 --- a/internal/module/mqtt-client.go +++ b/internal/module/mqtt-client.go @@ -7,7 +7,6 @@ import ( mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/jwetzell/showbridge-go/internal/config" - "github.com/jwetzell/showbridge-go/internal/processor" "github.com/jwetzell/showbridge-go/internal/route" ) @@ -105,7 +104,9 @@ func (mc *MQTTClient) Run() error { } func (mc *MQTTClient) Output(payload any) error { - payloadMessage, ok := payload.(processor.MQTTMessage) + payloadMessage, ok := payload.(mqtt.Message) + + fmt.Printf("payload type: %T\n", payload) if !ok { return fmt.Errorf("net.mqtt.client is only able to output a MQTTMessage") @@ -119,7 +120,7 @@ func (mc *MQTTClient) Output(payload any) error { return fmt.Errorf("net.mqtt.client is not connected") } - token := mc.client.Publish(payloadMessage.Topic, payloadMessage.QoS, payloadMessage.Retained, payloadMessage.Payload) + token := mc.client.Publish(payloadMessage.Topic(), payloadMessage.Qos(), payloadMessage.Retained(), payloadMessage.Payload()) token.Wait() diff --git a/internal/processor/mqtt-message-create.go b/internal/processor/mqtt-message-create.go index ef9996e..7c8b6cd 100644 --- a/internal/processor/mqtt-message-create.go +++ b/internal/processor/mqtt-message-create.go @@ -8,10 +8,10 @@ import ( ) type MQTTMessage struct { - Topic string - QoS byte - Payload any - Retained bool + topic string + qos byte + payload []byte + retained bool } type MQTTMessageCreate struct { @@ -19,16 +19,44 @@ type MQTTMessageCreate struct { Topic string QoS byte Retained bool - Payload any + Payload []byte } +func (mm MQTTMessage) Duplicate() bool { + // TODO(jwetzell): implement? + return false +} + +func (mm MQTTMessage) Qos() byte { + return mm.qos +} + +func (mm MQTTMessage) Retained() bool { + return mm.retained +} + +func (mm MQTTMessage) Topic() string { + return mm.topic +} + +func (mm MQTTMessage) MessageID() uint16 { + // TODO(jwetzell): implement? + return 0 +} + +func (mm MQTTMessage) Payload() []byte { + return mm.payload +} + +func (mm MQTTMessage) Ack() {} + func (mmc *MQTTMessageCreate) Process(ctx context.Context, payload any) (any, error) { message := MQTTMessage{ - Topic: mmc.Topic, - QoS: mmc.QoS, - Retained: mmc.Retained, - Payload: mmc.Payload, + topic: mmc.Topic, + qos: mmc.QoS, + retained: mmc.Retained, + payload: mmc.Payload, } return message, nil @@ -86,7 +114,19 @@ func init() { return nil, fmt.Errorf("mqtt.message.create requires an payload parameter") } - return &MQTTMessageCreate{config: config, Topic: topicString, QoS: byte(qosByte), Retained: retainedBool, Payload: payload}, nil + if payloadBytes, ok := payload.([]byte); ok { + return &MQTTMessageCreate{config: config, Topic: topicString, QoS: byte(qosByte), Retained: retainedBool, Payload: payloadBytes}, nil + } + + payloadString, ok := payload.(string) + + if !ok { + return nil, fmt.Errorf("mqtt.message.create payload must be a string or byte array") + } + + payloadBytes := []byte(payloadString) + + return &MQTTMessageCreate{config: config, Topic: topicString, QoS: byte(qosByte), Retained: retainedBool, Payload: payloadBytes}, nil }, }) }