mirror of
https://github.com/jwetzell/showbridge-go.git
synced 2026-04-27 13:25:40 +00:00
Merge pull request #33 from jwetzell/fix/route-concurrency
process routes concurrently
This commit is contained in:
11
router.go
11
router.go
@@ -131,8 +131,13 @@ func (r *Router) Stop() {
|
||||
func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) (bool, []route.RouteIOError) {
|
||||
var routeIOErrors []route.RouteIOError
|
||||
routeFound := false
|
||||
|
||||
var routeWaitGroup sync.WaitGroup
|
||||
|
||||
for routeIndex, routeInstance := range r.RouteInstances {
|
||||
if routeInstance.Input() == sourceId {
|
||||
routeWaitGroup.Go(func() {
|
||||
|
||||
routeFound = true
|
||||
routeContext := context.WithValue(ctx, route.SourceContextKey, sourceId)
|
||||
|
||||
@@ -146,12 +151,12 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any)
|
||||
Index: routeIndex,
|
||||
ProcessError: err,
|
||||
})
|
||||
continue
|
||||
return
|
||||
}
|
||||
|
||||
if payload == nil {
|
||||
r.logger.Error("no input after processing", "route", routeIndex, "source", sourceId)
|
||||
continue
|
||||
return
|
||||
}
|
||||
|
||||
outputErrors := r.HandleOutput(routeContext, routeInstance.Output(), payload)
|
||||
@@ -164,9 +169,11 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any)
|
||||
OutputErrors: outputErrors,
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
}
|
||||
routeWaitGroup.Wait()
|
||||
return routeFound, routeIOErrors
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user