package route import ( "context" "fmt" "github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/processor" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" ) type RouteError struct { Index int Config config.RouteConfig Error error } type RouteIOError struct { Index int OutputError error ProcessError error InputError error } type RouteIO interface { HandleInput(ctx context.Context, sourceId string, payload any) (bool, []RouteIOError) HandleOutput(ctx context.Context, destinationId string, payload any) error } type Route struct { input string processors []processor.Processor output string } func NewRoute(config config.RouteConfig) (*Route, error) { processors := []processor.Processor{} if len(config.Processors) > 0 { for _, processorDecl := range config.Processors { processorInfo, ok := processor.ProcessorRegistry[processorDecl.Type] if !ok { return nil, fmt.Errorf("problem loading processor registration for processor type: %s", processorDecl.Type) } processor, err := processorInfo.New(processorDecl) if err != nil { return nil, err } processors = append(processors, processor) } } return &Route{input: config.Input, processors: processors, output: config.Output}, nil } func (r *Route) Input() string { return r.input } func (r *Route) Output() string { return r.output } func (r *Route) ProcessPayload(ctx context.Context, payload any) (any, error) { tracer := otel.Tracer("route") processCtx, processSpan := tracer.Start(ctx, "ProcessPayload") defer processSpan.End() for processorIndex, processor := range r.processors { processorCtx, processorSpan := otel.Tracer("processor").Start(processCtx, "process", trace.WithAttributes(attribute.Int("processor.index", processorIndex), attribute.String("processor.type", processor.Type()))) processedPayload, err := processor.Process(processorCtx, payload) if err != nil { processorSpan.SetStatus(codes.Error, "route processor error") processorSpan.RecordError(err) processorSpan.End() processSpan.SetStatus(codes.Error, "route processing error") processSpan.RecordError(err) return nil, err } processorSpan.SetStatus(codes.Ok, "processor successful") //NOTE(jwetzell) nil payload will result in the route being "terminated" if processedPayload == nil { processSpan.SetStatus(codes.Ok, "route processing terminated early due to nil payload") processorSpan.End() return nil, nil } payload = processedPayload processorSpan.End() } processSpan.SetStatus(codes.Ok, "route processing successful") return payload, nil }