diff --git a/router.go b/router.go index 55e93de..574cd7a 100644 --- a/router.go +++ b/router.go @@ -131,42 +131,49 @@ func (r *Router) Stop() { func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) (bool, []route.RouteIOError) { var routeIOErrors []route.RouteIOError routeFound := false + + var routeWaitGroup sync.WaitGroup + for routeIndex, routeInstance := range r.RouteInstances { if routeInstance.Input() == sourceId { - routeFound = true - routeContext := context.WithValue(ctx, route.SourceContextKey, sourceId) + routeWaitGroup.Go(func() { - payload, err := routeInstance.ProcessPayload(routeContext, payload) - if err != nil { - if routeIOErrors == nil { - routeIOErrors = []route.RouteIOError{} + routeFound = true + routeContext := context.WithValue(ctx, route.SourceContextKey, sourceId) + + payload, err := routeInstance.ProcessPayload(routeContext, payload) + if err != nil { + if routeIOErrors == nil { + routeIOErrors = []route.RouteIOError{} + } + r.logger.Error("unable to process input", "route", routeIndex, "source", sourceId, "error", err) + routeIOErrors = append(routeIOErrors, route.RouteIOError{ + Index: routeIndex, + ProcessError: err, + }) + return } - r.logger.Error("unable to process input", "route", routeIndex, "source", sourceId, "error", err) - routeIOErrors = append(routeIOErrors, route.RouteIOError{ - Index: routeIndex, - ProcessError: err, - }) - continue - } - if payload == nil { - r.logger.Error("no input after processing", "route", routeIndex, "source", sourceId) - continue - } - - outputErrors := r.HandleOutput(routeContext, routeInstance.Output(), payload) - if outputErrors != nil { - if routeIOErrors == nil { - routeIOErrors = []route.RouteIOError{} + if payload == nil { + r.logger.Error("no input after processing", "route", routeIndex, "source", sourceId) + return } - routeIOErrors = append(routeIOErrors, route.RouteIOError{ - Index: routeIndex, - OutputErrors: outputErrors, - }) - } + + outputErrors := r.HandleOutput(routeContext, routeInstance.Output(), payload) + if outputErrors != nil { + if routeIOErrors == nil { + routeIOErrors = []route.RouteIOError{} + } + routeIOErrors = append(routeIOErrors, route.RouteIOError{ + Index: routeIndex, + OutputErrors: outputErrors, + }) + } + }) } } + routeWaitGroup.Wait() return routeFound, routeIOErrors }