diff --git a/go.mod b/go.mod index 402afa7..ebb57b0 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,7 @@ module github.com/jwetzell/showbridge-go go 1.25.1 -require github.com/urfave/cli/v3 v3.4.1 +require ( + github.com/jwetzell/osc-go v0.0.0-20251114203632-24077a77d6c7 + github.com/urfave/cli/v3 v3.6.0 +) diff --git a/go.sum b/go.sum index e5cca28..313b4b1 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,12 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jwetzell/osc-go v0.0.0-20251114203632-24077a77d6c7 h1:vR4ooQd95vO8pdCugY0Kg7/MSKvuJc0pkHUZlLf6AtM= +github.com/jwetzell/osc-go v0.0.0-20251114203632-24077a77d6c7/go.mod h1:mkPoLU72wmg9Wq6mh5P5RjsWFXqaUqq4n64EWqg121A= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/urfave/cli/v3 v3.4.1 h1:1M9UOCy5bLmGnuu1yn3t3CB4rG79Rtoxuv1sPhnm6qM= -github.com/urfave/cli/v3 v3.4.1/go.mod h1:FJSKtM/9AiiTOJL4fJ6TbMUkxBXn7GO9guZqoZtpYpo= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/urfave/cli/v3 v3.6.0 h1:oIdArVjkdIXHWg3iqxgmqwQGC8NM0JtdgwQAj2sRwFo= +github.com/urfave/cli/v3 v3.6.0/go.mod h1:ysVLtOEmg2tOy6PknnYVhDoouyC/6N42TMeoMzskhso= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internals/processing/osc-message-create.go b/internals/processing/osc-message-create.go new file mode 100644 index 0000000..492b248 --- /dev/null +++ b/internals/processing/osc-message-create.go @@ -0,0 +1,73 @@ +package processing + +import ( + "bytes" + "context" + "fmt" + "text/template" + + "github.com/jwetzell/osc-go" +) + +type OSCMessageCreate struct { + config ProcessorConfig + Address *template.Template +} + +func (o *OSCMessageCreate) Process(ctx context.Context, payload any) (any, error) { + + var addressBuffer bytes.Buffer + err := o.Address.Execute(&addressBuffer, payload) + + if err != nil { + return nil, err + } + + addressString := addressBuffer.String() + + if len(addressString) == 0 { + return nil, fmt.Errorf("osc.message.create address must not be empty") + } + + if addressString[0] != '/' { + return nil, fmt.Errorf("osc.message.create address must start with '/'") + } + + payloadMessage := osc.OSCMessage{ + Address: addressString, + } + + return payloadMessage, nil +} + +func (o *OSCMessageCreate) Type() string { + return o.config.Type +} + +func init() { + RegisterProcessor(ProcessorRegistration{ + Type: "osc.message.create", + New: func(config ProcessorConfig) (Processor, error) { + params := config.Params + address, ok := params["address"] + + if !ok { + return nil, fmt.Errorf("osc.message.create requires an address parameter") + } + + addressString, ok := address.(string) + + if !ok { + return nil, fmt.Errorf("osc.message.create address must be a string") + } + + addressTemplate, err := template.New("address").Parse(addressString) + + if err != nil { + return nil, err + } + + return &OSCMessageCreate{config: config, Address: addressTemplate}, nil + }, + }) +} diff --git a/internals/processing/osc-message-decode.go b/internals/processing/osc-message-decode.go new file mode 100644 index 0000000..7bdaf40 --- /dev/null +++ b/internals/processing/osc-message-decode.go @@ -0,0 +1,47 @@ +package processing + +import ( + "context" + "fmt" + + osc "github.com/jwetzell/osc-go" +) + +type OSCMessageDecode struct { + config ProcessorConfig +} + +func (o *OSCMessageDecode) Process(ctx context.Context, payload any) (any, error) { + payloadBytes, ok := payload.([]byte) + + if !ok { + return nil, fmt.Errorf("osc.message.decode processor only accepts a []byte payload") + } + + if len(payloadBytes) == 0 { + return nil, fmt.Errorf("osc.message.decode processor can't work on empty []byte") + } + + if payloadBytes[0] != '/' { + return nil, fmt.Errorf("osc.message.decode processor needs an OSC looking []byte") + } + + message, err := osc.MessageFromBytes(payloadBytes) + if err != nil { + return nil, err + } + return message, nil +} + +func (o *OSCMessageDecode) Type() string { + return o.config.Type +} + +func init() { + RegisterProcessor(ProcessorRegistration{ + Type: "osc.message.decode", + New: func(config ProcessorConfig) (Processor, error) { + return &OSCMessageDecode{config: config}, nil + }, + }) +} diff --git a/internals/processing/osc-message-encode.go b/internals/processing/osc-message-encode.go new file mode 100644 index 0000000..76390a5 --- /dev/null +++ b/internals/processing/osc-message-encode.go @@ -0,0 +1,36 @@ +package processing + +import ( + "context" + "fmt" + + osc "github.com/jwetzell/osc-go" +) + +type OSCMessageEncode struct { + config ProcessorConfig +} + +func (o *OSCMessageEncode) Process(ctx context.Context, payload any) (any, error) { + payloadMessage, ok := payload.(osc.OSCMessage) + + if !ok { + return nil, fmt.Errorf("osc.message.encode processor only accepts an OSCMessage") + } + + bytes := payloadMessage.ToBytes() + return bytes, nil +} + +func (o *OSCMessageEncode) Type() string { + return o.config.Type +} + +func init() { + RegisterProcessor(ProcessorRegistration{ + Type: "osc.message.encode", + New: func(config ProcessorConfig) (Processor, error) { + return &OSCMessageEncode{config: config}, nil + }, + }) +} diff --git a/internals/processing/osc-message-transform.go b/internals/processing/osc-message-transform.go new file mode 100644 index 0000000..db8f746 --- /dev/null +++ b/internals/processing/osc-message-transform.go @@ -0,0 +1,77 @@ +package processing + +import ( + "bytes" + "context" + "fmt" + "text/template" + + osc "github.com/jwetzell/osc-go" +) + +type OSCMessageTransform struct { + config ProcessorConfig + Address *template.Template +} + +func (o *OSCMessageTransform) Process(ctx context.Context, payload any) (any, error) { + payloadMessage, ok := payload.(osc.OSCMessage) + + if !ok { + return nil, fmt.Errorf("osc.message.transform processor only accepts an OSCMessage") + } + + var addressBuffer bytes.Buffer + //TODO(jwetzell): actually inject data into template + err := o.Address.Execute(&addressBuffer, payloadMessage) + + if err != nil { + return nil, err + } + + addressString := addressBuffer.String() + + if len(addressString) == 0 { + return nil, fmt.Errorf("osc.message.transform address must not be empty") + } + + if addressString[0] != '/' { + return nil, fmt.Errorf("osc.message.transform address must start with '/'") + } + + payloadMessage.Address = addressString + + return payloadMessage, nil +} + +func (o *OSCMessageTransform) Type() string { + return o.config.Type +} + +func init() { + RegisterProcessor(ProcessorRegistration{ + Type: "osc.message.transform", + New: func(config ProcessorConfig) (Processor, error) { + params := config.Params + address, ok := params["address"] + + if !ok { + return nil, fmt.Errorf("osc.message.transform requires an address parameter") + } + + addressString, ok := address.(string) + + if !ok { + return nil, fmt.Errorf("osc.message.transform address must be a string") + } + + addressTemplate, err := template.New("address").Parse(addressString) + + if err != nil { + return nil, err + } + + return &OSCMessageTransform{config: config, Address: addressTemplate}, nil + }, + }) +} diff --git a/internals/processing/processor.go b/internals/processing/processor.go new file mode 100644 index 0000000..16e362f --- /dev/null +++ b/internals/processing/processor.go @@ -0,0 +1,45 @@ +package processing + +import ( + "context" + "fmt" + "sync" +) + +type Processor interface { + Type() string + Process(context.Context, any) (any, error) +} + +type ProcessorConfig struct { + Type string `json:"type"` + Params map[string]any `json:"params"` +} + +type ProcessorRegistration struct { + Type string `json:"type"` + New func(ProcessorConfig) (Processor, error) +} + +func RegisterProcessor(processor ProcessorRegistration) { + + if processor.Type == "" { + panic("processor type is missing") + } + if processor.New == nil { + panic("missing ProcessorRegistration.New") + } + + processorRegistryMu.Lock() + defer processorRegistryMu.Unlock() + + if _, ok := ProcessorRegistry[string(processor.Type)]; ok { + panic(fmt.Sprintf("processor already registered: %s", processor.Type)) + } + ProcessorRegistry[string(processor.Type)] = processor +} + +var ( + processorRegistryMu sync.RWMutex + ProcessorRegistry = make(map[string]ProcessorRegistration) +) diff --git a/route.go b/route.go index b0427d5..7caea6e 100644 --- a/route.go +++ b/route.go @@ -1,32 +1,62 @@ package showbridge -import "log/slog" +import ( + "fmt" + "log/slog" + + "github.com/jwetzell/showbridge-go/internals/processing" +) type Route struct { - index int - Input string - Output string - router *Router + index int + Input string + Processors []processing.Processor + Output string + router *Router } type RouteConfig struct { - Input string `json:"input"` - Output string `json:"output"` + Input string `json:"input"` + Processors []processing.ProcessorConfig `json:"processors"` + Output string `json:"output"` } -func NewRoute(index int, config RouteConfig, router *Router) *Route { - return &Route{Input: config.Input, Output: config.Output, router: router, index: index} -} +func NewRoute(index int, config RouteConfig, router *Router) (*Route, error) { + processors := []processing.Processor{} -func (r *Route) HandleInput(sourceId string, payload any) { - slog.Debug("route input", "index", r.index, "source", sourceId, "payload", payload) - r.HandleOutput(payload) -} + if len(config.Processors) > 0 { + for _, processorDecl := range config.Processors { + processorInfo, ok := processing.ProcessorRegistry[processorDecl.Type] + if !ok { + return nil, fmt.Errorf("problem loading processor registration for processor type: %s", processorDecl.Type) + } -func (r *Route) HandleOutput(payload any) { - slog.Debug("route output", "index", r.index, "destination", r.Output, "payload", payload) - err := r.router.HandleOutput(r.Output, payload) - if err != nil { - slog.Error("problem with route output", "error", err.Error()) + processor, err := processorInfo.New(processorDecl) + if err != nil { + return nil, err + } + processors = append(processors, processor) + } } + + return &Route{Input: config.Input, Processors: processors, Output: config.Output, router: router, index: index}, nil +} + +func (r *Route) HandleInput(sourceId string, payload any) error { + slog.Debug("route input", "index", r.index, "source", sourceId, "payload", payload) + slog.Debug("route processing", "processorCount", len(r.Processors)) + + var err error + for _, processor := range r.Processors { + payload, err = processor.Process(r.router.Context, payload) + if err != nil { + return err + } + } + return r.HandleOutput(payload) +} + +func (r *Route) HandleOutput(payload any) error { + slog.Debug("route output", "index", r.index, "destination", r.Output, "payload", payload) + return r.router.HandleOutput(r.Output, payload) } diff --git a/router.go b/router.go index c03f71c..067782c 100644 --- a/router.go +++ b/router.go @@ -57,7 +57,12 @@ func NewRouter(ctx context.Context, config Config) (*Router, error) { } for routeIndex, routeDecl := range config.Routes { - router.RouteInstances = append(router.RouteInstances, NewRoute(routeIndex, routeDecl, &router)) + route, err := NewRoute(routeIndex, routeDecl, &router) + if err != nil { + slog.Error("problem creating route", "index", routeIndex, "error", err.Error()) + continue + } + router.RouteInstances = append(router.RouteInstances, route) } for _, moduleInstance := range router.ModuleInstances { @@ -75,9 +80,12 @@ func (r *Router) Run() { } func (r *Router) HandleInput(sourceId string, payload any) { - for _, route := range r.RouteInstances { + for routeIndex, route := range r.RouteInstances { if route.Input == sourceId { - route.HandleInput(sourceId, payload) + err := route.HandleInput(sourceId, payload) + if err != nil { + slog.Error("router unable to route input", "route", routeIndex, "source", sourceId, "error", err) + } } } }