rename processing to processor

This commit is contained in:
Joel Wetzell
2025-12-07 10:41:14 -06:00
parent 2e7feede28
commit 0bd43ca0c3
36 changed files with 159 additions and 115 deletions

View File

@@ -0,0 +1,31 @@
package processor
import (
"context"
"fmt"
"log/slog"
"github.com/jwetzell/showbridge-go/internal/config"
)
type DebugLog struct {
config config.ProcessorConfig
}
func (dl *DebugLog) Process(ctx context.Context, payload any) (any, error) {
slog.Debug("debug.log", "payload", payload, "payloadType", fmt.Sprintf("%T", payload))
return payload, nil
}
func (dl *DebugLog) Type() string {
return dl.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "debug.log",
New: func(config config.ProcessorConfig) (Processor, error) {
return &DebugLog{config: config}, nil
},
})
}

View File

@@ -0,0 +1,41 @@
package processor
import (
"context"
"fmt"
"strconv"
"github.com/jwetzell/showbridge-go/internal/config"
)
type FloatParse struct {
config config.ProcessorConfig
}
func (fp *FloatParse) Process(ctx context.Context, payload any) (any, error) {
payloadString, ok := payload.(string)
if !ok {
return nil, fmt.Errorf("float.parse processor only accepts a string")
}
// TODO(jwetzell): make bitSize configurable
payloadFloat, err := strconv.ParseFloat(payloadString, 64)
if err != nil {
return nil, err
}
return payloadFloat, nil
}
func (fp *FloatParse) Type() string {
return fp.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "float.parse",
New: func(config config.ProcessorConfig) (Processor, error) {
return &FloatParse{config: config}, nil
},
})
}

View File

@@ -0,0 +1,346 @@
package processor
import (
"bytes"
"context"
"fmt"
"strconv"
"text/template"
freeD "github.com/jwetzell/free-d-go"
"github.com/jwetzell/showbridge-go/internal/config"
)
type FreeDCreate struct {
config config.ProcessorConfig
Id *template.Template
Pan *template.Template
Tilt *template.Template
Roll *template.Template
PosX *template.Template
PosY *template.Template
PosZ *template.Template
Zoom *template.Template
Focus *template.Template
}
func (fc *FreeDCreate) Process(ctx context.Context, payload any) (any, error) {
var idBuffer bytes.Buffer
err := fc.Id.Execute(&idBuffer, payload)
if err != nil {
return nil, err
}
idString := idBuffer.String()
idNum, err := strconv.ParseUint(idString, 10, 8)
if err != nil {
return nil, err
}
var panBuffer bytes.Buffer
err = fc.Pan.Execute(&panBuffer, payload)
if err != nil {
return nil, err
}
panString := panBuffer.String()
panNum, err := strconv.ParseFloat(panString, 32)
var tiltBuffer bytes.Buffer
err = fc.Tilt.Execute(&tiltBuffer, payload)
if err != nil {
return nil, err
}
tiltString := tiltBuffer.String()
tiltNum, err := strconv.ParseFloat(tiltString, 32)
var rollBuffer bytes.Buffer
err = fc.Tilt.Execute(&rollBuffer, payload)
if err != nil {
return nil, err
}
rollString := rollBuffer.String()
rollNum, err := strconv.ParseFloat(rollString, 32)
if err != nil {
return nil, err
}
if err != nil {
return nil, err
}
var posXBuffer bytes.Buffer
err = fc.PosX.Execute(&posXBuffer, payload)
if err != nil {
return nil, err
}
posXString := posXBuffer.String()
posXNum, err := strconv.ParseFloat(posXString, 32)
if err != nil {
return nil, err
}
var posYBuffer bytes.Buffer
err = fc.PosY.Execute(&posYBuffer, payload)
if err != nil {
return nil, err
}
posYString := posYBuffer.String()
posYNum, err := strconv.ParseFloat(posYString, 32)
if err != nil {
return nil, err
}
var posZBuffer bytes.Buffer
err = fc.PosZ.Execute(&posZBuffer, payload)
if err != nil {
return nil, err
}
posZString := posZBuffer.String()
posZNum, err := strconv.ParseFloat(posZString, 32)
if err != nil {
return nil, err
}
var zoomBuffer bytes.Buffer
err = fc.Zoom.Execute(&zoomBuffer, payload)
if err != nil {
return nil, err
}
zoomString := zoomBuffer.String()
zoomNum, err := strconv.ParseInt(zoomString, 10, 32)
if err != nil {
return nil, err
}
var focusBuffer bytes.Buffer
err = fc.Zoom.Execute(&focusBuffer, payload)
if err != nil {
return nil, err
}
focusString := focusBuffer.String()
focusNum, err := strconv.ParseInt(focusString, 10, 32)
if err != nil {
return nil, err
}
payloadMessage := freeD.FreeDPosition{
ID: uint8(idNum),
Pan: float32(panNum),
Tilt: float32(tiltNum),
Roll: float32(rollNum),
PosX: float32(posXNum),
PosY: float32(posYNum),
PosZ: float32(posZNum),
Zoom: int32(zoomNum),
Focus: int32(focusNum),
}
return payloadMessage, nil
}
func (fc *FreeDCreate) Type() string {
return fc.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "freed.create",
New: func(config config.ProcessorConfig) (Processor, error) {
// TODO(jwetzell): make some params optional
params := config.Params
id, ok := params["id"]
if !ok {
return nil, fmt.Errorf("freed.create requires an id parameter")
}
idString, ok := id.(string)
if !ok {
return nil, fmt.Errorf("freed.create id must be a string")
}
idTemplate, err := template.New("id").Parse(idString)
if err != nil {
return nil, err
}
pan, ok := params["pan"]
if !ok {
return nil, fmt.Errorf("freed.create requires an pan parameter")
}
panString, ok := pan.(string)
if !ok {
return nil, fmt.Errorf("freed.create pan must be a string")
}
panTemplate, err := template.New("pan").Parse(panString)
tilt, ok := params["tilt"]
if !ok {
return nil, fmt.Errorf("freed.create requires an tilt parameter")
}
tiltString, ok := tilt.(string)
if !ok {
return nil, fmt.Errorf("freed.create tilt must be a string")
}
tiltTemplate, err := template.New("tilt").Parse(tiltString)
roll, ok := params["roll"]
if !ok {
return nil, fmt.Errorf("freed.create requires an roll parameter")
}
rollString, ok := roll.(string)
if !ok {
return nil, fmt.Errorf("freed.create roll must be a string")
}
rollTemplate, err := template.New("roll").Parse(rollString)
if err != nil {
return nil, err
}
posX, ok := params["posX"]
if !ok {
return nil, fmt.Errorf("freed.create requires a posX parameter")
}
posXString, ok := posX.(string)
if !ok {
return nil, fmt.Errorf("freed.create posX must be a string")
}
posXTemplate, err := template.New("posX").Parse(posXString)
if err != nil {
return nil, err
}
posY, ok := params["posY"]
if !ok {
return nil, fmt.Errorf("freed.create requires a posY parameter")
}
posYString, ok := posY.(string)
if !ok {
return nil, fmt.Errorf("freed.create posY must be a string")
}
posYTemplate, err := template.New("posY").Parse(posYString)
if err != nil {
return nil, err
}
posZ, ok := params["posZ"]
if !ok {
return nil, fmt.Errorf("freed.create requires a posZ parameter")
}
posZString, ok := posZ.(string)
if !ok {
return nil, fmt.Errorf("freed.create posZ must be a string")
}
posZTemplate, err := template.New("posZ").Parse(posZString)
if err != nil {
return nil, err
}
zoom, ok := params["zoom"]
if !ok {
return nil, fmt.Errorf("freed.create requires an zoom parameter")
}
zoomString, ok := zoom.(string)
if !ok {
return nil, fmt.Errorf("freed.create zoom must be a string")
}
zoomTemplate, err := template.New("zoom").Parse(zoomString)
focus, ok := params["focus"]
if !ok {
return nil, fmt.Errorf("freed.create requires an focus parameter")
}
focusString, ok := focus.(string)
if !ok {
return nil, fmt.Errorf("freed.create focus must be a string")
}
focusTemplate, err := template.New("focus").Parse(focusString)
return &FreeDCreate{
config: config,
Id: idTemplate,
Pan: panTemplate,
Tilt: tiltTemplate,
Roll: rollTemplate,
PosX: posXTemplate,
PosY: posYTemplate,
PosZ: posZTemplate,
Zoom: zoomTemplate,
Focus: focusTemplate,
}, nil
},
})
}

View File

@@ -0,0 +1,41 @@
package processor
import (
"context"
"fmt"
"log/slog"
freeD "github.com/jwetzell/free-d-go"
"github.com/jwetzell/showbridge-go/internal/config"
)
type FreeDDecode struct {
config config.ProcessorConfig
}
func (fdd *FreeDDecode) Process(ctx context.Context, payload any) (any, error) {
payloadBytes, ok := payload.([]byte)
if !ok {
return nil, fmt.Errorf("freed.decode processor only accepts a []byte")
}
payloadMessage, err := freeD.Decode(payloadBytes)
if err != nil {
slog.Error("error decoding", "err", err)
}
return payloadMessage, nil
}
func (fdd *FreeDDecode) Type() string {
return fdd.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "freed.decode",
New: func(config config.ProcessorConfig) (Processor, error) {
return &FreeDDecode{config: config}, nil
},
})
}

View File

@@ -0,0 +1,37 @@
package processor
import (
"context"
"fmt"
freeD "github.com/jwetzell/free-d-go"
"github.com/jwetzell/showbridge-go/internal/config"
)
type FreeDEncode struct {
config config.ProcessorConfig
}
func (fde *FreeDEncode) Process(ctx context.Context, payload any) (any, error) {
payloadPosition, ok := payload.(freeD.FreeDPosition)
if !ok {
return nil, fmt.Errorf("freed.decode processor only accepts a FreeDEncode")
}
payloadBytes := freeD.Encode(payloadPosition)
return payloadBytes, nil
}
func (fde *FreeDEncode) Type() string {
return fde.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "freed.encode",
New: func(config config.ProcessorConfig) (Processor, error) {
return &FreeDEncode{config: config}, nil
},
})
}

View File

@@ -0,0 +1,82 @@
package processor
import (
"bytes"
"context"
"fmt"
"net/http"
"text/template"
"github.com/jwetzell/showbridge-go/internal/config"
)
type HTTPRequestCreate struct {
config config.ProcessorConfig
Method string
URL *template.Template
}
func (hre *HTTPRequestCreate) Process(ctx context.Context, payload any) (any, error) {
var urlBuffer bytes.Buffer
err := hre.URL.Execute(&urlBuffer, payload)
if err != nil {
return nil, err
}
urlString := urlBuffer.String()
//TODO(jwetzell): support body
request, err := http.NewRequest(hre.Method, urlString, bytes.NewBuffer([]byte{}))
if err != nil {
return nil, err
}
return request, nil
}
func (hre *HTTPRequestCreate) Type() string {
return hre.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "http.request.create",
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
method, ok := params["method"]
if !ok {
return nil, fmt.Errorf("http.request.create requires an method parameter")
}
methodString, ok := method.(string)
if !ok {
return nil, fmt.Errorf("http.request.create url must be a string")
}
url, ok := params["url"]
if !ok {
return nil, fmt.Errorf("http.request.create requires an url parameter")
}
urlString, ok := url.(string)
if !ok {
return nil, fmt.Errorf("http.request.create url must be a string")
}
urlTemplate, err := template.New("url").Parse(urlString)
if err != nil {
return nil, err
}
return &HTTPRequestCreate{config: config, URL: urlTemplate, Method: methodString}, nil
},
})
}

View File

@@ -0,0 +1,42 @@
package processor
import (
"context"
"fmt"
"io"
"net/http"
"github.com/jwetzell/showbridge-go/internal/config"
)
type HTTPRequestEncode struct {
config config.ProcessorConfig
}
func (hre *HTTPRequestEncode) Process(ctx context.Context, payload any) (any, error) {
payloadRequest, ok := payload.(*http.Request)
if !ok {
return nil, fmt.Errorf("http.request.encode processor only accepts an http.Request")
}
bytes, err := io.ReadAll(payloadRequest.Body)
if err != nil {
return nil, err
}
return bytes, nil
}
func (hre *HTTPRequestEncode) Type() string {
return hre.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "http.request.encode",
New: func(config config.ProcessorConfig) (Processor, error) {
return &HTTPRequestEncode{config: config}, nil
},
})
}

View File

@@ -0,0 +1,80 @@
package processor
import (
"context"
"fmt"
"net/http"
"regexp"
"github.com/jwetzell/showbridge-go/internal/config"
)
type HTTPRequestFilter struct {
config config.ProcessorConfig
Path *regexp.Regexp
Method string
}
func (hrf *HTTPRequestFilter) Process(ctx context.Context, payload any) (any, error) {
payloadRequest, ok := payload.(*http.Request)
if !ok {
return nil, fmt.Errorf("http.request.filter can only operate on http.Request payloads")
}
if hrf.Method != "" {
if payloadRequest.Method != hrf.Method {
return nil, nil
}
}
if !hrf.Path.MatchString(payloadRequest.URL.Path) {
return nil, nil
}
return payloadRequest, nil
}
func (hrf *HTTPRequestFilter) Type() string {
return hrf.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "http.request.filter",
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
path, ok := params["path"]
if !ok {
return nil, fmt.Errorf("http.request.filter requires an path parameter")
}
pathString, ok := path.(string)
if !ok {
return nil, fmt.Errorf("http.request.filter path must be a string")
}
pathRegexp, err := regexp.Compile(fmt.Sprintf("^%s$", pathString))
if err != nil {
return nil, err
}
method, ok := params["method"]
if ok {
methodString, ok := method.(string)
if !ok {
return nil, fmt.Errorf("http.request.filter method must be a string")
}
return &HTTPRequestFilter{config: config, Path: pathRegexp, Method: methodString}, nil
}
return &HTTPRequestFilter{config: config, Path: pathRegexp}, nil
},
})
}

View File

@@ -0,0 +1,43 @@
package processor
import (
"context"
"fmt"
"io"
"net/http"
"github.com/jwetzell/showbridge-go/internal/config"
)
type HTTPResponseEncode struct {
config config.ProcessorConfig
}
func (hre *HTTPResponseEncode) Process(ctx context.Context, payload any) (any, error) {
payloadResponse, ok := payload.(*http.Response)
if !ok {
return nil, fmt.Errorf("http.response.encode processor only accepts an http.Response")
}
defer payloadResponse.Body.Close()
bytes, err := io.ReadAll(payloadResponse.Body)
if err != nil {
return nil, err
}
return bytes, nil
}
func (hre *HTTPResponseEncode) Type() string {
return hre.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "http.response.encode",
New: func(config config.ProcessorConfig) (Processor, error) {
return &HTTPResponseEncode{config: config}, nil
},
})
}

View File

@@ -0,0 +1,41 @@
package processor
import (
"context"
"fmt"
"strconv"
"github.com/jwetzell/showbridge-go/internal/config"
)
type IntParse struct {
config config.ProcessorConfig
}
func (ip *IntParse) Process(ctx context.Context, payload any) (any, error) {
payloadString, ok := payload.(string)
if !ok {
return nil, fmt.Errorf("int.parse processor only accepts a string")
}
// TODO(jwetzell): make base and bitSize configurable
payloadInt, err := strconv.ParseInt(payloadString, 10, 64)
if err != nil {
return nil, err
}
return payloadInt, nil
}
func (ip *IntParse) Type() string {
return ip.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "int.parse",
New: func(config config.ProcessorConfig) (Processor, error) {
return &IntParse{config: config}, nil
},
})
}

View File

@@ -0,0 +1,40 @@
//go:build cgo
package processor
import (
"context"
"fmt"
"github.com/jwetzell/showbridge-go/internal/config"
"gitlab.com/gomidi/midi/v2"
)
type MIDIMessageDecode struct {
config config.ProcessorConfig
}
func (mmd *MIDIMessageDecode) Process(ctx context.Context, payload any) (any, error) {
payloadBytes, ok := payload.([]byte)
if !ok {
return nil, fmt.Errorf("midi.message.decode processor only accepts a []byte")
}
payloadMessage := midi.Message(payloadBytes)
return payloadMessage, nil
}
func (mmd *MIDIMessageDecode) Type() string {
return mmd.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "midi.message.decode",
New: func(config config.ProcessorConfig) (Processor, error) {
return &MIDIMessageDecode{config: config}, nil
},
})
}

View File

@@ -0,0 +1,38 @@
//go:build cgo
package processor
import (
"context"
"fmt"
"github.com/jwetzell/showbridge-go/internal/config"
"gitlab.com/gomidi/midi/v2"
)
type MIDIMessageEncode struct {
config config.ProcessorConfig
}
func (mme *MIDIMessageEncode) Process(ctx context.Context, payload any) (any, error) {
payloadMessage, ok := payload.(midi.Message)
if !ok {
return nil, fmt.Errorf("midi.message.encode processor only accepts an midi.Message")
}
return payloadMessage.Bytes(), nil
}
func (mme *MIDIMessageEncode) Type() string {
return mme.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "midi.message.encode",
New: func(config config.ProcessorConfig) (Processor, error) {
return &MIDIMessageEncode{config: config}, nil
},
})
}

View File

@@ -0,0 +1,92 @@
package processor
import (
"context"
"fmt"
"github.com/jwetzell/showbridge-go/internal/config"
)
type MQTTMessage struct {
Topic string
QoS byte
Payload any
Retained bool
}
type MQTTMessageCreate struct {
config config.ProcessorConfig
Topic string
QoS byte
Retained bool
Payload any
}
func (mmc *MQTTMessageCreate) Process(ctx context.Context, payload any) (any, error) {
message := MQTTMessage{
Topic: mmc.Topic,
QoS: mmc.QoS,
Retained: mmc.Retained,
Payload: mmc.Payload,
}
return message, nil
}
func (mmc *MQTTMessageCreate) Type() string {
return mmc.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "mqtt.message.create",
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
topic, ok := params["topic"]
if !ok {
return nil, fmt.Errorf("mqtt.message.create requires an topic parameter")
}
topicString, ok := topic.(string)
if !ok {
return nil, fmt.Errorf("mqtt.message.create topic must be a string")
}
qos, ok := params["qos"]
if !ok {
return nil, fmt.Errorf("mqtt.message.create requires an qos parameter")
}
qosByte, ok := qos.(float64)
if !ok {
return nil, fmt.Errorf("mqtt.message.create qos must be a number")
}
retained, ok := params["retained"]
if !ok {
return nil, fmt.Errorf("mqtt.message.create requires an retained parameter")
}
retainedBool, ok := retained.(bool)
if !ok {
return nil, fmt.Errorf("mqtt.message.create retained must be a boolean")
}
//TODO(jwetzell): convert payload into []byte or string for sending
payload, ok := params["payload"]
if !ok {
return nil, fmt.Errorf("mqtt.message.create requires an payload parameter")
}
return &MQTTMessageCreate{config: config, Topic: topicString, QoS: byte(qosByte), Retained: retainedBool, Payload: payload}, nil
},
})
}

View File

@@ -0,0 +1,36 @@
package processor
import (
"context"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/jwetzell/showbridge-go/internal/config"
)
type MQTTMessageEncode struct {
config config.ProcessorConfig
}
func (mme *MQTTMessageEncode) Process(ctx context.Context, payload any) (any, error) {
payloadMessage, ok := payload.(mqtt.Message)
if !ok {
return nil, fmt.Errorf("mqtt.message.encode processor only accepts an mqtt.Message")
}
return payloadMessage.Payload(), nil
}
func (mme *MQTTMessageEncode) Type() string {
return mme.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "mqtt.message.encode",
New: func(config config.ProcessorConfig) (Processor, error) {
return &MQTTMessageEncode{config: config}, nil
},
})
}

View File

@@ -0,0 +1,85 @@
package processor
import (
"bytes"
"context"
"fmt"
"text/template"
"github.com/jwetzell/showbridge-go/internal/config"
)
type NATSMessage struct {
Subject string
Payload []byte
}
type NATSMessageCreate struct {
config config.ProcessorConfig
Subject string
Payload *template.Template
}
func (nmc *NATSMessageCreate) Process(ctx context.Context, payload any) (any, error) {
var payloadBuffer bytes.Buffer
err := nmc.Payload.Execute(&payloadBuffer, payload)
if err != nil {
return nil, err
}
payloadString := payloadBuffer.String()
message := NATSMessage{
Subject: nmc.Subject,
Payload: []byte(payloadString),
}
return message, nil
}
func (nmc *NATSMessageCreate) Type() string {
return nmc.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "nats.message.create",
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
// TODO(jwetzell): support template for subject
subject, ok := params["subject"]
if !ok {
return nil, fmt.Errorf("nats.message.create requires a subject parameter")
}
subjectString, ok := subject.(string)
if !ok {
return nil, fmt.Errorf("nats.message.create subject must be a string")
}
payload, ok := params["payload"]
if !ok {
return nil, fmt.Errorf("osc.message.create requires a payload parameter")
}
payloadString, ok := payload.(string)
if !ok {
return nil, fmt.Errorf("osc.message.create payload must be a string")
}
payloadTemplate, err := template.New("payload").Parse(payloadString)
if err != nil {
return nil, err
}
return &NATSMessageCreate{config: config, Subject: subjectString, Payload: payloadTemplate}, nil
},
})
}

View File

@@ -0,0 +1,36 @@
package processor
import (
"context"
"fmt"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/nats-io/nats.go"
)
type NATSMessageEncode struct {
config config.ProcessorConfig
}
func (nme *NATSMessageEncode) Process(ctx context.Context, payload any) (any, error) {
payloadMessage, ok := payload.(*nats.Msg)
if !ok {
return nil, fmt.Errorf("nats.message.encode processor only accepts an nats.Msg")
}
return payloadMessage.Data, nil
}
func (nme *NATSMessageEncode) Type() string {
return nme.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "nats.message.encode",
New: func(config config.ProcessorConfig) (Processor, error) {
return &NATSMessageEncode{config: config}, nil
},
})
}

View File

@@ -0,0 +1,219 @@
package processor
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"strconv"
"text/template"
"github.com/jwetzell/osc-go"
"github.com/jwetzell/showbridge-go/internal/config"
)
type OSCMessageCreate struct {
config config.ProcessorConfig
Address *template.Template
Args []*template.Template
Types string
}
func (o *OSCMessageCreate) Process(ctx context.Context, payload any) (any, error) {
var addressBuffer bytes.Buffer
err := o.Address.Execute(&addressBuffer, payload)
if err != nil {
return nil, err
}
addressString := addressBuffer.String()
if len(addressString) == 0 {
return nil, fmt.Errorf("osc.message.create address must not be empty")
}
if addressString[0] != '/' {
return nil, fmt.Errorf("osc.message.create address must start with '/'")
}
payloadMessage := osc.OSCMessage{
Address: addressString,
}
args := []osc.OSCArg{}
for argIndex, argTemplate := range o.Args {
var argBuffer bytes.Buffer
err := argTemplate.Execute(&argBuffer, payload)
if err != nil {
return nil, err
}
argString := argBuffer.String()
typedArg, err := argToTypedArg(argString, o.Types[argIndex])
if err != nil {
return nil, err
}
args = append(args, typedArg)
}
if len(args) > 0 {
payloadMessage.Args = args
}
return payloadMessage, nil
}
func (o *OSCMessageCreate) Type() string {
return o.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "osc.message.create",
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
address, ok := params["address"]
if !ok {
return nil, fmt.Errorf("osc.message.create requires an address parameter")
}
addressString, ok := address.(string)
if !ok {
return nil, fmt.Errorf("osc.message.create address must be a string")
}
addressTemplate, err := template.New("address").Parse(addressString)
if err != nil {
return nil, err
}
args, ok := params["args"]
if ok {
rawArgs, ok := args.([]interface{})
if !ok {
return nil, fmt.Errorf("osc.message.create address must be an array found %T", args)
}
types, ok := params["types"]
if !ok {
return nil, fmt.Errorf("osc.message.create requires a types parameter with args")
}
typesString, ok := types.(string)
if !ok {
return nil, fmt.Errorf("osc.message.create types must be a string")
}
if len(rawArgs) != len(typesString) {
return nil, fmt.Errorf("osc.message.create args and types must be the same length")
}
argTemplates := []*template.Template{}
for _, rawArg := range rawArgs {
argString, ok := rawArg.(string)
if !ok {
return nil, fmt.Errorf("osc.message.create arg must be a string")
}
argTemplate, err := template.New("arg").Parse(argString)
if err != nil {
return nil, err
}
argTemplates = append(argTemplates, argTemplate)
}
return &OSCMessageCreate{config: config, Address: addressTemplate, Args: argTemplates, Types: typesString}, nil
}
return &OSCMessageCreate{config: config, Address: addressTemplate}, nil
},
})
}
func argToTypedArg(rawArg string, oscType byte) (osc.OSCArg, error) {
switch oscType {
case 's':
return osc.OSCArg{
Value: rawArg,
Type: "s",
}, nil
case 'i':
number, err := strconv.ParseInt(rawArg, 10, 32)
if err != nil {
return osc.OSCArg{}, err
}
return osc.OSCArg{
Value: int32(number),
Type: "i",
}, nil
case 'f':
number, err := strconv.ParseFloat(rawArg, 32)
if err != nil {
return osc.OSCArg{}, err
}
return osc.OSCArg{
Value: float32(number),
Type: "f",
}, nil
case 'b':
data, err := hex.DecodeString(rawArg)
if err != nil {
return osc.OSCArg{}, err
}
return osc.OSCArg{
Value: data,
Type: "b",
}, nil
case 'h':
number, err := strconv.ParseInt(rawArg, 10, 64)
if err != nil {
return osc.OSCArg{}, err
}
return osc.OSCArg{
Value: int64(number),
Type: "h",
}, nil
case 'd':
number, err := strconv.ParseFloat(rawArg, 64)
if err != nil {
return osc.OSCArg{}, err
}
return osc.OSCArg{
Value: float64(number),
Type: "d",
}, nil
case 'T':
return osc.OSCArg{
Value: true,
Type: "T",
}, nil
case 'F':
return osc.OSCArg{
Value: false,
Type: "F",
}, nil
case 'N':
return osc.OSCArg{
Value: nil,
Type: "N",
}, nil
default:
return osc.OSCArg{}, fmt.Errorf("osc.message.create unhandled osc type: %c", oscType)
}
}

View File

@@ -0,0 +1,48 @@
package processor
import (
"context"
"fmt"
osc "github.com/jwetzell/osc-go"
"github.com/jwetzell/showbridge-go/internal/config"
)
type OSCMessageDecode struct {
config config.ProcessorConfig
}
func (o *OSCMessageDecode) Process(ctx context.Context, payload any) (any, error) {
payloadBytes, ok := payload.([]byte)
if !ok {
return nil, fmt.Errorf("osc.message.decode processor only accepts a []byte payload")
}
if len(payloadBytes) == 0 {
return nil, fmt.Errorf("osc.message.decode processor can't work on empty []byte")
}
if payloadBytes[0] != '/' {
return nil, fmt.Errorf("osc.message.decode processor needs an OSC looking []byte")
}
message, err := osc.MessageFromBytes(payloadBytes)
if err != nil {
return nil, err
}
return message, nil
}
func (o *OSCMessageDecode) Type() string {
return o.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "osc.message.decode",
New: func(config config.ProcessorConfig) (Processor, error) {
return &OSCMessageDecode{config: config}, nil
},
})
}

View File

@@ -0,0 +1,37 @@
package processor
import (
"context"
"fmt"
osc "github.com/jwetzell/osc-go"
"github.com/jwetzell/showbridge-go/internal/config"
)
type OSCMessageEncode struct {
config config.ProcessorConfig
}
func (o *OSCMessageEncode) Process(ctx context.Context, payload any) (any, error) {
payloadMessage, ok := payload.(osc.OSCMessage)
if !ok {
return nil, fmt.Errorf("osc.message.encode processor only accepts an OSCMessage")
}
bytes := payloadMessage.ToBytes()
return bytes, nil
}
func (o *OSCMessageEncode) Type() string {
return o.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "osc.message.encode",
New: func(config config.ProcessorConfig) (Processor, error) {
return &OSCMessageEncode{config: config}, nil
},
})
}

View File

@@ -0,0 +1,70 @@
package processor
import (
"context"
"fmt"
"regexp"
"strings"
"github.com/jwetzell/osc-go"
"github.com/jwetzell/showbridge-go/internal/config"
)
type OSCMessageFilter struct {
config config.ProcessorConfig
Address *regexp.Regexp
}
func (o *OSCMessageFilter) Process(ctx context.Context, payload any) (any, error) {
payloadMessage, ok := payload.(osc.OSCMessage)
if !ok {
return nil, fmt.Errorf("osc.message.filter can only operate on OSCMessage payloads")
}
if !o.Address.MatchString(payloadMessage.Address) {
return nil, nil
}
return payloadMessage, nil
}
func (o *OSCMessageFilter) Type() string {
return o.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "osc.message.filter",
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
address, ok := params["address"]
if !ok {
return nil, fmt.Errorf("osc.message.filter requires an address parameter")
}
addressString, ok := address.(string)
if !ok {
return nil, fmt.Errorf("osc.message.filter address must be a string")
}
addressPattern := strings.ReplaceAll(addressString, "?", ".")
addressPattern = strings.ReplaceAll(addressPattern, "*", "[^/]*")
addressPattern = strings.ReplaceAll(addressPattern, "[!", "[^")
addressPattern = strings.ReplaceAll(addressPattern, "{", "(")
addressPattern = strings.ReplaceAll(addressPattern, "}", ")")
addressPattern = strings.ReplaceAll(addressPattern, ",", "|")
addressPatternRegexp, err := regexp.Compile(fmt.Sprintf("^%s$", addressPattern))
if err != nil {
return nil, err
}
return &OSCMessageFilter{config: config, Address: addressPatternRegexp}, nil
},
})
}

View File

@@ -0,0 +1,78 @@
package processor
import (
"bytes"
"context"
"fmt"
"text/template"
osc "github.com/jwetzell/osc-go"
"github.com/jwetzell/showbridge-go/internal/config"
)
type OSCMessageTransform struct {
config config.ProcessorConfig
Address *template.Template
}
func (o *OSCMessageTransform) Process(ctx context.Context, payload any) (any, error) {
payloadMessage, ok := payload.(osc.OSCMessage)
if !ok {
return nil, fmt.Errorf("osc.message.transform processor only accepts an OSCMessage")
}
var addressBuffer bytes.Buffer
//TODO(jwetzell): actually inject data into template
err := o.Address.Execute(&addressBuffer, payloadMessage)
if err != nil {
return nil, err
}
addressString := addressBuffer.String()
if len(addressString) == 0 {
return nil, fmt.Errorf("osc.message.transform address must not be empty")
}
if addressString[0] != '/' {
return nil, fmt.Errorf("osc.message.transform address must start with '/'")
}
payloadMessage.Address = addressString
return payloadMessage, nil
}
func (o *OSCMessageTransform) Type() string {
return o.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "osc.message.transform",
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
address, ok := params["address"]
if !ok {
return nil, fmt.Errorf("osc.message.transform requires an address parameter")
}
addressString, ok := address.(string)
if !ok {
return nil, fmt.Errorf("osc.message.transform address must be a string")
}
addressTemplate, err := template.New("address").Parse(addressString)
if err != nil {
return nil, err
}
return &OSCMessageTransform{config: config, Address: addressTemplate}, nil
},
})
}

View File

@@ -0,0 +1,42 @@
package processor
import (
"context"
"fmt"
"sync"
"github.com/jwetzell/showbridge-go/internal/config"
)
type Processor interface {
Type() string
Process(context.Context, any) (any, error)
}
type ProcessorRegistration struct {
Type string `json:"type"`
New func(config.ProcessorConfig) (Processor, error)
}
func RegisterProcessor(processor ProcessorRegistration) {
if processor.Type == "" {
panic("processor type is missing")
}
if processor.New == nil {
panic("missing ProcessorRegistration.New")
}
processorRegistryMu.Lock()
defer processorRegistryMu.Unlock()
if _, ok := ProcessorRegistry[string(processor.Type)]; ok {
panic(fmt.Sprintf("processor already registered: %s", processor.Type))
}
ProcessorRegistry[string(processor.Type)] = processor
}
var (
processorRegistryMu sync.RWMutex
ProcessorRegistry = make(map[string]ProcessorRegistration)
)

View File

@@ -0,0 +1,58 @@
package processor
import (
"context"
"fmt"
"github.com/expr-lang/expr"
"github.com/expr-lang/expr/vm"
"github.com/jwetzell/showbridge-go/internal/config"
)
// NOTE(jwetzell): see language definition https://expr-lang.org/docs/language-definition
type ScriptExpr struct {
config config.ProcessorConfig
Program *vm.Program
}
func (se *ScriptExpr) Process(ctx context.Context, payload any) (any, error) {
output, err := expr.Run(se.Program, payload)
if err != nil {
return nil, err
}
return output, nil
}
func (se *ScriptExpr) Type() string {
return se.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "script.expr",
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
expression, ok := params["expression"]
if !ok {
return nil, fmt.Errorf("script.expr requires an expression parameter")
}
expressionString, ok := expression.(string)
if !ok {
return nil, fmt.Errorf("script.expr expression must be a string")
}
program, err := expr.Compile(expressionString)
if err != nil {
return nil, err
}
return &ScriptExpr{config: config, Program: program}, nil
},
})
}

View File

@@ -0,0 +1,89 @@
package processor
import (
"context"
"encoding/json"
"fmt"
"github.com/jwetzell/showbridge-go/internal/config"
"modernc.org/quickjs"
)
type ScriptJS struct {
config config.ProcessorConfig
Program string
}
func (sj *ScriptJS) Process(ctx context.Context, payload any) (any, error) {
vm, err := quickjs.NewVM()
if err != nil {
return nil, err
}
defer vm.Close()
payloadAtom, err := vm.NewAtom("payload")
if err != nil {
return nil, err
}
vm.SetProperty(vm.GlobalObject(), payloadAtom, payload)
_, err = vm.Eval(sj.Program, quickjs.EvalGlobal)
if err != nil {
return nil, err
}
output, err := vm.GetProperty(vm.GlobalObject(), payloadAtom)
if err != nil {
return nil, err
}
// NOTE(jwetzell): turn undefined into nil
_, ok := output.(quickjs.Undefined)
if ok {
return nil, nil
}
// NOTE(jwetzell): turn object into map[string]interface{}
outputObject, ok := output.(*quickjs.Object)
if ok {
var outputMap map[string]interface{}
err := json.Unmarshal([]byte(outputObject.String()), &outputMap)
return outputMap, err
}
return output, nil
}
func (sj *ScriptJS) Type() string {
return sj.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "script.js",
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
program, ok := params["program"]
if !ok {
return nil, fmt.Errorf("script.js requires a program parameter")
}
programString, ok := program.(string)
if !ok {
return nil, fmt.Errorf("script.js program must be a string")
}
return &ScriptJS{config: config, Program: programString}, nil
},
})
}

View File

@@ -0,0 +1,59 @@
package processor
import (
"bytes"
"context"
"fmt"
"text/template"
"github.com/jwetzell/showbridge-go/internal/config"
)
type StringCreate struct {
config config.ProcessorConfig
Template *template.Template
}
func (sc *StringCreate) Process(ctx context.Context, payload any) (any, error) {
var templateBuffer bytes.Buffer
err := sc.Template.Execute(&templateBuffer, payload)
if err != nil {
return nil, err
}
payloadString := templateBuffer.String()
return payloadString, nil
}
func (sc *StringCreate) Type() string {
return sc.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "string.create",
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
tmpl, ok := params["template"]
if !ok {
return nil, fmt.Errorf("string.create requires a template parameter")
}
templateString, ok := tmpl.(string)
if !ok {
return nil, fmt.Errorf("string.create template must be a string")
}
templateTemplate, err := template.New("template").Parse(templateString)
if err != nil {
return nil, err
}
return &StringCreate{config: config, Template: templateTemplate}, nil
},
})
}

View File

@@ -0,0 +1,37 @@
package processor
import (
"context"
"fmt"
"github.com/jwetzell/showbridge-go/internal/config"
)
type StringDecode struct {
config config.ProcessorConfig
}
func (sd *StringDecode) Process(ctx context.Context, payload any) (any, error) {
payloadBytes, ok := payload.([]byte)
if !ok {
return nil, fmt.Errorf("string.decode processor only accepts a []byte")
}
payloadMessage := string(payloadBytes)
return payloadMessage, nil
}
func (sd *StringDecode) Type() string {
return sd.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "string.decode",
New: func(config config.ProcessorConfig) (Processor, error) {
return &StringDecode{config: config}, nil
},
})
}

View File

@@ -0,0 +1,41 @@
package processor_test
import (
"testing"
"github.com/jwetzell/showbridge-go/internal/processor"
)
func TestGoodStringDecode(t *testing.T) {
stringDecoder := processor.StringDecode{}
tests := []struct {
processor processor.Processor
name string
payload any
expected string
}{
{
processor: &stringDecoder,
name: "hello",
payload: []byte{0x68, 0x65, 0x6c, 0x6c, 0x6f},
expected: "hello",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got, err := test.processor.Process(t.Context(), test.payload)
gotString, ok := got.(string)
if !ok {
t.Errorf("string.decode returned a %T payload: %s", got, got)
}
if err != nil {
t.Errorf("string.decode failed: %s", err)
}
if gotString != test.expected {
t.Errorf("string.decode got %s, expected %s", got, test.expected)
}
})
}
}

View File

@@ -0,0 +1,37 @@
package processor
import (
"context"
"fmt"
"github.com/jwetzell/showbridge-go/internal/config"
)
type StringEncode struct {
config config.ProcessorConfig
}
func (se *StringEncode) Process(ctx context.Context, payload any) (any, error) {
payloadString, ok := payload.(string)
if !ok {
return nil, fmt.Errorf("string.encode processor only accepts a string")
}
payloadBytes := []byte(payloadString)
return payloadBytes, nil
}
func (se *StringEncode) Type() string {
return se.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "string.encode",
New: func(config config.ProcessorConfig) (Processor, error) {
return &StringEncode{config: config}, nil
},
})
}

View File

@@ -0,0 +1,42 @@
package processor_test
import (
"slices"
"testing"
"github.com/jwetzell/showbridge-go/internal/processor"
)
func TestGoodStringEncode(t *testing.T) {
stringEncoder := processor.StringEncode{}
tests := []struct {
processor processor.Processor
name string
payload any
expected []byte
}{
{
processor: &stringEncoder,
name: "hello",
payload: "hello",
expected: []byte{0x68, 0x65, 0x6c, 0x6c, 0x6f},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got, err := test.processor.Process(t.Context(), test.payload)
gotBytes, ok := got.([]byte)
if !ok {
t.Errorf("string.encode returned a %T payload: %s", got, got)
}
if err != nil {
t.Errorf("string.encode failed: %s", err)
}
if !slices.Equal(gotBytes, test.expected) {
t.Errorf("string.encode got %s, expected %s", got, test.expected)
}
})
}
}

View File

@@ -0,0 +1,61 @@
package processor
import (
"context"
"fmt"
"regexp"
"github.com/jwetzell/showbridge-go/internal/config"
)
type StringFilter struct {
config config.ProcessorConfig
Pattern *regexp.Regexp
}
func (se *StringFilter) Process(ctx context.Context, payload any) (any, error) {
payloadString, ok := payload.(string)
if !ok {
return nil, fmt.Errorf("string.filter processor only accepts a string")
}
if !se.Pattern.MatchString(payloadString) {
return nil, nil
}
return payloadString, nil
}
func (se *StringFilter) Type() string {
return se.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "string.filter",
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
pattern, ok := params["pattern"]
if !ok {
return nil, fmt.Errorf("http.request.filter requires an pattern parameter")
}
patternString, ok := pattern.(string)
if !ok {
return nil, fmt.Errorf("http.request.filter pattern must be a string")
}
patternRegexp, err := regexp.Compile(fmt.Sprintf("^%s$", patternString))
if err != nil {
return nil, err
}
return &StringFilter{config: config, Pattern: patternRegexp}, nil
},
})
}

View File

@@ -0,0 +1,53 @@
package processor
import (
"context"
"fmt"
"strings"
"github.com/jwetzell/showbridge-go/internal/config"
)
type StringSplit struct {
config config.ProcessorConfig
Separator string
}
func (se *StringSplit) Process(ctx context.Context, payload any) (any, error) {
payloadString, ok := payload.(string)
if !ok {
return nil, fmt.Errorf("string.split only accepts a string")
}
payloadParts := strings.Split(payloadString, se.Separator)
return payloadParts, nil
}
func (se *StringSplit) Type() string {
return se.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "string.split",
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
separator, ok := params["separator"]
if !ok {
return nil, fmt.Errorf("string.split requires a separator")
}
separatorString, ok := separator.(string)
if !ok {
return nil, fmt.Errorf("string.split separator must be a string")
}
return &StringSplit{config: config, Separator: separatorString}, nil
},
})
}

View File

@@ -0,0 +1,41 @@
package processor
import (
"context"
"fmt"
"strconv"
"github.com/jwetzell/showbridge-go/internal/config"
)
type UintParse struct {
config config.ProcessorConfig
}
func (up *UintParse) Process(ctx context.Context, payload any) (any, error) {
payloadString, ok := payload.(string)
if !ok {
return nil, fmt.Errorf("uint.parse processor only accepts a string")
}
// TODO(jwetzell): make base and bitSize configurable
payloadUint, err := strconv.ParseUint(payloadString, 10, 64)
if err != nil {
return nil, err
}
return payloadUint, nil
}
func (up *UintParse) Type() string {
return up.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "uint.parse",
New: func(config config.ProcessorConfig) (Processor, error) {
return &UintParse{config: config}, nil
},
})
}