process routes concurrently

This commit is contained in:
Joel Wetzell
2025-12-28 20:42:11 -06:00
parent bd2a68ff6e
commit 59f00c1a32

View File

@@ -131,42 +131,49 @@ func (r *Router) Stop() {
func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) (bool, []route.RouteIOError) {
var routeIOErrors []route.RouteIOError
routeFound := false
var routeWaitGroup sync.WaitGroup
for routeIndex, routeInstance := range r.RouteInstances {
if routeInstance.Input() == sourceId {
routeFound = true
routeContext := context.WithValue(ctx, route.SourceContextKey, sourceId)
routeWaitGroup.Go(func() {
payload, err := routeInstance.ProcessPayload(routeContext, payload)
if err != nil {
if routeIOErrors == nil {
routeIOErrors = []route.RouteIOError{}
routeFound = true
routeContext := context.WithValue(ctx, route.SourceContextKey, sourceId)
payload, err := routeInstance.ProcessPayload(routeContext, payload)
if err != nil {
if routeIOErrors == nil {
routeIOErrors = []route.RouteIOError{}
}
r.logger.Error("unable to process input", "route", routeIndex, "source", sourceId, "error", err)
routeIOErrors = append(routeIOErrors, route.RouteIOError{
Index: routeIndex,
ProcessError: err,
})
return
}
r.logger.Error("unable to process input", "route", routeIndex, "source", sourceId, "error", err)
routeIOErrors = append(routeIOErrors, route.RouteIOError{
Index: routeIndex,
ProcessError: err,
})
continue
}
if payload == nil {
r.logger.Error("no input after processing", "route", routeIndex, "source", sourceId)
continue
}
outputErrors := r.HandleOutput(routeContext, routeInstance.Output(), payload)
if outputErrors != nil {
if routeIOErrors == nil {
routeIOErrors = []route.RouteIOError{}
if payload == nil {
r.logger.Error("no input after processing", "route", routeIndex, "source", sourceId)
return
}
routeIOErrors = append(routeIOErrors, route.RouteIOError{
Index: routeIndex,
OutputErrors: outputErrors,
})
}
outputErrors := r.HandleOutput(routeContext, routeInstance.Output(), payload)
if outputErrors != nil {
if routeIOErrors == nil {
routeIOErrors = []route.RouteIOError{}
}
routeIOErrors = append(routeIOErrors, route.RouteIOError{
Index: routeIndex,
OutputErrors: outputErrors,
})
}
})
}
}
routeWaitGroup.Wait()
return routeFound, routeIOErrors
}