rework otel tracer providing

This commit is contained in:
Joel Wetzell
2026-02-16 17:11:43 -06:00
parent 11e25ab8f7
commit 09b030efa6
4 changed files with 23 additions and 35 deletions

View File

@@ -19,7 +19,6 @@ import (
"go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace" sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.39.0" semconv "go.opentelemetry.io/otel/semconv/v1.39.0"
"go.opentelemetry.io/otel/trace"
"sigs.k8s.io/yaml" "sigs.k8s.io/yaml"
) )
@@ -90,7 +89,6 @@ type showbridgeApp struct {
logger *slog.Logger logger *slog.Logger
router *showbridge.Router router *showbridge.Router
routerRunner *sync.WaitGroup routerRunner *sync.WaitGroup
tracer trace.Tracer
routerMutex sync.Mutex routerMutex sync.Mutex
} }
@@ -155,7 +153,6 @@ func run(ctx context.Context, c *cli.Command) error {
slog.SetDefault(slog.New(logHandler)) slog.SetDefault(slog.New(logHandler))
var tracer trace.Tracer
if c.Bool("trace") { if c.Bool("trace") {
exporter, err := otlptracehttp.New(ctx) exporter, err := otlptracehttp.New(ctx)
if err != nil { if err != nil {
@@ -166,9 +163,7 @@ func run(ctx context.Context, c *cli.Command) error {
otel.SetTracerProvider(tracerProvider) otel.SetTracerProvider(tracerProvider)
defer tracerProvider.Shutdown(ctx) defer tracerProvider.Shutdown(ctx)
tracer = tracerProvider.Tracer("showbridge") otel.SetTracerProvider(tracerProvider)
} else {
tracer = otel.Tracer("showbridge")
} }
showbridgeApp := &showbridgeApp{ showbridgeApp := &showbridgeApp{
@@ -176,7 +171,6 @@ func run(ctx context.Context, c *cli.Command) error {
configPath: configPath, configPath: configPath,
logger: slog.Default().With("component", "cmd"), logger: slog.Default().With("component", "cmd"),
routerRunner: &sync.WaitGroup{}, routerRunner: &sync.WaitGroup{},
tracer: tracer,
} }
router, err := showbridgeApp.getNewRouter() router, err := showbridgeApp.getNewRouter()
@@ -233,7 +227,7 @@ func (app *showbridgeApp) getNewRouter() (*showbridge.Router, error) {
return nil, err return nil, err
} }
router, moduleErrors, routeErrors := showbridge.NewRouter(config, app.tracer) router, moduleErrors, routeErrors := showbridge.NewRouter(config)
for _, moduleError := range moduleErrors { for _, moduleError := range moduleErrors {
app.logger.Error("problem initializing module", "index", moduleError.Index, "error", moduleError.Error) app.logger.Error("problem initializing module", "index", moduleError.Index, "error", moduleError.Error)

View File

@@ -6,6 +6,7 @@ import (
"github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/processor" "github.com/jwetzell/showbridge-go/internal/processor"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
@@ -76,12 +77,11 @@ func (r *ProcessorRoute) Output() string {
} }
func (r *ProcessorRoute) ProcessPayload(ctx context.Context, payload any) (any, error) { func (r *ProcessorRoute) ProcessPayload(ctx context.Context, payload any) (any, error) {
parentSpan := trace.SpanFromContext(ctx) tracer := otel.Tracer("route")
tracer := parentSpan.TracerProvider().Tracer("route.ProcessPayload") processCtx, processSpan := tracer.Start(ctx, "ProcessPayload")
processCtx, processSpan := tracer.Start(ctx, "route.process")
defer processSpan.End() defer processSpan.End()
for processorIndex, processor := range r.processors { for processorIndex, processor := range r.processors {
processorCtx, processorSpan := tracer.Start(processCtx, "route.processor", trace.WithAttributes(attribute.Int("processor.index", processorIndex), attribute.String("processor.type", processor.Type()))) processorCtx, processorSpan := tracer.Start(processCtx, "processor.process", trace.WithAttributes(attribute.Int("processor.index", processorIndex), attribute.String("processor.type", processor.Type())))
processedPayload, err := processor.Process(processorCtx, payload) processedPayload, err := processor.Process(processorCtx, payload)
if err != nil { if err != nil {
processorSpan.SetStatus(codes.Error, "route processor error") processorSpan.SetStatus(codes.Error, "route processor error")

View File

@@ -10,6 +10,7 @@ import (
"github.com/jwetzell/showbridge-go/internal/module" "github.com/jwetzell/showbridge-go/internal/module"
"github.com/jwetzell/showbridge-go/internal/route" "github.com/jwetzell/showbridge-go/internal/route"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
@@ -23,7 +24,6 @@ type Router struct {
RouteInstances []route.Route RouteInstances []route.Route
moduleWait sync.WaitGroup moduleWait sync.WaitGroup
logger *slog.Logger logger *slog.Logger
tracer trace.Tracer
runningConfig config.Config runningConfig config.Config
} }
@@ -101,13 +101,12 @@ func (r *Router) getModule(moduleId string) module.Module {
return moduleInstance return moduleInstance
} }
func NewRouter(config config.Config, tracer trace.Tracer) (*Router, []module.ModuleError, []route.RouteError) { func NewRouter(config config.Config) (*Router, []module.ModuleError, []route.RouteError) {
router := Router{ router := Router{
ModuleInstances: make(map[string]module.Module), ModuleInstances: make(map[string]module.Module),
RouteInstances: []route.Route{}, RouteInstances: []route.Route{},
logger: slog.Default().With("component", "router"), logger: slog.Default().With("component", "router"),
tracer: tracer,
runningConfig: config, runningConfig: config,
} }
router.logger.Debug("creating") router.logger.Debug("creating")
@@ -173,7 +172,7 @@ func (r *Router) Stop() {
} }
func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) (bool, []route.RouteIOError) { func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) (bool, []route.RouteIOError) {
spanCtx, span := r.tracer.Start(ctx, "router.input", trace.WithAttributes(attribute.String("source.id", sourceId)), trace.WithNewRoot()) spanCtx, span := otel.Tracer("router").Start(ctx, "input", trace.WithAttributes(attribute.String("source.id", sourceId)), trace.WithNewRoot())
defer span.End() defer span.End()
var routeIOErrors []route.RouteIOError var routeIOErrors []route.RouteIOError
routeFound := false routeFound := false
@@ -187,7 +186,7 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any)
routeFound = true routeFound = true
routeContext := context.WithValue(spanCtx, route.SourceContextKey, sourceId) routeContext := context.WithValue(spanCtx, route.SourceContextKey, sourceId)
routeCtx, routeSpan := r.tracer.Start(routeContext, "route", trace.WithAttributes(attribute.Int("route.index", routeIndex), attribute.String("route.input", routeInstance.Input()), attribute.String("route.output", routeInstance.Output()))) routeCtx, routeSpan := otel.Tracer("router").Start(routeContext, "route", trace.WithAttributes(attribute.Int("route.index", routeIndex), attribute.String("route.input", routeInstance.Input()), attribute.String("route.output", routeInstance.Output())))
payload, err := routeInstance.ProcessPayload(routeCtx, payload) payload, err := routeInstance.ProcessPayload(routeCtx, payload)
if err != nil { if err != nil {
if routeIOErrors == nil { if routeIOErrors == nil {
@@ -225,7 +224,7 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any)
} }
func (r *Router) HandleOutput(ctx context.Context, destinationId string, payload any) error { func (r *Router) HandleOutput(ctx context.Context, destinationId string, payload any) error {
spanCtx, span := r.tracer.Start(ctx, "router.output", trace.WithAttributes(attribute.String("destination.id", destinationId))) spanCtx, span := otel.Tracer("router").Start(ctx, "output", trace.WithAttributes(attribute.String("destination.id", destinationId)))
defer span.End() defer span.End()
destinationModule := r.getModule(destinationId) destinationModule := r.getModule(destinationId)
@@ -238,13 +237,13 @@ func (r *Router) HandleOutput(ctx context.Context, destinationId string, payload
return err return err
} }
moduleOutputCtx, moduleOutputSpan := r.tracer.Start(spanCtx, "module.output", trace.WithAttributes(attribute.String("module.id", destinationModule.Id()), attribute.String("module.type", destinationModule.Type()))) moduleOutputCtx, moduleOutputSpan := otel.Tracer("router").Start(spanCtx, "module.output", trace.WithAttributes(attribute.String("module.id", destinationModule.Id()), attribute.String("module.type", destinationModule.Type())))
defer moduleOutputSpan.End() defer moduleOutputSpan.End()
err := destinationModule.Output(moduleOutputCtx, payload) err := destinationModule.Output(moduleOutputCtx, payload)
if err != nil { if err != nil {
moduleOutputSpan.SetStatus(codes.Error, err.Error()) moduleOutputSpan.SetStatus(codes.Error, err.Error())
moduleOutputSpan.RecordError(err) moduleOutputSpan.RecordError(err)
r.logger.Error("module output encountered error", "module", destinationModule.Id(), "error", err) r.logger.ErrorContext(moduleOutputCtx, "module output encountered error", "module", destinationModule.Id(), "error", err)
return err return err
} else { } else {
moduleOutputSpan.SetStatus(codes.Ok, "module output successful") moduleOutputSpan.SetStatus(codes.Ok, "module output successful")

View File

@@ -12,11 +12,6 @@ import (
"github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/module" "github.com/jwetzell/showbridge-go/internal/module"
"github.com/jwetzell/showbridge-go/internal/route" "github.com/jwetzell/showbridge-go/internal/route"
"go.opentelemetry.io/otel"
)
var (
tracer = otel.Tracer("showbridge.test")
) )
type MockCounterModule struct { type MockCounterModule struct {
@@ -78,7 +73,7 @@ func TestNewRouter(t *testing.T) {
}, },
} }
_, moduleErrors, routeErrors := showbridge.NewRouter(routerConfig, tracer) _, moduleErrors, routeErrors := showbridge.NewRouter(routerConfig)
if moduleErrors != nil { if moduleErrors != nil {
t.Fatalf("router should not have returned any module errors: %v", moduleErrors) t.Fatalf("router should not have returned any module errors: %v", moduleErrors)
@@ -99,7 +94,7 @@ func TestNewRouterNoModuleId(t *testing.T) {
}, },
} }
_, moduleErrors, _ := showbridge.NewRouter(routerConfig, tracer) _, moduleErrors, _ := showbridge.NewRouter(routerConfig)
if moduleErrors == nil { if moduleErrors == nil {
t.Fatalf("router should have returned 'unknown module' module errors") t.Fatalf("router should have returned 'unknown module' module errors")
@@ -116,7 +111,7 @@ func TestNewRouterUnknownModuleType(t *testing.T) {
}, },
} }
_, moduleErrors, _ := showbridge.NewRouter(routerConfig, tracer) _, moduleErrors, _ := showbridge.NewRouter(routerConfig)
if moduleErrors == nil { if moduleErrors == nil {
t.Fatalf("router should have returned 'unknown module' module errors") t.Fatalf("router should have returned 'unknown module' module errors")
@@ -137,7 +132,7 @@ func TestNewRouterDuplicateModuleId(t *testing.T) {
}, },
} }
_, moduleErrors, _ := showbridge.NewRouter(routerConfig, tracer) _, moduleErrors, _ := showbridge.NewRouter(routerConfig)
if moduleErrors == nil { if moduleErrors == nil {
t.Fatalf("router should have returned module error") t.Fatalf("router should have returned module error")
@@ -173,7 +168,7 @@ func TestNewRouterRouteWithUnknwonProcessor(t *testing.T) {
}, },
} }
_, _, routeErrors := showbridge.NewRouter(routerConfig, tracer) _, _, routeErrors := showbridge.NewRouter(routerConfig)
if routeErrors == nil { if routeErrors == nil {
t.Fatalf("router should have returned a route error") t.Fatalf("router should have returned a route error")
@@ -204,7 +199,7 @@ func TestRouterInputUnknownDestinationModule(t *testing.T) {
}, },
} }
router, moduleErrors, routeErrors := showbridge.NewRouter(routerConfig, tracer) router, moduleErrors, routeErrors := showbridge.NewRouter(routerConfig)
if moduleErrors != nil { if moduleErrors != nil {
t.Fatalf("router should not have returned any module errors: %v", moduleErrors) t.Fatalf("router should not have returned any module errors: %v", moduleErrors)
@@ -256,7 +251,7 @@ func TestRouterInputNoMatchingRoute(t *testing.T) {
}, },
} }
router, moduleErrors, routeErrors := showbridge.NewRouter(routerConfig, tracer) router, moduleErrors, routeErrors := showbridge.NewRouter(routerConfig)
if moduleErrors != nil { if moduleErrors != nil {
t.Fatalf("router should not have returned any module errors: %v", moduleErrors) t.Fatalf("router should not have returned any module errors: %v", moduleErrors)
@@ -300,7 +295,7 @@ func TestRouterInputSingleRoute(t *testing.T) {
}, },
} }
router, moduleErrors, routeErrors := showbridge.NewRouter(routerConfig, tracer) router, moduleErrors, routeErrors := showbridge.NewRouter(routerConfig)
if moduleErrors != nil { if moduleErrors != nil {
t.Fatalf("router should not have returned any module errors: %v", moduleErrors) t.Fatalf("router should not have returned any module errors: %v", moduleErrors)
@@ -372,7 +367,7 @@ func TestRouterInputMultipleRoutes(t *testing.T) {
}, },
} }
router, moduleErrors, routeErrors := showbridge.NewRouter(routerConfig, tracer) router, moduleErrors, routeErrors := showbridge.NewRouter(routerConfig)
if moduleErrors != nil { if moduleErrors != nil {
t.Fatalf("router should not have returned any module errors: %v", moduleErrors) t.Fatalf("router should not have returned any module errors: %v", moduleErrors)
@@ -443,7 +438,7 @@ func TestRouterInputMultipleModules(t *testing.T) {
}, },
} }
router, moduleErrors, routeErrors := showbridge.NewRouter(routerConfig, tracer) router, moduleErrors, routeErrors := showbridge.NewRouter(routerConfig)
if moduleErrors != nil { if moduleErrors != nil {
t.Fatalf("router should not have returned any module errors: %v", moduleErrors) t.Fatalf("router should not have returned any module errors: %v", moduleErrors)