wrap payload for all processors

This commit is contained in:
Joel Wetzell
2026-03-16 17:05:49 -05:00
parent b6c1c5c600
commit f273aedbc6
76 changed files with 674 additions and 486 deletions

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/processor"
"go.opentelemetry.io/otel"
@@ -48,12 +49,13 @@ func (r *Route) Input() string {
}
func (r *Route) ProcessPayload(ctx context.Context, payload any) (any, error) {
wrappedPayload := common.GetWrappedPayload(ctx, payload)
tracer := otel.Tracer("route")
processCtx, processSpan := tracer.Start(ctx, "ProcessPayload", trace.WithAttributes(attribute.String("payload.type", fmt.Sprintf("%T", payload))))
defer processSpan.End()
for processorIndex, processor := range r.processors {
processorCtx, processorSpan := otel.Tracer("processor").Start(processCtx, "process", trace.WithAttributes(attribute.Int("processor.index", processorIndex), attribute.String("processor.type", processor.Type())))
processedPayload, err := processor.Process(processorCtx, payload)
processedPayload, err := processor.Process(processorCtx, wrappedPayload)
if err != nil {
processorSpan.SetStatus(codes.Error, "route processor error")
processorSpan.RecordError(err)
@@ -63,16 +65,16 @@ func (r *Route) ProcessPayload(ctx context.Context, payload any) (any, error) {
return nil, err
}
processorSpan.SetStatus(codes.Ok, "processor successful")
//NOTE(jwetzell) nil payload will result in the route being "terminated"
if processedPayload == nil {
processSpan.SetStatus(codes.Ok, "route processing terminated early due to nil payload")
//NOTE(jwetzell) payload has been marked as an end
if processedPayload.End {
processSpan.SetStatus(codes.Ok, "route processing terminated early due to processor signal")
processorSpan.End()
return nil, nil
return processedPayload.Payload, nil
}
payload = processedPayload
wrappedPayload = processedPayload
processorSpan.End()
}
processSpan.SetStatus(codes.Ok, "route processing successful")
return payload, nil
return wrappedPayload.Payload, nil
}

View File

@@ -120,7 +120,7 @@ func TestRouteHandleNilPayload(t *testing.T) {
t.Fatalf("route ProcessPayload returned error: %v", err)
}
if payload != nil {
t.Fatalf("route returned the wrong payload")
t.Fatalf("route returned the wrong payload: expected nil got %+v (%T)", payload, payload)
}
}
@@ -143,14 +143,10 @@ func TestRouteHandleNilPayloadFromProcessor(t *testing.T) {
t.Fatalf("route failed to create: %v", err)
}
payload, err := testRoute.ProcessPayload(context.WithValue(t.Context(), common.RouterContextKey, &MockRouter{}), "test")
_, err = testRoute.ProcessPayload(context.WithValue(t.Context(), common.RouterContextKey, &MockRouter{}), "test")
if err != nil {
t.Fatalf("route HandleOutput returned error for nil payload: %v", err)
}
if payload != nil {
t.Fatalf("route returned the wrong payload")
}
}
func TestRouteUnknownProcessor(t *testing.T) {