package processor import ( "context" "errors" "fmt" "github.com/jwetzell/showbridge-go/internal/common" "github.com/jwetzell/showbridge-go/internal/config" ) type MQTTMessage struct { topic string qos byte payload []byte retained bool } type MQTTMessageCreate struct { config config.ProcessorConfig Topic string QoS byte Retained bool Payload []byte } func NewMQTTMessage(topic string, qos byte, retained bool, payload []byte) MQTTMessage { return MQTTMessage{ topic: topic, qos: qos, retained: retained, payload: payload, } } func (mm MQTTMessage) Duplicate() bool { // TODO(jwetzell): implement? return false } func (mm MQTTMessage) Qos() byte { return mm.qos } func (mm MQTTMessage) Retained() bool { return mm.retained } func (mm MQTTMessage) Topic() string { return mm.topic } func (mm MQTTMessage) MessageID() uint16 { // TODO(jwetzell): implement? return 0 } func (mm MQTTMessage) Payload() []byte { return mm.payload } func (mm MQTTMessage) Ack() {} func (mmc *MQTTMessageCreate) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) { // TODO(jwetzell): support templating wrappedPayload.Payload = MQTTMessage{ topic: mmc.Topic, qos: mmc.QoS, retained: mmc.Retained, payload: mmc.Payload, } return wrappedPayload, nil } func (mmc *MQTTMessageCreate) Type() string { return mmc.config.Type } func init() { RegisterProcessor(ProcessorRegistration{ Type: "mqtt.message.create", New: func(processorConfig config.ProcessorConfig) (Processor, error) { params := processorConfig.Params topicString, err := params.GetString("topic") if err != nil { return nil, fmt.Errorf("mqtt.message.create topic error: %w", err) } qosByte, err := params.GetInt("qos") if err != nil { return nil, fmt.Errorf("mqtt.message.create qos error: %w", err) } retainedBool, err := params.GetBool("retained") if err != nil { return nil, fmt.Errorf("mqtt.message.create retained error: %w", err) } //TODO(jwetzell): convert payload into []byte or string for sending payloadString, err := params.GetString("payload") if err != nil { if errors.Is(err, config.ErrParamNotString) { payloadBytes, err := params.GetByteSlice("payload") if err != nil { return nil, fmt.Errorf("mqtt.message.create payload error: %w", err) } return &MQTTMessageCreate{config: processorConfig, Topic: topicString, QoS: byte(qosByte), Retained: retainedBool, Payload: payloadBytes}, nil } else { return nil, fmt.Errorf("mqtt.message.create payload error: %w", err) } } payloadBytes := []byte(payloadString) return &MQTTMessageCreate{config: processorConfig, Topic: topicString, QoS: byte(qosByte), Retained: retainedBool, Payload: payloadBytes}, nil }, }) }