From b7a8b04a7298e4a8c863c5b506c7b4f7c1626732 Mon Sep 17 00:00:00 2001 From: Joel Wetzell Date: Wed, 4 Mar 2026 21:21:11 -0600 Subject: [PATCH] switch router output to be a processor instead of specific output per route --- config.yaml | 6 +- internal/common/common.go | 14 +++++ internal/config/config.go | 1 - internal/module/http-client.go | 5 +- internal/module/http-server.go | 5 +- internal/module/midi-input.go | 5 +- internal/module/midi-output.go | 5 +- internal/module/mqtt-client.go | 5 +- internal/module/nats-client.go | 5 +- internal/module/nats-server.go | 5 +- internal/module/psn-client.go | 5 +- internal/module/serial-client.go | 5 +- internal/module/sip-call-server.go | 5 +- internal/module/sip-dtmf-server.go | 5 +- internal/module/tcp-client.go | 5 +- internal/module/tcp-server.go | 5 +- internal/module/time-interval.go | 5 +- internal/module/time-timer.go | 5 +- internal/module/udp-client.go | 5 +- internal/module/udp-multicast.go | 5 +- internal/module/udp-server.go | 5 +- internal/processor/router-output.go | 55 ++++++++++++++++ internal/route/route.go | 20 +----- internal/route/route_test.go | 45 ++++++++----- router.go | 29 +++------ router_test.go | 98 ++++++++++++++++++++++------- schema/processors.schema.json | 22 +++++++ schema/routes.schema.json | 6 +- 28 files changed, 246 insertions(+), 140 deletions(-) create mode 100644 internal/processor/router-output.go diff --git a/config.yaml b/config.yaml index 0771bff..52856e0 100644 --- a/config.yaml +++ b/config.yaml @@ -13,6 +13,8 @@ routes: processors: - type: osc.message.create params: - address: "{{.URL.Path}}" + address: "{{.Payload.URL.Path}}" - type: osc.message.encode - output: udp + - type: router.output + params: + module: udp diff --git a/internal/common/common.go b/internal/common/common.go index c49cbc0..3725a05 100644 --- a/internal/common/common.go +++ b/internal/common/common.go @@ -1,8 +1,22 @@ package common +import "context" + type contextKey string const RouterContextKey contextKey = contextKey("router") const SourceContextKey contextKey = contextKey("source") const ModulesContextKey contextKey = contextKey("modules") const SenderContextKey contextKey = contextKey("sender") + +type RouteIO interface { + HandleInput(ctx context.Context, sourceId string, payload any) (bool, []RouteIOError) + HandleOutput(ctx context.Context, destinationId string, payload any) error +} + +type RouteIOError struct { + Index int + OutputError error + ProcessError error + InputError error +} diff --git a/internal/config/config.go b/internal/config/config.go index 9d9e4d6..1ae3e43 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -273,7 +273,6 @@ type ModuleConfig struct { type RouteConfig struct { Input string `json:"input"` Processors []ProcessorConfig `json:"processors"` - Output string `json:"output"` } type ProcessorConfig struct { diff --git a/internal/module/http-client.go b/internal/module/http-client.go index 7e44a80..3c6c056 100644 --- a/internal/module/http-client.go +++ b/internal/module/http-client.go @@ -10,14 +10,13 @@ import ( "github.com/jwetzell/showbridge-go/internal/common" "github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/processor" - "github.com/jwetzell/showbridge-go/internal/route" ) type HTTPClient struct { config config.ModuleConfig ctx context.Context client *http.Client - router route.RouteIO + router common.RouteIO logger *slog.Logger cancel context.CancelFunc } @@ -42,7 +41,7 @@ func (hc *HTTPClient) Type() string { func (hc *HTTPClient) Start(ctx context.Context) error { hc.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO) + router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) if !ok { return errors.New("http.client unable to get router from context") diff --git a/internal/module/http-server.go b/internal/module/http-server.go index 0df3b29..eaaef72 100644 --- a/internal/module/http-server.go +++ b/internal/module/http-server.go @@ -12,14 +12,13 @@ import ( "github.com/jwetzell/showbridge-go/internal/common" "github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/processor" - "github.com/jwetzell/showbridge-go/internal/route" ) type HTTPServer struct { config config.ModuleConfig Port uint16 ctx context.Context - router route.RouteIO + router common.RouteIO logger *slog.Logger cancel context.CancelFunc } @@ -148,7 +147,7 @@ func (hs *HTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (hs *HTTPServer) Start(ctx context.Context) error { hs.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO) + router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) if !ok { return errors.New("http.server unable to get router from context") diff --git a/internal/module/midi-input.go b/internal/module/midi-input.go index 8ee314b..690faee 100644 --- a/internal/module/midi-input.go +++ b/internal/module/midi-input.go @@ -10,7 +10,6 @@ import ( "github.com/jwetzell/showbridge-go/internal/common" "github.com/jwetzell/showbridge-go/internal/config" - "github.com/jwetzell/showbridge-go/internal/route" "gitlab.com/gomidi/midi/v2" _ "gitlab.com/gomidi/midi/v2/drivers/rtmididrv" ) @@ -18,7 +17,7 @@ import ( type MIDIInput struct { config config.ModuleConfig ctx context.Context - router route.RouteIO + router common.RouteIO Port string SendFunc func(midi.Message) error logger *slog.Logger @@ -51,7 +50,7 @@ func (mi *MIDIInput) Type() string { func (mi *MIDIInput) Start(ctx context.Context) error { mi.logger.Debug("running") defer midi.CloseDriver() - router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO) + router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) if !ok { return errors.New("midi.input unable to get router from context") diff --git a/internal/module/midi-output.go b/internal/module/midi-output.go index caf07d5..8503859 100644 --- a/internal/module/midi-output.go +++ b/internal/module/midi-output.go @@ -11,7 +11,6 @@ import ( "github.com/jwetzell/showbridge-go/internal/common" "github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/processor" - "github.com/jwetzell/showbridge-go/internal/route" "gitlab.com/gomidi/midi/v2" _ "gitlab.com/gomidi/midi/v2/drivers/rtmididrv" ) @@ -19,7 +18,7 @@ import ( type MIDIOutput struct { config config.ModuleConfig ctx context.Context - router route.RouteIO + router common.RouteIO Port string SendFunc func(midi.Message) error logger *slog.Logger @@ -53,7 +52,7 @@ func (mo *MIDIOutput) Type() string { func (mo *MIDIOutput) Start(ctx context.Context) error { mo.logger.Debug("running") defer midi.CloseDriver() - router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO) + router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) if !ok { return errors.New("midi.output unable to get router from context") diff --git a/internal/module/mqtt-client.go b/internal/module/mqtt-client.go index ad0e9bf..692dc88 100644 --- a/internal/module/mqtt-client.go +++ b/internal/module/mqtt-client.go @@ -10,13 +10,12 @@ import ( "github.com/jwetzell/showbridge-go/internal/common" "github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/processor" - "github.com/jwetzell/showbridge-go/internal/route" ) type MQTTClient struct { config config.ModuleConfig ctx context.Context - router route.RouteIO + router common.RouteIO Broker string ClientID string Topic string @@ -63,7 +62,7 @@ func (mc *MQTTClient) Type() string { func (mc *MQTTClient) Start(ctx context.Context) error { mc.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO) + router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) if !ok { return errors.New("mqtt.client unable to get router from context") diff --git a/internal/module/nats-client.go b/internal/module/nats-client.go index 5316caa..2e25d04 100644 --- a/internal/module/nats-client.go +++ b/internal/module/nats-client.go @@ -8,14 +8,13 @@ import ( "github.com/jwetzell/showbridge-go/internal/common" "github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/processor" - "github.com/jwetzell/showbridge-go/internal/route" "github.com/nats-io/nats.go" ) type NATSClient struct { config config.ModuleConfig ctx context.Context - router route.RouteIO + router common.RouteIO URL string Subject string client *nats.Conn @@ -54,7 +53,7 @@ func (nc *NATSClient) Type() string { func (nc *NATSClient) Start(ctx context.Context) error { nc.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO) + router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) if !ok { return errors.New("nats.client unable to get router from context") diff --git a/internal/module/nats-server.go b/internal/module/nats-server.go index 50e7fc8..eee10ad 100644 --- a/internal/module/nats-server.go +++ b/internal/module/nats-server.go @@ -10,7 +10,6 @@ import ( "github.com/jwetzell/showbridge-go/internal/common" "github.com/jwetzell/showbridge-go/internal/config" - "github.com/jwetzell/showbridge-go/internal/route" "github.com/nats-io/nats-server/v2/server" ) @@ -19,7 +18,7 @@ type NATSServer struct { ctx context.Context Ip string Port int - router route.RouteIO + router common.RouteIO server *server.Server logger *slog.Logger cancel context.CancelFunc @@ -67,7 +66,7 @@ func (ns *NATSServer) Type() string { func (ns *NATSServer) Start(ctx context.Context) error { ns.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO) + router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) if !ok { return errors.New("nats.server unable to get router from context") diff --git a/internal/module/psn-client.go b/internal/module/psn-client.go index b3fc9ca..822a1d2 100644 --- a/internal/module/psn-client.go +++ b/internal/module/psn-client.go @@ -11,14 +11,13 @@ import ( "github.com/jwetzell/psn-go" "github.com/jwetzell/showbridge-go/internal/common" "github.com/jwetzell/showbridge-go/internal/config" - "github.com/jwetzell/showbridge-go/internal/route" ) type PSNClient struct { config config.ModuleConfig conn *net.UDPConn ctx context.Context - router route.RouteIO + router common.RouteIO decoder *psn.Decoder logger *slog.Logger cancel context.CancelFunc @@ -44,7 +43,7 @@ func (pc *PSNClient) Type() string { func (pc *PSNClient) Start(ctx context.Context) error { pc.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO) + router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) if !ok { return errors.New("psn.client unable to get router from context") diff --git a/internal/module/serial-client.go b/internal/module/serial-client.go index fd139cb..6d73dac 100644 --- a/internal/module/serial-client.go +++ b/internal/module/serial-client.go @@ -13,14 +13,13 @@ import ( "github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/framer" "github.com/jwetzell/showbridge-go/internal/processor" - "github.com/jwetzell/showbridge-go/internal/route" "go.bug.st/serial" ) type SerialClient struct { config config.ModuleConfig ctx context.Context - router route.RouteIO + router common.RouteIO Port string Framer framer.Framer Mode *serial.Mode @@ -86,7 +85,7 @@ func (sc *SerialClient) SetupPort() error { func (sc *SerialClient) Start(ctx context.Context) error { sc.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO) + router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) if !ok { return errors.New("serial.client unable to get router from context") diff --git a/internal/module/sip-call-server.go b/internal/module/sip-call-server.go index 249858d..24392b8 100644 --- a/internal/module/sip-call-server.go +++ b/internal/module/sip-call-server.go @@ -17,13 +17,12 @@ import ( "github.com/jwetzell/showbridge-go/internal/common" "github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/processor" - "github.com/jwetzell/showbridge-go/internal/route" ) type SIPCallServer struct { config config.ModuleConfig ctx context.Context - router route.RouteIO + router common.RouteIO IP string Port int Transport string @@ -101,7 +100,7 @@ func (scs *SIPCallServer) Type() string { func (scs *SIPCallServer) Start(ctx context.Context) error { scs.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO) + router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) if !ok { return errors.New("sip.call.server unable to get router from context") diff --git a/internal/module/sip-dtmf-server.go b/internal/module/sip-dtmf-server.go index b92cc9a..b609928 100644 --- a/internal/module/sip-dtmf-server.go +++ b/internal/module/sip-dtmf-server.go @@ -18,13 +18,12 @@ import ( "github.com/jwetzell/showbridge-go/internal/common" "github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/processor" - "github.com/jwetzell/showbridge-go/internal/route" ) type SIPDTMFServer struct { config config.ModuleConfig ctx context.Context - router route.RouteIO + router common.RouteIO IP string Port int Transport string @@ -114,7 +113,7 @@ func (sds *SIPDTMFServer) Type() string { func (sds *SIPDTMFServer) Start(ctx context.Context) error { sds.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO) + router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) if !ok { return errors.New("sip.dtmf.server unable to get router from context") diff --git a/internal/module/tcp-client.go b/internal/module/tcp-client.go index 11c3622..a79325e 100644 --- a/internal/module/tcp-client.go +++ b/internal/module/tcp-client.go @@ -12,7 +12,6 @@ import ( "github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/framer" "github.com/jwetzell/showbridge-go/internal/processor" - "github.com/jwetzell/showbridge-go/internal/route" ) type TCPClient struct { @@ -20,7 +19,7 @@ type TCPClient struct { framer framer.Framer conn *net.TCPConn ctx context.Context - router route.RouteIO + router common.RouteIO Addr *net.TCPAddr logger *slog.Logger cancel context.CancelFunc @@ -71,7 +70,7 @@ func (tc *TCPClient) Type() string { func (tc *TCPClient) Start(ctx context.Context) error { tc.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO) + router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) if !ok { return errors.New("net.tcp.client unable to get router from context") diff --git a/internal/module/tcp-server.go b/internal/module/tcp-server.go index 1fc2136..bad9b0a 100644 --- a/internal/module/tcp-server.go +++ b/internal/module/tcp-server.go @@ -15,7 +15,6 @@ import ( "github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/framer" "github.com/jwetzell/showbridge-go/internal/processor" - "github.com/jwetzell/showbridge-go/internal/route" ) type TCPServer struct { @@ -23,7 +22,7 @@ type TCPServer struct { Addr *net.TCPAddr Framer framer.Framer ctx context.Context - router route.RouteIO + router common.RouteIO quit chan interface{} wg sync.WaitGroup connections []*net.TCPConn @@ -161,7 +160,7 @@ ClientRead: func (ts *TCPServer) Start(ctx context.Context) error { ts.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO) + router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) if !ok { return errors.New("net.tcp.server unable to get router from context") diff --git a/internal/module/time-interval.go b/internal/module/time-interval.go index 40684a1..ac76e72 100644 --- a/internal/module/time-interval.go +++ b/internal/module/time-interval.go @@ -9,14 +9,13 @@ import ( "github.com/jwetzell/showbridge-go/internal/common" "github.com/jwetzell/showbridge-go/internal/config" - "github.com/jwetzell/showbridge-go/internal/route" ) type TimeInterval struct { config config.ModuleConfig Duration uint32 ctx context.Context - router route.RouteIO + router common.RouteIO ticker *time.Ticker logger *slog.Logger cancel context.CancelFunc @@ -47,7 +46,7 @@ func (i *TimeInterval) Type() string { func (i *TimeInterval) Start(ctx context.Context) error { i.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO) + router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) if !ok { return errors.New("time.interval unable to get router from context") diff --git a/internal/module/time-timer.go b/internal/module/time-timer.go index 51a839b..be0c556 100644 --- a/internal/module/time-timer.go +++ b/internal/module/time-timer.go @@ -9,14 +9,13 @@ import ( "github.com/jwetzell/showbridge-go/internal/common" "github.com/jwetzell/showbridge-go/internal/config" - "github.com/jwetzell/showbridge-go/internal/route" ) type TimeTimer struct { config config.ModuleConfig Duration uint32 ctx context.Context - router route.RouteIO + router common.RouteIO timer *time.Timer logger *slog.Logger cancel context.CancelFunc @@ -48,7 +47,7 @@ func (t *TimeTimer) Type() string { func (t *TimeTimer) Start(ctx context.Context) error { t.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO) + router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) if !ok { return errors.New("net.tcp.client unable to get router from context") diff --git a/internal/module/udp-client.go b/internal/module/udp-client.go index 06f718f..d07131f 100644 --- a/internal/module/udp-client.go +++ b/internal/module/udp-client.go @@ -10,7 +10,6 @@ import ( "github.com/jwetzell/showbridge-go/internal/common" "github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/processor" - "github.com/jwetzell/showbridge-go/internal/route" ) type UDPClient struct { @@ -19,7 +18,7 @@ type UDPClient struct { Port uint16 conn *net.UDPConn ctx context.Context - router route.RouteIO + router common.RouteIO logger *slog.Logger cancel context.CancelFunc } @@ -64,7 +63,7 @@ func (uc *UDPClient) SetupConn() error { func (uc *UDPClient) Start(ctx context.Context) error { uc.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO) + router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) if !ok { return errors.New("net.udp.client unable to get router from context") diff --git a/internal/module/udp-multicast.go b/internal/module/udp-multicast.go index 9684feb..54e4087 100644 --- a/internal/module/udp-multicast.go +++ b/internal/module/udp-multicast.go @@ -11,14 +11,13 @@ import ( "github.com/jwetzell/showbridge-go/internal/common" "github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/processor" - "github.com/jwetzell/showbridge-go/internal/route" ) type UDPMulticast struct { config config.ModuleConfig conn *net.UDPConn ctx context.Context - router route.RouteIO + router common.RouteIO Addr *net.UDPAddr logger *slog.Logger cancel context.CancelFunc @@ -58,7 +57,7 @@ func (um *UDPMulticast) Type() string { func (um *UDPMulticast) Start(ctx context.Context) error { um.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO) + router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) if !ok { return errors.New("net.udp.multicast unable to get router from context") diff --git a/internal/module/udp-server.go b/internal/module/udp-server.go index 6ff7bac..1bf257c 100644 --- a/internal/module/udp-server.go +++ b/internal/module/udp-server.go @@ -10,7 +10,6 @@ import ( "github.com/jwetzell/showbridge-go/internal/common" "github.com/jwetzell/showbridge-go/internal/config" - "github.com/jwetzell/showbridge-go/internal/route" ) type UDPServer struct { @@ -18,7 +17,7 @@ type UDPServer struct { BufferSize int config config.ModuleConfig ctx context.Context - router route.RouteIO + router common.RouteIO logger *slog.Logger cancel context.CancelFunc } @@ -70,7 +69,7 @@ func (us *UDPServer) Type() string { func (us *UDPServer) Start(ctx context.Context) error { us.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO) + router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) if !ok { return errors.New("net.udp.server unable to get router from context") diff --git a/internal/processor/router-output.go b/internal/processor/router-output.go new file mode 100644 index 0000000..8524cf3 --- /dev/null +++ b/internal/processor/router-output.go @@ -0,0 +1,55 @@ +package processor + +import ( + "context" + "errors" + "fmt" + "log/slog" + + "github.com/jwetzell/showbridge-go/internal/common" + "github.com/jwetzell/showbridge-go/internal/config" +) + +type RouterOutput struct { + config config.ProcessorConfig + ModuleId string + logger *slog.Logger +} + +func (dl *RouterOutput) Process(ctx context.Context, payload any) (any, error) { + + router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) + if !ok { + return nil, errors.New("router.output no router found") + } + + err := router.HandleOutput(ctx, dl.ModuleId, payload) + + if err != nil { + return nil, fmt.Errorf("router.output failed to send output: %w", err) + } + + return payload, nil +} + +func (dl *RouterOutput) Type() string { + return dl.config.Type +} + +func init() { + RegisterProcessor(ProcessorRegistration{ + Type: "router.output", + New: func(config config.ProcessorConfig) (Processor, error) { + + params := config.Params + + moduleId, err := params.GetString("module") + + if err != nil { + return nil, fmt.Errorf("router.output module error: %w", err) + } + + return &RouterOutput{config: config, ModuleId: moduleId, logger: slog.Default().With("component", "processor", "type", config.Type)}, nil + }, + }) +} diff --git a/internal/route/route.go b/internal/route/route.go index cc41a03..351448d 100644 --- a/internal/route/route.go +++ b/internal/route/route.go @@ -17,23 +17,9 @@ type RouteError struct { Config config.RouteConfig Error error } - -type RouteIOError struct { - Index int - OutputError error - ProcessError error - InputError error -} - -type RouteIO interface { - HandleInput(ctx context.Context, sourceId string, payload any) (bool, []RouteIOError) - HandleOutput(ctx context.Context, destinationId string, payload any) error -} - type Route struct { input string processors []processor.Processor - output string } func NewRoute(config config.RouteConfig) (*Route, error) { @@ -54,17 +40,13 @@ func NewRoute(config config.RouteConfig) (*Route, error) { } } - return &Route{input: config.Input, processors: processors, output: config.Output}, nil + return &Route{input: config.Input, processors: processors}, nil } func (r *Route) Input() string { return r.input } -func (r *Route) Output() string { - return r.output -} - func (r *Route) ProcessPayload(ctx context.Context, payload any) (any, error) { tracer := otel.Tracer("route") processCtx, processSpan := tracer.Start(ctx, "ProcessPayload") diff --git a/internal/route/route_test.go b/internal/route/route_test.go index dd8a9ca..79a9472 100644 --- a/internal/route/route_test.go +++ b/internal/route/route_test.go @@ -13,8 +13,7 @@ import ( func TestRouteCreate(t *testing.T) { routeConfig := config.RouteConfig{ - Input: "input", - Output: "output", + Input: "input", } testRoute, err := route.NewRoute(routeConfig) @@ -25,15 +24,12 @@ func TestRouteCreate(t *testing.T) { if testRoute.Input() != routeConfig.Input { t.Fatalf("route input does not match expected input") } - if testRoute.Output() != routeConfig.Output { - t.Fatalf("route output does not match expected output") - } } type MockRouter struct{} -func (mr *MockRouter) HandleInput(ctx context.Context, sourceId string, payload any) (bool, []route.RouteIOError) { - return false, []route.RouteIOError{} +func (mr *MockRouter) HandleInput(ctx context.Context, sourceId string, payload any) (bool, []common.RouteIOError) { + return false, []common.RouteIOError{} } func (mr *MockRouter) HandleOutput(ctx context.Context, destinationId string, payload any) error { @@ -45,8 +41,13 @@ func TestGoodRouteHandleInput(t *testing.T) { Input: "input", Processors: []config.ProcessorConfig{ {Type: "string.encode"}, + { + Type: "router.output", + Params: config.Params{ + "module": "output", + }, + }, }, - Output: "output", } testRoute, err := route.NewRoute(routeConfig) @@ -75,8 +76,13 @@ func TestRouteHandleInputWithProcessorError(t *testing.T) { Input: "input", Processors: []config.ProcessorConfig{ {Type: "string.create", Params: map[string]any{"template": "{{.invalid}}}"}}, + { + Type: "router.output", + Params: config.Params{ + "module": "output", + }, + }, }, - Output: "output", } testRoute, err := route.NewRoute(routeConfig) @@ -93,9 +99,15 @@ func TestRouteHandleInputWithProcessorError(t *testing.T) { func TestRouteHandleNilPayload(t *testing.T) { routeConfig := config.RouteConfig{ - Input: "input", - Processors: []config.ProcessorConfig{}, - Output: "output", + Input: "input", + Processors: []config.ProcessorConfig{ + { + Type: "router.output", + Params: config.Params{ + "module": "output", + }, + }, + }, } testRoute, err := route.NewRoute(routeConfig) @@ -118,8 +130,13 @@ func TestRouteHandleNilPayloadFromProcessor(t *testing.T) { Input: "input", Processors: []config.ProcessorConfig{ {Type: "script.js", Params: map[string]any{"program": "payload = undefined"}}, + { + Type: "router.output", + Params: config.Params{ + "module": "output", + }, + }, }, - Output: "output", } testRoute, err := route.NewRoute(routeConfig) @@ -143,7 +160,6 @@ func TestRouteUnknownProcessor(t *testing.T) { Processors: []config.ProcessorConfig{ {Type: "asdfasdflkjalkj"}, }, - Output: "output", } _, err := route.NewRoute(routeConfig) @@ -158,7 +174,6 @@ func TestRouteBadProcessorConfig(t *testing.T) { Processors: []config.ProcessorConfig{ {Type: "string.create", Params: map[string]any{}}, }, - Output: "output", } _, err := route.NewRoute(routeConfig) diff --git a/router.go b/router.go index 6ccd30c..0e42922 100644 --- a/router.go +++ b/router.go @@ -172,10 +172,10 @@ func (r *Router) Stop() { r.contextCancel() } -func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) (bool, []route.RouteIOError) { +func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) (bool, []common.RouteIOError) { spanCtx, span := otel.Tracer("router").Start(ctx, "input", trace.WithAttributes(attribute.String("source.id", sourceId)), trace.WithNewRoot()) defer span.End() - var routeIOErrors []route.RouteIOError + var routeIOErrors []common.RouteIOError routeFound := false var routeWaitGroup sync.WaitGroup @@ -190,37 +190,22 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) routeFound = true routeContext := context.WithValue(spanCtx, common.SourceContextKey, sourceId) + routeContext = context.WithValue(routeContext, common.RouterContextKey, r) routeContext = context.WithValue(routeContext, common.ModulesContextKey, r.ModuleInstances) - routeCtx, routeSpan := otel.Tracer("router").Start(routeContext, "route", trace.WithAttributes(attribute.Int("route.index", routeIndex), attribute.String("route.input", routeInstance.Input()), attribute.String("route.output", routeInstance.Output()))) - payload, err := routeInstance.ProcessPayload(routeCtx, payload) + routeCtx, routeSpan := otel.Tracer("router").Start(routeContext, "route", trace.WithAttributes(attribute.Int("route.index", routeIndex), attribute.String("route.input", routeInstance.Input()))) + _, err := routeInstance.ProcessPayload(routeCtx, payload) if err != nil { if routeIOErrors == nil { - routeIOErrors = []route.RouteIOError{} + routeIOErrors = []common.RouteIOError{} } r.logger.Error("unable to process input", "route", routeIndex, "source", sourceId, "error", err) - routeIOErrors = append(routeIOErrors, route.RouteIOError{ + routeIOErrors = append(routeIOErrors, common.RouteIOError{ Index: routeIndex, ProcessError: err, }) return } - - if payload == nil { - r.logger.Debug("no payload after processing, route terminated", "route", routeIndex, "source", sourceId) - return - } - - outputError := r.HandleOutput(routeCtx, routeInstance.Output(), payload) - if outputError != nil { - if routeIOErrors == nil { - routeIOErrors = []route.RouteIOError{} - } - routeIOErrors = append(routeIOErrors, route.RouteIOError{ - Index: routeIndex, - OutputError: outputError, - }) - } routeSpan.End() }) } diff --git a/router_test.go b/router_test.go index 81f52fb..441e03c 100644 --- a/router_test.go +++ b/router_test.go @@ -12,14 +12,13 @@ import ( "github.com/jwetzell/showbridge-go/internal/common" "github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/module" - "github.com/jwetzell/showbridge-go/internal/route" ) type MockCounterModule struct { config config.ModuleConfig ctx context.Context outputCount int - router route.RouteIO + router common.RouteIO logger *slog.Logger cancel context.CancelFunc } @@ -34,7 +33,7 @@ func (mcm *MockCounterModule) Output(context.Context, any) error { } func (mcm *MockCounterModule) Start(ctx context.Context) error { - router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO) + router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) if !ok { return fmt.Errorf("mock.counter could not get router from context") @@ -171,7 +170,6 @@ func TestNewRouterRouteWithUnknwonProcessor(t *testing.T) { Type: "asdfasdf", }, }, - Output: "mock", }, }, } @@ -201,8 +199,15 @@ func TestRouterInputUnknownDestinationModule(t *testing.T) { }, Routes: []config.RouteConfig{ { - Input: "mock", - Output: "test", + Input: "mock", + Processors: []config.ProcessorConfig{ + { + Type: "router.output", + Params: config.Params{ + "module": "test", + }, + }, + }, }, }, } @@ -238,8 +243,8 @@ func TestRouterInputUnknownDestinationModule(t *testing.T) { t.Fatalf("router should have returned exactly 1 routing error, got: %d", len(routingErrors)) } - if routingErrors[0].OutputError.Error() != "no module found for destination id" { - t.Fatalf("routing output error did not match expected, got: %s", routingErrors[0].OutputError.Error()) + if routingErrors[0].ProcessError.Error() != "router.output failed to send output: no module found for destination id" { + t.Fatalf("routing output error did not match expected, got: %s", routingErrors[0].ProcessError.Error()) } } @@ -253,8 +258,15 @@ func TestRouterInputNoMatchingRoute(t *testing.T) { }, Routes: []config.RouteConfig{ { - Input: "test", - Output: "mock", + Input: "test", + Processors: []config.ProcessorConfig{ + { + Type: "router.output", + Params: config.Params{ + "module": "mock", + }, + }, + }, }, }, } @@ -297,8 +309,15 @@ func TestRouterInputSingleRoute(t *testing.T) { }, Routes: []config.RouteConfig{ { - Input: "mock", - Output: "mock", + Input: "mock", + Processors: []config.ProcessorConfig{ + { + Type: "router.output", + Params: config.Params{ + "module": "mock", + }, + }, + }, }, }, } @@ -361,16 +380,37 @@ func TestRouterInputMultipleRoutes(t *testing.T) { }, Routes: []config.RouteConfig{ { - Input: "mock", - Output: "mock", + Input: "mock", + Processors: []config.ProcessorConfig{ + { + Type: "router.output", + Params: config.Params{ + "module": "mock", + }, + }, + }, }, { - Input: "mock", - Output: "mock", + Input: "mock", + Processors: []config.ProcessorConfig{ + { + Type: "router.output", + Params: config.Params{ + "module": "mock", + }, + }, + }, }, { - Input: "mock", - Output: "mock", + Input: "mock", + Processors: []config.ProcessorConfig{ + { + Type: "router.output", + Params: config.Params{ + "module": "mock", + }, + }, + }, }, }, } @@ -436,12 +476,26 @@ func TestRouterInputMultipleModules(t *testing.T) { }, Routes: []config.RouteConfig{ { - Input: "mock1", - Output: "mock1", + Input: "mock1", + Processors: []config.ProcessorConfig{ + { + Type: "router.output", + Params: config.Params{ + "module": "mock1", + }, + }, + }, }, { - Input: "mock2", - Output: "mock2", + Input: "mock2", + Processors: []config.ProcessorConfig{ + { + Type: "router.output", + Params: config.Params{ + "module": "mock2", + }, + }, + }, }, }, } diff --git a/schema/processors.schema.json b/schema/processors.schema.json index 00c218b..2246a34 100644 --- a/schema/processors.schema.json +++ b/schema/processors.schema.json @@ -614,6 +614,28 @@ "required": ["type", "params"], "additionalProperties": false }, + { + "type": "object", + "title": "Router Output", + "properties": { + "type": { + "type": "string", + "const": "router.output" + }, + "params": { + "type": "object", + "properties": { + "module": { + "type": "string", + "description": "ID of module to send output to" + } + }, + "required": ["module"] + } + }, + "required": ["type", "params"], + "additionalProperties": false + }, { "type": "object", "title": "Evaluate Expr expression", diff --git a/schema/routes.schema.json b/schema/routes.schema.json index 874dda8..d5a58aa 100644 --- a/schema/routes.schema.json +++ b/schema/routes.schema.json @@ -13,12 +13,8 @@ }, "processors": { "$ref": "https://showbridge.io/processors.schema.json" - }, - "output": { - "type": "string", - "minLength": 1 } }, - "required": ["input", "output"] + "required": ["input"] } }