rework Route struct

This commit is contained in:
Joel Wetzell
2026-02-16 19:13:04 -06:00
parent 8c3f93b601
commit abd37439c5
2 changed files with 12 additions and 14 deletions

View File

@@ -35,19 +35,13 @@ type RouteIO interface {
HandleOutput(ctx context.Context, destinationId string, payload any) error
}
type Route interface {
Input() string
Output() string
ProcessPayload(ctx context.Context, payload any) (any, error)
}
type ProcessorRoute struct {
type Route struct {
input string
processors []processor.Processor
output string
}
func NewRoute(config config.RouteConfig) (Route, error) {
func NewRoute(config config.RouteConfig) (*Route, error) {
processors := []processor.Processor{}
if len(config.Processors) > 0 {
@@ -65,18 +59,18 @@ func NewRoute(config config.RouteConfig) (Route, error) {
}
}
return &ProcessorRoute{input: config.Input, processors: processors, output: config.Output}, nil
return &Route{input: config.Input, processors: processors, output: config.Output}, nil
}
func (r *ProcessorRoute) Input() string {
func (r *Route) Input() string {
return r.input
}
func (r *ProcessorRoute) Output() string {
func (r *Route) Output() string {
return r.output
}
func (r *ProcessorRoute) ProcessPayload(ctx context.Context, payload any) (any, error) {
func (r *Route) ProcessPayload(ctx context.Context, payload any) (any, error) {
tracer := otel.Tracer("route")
processCtx, processSpan := tracer.Start(ctx, "ProcessPayload")
defer processSpan.End()

View File

@@ -21,7 +21,7 @@ type Router struct {
Context context.Context
ModuleInstances map[string]module.Module
// TODO(jwetzell): change to something easier to lookup
RouteInstances []route.Route
RouteInstances []*route.Route
moduleWait sync.WaitGroup
logger *slog.Logger
runningConfig config.Config
@@ -105,7 +105,7 @@ func NewRouter(config config.Config) (*Router, []module.ModuleError, []route.Rou
router := Router{
ModuleInstances: make(map[string]module.Module),
RouteInstances: []route.Route{},
RouteInstances: []*route.Route{},
logger: slog.Default().With("component", "router"),
runningConfig: config,
}
@@ -180,6 +180,10 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any)
var routeWaitGroup sync.WaitGroup
for routeIndex, routeInstance := range r.RouteInstances {
if routeInstance == nil {
r.logger.Error("nil route instance found", "routeIndex", routeIndex)
continue
}
if routeInstance.Input() == sourceId {
routeWaitGroup.Go(func() {