From abd37439c5251ad362e97b526723eb93a43bf284 Mon Sep 17 00:00:00 2001 From: Joel Wetzell Date: Mon, 16 Feb 2026 19:13:04 -0600 Subject: [PATCH] rework Route struct --- internal/route/route.go | 18 ++++++------------ router.go | 8 ++++++-- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/internal/route/route.go b/internal/route/route.go index 5881b9d..7b5e8a5 100644 --- a/internal/route/route.go +++ b/internal/route/route.go @@ -35,19 +35,13 @@ type RouteIO interface { HandleOutput(ctx context.Context, destinationId string, payload any) error } -type Route interface { - Input() string - Output() string - ProcessPayload(ctx context.Context, payload any) (any, error) -} - -type ProcessorRoute struct { +type Route struct { input string processors []processor.Processor output string } -func NewRoute(config config.RouteConfig) (Route, error) { +func NewRoute(config config.RouteConfig) (*Route, error) { processors := []processor.Processor{} if len(config.Processors) > 0 { @@ -65,18 +59,18 @@ func NewRoute(config config.RouteConfig) (Route, error) { } } - return &ProcessorRoute{input: config.Input, processors: processors, output: config.Output}, nil + return &Route{input: config.Input, processors: processors, output: config.Output}, nil } -func (r *ProcessorRoute) Input() string { +func (r *Route) Input() string { return r.input } -func (r *ProcessorRoute) Output() string { +func (r *Route) Output() string { return r.output } -func (r *ProcessorRoute) ProcessPayload(ctx context.Context, payload any) (any, error) { +func (r *Route) ProcessPayload(ctx context.Context, payload any) (any, error) { tracer := otel.Tracer("route") processCtx, processSpan := tracer.Start(ctx, "ProcessPayload") defer processSpan.End() diff --git a/router.go b/router.go index 009fc3f..6c36ca1 100644 --- a/router.go +++ b/router.go @@ -21,7 +21,7 @@ type Router struct { Context context.Context ModuleInstances map[string]module.Module // TODO(jwetzell): change to something easier to lookup - RouteInstances []route.Route + RouteInstances []*route.Route moduleWait sync.WaitGroup logger *slog.Logger runningConfig config.Config @@ -105,7 +105,7 @@ func NewRouter(config config.Config) (*Router, []module.ModuleError, []route.Rou router := Router{ ModuleInstances: make(map[string]module.Module), - RouteInstances: []route.Route{}, + RouteInstances: []*route.Route{}, logger: slog.Default().With("component", "router"), runningConfig: config, } @@ -180,6 +180,10 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) var routeWaitGroup sync.WaitGroup for routeIndex, routeInstance := range r.RouteInstances { + if routeInstance == nil { + r.logger.Error("nil route instance found", "routeIndex", routeIndex) + continue + } if routeInstance.Input() == sourceId { routeWaitGroup.Go(func() {