diff --git a/internal/module/http-server.go b/internal/module/http-server.go index b90a544..dc56137 100644 --- a/internal/module/http-server.go +++ b/internal/module/http-server.go @@ -22,10 +22,10 @@ type HTTPServer struct { } type ResponseIOError struct { - Index int `json:"index"` - OutputErrors []string `json:"outputErrors"` - ProcessError *string `json:"processError"` - InputError *string `json:"inputError"` + Index int `json:"index"` + OutputError *string `json:"outputError"` + ProcessError *string `json:"processError"` + InputError *string `json:"inputError"` } type IOResponseData struct { @@ -113,14 +113,9 @@ func (hs *HTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { errorToAdd.ProcessError = &errorMsg } - if responseIOError.OutputErrors != nil { - outputErrorMsgs := []string{} - - for _, outputError := range responseIOError.OutputErrors { - outputErrorMsgs = append(outputErrorMsgs, outputError.Error()) - } - - errorToAdd.OutputErrors = outputErrorMsgs + if responseIOError.OutputError != nil { + errorMsg := responseIOError.OutputError.Error() + errorToAdd.OutputError = &errorMsg } response.IOErrors = append(response.IOErrors, errorToAdd) diff --git a/internal/route/route.go b/internal/route/route.go index 1d0814a..554adc0 100644 --- a/internal/route/route.go +++ b/internal/route/route.go @@ -24,7 +24,7 @@ type RouteError struct { type RouteIOError struct { Index int - OutputErrors []error + OutputError error ProcessError error InputError error } diff --git a/router.go b/router.go index e27d1e5..bf08390 100644 --- a/router.go +++ b/router.go @@ -16,8 +16,9 @@ import ( ) type Router struct { - contextCancel context.CancelFunc - Context context.Context + contextCancel context.CancelFunc + Context context.Context + // TODO(jwetzell): change to map for faster lookup ModuleInstances []module.Module RouteInstances []route.Route moduleWait sync.WaitGroup @@ -25,6 +26,52 @@ type Router struct { tracer trace.Tracer } +func (r *Router) addModule(moduleDecl config.ModuleConfig) error { + if moduleDecl.Id == "" { + return errors.New("module id cannot be empty") + } + moduleInfo, ok := module.ModuleRegistry[moduleDecl.Type] + if !ok { + return errors.New("module type not defined") + } + + moduleInstanceExists := false + for _, moduleInstance := range r.ModuleInstances { + if moduleInstance.Id() == moduleDecl.Id { + moduleInstanceExists = true + return errors.New("duplicate module id") + } + } + + if !moduleInstanceExists { + moduleInstance, err := moduleInfo.New(moduleDecl) + if err != nil { + return err + } + + r.ModuleInstances = append(r.ModuleInstances, moduleInstance) + } + return nil +} + +func (r *Router) addRoute(routeDecl config.RouteConfig) error { + routeInstance, err := route.NewRoute(routeDecl) + if err != nil { + return err + } + r.RouteInstances = append(r.RouteInstances, routeInstance) + return nil +} + +func (r *Router) getModule(moduleId string) module.Module { + for _, moduleInstance := range r.ModuleInstances { + if moduleInstance.Id() == moduleId { + return moduleInstance + } + } + return nil +} + func NewRouter(config config.Config, tracer trace.Tracer) (*Router, []module.ModuleError, []route.RouteError) { router := Router{ @@ -39,69 +86,24 @@ func NewRouter(config config.Config, tracer trace.Tracer) (*Router, []module.Mod for moduleIndex, moduleDecl := range config.Modules { - if moduleDecl.Id == "" { + err := router.addModule(moduleDecl) + if err != nil { if moduleErrors == nil { moduleErrors = []module.ModuleError{} } moduleErrors = append(moduleErrors, module.ModuleError{ Index: moduleIndex, Config: moduleDecl, - Error: errors.New("module id cannot be empty"), + Error: err, }) continue } - moduleInfo, ok := module.ModuleRegistry[moduleDecl.Type] - if !ok { - if moduleErrors == nil { - moduleErrors = []module.ModuleError{} - } - moduleErrors = append(moduleErrors, module.ModuleError{ - Index: moduleIndex, - Config: moduleDecl, - Error: errors.New("module type not defined"), - }) - continue - } - - moduleInstanceExists := false - for _, moduleInstance := range router.ModuleInstances { - if moduleInstance.Id() == moduleDecl.Id { - moduleInstanceExists = true - if moduleErrors == nil { - moduleErrors = []module.ModuleError{} - } - moduleErrors = append(moduleErrors, module.ModuleError{ - Index: moduleIndex, - Config: moduleDecl, - Error: errors.New("duplicate module id"), - }) - break - } - } - - if !moduleInstanceExists { - moduleInstance, err := moduleInfo.New(moduleDecl) - if err != nil { - if moduleErrors == nil { - moduleErrors = []module.ModuleError{} - } - moduleErrors = append(moduleErrors, module.ModuleError{ - Index: moduleIndex, - Config: moduleDecl, - Error: err, - }) - continue - } - - router.ModuleInstances = append(router.ModuleInstances, moduleInstance) - } - } var routeErrors []route.RouteError for routeIndex, routeDecl := range config.Routes { - routeInstance, err := route.NewRoute(routeDecl) + err := router.addRoute(routeDecl) if err != nil { if routeErrors == nil { routeErrors = []route.RouteError{} @@ -113,7 +115,6 @@ func NewRouter(config config.Config, tracer trace.Tracer) (*Router, []module.Mod }) continue } - router.RouteInstances = append(router.RouteInstances, routeInstance) } return &router, moduleErrors, routeErrors @@ -179,14 +180,14 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) return } - outputErrors := r.HandleOutput(routeCtx, routeInstance.Output(), payload) - if outputErrors != nil { + outputError := r.HandleOutput(routeCtx, routeInstance.Output(), payload) + if outputError != nil { if routeIOErrors == nil { routeIOErrors = []route.RouteIOError{} } routeIOErrors = append(routeIOErrors, route.RouteIOError{ - Index: routeIndex, - OutputErrors: outputErrors, + Index: routeIndex, + OutputError: outputError, }) } routeSpan.End() @@ -197,36 +198,31 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) return routeFound, routeIOErrors } -func (r *Router) HandleOutput(ctx context.Context, destinationId string, payload any) []error { +func (r *Router) HandleOutput(ctx context.Context, destinationId string, payload any) error { spanCtx, span := r.tracer.Start(ctx, "router.output", trace.WithAttributes(attribute.String("destination.id", destinationId))) defer span.End() - var outputErrors []error - for _, moduleInstance := range r.ModuleInstances { - if moduleInstance.Id() == destinationId { - moduleSpanCtx, moduleSpan := r.tracer.Start(spanCtx, "module.output", trace.WithAttributes(attribute.String("module.id", moduleInstance.Id()), attribute.String("module.type", moduleInstance.Type()))) - err := moduleInstance.Output(moduleSpanCtx, payload) - if err != nil { - if outputErrors == nil { - outputErrors = []error{} - } - outputErrors = append(outputErrors, err) - moduleSpan.SetStatus(codes.Error, err.Error()) - moduleSpan.RecordError(err) - r.logger.Error("module output encountered error", "module", moduleInstance.Id(), "error", err) - } else { - moduleSpan.SetStatus(codes.Ok, "module output successful") - } - moduleSpan.End() - } + + destinationModule := r.getModule(destinationId) + + if destinationModule == nil { + err := errors.New("no module found for destination id") + span.SetStatus(codes.Error, err.Error()) + span.RecordError(err) + r.logger.Error("no module found for destination id", "destinationId", destinationId) + return err } - if outputErrors != nil { - span.SetStatus(codes.Error, "router output error") - for _, outputError := range outputErrors { - span.RecordError(outputError) - } + moduleOutputCtx, moduleOutputSpan := r.tracer.Start(spanCtx, "module.output", trace.WithAttributes(attribute.String("module.id", destinationModule.Id()), attribute.String("module.type", destinationModule.Type()))) + defer moduleOutputSpan.End() + err := destinationModule.Output(moduleOutputCtx, payload) + if err != nil { + moduleOutputSpan.SetStatus(codes.Error, err.Error()) + moduleOutputSpan.RecordError(err) + r.logger.Error("module output encountered error", "module", destinationModule.Id(), "error", err) + return err } else { - span.SetStatus(codes.Ok, "router output successful") + moduleOutputSpan.SetStatus(codes.Ok, "module output successful") } - return outputErrors + + return nil }