From a920f3492603767ea9e2f43ceda74e5106c4857b Mon Sep 17 00:00:00 2001 From: Joel Wetzell Date: Wed, 4 Feb 2026 12:20:36 -0600 Subject: [PATCH] rework route and processing spans --- internal/route/route.go | 22 ++++++++++++++++++++-- router.go | 23 ++++------------------- 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/internal/route/route.go b/internal/route/route.go index c096b74..1d0814a 100644 --- a/internal/route/route.go +++ b/internal/route/route.go @@ -6,6 +6,9 @@ import ( "github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/processor" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" ) type routeContextKey string @@ -73,17 +76,32 @@ func (r *ProcessorRoute) Output() string { } func (r *ProcessorRoute) ProcessPayload(ctx context.Context, payload any) (any, error) { - for _, processor := range r.processors { - processedPayload, err := processor.Process(ctx, payload) + parentSpan := trace.SpanFromContext(ctx) + tracer := parentSpan.TracerProvider().Tracer("route.ProcessPayload") + processCtx, processSpan := tracer.Start(ctx, "route.process") + defer processSpan.End() + for processorIndex, processor := range r.processors { + processorCtx, processorSpan := tracer.Start(processCtx, "route.processor", trace.WithAttributes(attribute.Int("processor.index", processorIndex), attribute.String("processor.type", processor.Type()))) + processedPayload, err := processor.Process(processorCtx, payload) if err != nil { + processorSpan.SetStatus(codes.Error, "route processor error") + processorSpan.RecordError(err) + processorSpan.End() + processSpan.SetStatus(codes.Error, "route processing error") + processSpan.RecordError(err) return nil, err } + processorSpan.SetStatus(codes.Ok, "processor successful") //NOTE(jwetzell) nil payload will result in the route being "terminated" if processedPayload == nil { + processSpan.SetStatus(codes.Ok, "route processing terminated early due to nil payload") + processorSpan.End() return nil, nil } payload = processedPayload + processorSpan.End() } + processSpan.SetStatus(codes.Ok, "route processing successful") return payload, nil } diff --git a/router.go b/router.go index 2da50c6..e27d1e5 100644 --- a/router.go +++ b/router.go @@ -160,10 +160,8 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) routeFound = true routeContext := context.WithValue(spanCtx, route.SourceContextKey, sourceId) - routeSpanCtx, routeSpan := r.tracer.Start(routeContext, "route.input", trace.WithAttributes(attribute.Int("route.index", routeIndex))) - defer routeSpan.End() - routeProcessCtx, routeSpan := r.tracer.Start(routeSpanCtx, "route.process") - payload, err := routeInstance.ProcessPayload(routeProcessCtx, payload) + routeCtx, routeSpan := r.tracer.Start(routeContext, "route", trace.WithAttributes(attribute.Int("route.index", routeIndex), attribute.String("route.input", routeInstance.Input()), attribute.String("route.output", routeInstance.Output()))) + payload, err := routeInstance.ProcessPayload(routeCtx, payload) if err != nil { if routeIOErrors == nil { routeIOErrors = []route.RouteIOError{} @@ -173,13 +171,7 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) Index: routeIndex, ProcessError: err, }) - routeSpan.SetStatus(codes.Error, err.Error()) - routeSpan.RecordError(err) - routeSpan.End() return - } else { - routeSpan.SetStatus(codes.Ok, "route processing successful") - routeSpan.End() } if payload == nil { @@ -187,8 +179,7 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) return } - routeOutputCtx, routeOutputSpan := r.tracer.Start(routeSpanCtx, "route.output", trace.WithAttributes(attribute.String("destination.id", routeInstance.Output()))) - outputErrors := r.HandleOutput(routeOutputCtx, routeInstance.Output(), payload) + outputErrors := r.HandleOutput(routeCtx, routeInstance.Output(), payload) if outputErrors != nil { if routeIOErrors == nil { routeIOErrors = []route.RouteIOError{} @@ -197,14 +188,8 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) Index: routeIndex, OutputErrors: outputErrors, }) - routeOutputSpan.SetStatus(codes.Error, "route output error") - for _, outputError := range outputErrors { - routeOutputSpan.RecordError(outputError) - } - } else { - routeOutputSpan.SetStatus(codes.Ok, "route output successful") } - routeOutputSpan.End() + routeSpan.End() }) } }