mirror of
https://github.com/jwetzell/showbridge-go.git
synced 2026-04-27 05:15:47 +00:00
implement mqtt.Message for internal MQTTMessage type
This commit is contained in:
@@ -7,7 +7,6 @@ import (
|
|||||||
|
|
||||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||||
"github.com/jwetzell/showbridge-go/internal/config"
|
"github.com/jwetzell/showbridge-go/internal/config"
|
||||||
"github.com/jwetzell/showbridge-go/internal/processor"
|
|
||||||
"github.com/jwetzell/showbridge-go/internal/route"
|
"github.com/jwetzell/showbridge-go/internal/route"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -105,7 +104,9 @@ func (mc *MQTTClient) Run() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mc *MQTTClient) Output(payload any) error {
|
func (mc *MQTTClient) Output(payload any) error {
|
||||||
payloadMessage, ok := payload.(processor.MQTTMessage)
|
payloadMessage, ok := payload.(mqtt.Message)
|
||||||
|
|
||||||
|
fmt.Printf("payload type: %T\n", payload)
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("net.mqtt.client is only able to output a MQTTMessage")
|
return fmt.Errorf("net.mqtt.client is only able to output a MQTTMessage")
|
||||||
@@ -119,7 +120,7 @@ func (mc *MQTTClient) Output(payload any) error {
|
|||||||
return fmt.Errorf("net.mqtt.client is not connected")
|
return fmt.Errorf("net.mqtt.client is not connected")
|
||||||
}
|
}
|
||||||
|
|
||||||
token := mc.client.Publish(payloadMessage.Topic, payloadMessage.QoS, payloadMessage.Retained, payloadMessage.Payload)
|
token := mc.client.Publish(payloadMessage.Topic(), payloadMessage.Qos(), payloadMessage.Retained(), payloadMessage.Payload())
|
||||||
|
|
||||||
token.Wait()
|
token.Wait()
|
||||||
|
|
||||||
|
|||||||
@@ -8,10 +8,10 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type MQTTMessage struct {
|
type MQTTMessage struct {
|
||||||
Topic string
|
topic string
|
||||||
QoS byte
|
qos byte
|
||||||
Payload any
|
payload []byte
|
||||||
Retained bool
|
retained bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type MQTTMessageCreate struct {
|
type MQTTMessageCreate struct {
|
||||||
@@ -19,16 +19,44 @@ type MQTTMessageCreate struct {
|
|||||||
Topic string
|
Topic string
|
||||||
QoS byte
|
QoS byte
|
||||||
Retained bool
|
Retained bool
|
||||||
Payload any
|
Payload []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mm MQTTMessage) Duplicate() bool {
|
||||||
|
// TODO(jwetzell): implement?
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mm MQTTMessage) Qos() byte {
|
||||||
|
return mm.qos
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mm MQTTMessage) Retained() bool {
|
||||||
|
return mm.retained
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mm MQTTMessage) Topic() string {
|
||||||
|
return mm.topic
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mm MQTTMessage) MessageID() uint16 {
|
||||||
|
// TODO(jwetzell): implement?
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mm MQTTMessage) Payload() []byte {
|
||||||
|
return mm.payload
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mm MQTTMessage) Ack() {}
|
||||||
|
|
||||||
func (mmc *MQTTMessageCreate) Process(ctx context.Context, payload any) (any, error) {
|
func (mmc *MQTTMessageCreate) Process(ctx context.Context, payload any) (any, error) {
|
||||||
|
|
||||||
message := MQTTMessage{
|
message := MQTTMessage{
|
||||||
Topic: mmc.Topic,
|
topic: mmc.Topic,
|
||||||
QoS: mmc.QoS,
|
qos: mmc.QoS,
|
||||||
Retained: mmc.Retained,
|
retained: mmc.Retained,
|
||||||
Payload: mmc.Payload,
|
payload: mmc.Payload,
|
||||||
}
|
}
|
||||||
|
|
||||||
return message, nil
|
return message, nil
|
||||||
@@ -86,7 +114,19 @@ func init() {
|
|||||||
return nil, fmt.Errorf("mqtt.message.create requires an payload parameter")
|
return nil, fmt.Errorf("mqtt.message.create requires an payload parameter")
|
||||||
}
|
}
|
||||||
|
|
||||||
return &MQTTMessageCreate{config: config, Topic: topicString, QoS: byte(qosByte), Retained: retainedBool, Payload: payload}, nil
|
if payloadBytes, ok := payload.([]byte); ok {
|
||||||
|
return &MQTTMessageCreate{config: config, Topic: topicString, QoS: byte(qosByte), Retained: retainedBool, Payload: payloadBytes}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
payloadString, ok := payload.(string)
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("mqtt.message.create payload must be a string or byte array")
|
||||||
|
}
|
||||||
|
|
||||||
|
payloadBytes := []byte(payloadString)
|
||||||
|
|
||||||
|
return &MQTTMessageCreate{config: config, Topic: topicString, QoS: byte(qosByte), Retained: retainedBool, Payload: payloadBytes}, nil
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user