switch router output to be a processor instead of specific output per route

This commit is contained in:
Joel Wetzell
2026-03-04 21:21:11 -06:00
parent 078e6ec68c
commit b7a8b04a72
28 changed files with 246 additions and 140 deletions

View File

@@ -13,6 +13,8 @@ routes:
processors:
- type: osc.message.create
params:
address: "{{.URL.Path}}"
address: "{{.Payload.URL.Path}}"
- type: osc.message.encode
output: udp
- type: router.output
params:
module: udp

View File

@@ -1,8 +1,22 @@
package common
import "context"
type contextKey string
const RouterContextKey contextKey = contextKey("router")
const SourceContextKey contextKey = contextKey("source")
const ModulesContextKey contextKey = contextKey("modules")
const SenderContextKey contextKey = contextKey("sender")
type RouteIO interface {
HandleInput(ctx context.Context, sourceId string, payload any) (bool, []RouteIOError)
HandleOutput(ctx context.Context, destinationId string, payload any) error
}
type RouteIOError struct {
Index int
OutputError error
ProcessError error
InputError error
}

View File

@@ -273,7 +273,6 @@ type ModuleConfig struct {
type RouteConfig struct {
Input string `json:"input"`
Processors []ProcessorConfig `json:"processors"`
Output string `json:"output"`
}
type ProcessorConfig struct {

View File

@@ -10,14 +10,13 @@ import (
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/processor"
"github.com/jwetzell/showbridge-go/internal/route"
)
type HTTPClient struct {
config config.ModuleConfig
ctx context.Context
client *http.Client
router route.RouteIO
router common.RouteIO
logger *slog.Logger
cancel context.CancelFunc
}
@@ -42,7 +41,7 @@ func (hc *HTTPClient) Type() string {
func (hc *HTTPClient) Start(ctx context.Context) error {
hc.logger.Debug("running")
router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO)
router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO)
if !ok {
return errors.New("http.client unable to get router from context")

View File

@@ -12,14 +12,13 @@ import (
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/processor"
"github.com/jwetzell/showbridge-go/internal/route"
)
type HTTPServer struct {
config config.ModuleConfig
Port uint16
ctx context.Context
router route.RouteIO
router common.RouteIO
logger *slog.Logger
cancel context.CancelFunc
}
@@ -148,7 +147,7 @@ func (hs *HTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func (hs *HTTPServer) Start(ctx context.Context) error {
hs.logger.Debug("running")
router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO)
router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO)
if !ok {
return errors.New("http.server unable to get router from context")

View File

@@ -10,7 +10,6 @@ import (
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/route"
"gitlab.com/gomidi/midi/v2"
_ "gitlab.com/gomidi/midi/v2/drivers/rtmididrv"
)
@@ -18,7 +17,7 @@ import (
type MIDIInput struct {
config config.ModuleConfig
ctx context.Context
router route.RouteIO
router common.RouteIO
Port string
SendFunc func(midi.Message) error
logger *slog.Logger
@@ -51,7 +50,7 @@ func (mi *MIDIInput) Type() string {
func (mi *MIDIInput) Start(ctx context.Context) error {
mi.logger.Debug("running")
defer midi.CloseDriver()
router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO)
router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO)
if !ok {
return errors.New("midi.input unable to get router from context")

View File

@@ -11,7 +11,6 @@ import (
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/processor"
"github.com/jwetzell/showbridge-go/internal/route"
"gitlab.com/gomidi/midi/v2"
_ "gitlab.com/gomidi/midi/v2/drivers/rtmididrv"
)
@@ -19,7 +18,7 @@ import (
type MIDIOutput struct {
config config.ModuleConfig
ctx context.Context
router route.RouteIO
router common.RouteIO
Port string
SendFunc func(midi.Message) error
logger *slog.Logger
@@ -53,7 +52,7 @@ func (mo *MIDIOutput) Type() string {
func (mo *MIDIOutput) Start(ctx context.Context) error {
mo.logger.Debug("running")
defer midi.CloseDriver()
router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO)
router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO)
if !ok {
return errors.New("midi.output unable to get router from context")

View File

@@ -10,13 +10,12 @@ import (
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/processor"
"github.com/jwetzell/showbridge-go/internal/route"
)
type MQTTClient struct {
config config.ModuleConfig
ctx context.Context
router route.RouteIO
router common.RouteIO
Broker string
ClientID string
Topic string
@@ -63,7 +62,7 @@ func (mc *MQTTClient) Type() string {
func (mc *MQTTClient) Start(ctx context.Context) error {
mc.logger.Debug("running")
router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO)
router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO)
if !ok {
return errors.New("mqtt.client unable to get router from context")

View File

@@ -8,14 +8,13 @@ import (
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/processor"
"github.com/jwetzell/showbridge-go/internal/route"
"github.com/nats-io/nats.go"
)
type NATSClient struct {
config config.ModuleConfig
ctx context.Context
router route.RouteIO
router common.RouteIO
URL string
Subject string
client *nats.Conn
@@ -54,7 +53,7 @@ func (nc *NATSClient) Type() string {
func (nc *NATSClient) Start(ctx context.Context) error {
nc.logger.Debug("running")
router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO)
router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO)
if !ok {
return errors.New("nats.client unable to get router from context")

View File

@@ -10,7 +10,6 @@ import (
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/route"
"github.com/nats-io/nats-server/v2/server"
)
@@ -19,7 +18,7 @@ type NATSServer struct {
ctx context.Context
Ip string
Port int
router route.RouteIO
router common.RouteIO
server *server.Server
logger *slog.Logger
cancel context.CancelFunc
@@ -67,7 +66,7 @@ func (ns *NATSServer) Type() string {
func (ns *NATSServer) Start(ctx context.Context) error {
ns.logger.Debug("running")
router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO)
router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO)
if !ok {
return errors.New("nats.server unable to get router from context")

View File

@@ -11,14 +11,13 @@ import (
"github.com/jwetzell/psn-go"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/route"
)
type PSNClient struct {
config config.ModuleConfig
conn *net.UDPConn
ctx context.Context
router route.RouteIO
router common.RouteIO
decoder *psn.Decoder
logger *slog.Logger
cancel context.CancelFunc
@@ -44,7 +43,7 @@ func (pc *PSNClient) Type() string {
func (pc *PSNClient) Start(ctx context.Context) error {
pc.logger.Debug("running")
router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO)
router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO)
if !ok {
return errors.New("psn.client unable to get router from context")

View File

@@ -13,14 +13,13 @@ import (
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/framer"
"github.com/jwetzell/showbridge-go/internal/processor"
"github.com/jwetzell/showbridge-go/internal/route"
"go.bug.st/serial"
)
type SerialClient struct {
config config.ModuleConfig
ctx context.Context
router route.RouteIO
router common.RouteIO
Port string
Framer framer.Framer
Mode *serial.Mode
@@ -86,7 +85,7 @@ func (sc *SerialClient) SetupPort() error {
func (sc *SerialClient) Start(ctx context.Context) error {
sc.logger.Debug("running")
router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO)
router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO)
if !ok {
return errors.New("serial.client unable to get router from context")

View File

@@ -17,13 +17,12 @@ import (
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/processor"
"github.com/jwetzell/showbridge-go/internal/route"
)
type SIPCallServer struct {
config config.ModuleConfig
ctx context.Context
router route.RouteIO
router common.RouteIO
IP string
Port int
Transport string
@@ -101,7 +100,7 @@ func (scs *SIPCallServer) Type() string {
func (scs *SIPCallServer) Start(ctx context.Context) error {
scs.logger.Debug("running")
router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO)
router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO)
if !ok {
return errors.New("sip.call.server unable to get router from context")

View File

@@ -18,13 +18,12 @@ import (
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/processor"
"github.com/jwetzell/showbridge-go/internal/route"
)
type SIPDTMFServer struct {
config config.ModuleConfig
ctx context.Context
router route.RouteIO
router common.RouteIO
IP string
Port int
Transport string
@@ -114,7 +113,7 @@ func (sds *SIPDTMFServer) Type() string {
func (sds *SIPDTMFServer) Start(ctx context.Context) error {
sds.logger.Debug("running")
router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO)
router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO)
if !ok {
return errors.New("sip.dtmf.server unable to get router from context")

View File

@@ -12,7 +12,6 @@ import (
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/framer"
"github.com/jwetzell/showbridge-go/internal/processor"
"github.com/jwetzell/showbridge-go/internal/route"
)
type TCPClient struct {
@@ -20,7 +19,7 @@ type TCPClient struct {
framer framer.Framer
conn *net.TCPConn
ctx context.Context
router route.RouteIO
router common.RouteIO
Addr *net.TCPAddr
logger *slog.Logger
cancel context.CancelFunc
@@ -71,7 +70,7 @@ func (tc *TCPClient) Type() string {
func (tc *TCPClient) Start(ctx context.Context) error {
tc.logger.Debug("running")
router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO)
router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO)
if !ok {
return errors.New("net.tcp.client unable to get router from context")

View File

@@ -15,7 +15,6 @@ import (
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/framer"
"github.com/jwetzell/showbridge-go/internal/processor"
"github.com/jwetzell/showbridge-go/internal/route"
)
type TCPServer struct {
@@ -23,7 +22,7 @@ type TCPServer struct {
Addr *net.TCPAddr
Framer framer.Framer
ctx context.Context
router route.RouteIO
router common.RouteIO
quit chan interface{}
wg sync.WaitGroup
connections []*net.TCPConn
@@ -161,7 +160,7 @@ ClientRead:
func (ts *TCPServer) Start(ctx context.Context) error {
ts.logger.Debug("running")
router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO)
router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO)
if !ok {
return errors.New("net.tcp.server unable to get router from context")

View File

@@ -9,14 +9,13 @@ import (
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/route"
)
type TimeInterval struct {
config config.ModuleConfig
Duration uint32
ctx context.Context
router route.RouteIO
router common.RouteIO
ticker *time.Ticker
logger *slog.Logger
cancel context.CancelFunc
@@ -47,7 +46,7 @@ func (i *TimeInterval) Type() string {
func (i *TimeInterval) Start(ctx context.Context) error {
i.logger.Debug("running")
router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO)
router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO)
if !ok {
return errors.New("time.interval unable to get router from context")

View File

@@ -9,14 +9,13 @@ import (
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/route"
)
type TimeTimer struct {
config config.ModuleConfig
Duration uint32
ctx context.Context
router route.RouteIO
router common.RouteIO
timer *time.Timer
logger *slog.Logger
cancel context.CancelFunc
@@ -48,7 +47,7 @@ func (t *TimeTimer) Type() string {
func (t *TimeTimer) Start(ctx context.Context) error {
t.logger.Debug("running")
router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO)
router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO)
if !ok {
return errors.New("net.tcp.client unable to get router from context")

View File

@@ -10,7 +10,6 @@ import (
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/processor"
"github.com/jwetzell/showbridge-go/internal/route"
)
type UDPClient struct {
@@ -19,7 +18,7 @@ type UDPClient struct {
Port uint16
conn *net.UDPConn
ctx context.Context
router route.RouteIO
router common.RouteIO
logger *slog.Logger
cancel context.CancelFunc
}
@@ -64,7 +63,7 @@ func (uc *UDPClient) SetupConn() error {
func (uc *UDPClient) Start(ctx context.Context) error {
uc.logger.Debug("running")
router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO)
router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO)
if !ok {
return errors.New("net.udp.client unable to get router from context")

View File

@@ -11,14 +11,13 @@ import (
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/processor"
"github.com/jwetzell/showbridge-go/internal/route"
)
type UDPMulticast struct {
config config.ModuleConfig
conn *net.UDPConn
ctx context.Context
router route.RouteIO
router common.RouteIO
Addr *net.UDPAddr
logger *slog.Logger
cancel context.CancelFunc
@@ -58,7 +57,7 @@ func (um *UDPMulticast) Type() string {
func (um *UDPMulticast) Start(ctx context.Context) error {
um.logger.Debug("running")
router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO)
router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO)
if !ok {
return errors.New("net.udp.multicast unable to get router from context")

View File

@@ -10,7 +10,6 @@ import (
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/route"
)
type UDPServer struct {
@@ -18,7 +17,7 @@ type UDPServer struct {
BufferSize int
config config.ModuleConfig
ctx context.Context
router route.RouteIO
router common.RouteIO
logger *slog.Logger
cancel context.CancelFunc
}
@@ -70,7 +69,7 @@ func (us *UDPServer) Type() string {
func (us *UDPServer) Start(ctx context.Context) error {
us.logger.Debug("running")
router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO)
router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO)
if !ok {
return errors.New("net.udp.server unable to get router from context")

View File

@@ -0,0 +1,55 @@
package processor
import (
"context"
"errors"
"fmt"
"log/slog"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
type RouterOutput struct {
config config.ProcessorConfig
ModuleId string
logger *slog.Logger
}
func (dl *RouterOutput) Process(ctx context.Context, payload any) (any, error) {
router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO)
if !ok {
return nil, errors.New("router.output no router found")
}
err := router.HandleOutput(ctx, dl.ModuleId, payload)
if err != nil {
return nil, fmt.Errorf("router.output failed to send output: %w", err)
}
return payload, nil
}
func (dl *RouterOutput) Type() string {
return dl.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "router.output",
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
moduleId, err := params.GetString("module")
if err != nil {
return nil, fmt.Errorf("router.output module error: %w", err)
}
return &RouterOutput{config: config, ModuleId: moduleId, logger: slog.Default().With("component", "processor", "type", config.Type)}, nil
},
})
}

View File

@@ -17,23 +17,9 @@ type RouteError struct {
Config config.RouteConfig
Error error
}
type RouteIOError struct {
Index int
OutputError error
ProcessError error
InputError error
}
type RouteIO interface {
HandleInput(ctx context.Context, sourceId string, payload any) (bool, []RouteIOError)
HandleOutput(ctx context.Context, destinationId string, payload any) error
}
type Route struct {
input string
processors []processor.Processor
output string
}
func NewRoute(config config.RouteConfig) (*Route, error) {
@@ -54,17 +40,13 @@ func NewRoute(config config.RouteConfig) (*Route, error) {
}
}
return &Route{input: config.Input, processors: processors, output: config.Output}, nil
return &Route{input: config.Input, processors: processors}, nil
}
func (r *Route) Input() string {
return r.input
}
func (r *Route) Output() string {
return r.output
}
func (r *Route) ProcessPayload(ctx context.Context, payload any) (any, error) {
tracer := otel.Tracer("route")
processCtx, processSpan := tracer.Start(ctx, "ProcessPayload")

View File

@@ -13,8 +13,7 @@ import (
func TestRouteCreate(t *testing.T) {
routeConfig := config.RouteConfig{
Input: "input",
Output: "output",
Input: "input",
}
testRoute, err := route.NewRoute(routeConfig)
@@ -25,15 +24,12 @@ func TestRouteCreate(t *testing.T) {
if testRoute.Input() != routeConfig.Input {
t.Fatalf("route input does not match expected input")
}
if testRoute.Output() != routeConfig.Output {
t.Fatalf("route output does not match expected output")
}
}
type MockRouter struct{}
func (mr *MockRouter) HandleInput(ctx context.Context, sourceId string, payload any) (bool, []route.RouteIOError) {
return false, []route.RouteIOError{}
func (mr *MockRouter) HandleInput(ctx context.Context, sourceId string, payload any) (bool, []common.RouteIOError) {
return false, []common.RouteIOError{}
}
func (mr *MockRouter) HandleOutput(ctx context.Context, destinationId string, payload any) error {
@@ -45,8 +41,13 @@ func TestGoodRouteHandleInput(t *testing.T) {
Input: "input",
Processors: []config.ProcessorConfig{
{Type: "string.encode"},
{
Type: "router.output",
Params: config.Params{
"module": "output",
},
},
},
Output: "output",
}
testRoute, err := route.NewRoute(routeConfig)
@@ -75,8 +76,13 @@ func TestRouteHandleInputWithProcessorError(t *testing.T) {
Input: "input",
Processors: []config.ProcessorConfig{
{Type: "string.create", Params: map[string]any{"template": "{{.invalid}}}"}},
{
Type: "router.output",
Params: config.Params{
"module": "output",
},
},
},
Output: "output",
}
testRoute, err := route.NewRoute(routeConfig)
@@ -93,9 +99,15 @@ func TestRouteHandleInputWithProcessorError(t *testing.T) {
func TestRouteHandleNilPayload(t *testing.T) {
routeConfig := config.RouteConfig{
Input: "input",
Processors: []config.ProcessorConfig{},
Output: "output",
Input: "input",
Processors: []config.ProcessorConfig{
{
Type: "router.output",
Params: config.Params{
"module": "output",
},
},
},
}
testRoute, err := route.NewRoute(routeConfig)
@@ -118,8 +130,13 @@ func TestRouteHandleNilPayloadFromProcessor(t *testing.T) {
Input: "input",
Processors: []config.ProcessorConfig{
{Type: "script.js", Params: map[string]any{"program": "payload = undefined"}},
{
Type: "router.output",
Params: config.Params{
"module": "output",
},
},
},
Output: "output",
}
testRoute, err := route.NewRoute(routeConfig)
@@ -143,7 +160,6 @@ func TestRouteUnknownProcessor(t *testing.T) {
Processors: []config.ProcessorConfig{
{Type: "asdfasdflkjalkj"},
},
Output: "output",
}
_, err := route.NewRoute(routeConfig)
@@ -158,7 +174,6 @@ func TestRouteBadProcessorConfig(t *testing.T) {
Processors: []config.ProcessorConfig{
{Type: "string.create", Params: map[string]any{}},
},
Output: "output",
}
_, err := route.NewRoute(routeConfig)

View File

@@ -172,10 +172,10 @@ func (r *Router) Stop() {
r.contextCancel()
}
func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) (bool, []route.RouteIOError) {
func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) (bool, []common.RouteIOError) {
spanCtx, span := otel.Tracer("router").Start(ctx, "input", trace.WithAttributes(attribute.String("source.id", sourceId)), trace.WithNewRoot())
defer span.End()
var routeIOErrors []route.RouteIOError
var routeIOErrors []common.RouteIOError
routeFound := false
var routeWaitGroup sync.WaitGroup
@@ -190,37 +190,22 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any)
routeFound = true
routeContext := context.WithValue(spanCtx, common.SourceContextKey, sourceId)
routeContext = context.WithValue(routeContext, common.RouterContextKey, r)
routeContext = context.WithValue(routeContext, common.ModulesContextKey, r.ModuleInstances)
routeCtx, routeSpan := otel.Tracer("router").Start(routeContext, "route", trace.WithAttributes(attribute.Int("route.index", routeIndex), attribute.String("route.input", routeInstance.Input()), attribute.String("route.output", routeInstance.Output())))
payload, err := routeInstance.ProcessPayload(routeCtx, payload)
routeCtx, routeSpan := otel.Tracer("router").Start(routeContext, "route", trace.WithAttributes(attribute.Int("route.index", routeIndex), attribute.String("route.input", routeInstance.Input())))
_, err := routeInstance.ProcessPayload(routeCtx, payload)
if err != nil {
if routeIOErrors == nil {
routeIOErrors = []route.RouteIOError{}
routeIOErrors = []common.RouteIOError{}
}
r.logger.Error("unable to process input", "route", routeIndex, "source", sourceId, "error", err)
routeIOErrors = append(routeIOErrors, route.RouteIOError{
routeIOErrors = append(routeIOErrors, common.RouteIOError{
Index: routeIndex,
ProcessError: err,
})
return
}
if payload == nil {
r.logger.Debug("no payload after processing, route terminated", "route", routeIndex, "source", sourceId)
return
}
outputError := r.HandleOutput(routeCtx, routeInstance.Output(), payload)
if outputError != nil {
if routeIOErrors == nil {
routeIOErrors = []route.RouteIOError{}
}
routeIOErrors = append(routeIOErrors, route.RouteIOError{
Index: routeIndex,
OutputError: outputError,
})
}
routeSpan.End()
})
}

View File

@@ -12,14 +12,13 @@ import (
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/module"
"github.com/jwetzell/showbridge-go/internal/route"
)
type MockCounterModule struct {
config config.ModuleConfig
ctx context.Context
outputCount int
router route.RouteIO
router common.RouteIO
logger *slog.Logger
cancel context.CancelFunc
}
@@ -34,7 +33,7 @@ func (mcm *MockCounterModule) Output(context.Context, any) error {
}
func (mcm *MockCounterModule) Start(ctx context.Context) error {
router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO)
router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO)
if !ok {
return fmt.Errorf("mock.counter could not get router from context")
@@ -171,7 +170,6 @@ func TestNewRouterRouteWithUnknwonProcessor(t *testing.T) {
Type: "asdfasdf",
},
},
Output: "mock",
},
},
}
@@ -201,8 +199,15 @@ func TestRouterInputUnknownDestinationModule(t *testing.T) {
},
Routes: []config.RouteConfig{
{
Input: "mock",
Output: "test",
Input: "mock",
Processors: []config.ProcessorConfig{
{
Type: "router.output",
Params: config.Params{
"module": "test",
},
},
},
},
},
}
@@ -238,8 +243,8 @@ func TestRouterInputUnknownDestinationModule(t *testing.T) {
t.Fatalf("router should have returned exactly 1 routing error, got: %d", len(routingErrors))
}
if routingErrors[0].OutputError.Error() != "no module found for destination id" {
t.Fatalf("routing output error did not match expected, got: %s", routingErrors[0].OutputError.Error())
if routingErrors[0].ProcessError.Error() != "router.output failed to send output: no module found for destination id" {
t.Fatalf("routing output error did not match expected, got: %s", routingErrors[0].ProcessError.Error())
}
}
@@ -253,8 +258,15 @@ func TestRouterInputNoMatchingRoute(t *testing.T) {
},
Routes: []config.RouteConfig{
{
Input: "test",
Output: "mock",
Input: "test",
Processors: []config.ProcessorConfig{
{
Type: "router.output",
Params: config.Params{
"module": "mock",
},
},
},
},
},
}
@@ -297,8 +309,15 @@ func TestRouterInputSingleRoute(t *testing.T) {
},
Routes: []config.RouteConfig{
{
Input: "mock",
Output: "mock",
Input: "mock",
Processors: []config.ProcessorConfig{
{
Type: "router.output",
Params: config.Params{
"module": "mock",
},
},
},
},
},
}
@@ -361,16 +380,37 @@ func TestRouterInputMultipleRoutes(t *testing.T) {
},
Routes: []config.RouteConfig{
{
Input: "mock",
Output: "mock",
Input: "mock",
Processors: []config.ProcessorConfig{
{
Type: "router.output",
Params: config.Params{
"module": "mock",
},
},
},
},
{
Input: "mock",
Output: "mock",
Input: "mock",
Processors: []config.ProcessorConfig{
{
Type: "router.output",
Params: config.Params{
"module": "mock",
},
},
},
},
{
Input: "mock",
Output: "mock",
Input: "mock",
Processors: []config.ProcessorConfig{
{
Type: "router.output",
Params: config.Params{
"module": "mock",
},
},
},
},
},
}
@@ -436,12 +476,26 @@ func TestRouterInputMultipleModules(t *testing.T) {
},
Routes: []config.RouteConfig{
{
Input: "mock1",
Output: "mock1",
Input: "mock1",
Processors: []config.ProcessorConfig{
{
Type: "router.output",
Params: config.Params{
"module": "mock1",
},
},
},
},
{
Input: "mock2",
Output: "mock2",
Input: "mock2",
Processors: []config.ProcessorConfig{
{
Type: "router.output",
Params: config.Params{
"module": "mock2",
},
},
},
},
},
}

View File

@@ -614,6 +614,28 @@
"required": ["type", "params"],
"additionalProperties": false
},
{
"type": "object",
"title": "Router Output",
"properties": {
"type": {
"type": "string",
"const": "router.output"
},
"params": {
"type": "object",
"properties": {
"module": {
"type": "string",
"description": "ID of module to send output to"
}
},
"required": ["module"]
}
},
"required": ["type", "params"],
"additionalProperties": false
},
{
"type": "object",
"title": "Evaluate Expr expression",

View File

@@ -13,12 +13,8 @@
},
"processors": {
"$ref": "https://showbridge.io/processors.schema.json"
},
"output": {
"type": "string",
"minLength": 1
}
},
"required": ["input", "output"]
"required": ["input"]
}
}