mirror of
https://github.com/jwetzell/showbridge-go.git
synced 2026-04-26 21:05:30 +00:00
rework route to just process payload
This commit is contained in:
@@ -2,7 +2,6 @@ package route
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/jwetzell/showbridge-go/internal/config"
|
"github.com/jwetzell/showbridge-go/internal/config"
|
||||||
@@ -33,7 +32,7 @@ type RouteIO interface {
|
|||||||
type Route interface {
|
type Route interface {
|
||||||
Input() string
|
Input() string
|
||||||
Output() string
|
Output() string
|
||||||
HandleInput(ctx context.Context, payload any) error
|
ProcessPayload(ctx context.Context, payload any) (any, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type ProcessorRoute struct {
|
type ProcessorRoute struct {
|
||||||
@@ -71,24 +70,18 @@ func (r *ProcessorRoute) Output() string {
|
|||||||
return r.output
|
return r.output
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ProcessorRoute) HandleInput(ctx context.Context, payload any) error {
|
func (r *ProcessorRoute) ProcessPayload(ctx context.Context, payload any) (any, error) {
|
||||||
router, ok := ctx.Value(RouterContextKey).(RouteIO)
|
|
||||||
|
|
||||||
if !ok {
|
|
||||||
return errors.New("unable to get router from context")
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, processor := range r.processors {
|
for _, processor := range r.processors {
|
||||||
processedPayload, err := processor.Process(ctx, payload)
|
processedPayload, err := processor.Process(ctx, payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
//NOTE(jwetzell) nil payload will result in the route being "terminated"
|
//NOTE(jwetzell) nil payload will result in the route being "terminated"
|
||||||
if processedPayload == nil {
|
if processedPayload == nil {
|
||||||
return nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
payload = processedPayload
|
payload = processedPayload
|
||||||
}
|
}
|
||||||
|
|
||||||
return router.HandleOutput(ctx, r.output, payload)
|
return payload, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package route_test
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"slices"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/jwetzell/showbridge-go/internal/config"
|
"github.com/jwetzell/showbridge-go/internal/config"
|
||||||
@@ -52,9 +53,18 @@ func TestGoodRouteHandleInput(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
inputData := "test input data"
|
inputData := "test input data"
|
||||||
err = testRoute.HandleInput(context.WithValue(t.Context(), route.RouterContextKey, &MockRouter{}), inputData)
|
payload, err := testRoute.ProcessPayload(context.WithValue(t.Context(), route.RouterContextKey, &MockRouter{}), inputData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("route HandleOutput returned error: %v", err)
|
t.Fatalf("route ProcessPayload returned error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
payloadBytes, ok := payload.([]byte)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("payload should be []byte got %T", payload)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !slices.Equal([]byte(inputData), payloadBytes) {
|
||||||
|
t.Fatalf("route returned the wrong payload. expected: %+v got %+v", inputData, payloadBytes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -73,7 +83,7 @@ func TestRouteHandleInputWithProcessorError(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
inputData := "test input data"
|
inputData := "test input data"
|
||||||
err = testRoute.HandleInput(context.WithValue(t.Context(), route.RouterContextKey, &MockRouter{}), inputData)
|
_, err = testRoute.ProcessPayload(context.WithValue(t.Context(), route.RouterContextKey, &MockRouter{}), inputData)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatalf("route HandleOutput did not return error for bad processor")
|
t.Fatalf("route HandleOutput did not return error for bad processor")
|
||||||
}
|
}
|
||||||
@@ -92,9 +102,12 @@ func TestRouteHandleNilPayload(t *testing.T) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = testRoute.HandleInput(context.WithValue(t.Context(), route.RouterContextKey, &MockRouter{}), nil)
|
payload, err := testRoute.ProcessPayload(context.WithValue(t.Context(), route.RouterContextKey, &MockRouter{}), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("route HandleOutput returned error for nil payload: %v", err)
|
t.Fatalf("route ProcessPayload returned error: %v", err)
|
||||||
|
}
|
||||||
|
if payload != nil {
|
||||||
|
t.Fatalf("route returned the wrong payload")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -112,10 +125,14 @@ func TestRouteHandleNilPayloadFromProcessor(t *testing.T) {
|
|||||||
t.Fatalf("route failed to create: %v", err)
|
t.Fatalf("route failed to create: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = testRoute.HandleInput(context.WithValue(t.Context(), route.RouterContextKey, &MockRouter{}), nil)
|
payload, err := testRoute.ProcessPayload(context.WithValue(t.Context(), route.RouterContextKey, &MockRouter{}), "test")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("route HandleOutput returned error for nil payload: %v", err)
|
t.Fatalf("route HandleOutput returned error for nil payload: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if payload != nil {
|
||||||
|
t.Fatalf("route returned the wrong payload")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRouteUnknownProcessor(t *testing.T) {
|
func TestRouteUnknownProcessor(t *testing.T) {
|
||||||
|
|||||||
@@ -133,7 +133,9 @@ func (r *Router) HandleInput(sourceId string, payload any) []route.RouteIOError
|
|||||||
var routingErrors []route.RouteIOError
|
var routingErrors []route.RouteIOError
|
||||||
for routeIndex, routeInstance := range r.RouteInstances {
|
for routeIndex, routeInstance := range r.RouteInstances {
|
||||||
if routeInstance.Input() == sourceId {
|
if routeInstance.Input() == sourceId {
|
||||||
err := routeInstance.HandleInput(context.WithValue(r.Context, route.SourceContextKey, sourceId), payload)
|
routeContext := context.WithValue(r.Context, route.SourceContextKey, sourceId)
|
||||||
|
|
||||||
|
payload, err := routeInstance.ProcessPayload(routeContext, payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if routingErrors == nil {
|
if routingErrors == nil {
|
||||||
routingErrors = []route.RouteIOError{}
|
routingErrors = []route.RouteIOError{}
|
||||||
@@ -144,6 +146,7 @@ func (r *Router) HandleInput(sourceId string, payload any) []route.RouteIOError
|
|||||||
})
|
})
|
||||||
r.logger.Error("unable to route input", "route", routeIndex, "source", sourceId, "error", err)
|
r.logger.Error("unable to route input", "route", routeIndex, "source", sourceId, "error", err)
|
||||||
}
|
}
|
||||||
|
r.HandleOutput(routeContext, routeInstance.Output(), payload)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return routingErrors
|
return routingErrors
|
||||||
|
|||||||
Reference in New Issue
Block a user