add the concept of processors

This commit is contained in:
Joel Wetzell
2025-11-20 22:15:46 -06:00
parent 454dc58523
commit f28a9fac99
9 changed files with 348 additions and 27 deletions

5
go.mod
View File

@@ -2,4 +2,7 @@ module github.com/jwetzell/showbridge-go
go 1.25.1 go 1.25.1
require github.com/urfave/cli/v3 v3.4.1 require (
github.com/jwetzell/osc-go v0.0.0-20251114203632-24077a77d6c7
github.com/urfave/cli/v3 v3.6.0
)

10
go.sum
View File

@@ -1,10 +1,12 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/jwetzell/osc-go v0.0.0-20251114203632-24077a77d6c7 h1:vR4ooQd95vO8pdCugY0Kg7/MSKvuJc0pkHUZlLf6AtM=
github.com/jwetzell/osc-go v0.0.0-20251114203632-24077a77d6c7/go.mod h1:mkPoLU72wmg9Wq6mh5P5RjsWFXqaUqq4n64EWqg121A=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/urfave/cli/v3 v3.4.1 h1:1M9UOCy5bLmGnuu1yn3t3CB4rG79Rtoxuv1sPhnm6qM= github.com/urfave/cli/v3 v3.6.0 h1:oIdArVjkdIXHWg3iqxgmqwQGC8NM0JtdgwQAj2sRwFo=
github.com/urfave/cli/v3 v3.4.1/go.mod h1:FJSKtM/9AiiTOJL4fJ6TbMUkxBXn7GO9guZqoZtpYpo= github.com/urfave/cli/v3 v3.6.0/go.mod h1:ysVLtOEmg2tOy6PknnYVhDoouyC/6N42TMeoMzskhso=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -0,0 +1,73 @@
package processing
import (
"bytes"
"context"
"fmt"
"text/template"
"github.com/jwetzell/osc-go"
)
type OSCMessageCreate struct {
config ProcessorConfig
Address *template.Template
}
func (o *OSCMessageCreate) Process(ctx context.Context, payload any) (any, error) {
var addressBuffer bytes.Buffer
err := o.Address.Execute(&addressBuffer, payload)
if err != nil {
return nil, err
}
addressString := addressBuffer.String()
if len(addressString) == 0 {
return nil, fmt.Errorf("osc.message.create address must not be empty")
}
if addressString[0] != '/' {
return nil, fmt.Errorf("osc.message.create address must start with '/'")
}
payloadMessage := osc.OSCMessage{
Address: addressString,
}
return payloadMessage, nil
}
func (o *OSCMessageCreate) Type() string {
return o.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "osc.message.create",
New: func(config ProcessorConfig) (Processor, error) {
params := config.Params
address, ok := params["address"]
if !ok {
return nil, fmt.Errorf("osc.message.create requires an address parameter")
}
addressString, ok := address.(string)
if !ok {
return nil, fmt.Errorf("osc.message.create address must be a string")
}
addressTemplate, err := template.New("address").Parse(addressString)
if err != nil {
return nil, err
}
return &OSCMessageCreate{config: config, Address: addressTemplate}, nil
},
})
}

View File

@@ -0,0 +1,47 @@
package processing
import (
"context"
"fmt"
osc "github.com/jwetzell/osc-go"
)
type OSCMessageDecode struct {
config ProcessorConfig
}
func (o *OSCMessageDecode) Process(ctx context.Context, payload any) (any, error) {
payloadBytes, ok := payload.([]byte)
if !ok {
return nil, fmt.Errorf("osc.message.decode processor only accepts a []byte payload")
}
if len(payloadBytes) == 0 {
return nil, fmt.Errorf("osc.message.decode processor can't work on empty []byte")
}
if payloadBytes[0] != '/' {
return nil, fmt.Errorf("osc.message.decode processor needs an OSC looking []byte")
}
message, err := osc.MessageFromBytes(payloadBytes)
if err != nil {
return nil, err
}
return message, nil
}
func (o *OSCMessageDecode) Type() string {
return o.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "osc.message.decode",
New: func(config ProcessorConfig) (Processor, error) {
return &OSCMessageDecode{config: config}, nil
},
})
}

View File

@@ -0,0 +1,36 @@
package processing
import (
"context"
"fmt"
osc "github.com/jwetzell/osc-go"
)
type OSCMessageEncode struct {
config ProcessorConfig
}
func (o *OSCMessageEncode) Process(ctx context.Context, payload any) (any, error) {
payloadMessage, ok := payload.(osc.OSCMessage)
if !ok {
return nil, fmt.Errorf("osc.message.encode processor only accepts an OSCMessage")
}
bytes := payloadMessage.ToBytes()
return bytes, nil
}
func (o *OSCMessageEncode) Type() string {
return o.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "osc.message.encode",
New: func(config ProcessorConfig) (Processor, error) {
return &OSCMessageEncode{config: config}, nil
},
})
}

View File

@@ -0,0 +1,77 @@
package processing
import (
"bytes"
"context"
"fmt"
"text/template"
osc "github.com/jwetzell/osc-go"
)
type OSCMessageTransform struct {
config ProcessorConfig
Address *template.Template
}
func (o *OSCMessageTransform) Process(ctx context.Context, payload any) (any, error) {
payloadMessage, ok := payload.(osc.OSCMessage)
if !ok {
return nil, fmt.Errorf("osc.message.transform processor only accepts an OSCMessage")
}
var addressBuffer bytes.Buffer
//TODO(jwetzell): actually inject data into template
err := o.Address.Execute(&addressBuffer, payloadMessage)
if err != nil {
return nil, err
}
addressString := addressBuffer.String()
if len(addressString) == 0 {
return nil, fmt.Errorf("osc.message.transform address must not be empty")
}
if addressString[0] != '/' {
return nil, fmt.Errorf("osc.message.transform address must start with '/'")
}
payloadMessage.Address = addressString
return payloadMessage, nil
}
func (o *OSCMessageTransform) Type() string {
return o.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "osc.message.transform",
New: func(config ProcessorConfig) (Processor, error) {
params := config.Params
address, ok := params["address"]
if !ok {
return nil, fmt.Errorf("osc.message.transform requires an address parameter")
}
addressString, ok := address.(string)
if !ok {
return nil, fmt.Errorf("osc.message.transform address must be a string")
}
addressTemplate, err := template.New("address").Parse(addressString)
if err != nil {
return nil, err
}
return &OSCMessageTransform{config: config, Address: addressTemplate}, nil
},
})
}

View File

@@ -0,0 +1,45 @@
package processing
import (
"context"
"fmt"
"sync"
)
type Processor interface {
Type() string
Process(context.Context, any) (any, error)
}
type ProcessorConfig struct {
Type string `json:"type"`
Params map[string]any `json:"params"`
}
type ProcessorRegistration struct {
Type string `json:"type"`
New func(ProcessorConfig) (Processor, error)
}
func RegisterProcessor(processor ProcessorRegistration) {
if processor.Type == "" {
panic("processor type is missing")
}
if processor.New == nil {
panic("missing ProcessorRegistration.New")
}
processorRegistryMu.Lock()
defer processorRegistryMu.Unlock()
if _, ok := ProcessorRegistry[string(processor.Type)]; ok {
panic(fmt.Sprintf("processor already registered: %s", processor.Type))
}
ProcessorRegistry[string(processor.Type)] = processor
}
var (
processorRegistryMu sync.RWMutex
ProcessorRegistry = make(map[string]ProcessorRegistration)
)

View File

@@ -1,32 +1,62 @@
package showbridge package showbridge
import "log/slog" import (
"fmt"
"log/slog"
"github.com/jwetzell/showbridge-go/internals/processing"
)
type Route struct { type Route struct {
index int index int
Input string Input string
Output string Processors []processing.Processor
router *Router Output string
router *Router
} }
type RouteConfig struct { type RouteConfig struct {
Input string `json:"input"` Input string `json:"input"`
Output string `json:"output"` Processors []processing.ProcessorConfig `json:"processors"`
Output string `json:"output"`
} }
func NewRoute(index int, config RouteConfig, router *Router) *Route { func NewRoute(index int, config RouteConfig, router *Router) (*Route, error) {
return &Route{Input: config.Input, Output: config.Output, router: router, index: index} processors := []processing.Processor{}
}
func (r *Route) HandleInput(sourceId string, payload any) { if len(config.Processors) > 0 {
slog.Debug("route input", "index", r.index, "source", sourceId, "payload", payload) for _, processorDecl := range config.Processors {
r.HandleOutput(payload) processorInfo, ok := processing.ProcessorRegistry[processorDecl.Type]
} if !ok {
return nil, fmt.Errorf("problem loading processor registration for processor type: %s", processorDecl.Type)
}
func (r *Route) HandleOutput(payload any) { processor, err := processorInfo.New(processorDecl)
slog.Debug("route output", "index", r.index, "destination", r.Output, "payload", payload) if err != nil {
err := r.router.HandleOutput(r.Output, payload) return nil, err
if err != nil { }
slog.Error("problem with route output", "error", err.Error()) processors = append(processors, processor)
}
} }
return &Route{Input: config.Input, Processors: processors, Output: config.Output, router: router, index: index}, nil
}
func (r *Route) HandleInput(sourceId string, payload any) error {
slog.Debug("route input", "index", r.index, "source", sourceId, "payload", payload)
slog.Debug("route processing", "processorCount", len(r.Processors))
var err error
for _, processor := range r.Processors {
payload, err = processor.Process(r.router.Context, payload)
if err != nil {
return err
}
}
return r.HandleOutput(payload)
}
func (r *Route) HandleOutput(payload any) error {
slog.Debug("route output", "index", r.index, "destination", r.Output, "payload", payload)
return r.router.HandleOutput(r.Output, payload)
} }

View File

@@ -57,7 +57,12 @@ func NewRouter(ctx context.Context, config Config) (*Router, error) {
} }
for routeIndex, routeDecl := range config.Routes { for routeIndex, routeDecl := range config.Routes {
router.RouteInstances = append(router.RouteInstances, NewRoute(routeIndex, routeDecl, &router)) route, err := NewRoute(routeIndex, routeDecl, &router)
if err != nil {
slog.Error("problem creating route", "index", routeIndex, "error", err.Error())
continue
}
router.RouteInstances = append(router.RouteInstances, route)
} }
for _, moduleInstance := range router.ModuleInstances { for _, moduleInstance := range router.ModuleInstances {
@@ -75,9 +80,12 @@ func (r *Router) Run() {
} }
func (r *Router) HandleInput(sourceId string, payload any) { func (r *Router) HandleInput(sourceId string, payload any) {
for _, route := range r.RouteInstances { for routeIndex, route := range r.RouteInstances {
if route.Input == sourceId { if route.Input == sourceId {
route.HandleInput(sourceId, payload) err := route.HandleInput(sourceId, payload)
if err != nil {
slog.Error("router unable to route input", "route", routeIndex, "source", sourceId, "error", err)
}
} }
} }
} }