package processor import ( "context" "errors" "fmt" "github.com/jwetzell/showbridge-go/internal/config" ) type MQTTMessage struct { topic string qos byte payload []byte retained bool } type MQTTMessageCreate struct { config config.ProcessorConfig Topic string QoS byte Retained bool Payload []byte } func NewMQTTMessage(topic string, qos byte, retained bool, payload []byte) MQTTMessage { return MQTTMessage{ topic: topic, qos: qos, retained: retained, payload: payload, } } func (mm MQTTMessage) Duplicate() bool { // TODO(jwetzell): implement? return false } func (mm MQTTMessage) Qos() byte { return mm.qos } func (mm MQTTMessage) Retained() bool { return mm.retained } func (mm MQTTMessage) Topic() string { return mm.topic } func (mm MQTTMessage) MessageID() uint16 { // TODO(jwetzell): implement? return 0 } func (mm MQTTMessage) Payload() []byte { return mm.payload } func (mm MQTTMessage) Ack() {} func (mmc *MQTTMessageCreate) Process(ctx context.Context, payload any) (any, error) { // TODO(jwetzell): support templating message := MQTTMessage{ topic: mmc.Topic, qos: mmc.QoS, retained: mmc.Retained, payload: mmc.Payload, } return message, nil } func (mmc *MQTTMessageCreate) Type() string { return mmc.config.Type } func init() { RegisterProcessor(ProcessorRegistration{ Type: "mqtt.message.create", New: func(config config.ProcessorConfig) (Processor, error) { params := config.Params topicString, err := params.GetString("topic") if err != nil { return nil, fmt.Errorf("mqtt.message.create topic error: %w", err) } qosByte, err := params.GetInt("qos") if err != nil { return nil, fmt.Errorf("mqtt.message.create qos error: %w", err) } retainedBool, err := params.GetBool("retained") if err != nil { return nil, fmt.Errorf("mqtt.message.create retained error: %w", err) } //TODO(jwetzell): convert payload into []byte or string for sending payload, ok := params["payload"] if !ok { return nil, errors.New("mqtt.message.create payload error: not found") } if payloadBytes, ok := GetAnyAs[[]byte](payload); ok { return &MQTTMessageCreate{config: config, Topic: topicString, QoS: byte(qosByte), Retained: retainedBool, Payload: payloadBytes}, nil } payloadString, ok := GetAnyAs[string](payload) if !ok { return nil, errors.New("mqtt.message.create payload error: not a string or byte array") } payloadBytes := []byte(payloadString) return &MQTTMessageCreate{config: config, Topic: topicString, QoS: byte(qosByte), Retained: retainedBool, Payload: payloadBytes}, nil }, }) }