split out module and route creation in router and router output only does one module

This commit is contained in:
Joel Wetzell
2026-02-05 19:31:31 -06:00
parent 0dfbf30fa7
commit b959b88527
3 changed files with 86 additions and 95 deletions

View File

@@ -22,10 +22,10 @@ type HTTPServer struct {
}
type ResponseIOError struct {
Index int `json:"index"`
OutputErrors []string `json:"outputErrors"`
ProcessError *string `json:"processError"`
InputError *string `json:"inputError"`
Index int `json:"index"`
OutputError *string `json:"outputError"`
ProcessError *string `json:"processError"`
InputError *string `json:"inputError"`
}
type IOResponseData struct {
@@ -113,14 +113,9 @@ func (hs *HTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
errorToAdd.ProcessError = &errorMsg
}
if responseIOError.OutputErrors != nil {
outputErrorMsgs := []string{}
for _, outputError := range responseIOError.OutputErrors {
outputErrorMsgs = append(outputErrorMsgs, outputError.Error())
}
errorToAdd.OutputErrors = outputErrorMsgs
if responseIOError.OutputError != nil {
errorMsg := responseIOError.OutputError.Error()
errorToAdd.OutputError = &errorMsg
}
response.IOErrors = append(response.IOErrors, errorToAdd)

View File

@@ -24,7 +24,7 @@ type RouteError struct {
type RouteIOError struct {
Index int
OutputErrors []error
OutputError error
ProcessError error
InputError error
}

160
router.go
View File

@@ -16,8 +16,9 @@ import (
)
type Router struct {
contextCancel context.CancelFunc
Context context.Context
contextCancel context.CancelFunc
Context context.Context
// TODO(jwetzell): change to map for faster lookup
ModuleInstances []module.Module
RouteInstances []route.Route
moduleWait sync.WaitGroup
@@ -25,6 +26,52 @@ type Router struct {
tracer trace.Tracer
}
func (r *Router) addModule(moduleDecl config.ModuleConfig) error {
if moduleDecl.Id == "" {
return errors.New("module id cannot be empty")
}
moduleInfo, ok := module.ModuleRegistry[moduleDecl.Type]
if !ok {
return errors.New("module type not defined")
}
moduleInstanceExists := false
for _, moduleInstance := range r.ModuleInstances {
if moduleInstance.Id() == moduleDecl.Id {
moduleInstanceExists = true
return errors.New("duplicate module id")
}
}
if !moduleInstanceExists {
moduleInstance, err := moduleInfo.New(moduleDecl)
if err != nil {
return err
}
r.ModuleInstances = append(r.ModuleInstances, moduleInstance)
}
return nil
}
func (r *Router) addRoute(routeDecl config.RouteConfig) error {
routeInstance, err := route.NewRoute(routeDecl)
if err != nil {
return err
}
r.RouteInstances = append(r.RouteInstances, routeInstance)
return nil
}
func (r *Router) getModule(moduleId string) module.Module {
for _, moduleInstance := range r.ModuleInstances {
if moduleInstance.Id() == moduleId {
return moduleInstance
}
}
return nil
}
func NewRouter(config config.Config, tracer trace.Tracer) (*Router, []module.ModuleError, []route.RouteError) {
router := Router{
@@ -39,69 +86,24 @@ func NewRouter(config config.Config, tracer trace.Tracer) (*Router, []module.Mod
for moduleIndex, moduleDecl := range config.Modules {
if moduleDecl.Id == "" {
err := router.addModule(moduleDecl)
if err != nil {
if moduleErrors == nil {
moduleErrors = []module.ModuleError{}
}
moduleErrors = append(moduleErrors, module.ModuleError{
Index: moduleIndex,
Config: moduleDecl,
Error: errors.New("module id cannot be empty"),
Error: err,
})
continue
}
moduleInfo, ok := module.ModuleRegistry[moduleDecl.Type]
if !ok {
if moduleErrors == nil {
moduleErrors = []module.ModuleError{}
}
moduleErrors = append(moduleErrors, module.ModuleError{
Index: moduleIndex,
Config: moduleDecl,
Error: errors.New("module type not defined"),
})
continue
}
moduleInstanceExists := false
for _, moduleInstance := range router.ModuleInstances {
if moduleInstance.Id() == moduleDecl.Id {
moduleInstanceExists = true
if moduleErrors == nil {
moduleErrors = []module.ModuleError{}
}
moduleErrors = append(moduleErrors, module.ModuleError{
Index: moduleIndex,
Config: moduleDecl,
Error: errors.New("duplicate module id"),
})
break
}
}
if !moduleInstanceExists {
moduleInstance, err := moduleInfo.New(moduleDecl)
if err != nil {
if moduleErrors == nil {
moduleErrors = []module.ModuleError{}
}
moduleErrors = append(moduleErrors, module.ModuleError{
Index: moduleIndex,
Config: moduleDecl,
Error: err,
})
continue
}
router.ModuleInstances = append(router.ModuleInstances, moduleInstance)
}
}
var routeErrors []route.RouteError
for routeIndex, routeDecl := range config.Routes {
routeInstance, err := route.NewRoute(routeDecl)
err := router.addRoute(routeDecl)
if err != nil {
if routeErrors == nil {
routeErrors = []route.RouteError{}
@@ -113,7 +115,6 @@ func NewRouter(config config.Config, tracer trace.Tracer) (*Router, []module.Mod
})
continue
}
router.RouteInstances = append(router.RouteInstances, routeInstance)
}
return &router, moduleErrors, routeErrors
@@ -179,14 +180,14 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any)
return
}
outputErrors := r.HandleOutput(routeCtx, routeInstance.Output(), payload)
if outputErrors != nil {
outputError := r.HandleOutput(routeCtx, routeInstance.Output(), payload)
if outputError != nil {
if routeIOErrors == nil {
routeIOErrors = []route.RouteIOError{}
}
routeIOErrors = append(routeIOErrors, route.RouteIOError{
Index: routeIndex,
OutputErrors: outputErrors,
Index: routeIndex,
OutputError: outputError,
})
}
routeSpan.End()
@@ -197,36 +198,31 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any)
return routeFound, routeIOErrors
}
func (r *Router) HandleOutput(ctx context.Context, destinationId string, payload any) []error {
func (r *Router) HandleOutput(ctx context.Context, destinationId string, payload any) error {
spanCtx, span := r.tracer.Start(ctx, "router.output", trace.WithAttributes(attribute.String("destination.id", destinationId)))
defer span.End()
var outputErrors []error
for _, moduleInstance := range r.ModuleInstances {
if moduleInstance.Id() == destinationId {
moduleSpanCtx, moduleSpan := r.tracer.Start(spanCtx, "module.output", trace.WithAttributes(attribute.String("module.id", moduleInstance.Id()), attribute.String("module.type", moduleInstance.Type())))
err := moduleInstance.Output(moduleSpanCtx, payload)
if err != nil {
if outputErrors == nil {
outputErrors = []error{}
}
outputErrors = append(outputErrors, err)
moduleSpan.SetStatus(codes.Error, err.Error())
moduleSpan.RecordError(err)
r.logger.Error("module output encountered error", "module", moduleInstance.Id(), "error", err)
} else {
moduleSpan.SetStatus(codes.Ok, "module output successful")
}
moduleSpan.End()
}
destinationModule := r.getModule(destinationId)
if destinationModule == nil {
err := errors.New("no module found for destination id")
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
r.logger.Error("no module found for destination id", "destinationId", destinationId)
return err
}
if outputErrors != nil {
span.SetStatus(codes.Error, "router output error")
for _, outputError := range outputErrors {
span.RecordError(outputError)
}
moduleOutputCtx, moduleOutputSpan := r.tracer.Start(spanCtx, "module.output", trace.WithAttributes(attribute.String("module.id", destinationModule.Id()), attribute.String("module.type", destinationModule.Type())))
defer moduleOutputSpan.End()
err := destinationModule.Output(moduleOutputCtx, payload)
if err != nil {
moduleOutputSpan.SetStatus(codes.Error, err.Error())
moduleOutputSpan.RecordError(err)
r.logger.Error("module output encountered error", "module", destinationModule.Id(), "error", err)
return err
} else {
span.SetStatus(codes.Ok, "router output successful")
moduleOutputSpan.SetStatus(codes.Ok, "module output successful")
}
return outputErrors
return nil
}