From 833bd529d62553250a7a57e943fc436b6bfbcb0b Mon Sep 17 00:00:00 2001 From: Joel Wetzell Date: Wed, 6 May 2026 21:33:16 -0500 Subject: [PATCH] pull all non-request scoped values out of context --- internal/common/context.go | 8 -- internal/common/module.go | 2 +- internal/common/payload.go | 33 +------- internal/common/payload_test.go | 82 ------------------- internal/module/db-sqlite.go | 8 +- internal/module/http-server.go | 12 +-- internal/module/midi-input.go | 8 +- internal/module/midi-output.go | 7 +- internal/module/mqtt-client.go | 7 +- internal/module/nats-client.go | 8 +- internal/module/nats-server.go | 8 +- internal/module/psn-client.go | 8 +- internal/module/redis-client.go | 8 +- internal/module/serial-client.go | 8 +- internal/module/sip-call-server.go | 7 +- internal/module/sip-dtmf-server.go | 7 +- internal/module/tcp-client.go | 7 +- internal/module/tcp-server.go | 12 +-- internal/module/test/db-sqlite_test.go | 2 +- internal/module/test/http-server_test.go | 2 +- internal/module/test/midi-input_test.go | 2 +- internal/module/test/midi-output_test.go | 2 +- internal/module/test/mqtt-client_test.go | 2 +- internal/module/test/nats-client_test.go | 2 +- internal/module/test/nats-server_test.go | 2 +- internal/module/test/psn-client_test.go | 2 +- internal/module/test/redis-client_test.go | 2 +- internal/module/test/serial-client_test.go | 2 +- internal/module/test/sip-call-server_test.go | 2 +- internal/module/test/sip-dtmf-server_test.go | 2 +- internal/module/test/tcp-client_test.go | 2 +- internal/module/test/tcp-server_test.go | 2 +- internal/module/test/time-interval_test.go | 2 +- internal/module/test/time-timer_test.go | 2 +- internal/module/test/udp-client_test.go | 2 +- internal/module/test/udp-multicast_test.go | 2 +- internal/module/test/udp-server_test.go | 2 +- internal/module/time-interval.go | 8 +- internal/module/time-timer.go | 8 +- internal/module/udp-client.go | 7 +- internal/module/udp-multicast.go | 7 +- internal/module/udp-server.go | 12 +-- internal/processor/router-input.go | 6 +- internal/processor/router-output.go | 5 +- internal/processor/script-js.go | 2 - .../test/artnet-packet-decode_test.go | 4 +- .../test/artnet-packet-encode_test.go | 4 +- internal/processor/test/db-query_test.go | 76 ++++++++--------- internal/processor/test/debug-log_test.go | 6 +- internal/processor/test/filter-change_test.go | 8 +- internal/processor/test/filter-expr_test.go | 4 +- internal/processor/test/filter-regex_test.go | 6 +- internal/processor/test/float-parse_test.go | 4 +- internal/processor/test/float-random_test.go | 4 +- internal/processor/test/free-d-create_test.go | 4 +- internal/processor/test/free-d-decode_test.go | 4 +- internal/processor/test/free-d-encode_test.go | 4 +- .../processor/test/http-request-do_test.go | 4 +- .../test/http-response-create_test.go | 4 +- internal/processor/test/int-parse_test.go | 4 +- internal/processor/test/int-random_test.go | 6 +- internal/processor/test/int-scale_test.go | 4 +- internal/processor/test/json-decode_test.go | 6 +- internal/processor/test/json-encode_test.go | 6 +- internal/processor/test/kv-get_test.go | 57 +++++++------ internal/processor/test/kv-set_test.go | 73 ++++++++--------- .../test/midi-control_change-create_test.go | 4 +- .../test/midi-message-decode_test.go | 4 +- .../test/midi-message-encode_test.go | 4 +- .../test/midi-message-unpack_test.go | 4 +- .../test/midi-note_off-create_test.go | 4 +- .../test/midi-note_on-create_test.go | 4 +- .../test/midi-program_change-create_test.go | 4 +- .../test/mqtt-message-create_test.go | 4 +- .../test/nats-message-create_test.go | 4 +- .../processor/test/osc-message-create_test.go | 4 +- .../processor/test/osc-message-decode_test.go | 4 +- .../processor/test/osc-message-encode_test.go | 4 +- internal/processor/test/router-input_test.go | 47 ++++++----- internal/processor/test/router-output_test.go | 46 +++++------ internal/processor/test/script-expr_test.go | 4 +- internal/processor/test/script-js_test.go | 6 +- internal/processor/test/script-wasm_test.go | 4 +- .../test/sip-response-audio-create_test.go | 4 +- .../test/sip-response-dtmf-create_test.go | 4 +- internal/processor/test/string-create_test.go | 6 +- internal/processor/test/string-decode_test.go | 6 +- internal/processor/test/string-encode_test.go | 6 +- internal/processor/test/string-split_test.go | 6 +- .../processor/test/struct-field-get_test.go | 6 +- .../processor/test/struct-method-get_test.go | 6 +- internal/processor/test/time-sleep_test.go | 4 +- internal/route/route.go | 5 +- internal/route/route_test.go | 20 ++++- internal/test/context.go | 27 ------ internal/test/module.go | 7 +- router.go | 18 ++-- router_test.go | 7 +- 98 files changed, 328 insertions(+), 584 deletions(-) delete mode 100644 internal/common/context.go delete mode 100644 internal/common/payload_test.go delete mode 100644 internal/test/context.go diff --git a/internal/common/context.go b/internal/common/context.go deleted file mode 100644 index c49cbc0..0000000 --- a/internal/common/context.go +++ /dev/null @@ -1,8 +0,0 @@ -package common - -type contextKey string - -const RouterContextKey contextKey = contextKey("router") -const SourceContextKey contextKey = contextKey("source") -const ModulesContextKey contextKey = contextKey("modules") -const SenderContextKey contextKey = contextKey("sender") diff --git a/internal/common/module.go b/internal/common/module.go index f81bffa..cbf89f9 100644 --- a/internal/common/module.go +++ b/internal/common/module.go @@ -8,7 +8,7 @@ import ( type Module interface { Id() string Type() string - Start(context.Context) error + Start(context.Context, RouteIO) error Stop() } diff --git a/internal/common/payload.go b/internal/common/payload.go index 8f13544..ac28bf8 100644 --- a/internal/common/payload.go +++ b/internal/common/payload.go @@ -1,40 +1,9 @@ package common -import ( - "context" -) - type WrappedPayload struct { Payload any + Router RouteIO Modules map[string]Module - Sender any Source string End bool } - -func GetWrappedPayload(ctx context.Context, payload any) WrappedPayload { - wrappedPayload := WrappedPayload{ - Payload: payload, - End: false, - } - modules := ctx.Value(ModulesContextKey) - if modules != nil { - moduleMap, ok := modules.(map[string]Module) - if ok { - wrappedPayload.Modules = moduleMap - } else { - wrappedPayload.Modules = make(map[string]Module) - } - } - - sender := ctx.Value(SenderContextKey) - if sender != nil { - wrappedPayload.Sender = sender - } - - source := ctx.Value(SourceContextKey) - if source != nil { - wrappedPayload.Source = source.(string) - } - return wrappedPayload -} diff --git a/internal/common/payload_test.go b/internal/common/payload_test.go deleted file mode 100644 index e2d9ffb..0000000 --- a/internal/common/payload_test.go +++ /dev/null @@ -1,82 +0,0 @@ -package common_test - -import ( - "context" - "reflect" - "testing" - - "github.com/jwetzell/showbridge-go/internal/common" - "github.com/jwetzell/showbridge-go/internal/test" -) - -func TestGoodGetWrappedPayload(t *testing.T) { - testCases := []struct { - name string - ctx context.Context - payload any - expected common.WrappedPayload - }{ - { - name: "basic", - ctx: t.Context(), - payload: "test", - expected: common.WrappedPayload{ - Payload: "test", - }, - }, - { - name: "with modules in context", - ctx: test.GetContextWithModules(t.Context(), map[string]common.Module{}), - payload: "test", - expected: common.WrappedPayload{ - Payload: "test", - Modules: map[string]common.Module{}, - }, - }, - { - name: "with sender in context", - ctx: test.GetContextWithSender(t.Context(), "sender"), - payload: "test", - expected: common.WrappedPayload{ - Payload: "test", - Sender: "sender", - }, - }, - { - name: "with source in context", - ctx: test.GetContextWithSource(t.Context(), "source"), - payload: "test", - expected: common.WrappedPayload{ - Payload: "test", - Source: "source", - }, - }, - { - name: "with all fields in context", - ctx: test.GetContextWithSource( - test.GetContextWithSender( - test.GetContextWithModules(t.Context(), map[string]common.Module{}), - "sender", - ), - "source", - ), - payload: "test", - expected: common.WrappedPayload{ - Payload: "test", - Modules: map[string]common.Module{}, - Sender: "sender", - Source: "source", - }, - }, - } - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - wrappedPayload := common.GetWrappedPayload(testCase.ctx, testCase.payload) - - if !reflect.DeepEqual(wrappedPayload, testCase.expected) { - t.Fatalf("GetWrappedPayload expected got %+v, expected %+v", wrappedPayload, testCase.expected) - } - }) - } - -} diff --git a/internal/module/db-sqlite.go b/internal/module/db-sqlite.go index c0941e0..a3dd227 100644 --- a/internal/module/db-sqlite.go +++ b/internal/module/db-sqlite.go @@ -3,7 +3,6 @@ package module import ( "context" "database/sql" - "errors" "fmt" "log/slog" @@ -59,13 +58,8 @@ func (t *DbSqlite) Type() string { return t.config.Type } -func (t *DbSqlite) Start(ctx context.Context) error { +func (t *DbSqlite) Start(ctx context.Context, router common.RouteIO) error { t.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) - - if !ok { - return errors.New("db.sqlite unable to get router from context") - } t.router = router t.ctx = ctx diff --git a/internal/module/http-server.go b/internal/module/http-server.go index 4a6068e..ae8d95f 100644 --- a/internal/module/http-server.go +++ b/internal/module/http-server.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "log/slog" - "net" "net/http" "github.com/google/jsonschema-go/jsonschema" @@ -99,10 +98,6 @@ func (hs *HTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { } if hs.router != nil { inputContext := context.WithValue(hs.ctx, httpServerContextKey("responseWriter"), &responseWriter) - senderAddr, err := net.ResolveTCPAddr("tcp", r.RemoteAddr) - if err == nil { - inputContext = context.WithValue(inputContext, common.SenderContextKey, senderAddr) - } aRouteFound, routingErrors := hs.router.HandleInput(inputContext, hs.Id(), r) if !responseWriter.done { if aRouteFound { @@ -160,13 +155,8 @@ func (hs *HTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } -func (hs *HTTPServer) Start(ctx context.Context) error { +func (hs *HTTPServer) Start(ctx context.Context, router common.RouteIO) error { hs.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) - - if !ok { - return errors.New("http.server unable to get router from context") - } hs.router = router moduleContext, cancel := context.WithCancel(ctx) hs.ctx = moduleContext diff --git a/internal/module/midi-input.go b/internal/module/midi-input.go index 40eb5eb..d028eb3 100644 --- a/internal/module/midi-input.go +++ b/internal/module/midi-input.go @@ -4,7 +4,6 @@ package module import ( "context" - "errors" "fmt" "log/slog" @@ -60,14 +59,9 @@ func (mi *MIDIInput) Type() string { return mi.config.Type } -func (mi *MIDIInput) Start(ctx context.Context) error { +func (mi *MIDIInput) Start(ctx context.Context, router common.RouteIO) error { mi.logger.Debug("running") defer midi.CloseDriver() - router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) - - if !ok { - return errors.New("midi.input unable to get router from context") - } mi.router = router moduleContext, cancel := context.WithCancel(ctx) mi.ctx = moduleContext diff --git a/internal/module/midi-output.go b/internal/module/midi-output.go index d4660be..b8a323f 100644 --- a/internal/module/midi-output.go +++ b/internal/module/midi-output.go @@ -61,14 +61,9 @@ func (mo *MIDIOutput) Type() string { return mo.config.Type } -func (mo *MIDIOutput) Start(ctx context.Context) error { +func (mo *MIDIOutput) Start(ctx context.Context, router common.RouteIO) error { mo.logger.Debug("running") defer midi.CloseDriver() - router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) - - if !ok { - return errors.New("midi.output unable to get router from context") - } mo.router = router moduleContext, cancel := context.WithCancel(ctx) mo.ctx = moduleContext diff --git a/internal/module/mqtt-client.go b/internal/module/mqtt-client.go index c73d675..4c1357c 100644 --- a/internal/module/mqtt-client.go +++ b/internal/module/mqtt-client.go @@ -80,13 +80,8 @@ func (mc *MQTTClient) Type() string { return mc.config.Type } -func (mc *MQTTClient) Start(ctx context.Context) error { +func (mc *MQTTClient) Start(ctx context.Context, router common.RouteIO) error { mc.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) - - if !ok { - return errors.New("mqtt.client unable to get router from context") - } mc.router = router moduleContext, cancel := context.WithCancel(ctx) mc.ctx = moduleContext diff --git a/internal/module/nats-client.go b/internal/module/nats-client.go index e32f6e7..e133b09 100644 --- a/internal/module/nats-client.go +++ b/internal/module/nats-client.go @@ -68,14 +68,8 @@ func (nc *NATSClient) Type() string { return nc.config.Type } -func (nc *NATSClient) Start(ctx context.Context) error { +func (nc *NATSClient) Start(ctx context.Context, router common.RouteIO) error { nc.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) - - if !ok { - return errors.New("nats.client unable to get router from context") - } - nc.router = router moduleContext, cancel := context.WithCancel(ctx) nc.ctx = moduleContext diff --git a/internal/module/nats-server.go b/internal/module/nats-server.go index 7b2ebd0..dd420bd 100644 --- a/internal/module/nats-server.go +++ b/internal/module/nats-server.go @@ -86,14 +86,8 @@ func (ns *NATSServer) Type() string { return ns.config.Type } -func (ns *NATSServer) Start(ctx context.Context) error { +func (ns *NATSServer) Start(ctx context.Context, router common.RouteIO) error { ns.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) - - if !ok { - return errors.New("nats.server unable to get router from context") - } - ns.router = router moduleContext, cancel := context.WithCancel(ctx) ns.ctx = moduleContext diff --git a/internal/module/psn-client.go b/internal/module/psn-client.go index bc06cb0..fbdbc7b 100644 --- a/internal/module/psn-client.go +++ b/internal/module/psn-client.go @@ -2,7 +2,6 @@ package module import ( "context" - "errors" "log/slog" "net" "time" @@ -40,13 +39,8 @@ func (pc *PSNClient) Type() string { return pc.config.Type } -func (pc *PSNClient) Start(ctx context.Context) error { +func (pc *PSNClient) Start(ctx context.Context, router common.RouteIO) error { pc.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) - - if !ok { - return errors.New("psn.client unable to get router from context") - } pc.router = router moduleContext, cancel := context.WithCancel(ctx) pc.ctx = moduleContext diff --git a/internal/module/redis-client.go b/internal/module/redis-client.go index 7664d24..16f46bd 100644 --- a/internal/module/redis-client.go +++ b/internal/module/redis-client.go @@ -73,15 +73,9 @@ func (rc *RedisClient) Printf(ctx context.Context, format string, v ...interface rc.logger.Debug(msg) } -func (rc *RedisClient) Start(ctx context.Context) error { +func (rc *RedisClient) Start(ctx context.Context, router common.RouteIO) error { redis.SetLogger(rc) rc.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) - - if !ok { - return errors.New("redis.client unable to get router from context") - } - rc.router = router moduleContext, cancel := context.WithCancel(ctx) rc.ctx = moduleContext diff --git a/internal/module/serial-client.go b/internal/module/serial-client.go index 884fc81..d12ede8 100644 --- a/internal/module/serial-client.go +++ b/internal/module/serial-client.go @@ -99,14 +99,8 @@ func (sc *SerialClient) SetupPort() error { return nil } -func (sc *SerialClient) Start(ctx context.Context) error { +func (sc *SerialClient) Start(ctx context.Context, router common.RouteIO) error { sc.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) - - if !ok { - return errors.New("serial.client unable to get router from context") - } - sc.router = router moduleContext, cancel := context.WithCancel(ctx) sc.ctx = moduleContext diff --git a/internal/module/sip-call-server.go b/internal/module/sip-call-server.go index c2bbab4..3442809 100644 --- a/internal/module/sip-call-server.go +++ b/internal/module/sip-call-server.go @@ -131,13 +131,8 @@ func (scs *SIPCallServer) Type() string { return scs.config.Type } -func (scs *SIPCallServer) Start(ctx context.Context) error { +func (scs *SIPCallServer) Start(ctx context.Context, router common.RouteIO) error { scs.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) - - if !ok { - return errors.New("sip.call.server unable to get router from context") - } scs.router = router moduleContext, cancel := context.WithCancel(ctx) scs.ctx = moduleContext diff --git a/internal/module/sip-dtmf-server.go b/internal/module/sip-dtmf-server.go index e8aaacd..6e8edcf 100644 --- a/internal/module/sip-dtmf-server.go +++ b/internal/module/sip-dtmf-server.go @@ -150,13 +150,8 @@ func (sds *SIPDTMFServer) Type() string { return sds.config.Type } -func (sds *SIPDTMFServer) Start(ctx context.Context) error { +func (sds *SIPDTMFServer) Start(ctx context.Context, router common.RouteIO) error { sds.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) - - if !ok { - return errors.New("sip.dtmf.server unable to get router from context") - } sds.router = router moduleContext, cancel := context.WithCancel(ctx) sds.ctx = moduleContext diff --git a/internal/module/tcp-client.go b/internal/module/tcp-client.go index da1c621..1e82ce2 100644 --- a/internal/module/tcp-client.go +++ b/internal/module/tcp-client.go @@ -91,13 +91,8 @@ func (tc *TCPClient) Type() string { return tc.config.Type } -func (tc *TCPClient) Start(ctx context.Context) error { +func (tc *TCPClient) Start(ctx context.Context, router common.RouteIO) error { tc.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) - - if !ok { - return errors.New("net.tcp.client unable to get router from context") - } tc.router = router moduleContext, cancel := context.WithCancel(ctx) tc.ctx = moduleContext diff --git a/internal/module/tcp-server.go b/internal/module/tcp-server.go index 828f057..a11498c 100644 --- a/internal/module/tcp-server.go +++ b/internal/module/tcp-server.go @@ -166,10 +166,9 @@ ClientRead: messages := ts.Framer.Decode(buffer[0:byteCount]) for _, message := range messages { if ts.router != nil { - senderAddr, ok := client.RemoteAddr().(*net.TCPAddr) + _, ok := client.RemoteAddr().(*net.TCPAddr) if ok { - senderCtx := context.WithValue(ts.ctx, common.SenderContextKey, senderAddr) - ts.router.HandleInput(senderCtx, ts.Id(), message) + ts.router.HandleInput(ts.ctx, ts.Id(), message) } else { ts.router.HandleInput(ts.ctx, ts.Id(), message) } @@ -183,13 +182,8 @@ ClientRead: } } -func (ts *TCPServer) Start(ctx context.Context) error { +func (ts *TCPServer) Start(ctx context.Context, router common.RouteIO) error { ts.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) - - if !ok { - return errors.New("net.tcp.server unable to get router from context") - } ts.router = router moduleContext, cancel := context.WithCancel(ctx) ts.ctx = moduleContext diff --git a/internal/module/test/db-sqlite_test.go b/internal/module/test/db-sqlite_test.go index b4e2f9d..c23dcc6 100644 --- a/internal/module/test/db-sqlite_test.go +++ b/internal/module/test/db-sqlite_test.go @@ -73,7 +73,7 @@ func TestBadDbSqlite(t *testing.T) { return } - err = moduleInstance.Start(t.Context()) + err = moduleInstance.Start(t.Context(), nil) if err == nil { t.Fatalf("db.sqlite expected to fail") diff --git a/internal/module/test/http-server_test.go b/internal/module/test/http-server_test.go index 2936a37..40aac8d 100644 --- a/internal/module/test/http-server_test.go +++ b/internal/module/test/http-server_test.go @@ -73,7 +73,7 @@ func TestBadHTTPServer(t *testing.T) { return } - err = moduleInstance.Start(t.Context()) + err = moduleInstance.Start(t.Context(), nil) if err == nil { t.Fatalf("http.server expected to fail") diff --git a/internal/module/test/midi-input_test.go b/internal/module/test/midi-input_test.go index 2916600..69f2c43 100644 --- a/internal/module/test/midi-input_test.go +++ b/internal/module/test/midi-input_test.go @@ -73,7 +73,7 @@ func TestBadMIDIInput(t *testing.T) { return } - err = moduleInstance.Start(t.Context()) + err = moduleInstance.Start(t.Context(), nil) if err == nil { t.Fatalf("midi.input expected to fail") diff --git a/internal/module/test/midi-output_test.go b/internal/module/test/midi-output_test.go index 6cfa466..17a1d4e 100644 --- a/internal/module/test/midi-output_test.go +++ b/internal/module/test/midi-output_test.go @@ -73,7 +73,7 @@ func TestBadMIDIOutput(t *testing.T) { return } - err = moduleInstance.Start(t.Context()) + err = moduleInstance.Start(t.Context(), nil) if err == nil { t.Fatalf("midi.output expected to fail") diff --git a/internal/module/test/mqtt-client_test.go b/internal/module/test/mqtt-client_test.go index 1b579b2..2d83a83 100644 --- a/internal/module/test/mqtt-client_test.go +++ b/internal/module/test/mqtt-client_test.go @@ -116,7 +116,7 @@ func TestBadMQTTClient(t *testing.T) { return } - err = moduleInstance.Start(t.Context()) + err = moduleInstance.Start(t.Context(), nil) if err == nil { t.Fatalf("mqtt.client expected to fail") diff --git a/internal/module/test/nats-client_test.go b/internal/module/test/nats-client_test.go index 65538b2..04c004d 100644 --- a/internal/module/test/nats-client_test.go +++ b/internal/module/test/nats-client_test.go @@ -94,7 +94,7 @@ func TestBadNATSClient(t *testing.T) { return } - err = moduleInstance.Start(t.Context()) + err = moduleInstance.Start(t.Context(), nil) if err == nil { t.Fatalf("nats.client expected to fail") diff --git a/internal/module/test/nats-server_test.go b/internal/module/test/nats-server_test.go index d95d8b1..52a4da5 100644 --- a/internal/module/test/nats-server_test.go +++ b/internal/module/test/nats-server_test.go @@ -71,7 +71,7 @@ func TestBadNATSServer(t *testing.T) { return } - err = moduleInstance.Start(t.Context()) + err = moduleInstance.Start(t.Context(), nil) if err == nil { t.Fatalf("nats.server expected to fail") diff --git a/internal/module/test/psn-client_test.go b/internal/module/test/psn-client_test.go index 85f9ad3..6ad74a1 100644 --- a/internal/module/test/psn-client_test.go +++ b/internal/module/test/psn-client_test.go @@ -59,7 +59,7 @@ func TestBadPSNClient(t *testing.T) { return } - err = moduleInstance.Start(t.Context()) + err = moduleInstance.Start(t.Context(), nil) if err == nil { t.Fatalf("psn.client expected to fail") diff --git a/internal/module/test/redis-client_test.go b/internal/module/test/redis-client_test.go index 0f34247..040454c 100644 --- a/internal/module/test/redis-client_test.go +++ b/internal/module/test/redis-client_test.go @@ -94,7 +94,7 @@ func TestBadRedisClient(t *testing.T) { return } - err = moduleInstance.Start(t.Context()) + err = moduleInstance.Start(t.Context(), nil) if err == nil { t.Fatalf("redis.client expected to fail") diff --git a/internal/module/test/serial-client_test.go b/internal/module/test/serial-client_test.go index 46876d6..8c7627e 100644 --- a/internal/module/test/serial-client_test.go +++ b/internal/module/test/serial-client_test.go @@ -103,7 +103,7 @@ func TestBadSerialClient(t *testing.T) { return } - err = moduleInstance.Start(t.Context()) + err = moduleInstance.Start(t.Context(), nil) if err == nil { t.Fatalf("serial.client expected to fail") diff --git a/internal/module/test/sip-call-server_test.go b/internal/module/test/sip-call-server_test.go index 667fdca..bbb8fa0 100644 --- a/internal/module/test/sip-call-server_test.go +++ b/internal/module/test/sip-call-server_test.go @@ -88,7 +88,7 @@ func TestBadSIPCallServer(t *testing.T) { return } - err = moduleInstance.Start(t.Context()) + err = moduleInstance.Start(t.Context(), nil) if err == nil { t.Fatalf("sip.call.server expected to fail") diff --git a/internal/module/test/sip-dtmf-server_test.go b/internal/module/test/sip-dtmf-server_test.go index f5dbc22..0b328c3 100644 --- a/internal/module/test/sip-dtmf-server_test.go +++ b/internal/module/test/sip-dtmf-server_test.go @@ -107,7 +107,7 @@ func TestBadSIPDTMFServer(t *testing.T) { return } - err = moduleInstance.Start(t.Context()) + err = moduleInstance.Start(t.Context(), nil) if err == nil { t.Fatalf("sip.dtmf.server expected to fail") diff --git a/internal/module/test/tcp-client_test.go b/internal/module/test/tcp-client_test.go index e902f10..56a0d86 100644 --- a/internal/module/test/tcp-client_test.go +++ b/internal/module/test/tcp-client_test.go @@ -95,7 +95,7 @@ func TestBadTCPClient(t *testing.T) { return } - err = moduleInstance.Start(t.Context()) + err = moduleInstance.Start(t.Context(), nil) if err == nil { t.Fatalf("net.tcp.client expected to fail") diff --git a/internal/module/test/tcp-server_test.go b/internal/module/test/tcp-server_test.go index da2a794..cc1c997 100644 --- a/internal/module/test/tcp-server_test.go +++ b/internal/module/test/tcp-server_test.go @@ -120,7 +120,7 @@ func TestBadTCPServer(t *testing.T) { return } - err = moduleInstance.Start(t.Context()) + err = moduleInstance.Start(t.Context(), nil) if err == nil { t.Fatalf("net.tcp.server expected to fail") diff --git a/internal/module/test/time-interval_test.go b/internal/module/test/time-interval_test.go index 4d3e393..10eb07c 100644 --- a/internal/module/test/time-interval_test.go +++ b/internal/module/test/time-interval_test.go @@ -75,7 +75,7 @@ func TestBadTimeInterval(t *testing.T) { return } - err = moduleInstance.Start(t.Context()) + err = moduleInstance.Start(t.Context(), nil) if err == nil { t.Fatalf("time.interval expected to fail") diff --git a/internal/module/test/time-timer_test.go b/internal/module/test/time-timer_test.go index bae5214..c02870b 100644 --- a/internal/module/test/time-timer_test.go +++ b/internal/module/test/time-timer_test.go @@ -75,7 +75,7 @@ func TestBadTimeTimer(t *testing.T) { return } - err = moduleInstance.Start(t.Context()) + err = moduleInstance.Start(t.Context(), nil) if err == nil { t.Fatalf("time.timer expected to fail") diff --git a/internal/module/test/udp-client_test.go b/internal/module/test/udp-client_test.go index 3c6d15c..bda415f 100644 --- a/internal/module/test/udp-client_test.go +++ b/internal/module/test/udp-client_test.go @@ -95,7 +95,7 @@ func TestBadUDPClient(t *testing.T) { return } - err = moduleInstance.Start(t.Context()) + err = moduleInstance.Start(t.Context(), nil) if err == nil { t.Fatalf("net.udp.client expected to fail") diff --git a/internal/module/test/udp-multicast_test.go b/internal/module/test/udp-multicast_test.go index dccc1d7..9699e52 100644 --- a/internal/module/test/udp-multicast_test.go +++ b/internal/module/test/udp-multicast_test.go @@ -102,7 +102,7 @@ func TestBadUDPMulticast(t *testing.T) { return } - err = moduleInstance.Start(t.Context()) + err = moduleInstance.Start(t.Context(), nil) if err == nil { t.Fatalf("net.udp.multicast expected to fail") diff --git a/internal/module/test/udp-server_test.go b/internal/module/test/udp-server_test.go index dc32dfa..03b477f 100644 --- a/internal/module/test/udp-server_test.go +++ b/internal/module/test/udp-server_test.go @@ -99,7 +99,7 @@ func TestBadUDPServer(t *testing.T) { return } - err = moduleInstance.Start(t.Context()) + err = moduleInstance.Start(t.Context(), nil) if err == nil { t.Fatalf("net.udp.server expected to fail") diff --git a/internal/module/time-interval.go b/internal/module/time-interval.go index 3a8ae30..149ec89 100644 --- a/internal/module/time-interval.go +++ b/internal/module/time-interval.go @@ -2,7 +2,6 @@ package module import ( "context" - "errors" "fmt" "log/slog" "time" @@ -58,13 +57,8 @@ func (i *TimeInterval) Type() string { return i.config.Type } -func (i *TimeInterval) Start(ctx context.Context) error { +func (i *TimeInterval) Start(ctx context.Context, router common.RouteIO) error { i.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) - - if !ok { - return errors.New("time.interval unable to get router from context") - } i.router = router moduleContext, cancel := context.WithCancel(ctx) i.ctx = moduleContext diff --git a/internal/module/time-timer.go b/internal/module/time-timer.go index 22e4958..77bd641 100644 --- a/internal/module/time-timer.go +++ b/internal/module/time-timer.go @@ -2,7 +2,6 @@ package module import ( "context" - "errors" "fmt" "log/slog" "time" @@ -59,13 +58,8 @@ func (t *TimeTimer) Type() string { return t.config.Type } -func (t *TimeTimer) Start(ctx context.Context) error { +func (t *TimeTimer) Start(ctx context.Context, router common.RouteIO) error { t.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) - - if !ok { - return errors.New("net.tcp.client unable to get router from context") - } t.router = router moduleContext, cancel := context.WithCancel(ctx) t.ctx = moduleContext diff --git a/internal/module/udp-client.go b/internal/module/udp-client.go index ee9a0c2..9a4916b 100644 --- a/internal/module/udp-client.go +++ b/internal/module/udp-client.go @@ -79,13 +79,8 @@ func (uc *UDPClient) SetupConn() error { return err } -func (uc *UDPClient) Start(ctx context.Context) error { +func (uc *UDPClient) Start(ctx context.Context, router common.RouteIO) error { uc.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) - - if !ok { - return errors.New("net.udp.client unable to get router from context") - } uc.router = router moduleContext, cancel := context.WithCancel(ctx) uc.ctx = moduleContext diff --git a/internal/module/udp-multicast.go b/internal/module/udp-multicast.go index f87a389..9677656 100644 --- a/internal/module/udp-multicast.go +++ b/internal/module/udp-multicast.go @@ -73,13 +73,8 @@ func (um *UDPMulticast) Type() string { return um.config.Type } -func (um *UDPMulticast) Start(ctx context.Context) error { +func (um *UDPMulticast) Start(ctx context.Context, router common.RouteIO) error { um.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) - - if !ok { - return errors.New("net.udp.multicast unable to get router from context") - } um.router = router moduleContext, cancel := context.WithCancel(ctx) um.ctx = moduleContext diff --git a/internal/module/udp-server.go b/internal/module/udp-server.go index 36301fe..9acde51 100644 --- a/internal/module/udp-server.go +++ b/internal/module/udp-server.go @@ -95,13 +95,8 @@ func (us *UDPServer) Type() string { return us.config.Type } -func (us *UDPServer) Start(ctx context.Context) error { +func (us *UDPServer) Start(ctx context.Context, router common.RouteIO) error { us.logger.Debug("running") - router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) - - if !ok { - return errors.New("net.udp.server unable to get router from context") - } us.router = router moduleContext, cancel := context.WithCancel(ctx) us.ctx = moduleContext @@ -124,7 +119,7 @@ func (us *UDPServer) Start(ctx context.Context) error { default: listener.SetDeadline(time.Now().Add(time.Millisecond * 200)) - numBytes, senderAddr, err := listener.ReadFromUDP(buffer) + numBytes, _, err := listener.ReadFromUDP(buffer) if err != nil { //NOTE(jwetzell) we hit deadline if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() { @@ -134,8 +129,7 @@ func (us *UDPServer) Start(ctx context.Context) error { } message := buffer[:numBytes] if us.router != nil { - senderCtx := context.WithValue(us.ctx, common.SenderContextKey, senderAddr) - us.router.HandleInput(senderCtx, us.Id(), message) + us.router.HandleInput(us.ctx, us.Id(), message) } else { us.logger.Error("input received but no router is configured") } diff --git a/internal/processor/router-input.go b/internal/processor/router-input.go index 9e144ce..9626279 100644 --- a/internal/processor/router-input.go +++ b/internal/processor/router-input.go @@ -20,14 +20,12 @@ type RouterInput struct { func (ro *RouterInput) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) { payload := wrappedPayload.Payload - router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) - if !ok { - + if wrappedPayload.Router == nil { wrappedPayload.End = true return wrappedPayload, errors.New("router.input no router found") } - _, err := router.HandleInput(ctx, ro.SourceId, payload) + _, err := wrappedPayload.Router.HandleInput(ctx, ro.SourceId, payload) if err != nil { wrappedPayload.End = true diff --git a/internal/processor/router-output.go b/internal/processor/router-output.go index 1100223..3815d07 100644 --- a/internal/processor/router-output.go +++ b/internal/processor/router-output.go @@ -19,13 +19,12 @@ type RouterOutput struct { func (ro *RouterOutput) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) { - router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) - if !ok { + if wrappedPayload.Router == nil { wrappedPayload.End = true return wrappedPayload, errors.New("router.output no router found") } - err := router.HandleOutput(ctx, ro.ModuleId, wrappedPayload.Payload) + err := wrappedPayload.Router.HandleOutput(ctx, ro.ModuleId, wrappedPayload.Payload) if err != nil { wrappedPayload.End = true diff --git a/internal/processor/script-js.go b/internal/processor/script-js.go index 2733bc0..68876fb 100644 --- a/internal/processor/script-js.go +++ b/internal/processor/script-js.go @@ -34,8 +34,6 @@ func (sj *ScriptJS) Process(ctx context.Context, wrappedPayload common.WrappedPa sj.vm.SetProperty(sj.vm.GlobalObject(), sj.payloadAtom, wrappedPayload.Payload) - sj.vm.SetProperty(sj.vm.GlobalObject(), sj.senderAtom, wrappedPayload.Sender) - _, err := sj.vm.Eval(sj.Program, quickjs.EvalGlobal) if err != nil { diff --git a/internal/processor/test/artnet-packet-decode_test.go b/internal/processor/test/artnet-packet-decode_test.go index e58b48a..33ea0de 100644 --- a/internal/processor/test/artnet-packet-decode_test.go +++ b/internal/processor/test/artnet-packet-decode_test.go @@ -58,7 +58,7 @@ func TestGoodArtnetPacketDecode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := packetDecoder.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := packetDecoder.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("artnet.packet.decode processing failed: %s", err) @@ -94,7 +94,7 @@ func TestBadArtnetPacketDecode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := packetDecoder.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := packetDecoder.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("artnet.packet.decode expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/artnet-packet-encode_test.go b/internal/processor/test/artnet-packet-encode_test.go index eb8a54a..9364a46 100644 --- a/internal/processor/test/artnet-packet-encode_test.go +++ b/internal/processor/test/artnet-packet-encode_test.go @@ -58,7 +58,7 @@ func TestGoodArtnetPacketEncode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := packetEncoder.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := packetEncoder.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("artnet.packet.encode processing failed: %s", err) @@ -89,7 +89,7 @@ func TestBadArtnetPacketEncode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := packetEncoder.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := packetEncoder.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("artnet.packet.encode expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/db-query_test.go b/internal/processor/test/db-query_test.go index 7e48311..8703f21 100644 --- a/internal/processor/test/db-query_test.go +++ b/internal/processor/test/db-query_test.go @@ -1,7 +1,6 @@ package processor_test import ( - "context" "reflect" "testing" @@ -36,12 +35,12 @@ func TestDbQueryFromRegistry(t *testing.T) { payload := "hello" expected := map[string]any{"sqlite_version()": "3.53.0"} - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(test.GetContextWithModules( - t.Context(), - map[string]common.Module{ + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{ + Payload: payload, + Modules: map[string]common.Module{ "test": test.NewTestDBModule("test"), }, - ), payload)) + }) if err != nil { t.Fatalf("db.query processing failed: %s", err) } @@ -115,12 +114,12 @@ func TestGoodDbQuery(t *testing.T) { t.Fatalf("db.query failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(test.GetContextWithModules( - t.Context(), - map[string]common.Module{ + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{ + Modules: map[string]common.Module{ "test": test.NewTestDBModule("test"), }, - ), testCase.payload)) + Payload: testCase.payload, + }) if err != nil { t.Fatalf("db.query processing failed: %s", err) @@ -135,11 +134,11 @@ func TestGoodDbQuery(t *testing.T) { func TestBadDbQuery(t *testing.T) { tests := []struct { - name string - params map[string]any - payload any - wrappedPayloadCtx context.Context - errorString string + name string + params map[string]any + payload any + wrappedPayloadModules map[string]common.Module + errorString string }{ { name: "no module param", @@ -147,9 +146,9 @@ func TestBadDbQuery(t *testing.T) { params: map[string]any{ "query": "SELECT sqlite_version();", }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{ + wrappedPayloadModules: map[string]common.Module{ "test": test.NewTestDBModule("test"), - }), + }, errorString: "db.query module error: not found", }, { @@ -159,9 +158,9 @@ func TestBadDbQuery(t *testing.T) { "module": 1, "query": "SELECT sqlite_version();", }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{ + wrappedPayloadModules: map[string]common.Module{ "test": test.NewTestDBModule("test"), - }), + }, errorString: "db.query module error: not a string", }, { @@ -170,9 +169,9 @@ func TestBadDbQuery(t *testing.T) { params: map[string]any{ "module": "test", }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{ + wrappedPayloadModules: map[string]common.Module{ "test": test.NewTestDBModule("test"), - }), + }, errorString: "db.query query error: not found", }, { @@ -182,9 +181,9 @@ func TestBadDbQuery(t *testing.T) { "module": "test", "query": 1, }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{ + wrappedPayloadModules: map[string]common.Module{ "test": test.NewTestDBModule("test"), - }), + }, errorString: "db.query query error: not a string", }, { @@ -194,9 +193,9 @@ func TestBadDbQuery(t *testing.T) { "module": "test", "query": "select * from {{", }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{ + wrappedPayloadModules: map[string]common.Module{ "test": test.NewTestDBModule("test"), - }), + }, errorString: "template: query:1: unclosed action", }, { @@ -206,9 +205,9 @@ func TestBadDbQuery(t *testing.T) { "module": "test", "query": "select * from {{.Data}}", }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{ + wrappedPayloadModules: map[string]common.Module{ "test": test.NewTestDBModule("test"), - }), + }, errorString: "template: query:1:16: executing \"query\" at <.Data>: can't evaluate field Data in type common.WrappedPayload", }, { @@ -218,9 +217,9 @@ func TestBadDbQuery(t *testing.T) { "module": "test", "query": "select * from asdf;", }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{ + wrappedPayloadModules: map[string]common.Module{ "test": test.NewTestDBModule("test"), - }), + }, errorString: "db.query error executing query: SQL logic error: no such table: asdf (1)", }, { @@ -230,8 +229,8 @@ func TestBadDbQuery(t *testing.T) { "module": "test", "query": "select * from test;", }, - wrappedPayloadCtx: t.Context(), - errorString: "db.query wrapped payload has no modules", + wrappedPayloadModules: nil, + errorString: "db.query wrapped payload has no modules", }, { name: "module not found in context", @@ -240,8 +239,8 @@ func TestBadDbQuery(t *testing.T) { "module": "test", "query": "select * from test;", }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{}), - errorString: "db.query unable to find module with id: test", + wrappedPayloadModules: map[string]common.Module{}, + errorString: "db.query unable to find module with id: test", }, { name: "module not found in context", @@ -250,8 +249,8 @@ func TestBadDbQuery(t *testing.T) { "module": "test", "query": "select * from test;", }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{}), - errorString: "db.query unable to find module with id: test", + wrappedPayloadModules: map[string]common.Module{}, + errorString: "db.query unable to find module with id: test", }, { name: "module not a DatabseModule", @@ -260,9 +259,9 @@ func TestBadDbQuery(t *testing.T) { "module": "test", "query": "select * from test;", }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{ + wrappedPayloadModules: map[string]common.Module{ "test": test.NewTestKVModule("test"), - }), + }, errorString: "db.query module with id test is not a DatabaseModule", }, } @@ -287,7 +286,10 @@ func TestBadDbQuery(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(test.wrappedPayloadCtx, test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{ + Payload: test.payload, + Modules: test.wrappedPayloadModules, + }) if err == nil { t.Fatalf("db.query expected to fail but got payload: %+v", got) diff --git a/internal/processor/test/debug-log_test.go b/internal/processor/test/debug-log_test.go index ac11a41..13c4462 100644 --- a/internal/processor/test/debug-log_test.go +++ b/internal/processor/test/debug-log_test.go @@ -30,7 +30,7 @@ func TestDebugLogFromRegistry(t *testing.T) { payload := "test" expected := "test" - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: payload}) if err != nil { t.Fatalf("debug.log processing failed: %s", err) } @@ -66,7 +66,7 @@ func TestGoodDebugLog(t *testing.T) { t.Fatalf("debug.log failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("debug.log processing failed: %s", err) } @@ -106,7 +106,7 @@ func TestBadDebugLog(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("debug.log expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/filter-change_test.go b/internal/processor/test/filter-change_test.go index 7c07c96..4d16f65 100644 --- a/internal/processor/test/filter-change_test.go +++ b/internal/processor/test/filter-change_test.go @@ -29,7 +29,9 @@ func TestFilterChangeFromRegistry(t *testing.T) { payload := "hello" expected := "hello" - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{ + Payload: payload, + }) if err != nil { t.Fatalf("filter.change processing failed: %s", err) } @@ -70,7 +72,7 @@ func TestGoodFilterChange(t *testing.T) { t.Fatalf("filter.change failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("filter.change processing failed: %s", err) @@ -110,7 +112,7 @@ func TestBadFilterChange(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("filter.change expected to fail but got payload: %+v", got) diff --git a/internal/processor/test/filter-expr_test.go b/internal/processor/test/filter-expr_test.go index 7512893..ae25e78 100644 --- a/internal/processor/test/filter-expr_test.go +++ b/internal/processor/test/filter-expr_test.go @@ -85,7 +85,7 @@ func TestGoodFilterExpr(t *testing.T) { t.Fatalf("filter.expr failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), testCase.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: testCase.payload}) if err != nil { t.Fatalf("filter.expr processing failed: %s", err) @@ -159,7 +159,7 @@ func TestBadFilterExpr(t *testing.T) { } return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("filter.expr expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/filter-regex_test.go b/internal/processor/test/filter-regex_test.go index 3215441..6f839ab 100644 --- a/internal/processor/test/filter-regex_test.go +++ b/internal/processor/test/filter-regex_test.go @@ -31,7 +31,7 @@ func TestFilterRegexFromRegistry(t *testing.T) { payload := "hello" expected := "hello" - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: payload}) if err != nil { t.Fatalf("filter.regex processing failed: %s", err) } @@ -90,7 +90,7 @@ func TestGoodFilterRegex(t *testing.T) { t.Fatalf("filter.regex failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("filter.regex processing failed: %s", err) @@ -161,7 +161,7 @@ func TestBadFilterRegex(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("filter.regex expected to fail but got payload: %+v", got) diff --git a/internal/processor/test/float-parse_test.go b/internal/processor/test/float-parse_test.go index 76dd0c2..5193a81 100644 --- a/internal/processor/test/float-parse_test.go +++ b/internal/processor/test/float-parse_test.go @@ -76,7 +76,7 @@ func TestGoodFloatParse(t *testing.T) { t.Fatalf("float.parse failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("float.parse processing failed: %s", err) } @@ -152,7 +152,7 @@ func TestBadFloatParse(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("float.parse expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/float-random_test.go b/internal/processor/test/float-random_test.go index 4f923b2..52b1abb 100644 --- a/internal/processor/test/float-random_test.go +++ b/internal/processor/test/float-random_test.go @@ -71,7 +71,7 @@ func TestGoodFloatRandom(t *testing.T) { t.Fatalf("float.random failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("float.random processing failed: %s", err) } @@ -172,7 +172,7 @@ func TestBadFloatRandom(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("float.random expected to fail but got payload: %+v", got) diff --git a/internal/processor/test/free-d-create_test.go b/internal/processor/test/free-d-create_test.go index 77429be..9e5ba0a 100644 --- a/internal/processor/test/free-d-create_test.go +++ b/internal/processor/test/free-d-create_test.go @@ -103,7 +103,7 @@ func TestGoodFreeDCreate(t *testing.T) { t.Fatalf("freed.create failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("freed.create processing failed: %s", err) } @@ -865,7 +865,7 @@ func TestBadFreeDCreate(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("freed.create expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/free-d-decode_test.go b/internal/processor/test/free-d-decode_test.go index 2dbee4f..977c863 100644 --- a/internal/processor/test/free-d-decode_test.go +++ b/internal/processor/test/free-d-decode_test.go @@ -59,7 +59,7 @@ func TestGoodFreeDDecode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := packetEncoder.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := packetEncoder.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("freed.decode processing failed: %s", err) @@ -90,7 +90,7 @@ func TestBadFreeDDecode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := packetEncoder.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := packetEncoder.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("freed.decode expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/free-d-encode_test.go b/internal/processor/test/free-d-encode_test.go index df89653..f1b0ac0 100644 --- a/internal/processor/test/free-d-encode_test.go +++ b/internal/processor/test/free-d-encode_test.go @@ -59,7 +59,7 @@ func TestGoodFreeDEncode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := packetEncoder.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := packetEncoder.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("freed.encode processing failed: %s", err) @@ -90,7 +90,7 @@ func TestBadFreeDEncode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := packetEncoder.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := packetEncoder.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("freed.encode expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/http-request-do_test.go b/internal/processor/test/http-request-do_test.go index 93b5b93..db03ff2 100644 --- a/internal/processor/test/http-request-do_test.go +++ b/internal/processor/test/http-request-do_test.go @@ -58,7 +58,7 @@ func TestGoodHTTPRequestDo(t *testing.T) { t.Fatalf("http.request.do failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("http.request.do processing failed: %s", err) } @@ -149,7 +149,7 @@ func TestBadHTTPRequestDo(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("http.request.do expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/http-response-create_test.go b/internal/processor/test/http-response-create_test.go index 6851090..0484e7e 100644 --- a/internal/processor/test/http-response-create_test.go +++ b/internal/processor/test/http-response-create_test.go @@ -68,7 +68,7 @@ func TestGoodHTTPResponseCreate(t *testing.T) { t.Fatalf("http.response.create failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("http.response.create processing failed: %s", err) } @@ -145,7 +145,7 @@ func TestBadHTTPResponseCreate(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("http.response.create expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/int-parse_test.go b/internal/processor/test/int-parse_test.go index fac379c..4587b0c 100644 --- a/internal/processor/test/int-parse_test.go +++ b/internal/processor/test/int-parse_test.go @@ -97,7 +97,7 @@ func TestGoodIntParse(t *testing.T) { t.Fatalf("int.parse failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("int.parse processing failed: %s", err) } @@ -186,7 +186,7 @@ func TestBadIntParse(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("int.parse expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/int-random_test.go b/internal/processor/test/int-random_test.go index 0ac570c..863cad6 100644 --- a/internal/processor/test/int-random_test.go +++ b/internal/processor/test/int-random_test.go @@ -51,7 +51,7 @@ func TestIntRandomGoodConfig(t *testing.T) { payload := "12345" - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: payload}) if err != nil { t.Fatalf("int.random processing failed: %s", err) } @@ -98,7 +98,7 @@ func TestGoodIntRandom(t *testing.T) { t.Fatalf("int.random failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("int.random processing failed: %s", err) } @@ -183,7 +183,7 @@ func TestBadIntRandom(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("int.random expected to fail but got payload: %+v", got) diff --git a/internal/processor/test/int-scale_test.go b/internal/processor/test/int-scale_test.go index 1a1dbb3..551b9cd 100644 --- a/internal/processor/test/int-scale_test.go +++ b/internal/processor/test/int-scale_test.go @@ -69,7 +69,7 @@ func TestGoodIntScale(t *testing.T) { t.Fatalf("int.scale failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("int.scale processing failed: %s", err) } @@ -157,7 +157,7 @@ func TestBadIntScale(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("int.scale expected to fail but got payload: %+v", got) diff --git a/internal/processor/test/json-decode_test.go b/internal/processor/test/json-decode_test.go index 525c17f..2d30099 100644 --- a/internal/processor/test/json-decode_test.go +++ b/internal/processor/test/json-decode_test.go @@ -32,7 +32,7 @@ func TestJsonDecodeFromRegistry(t *testing.T) { "property": "hello", } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: payload}) if err != nil { t.Fatalf("json.decode processing failed: %s", err) } @@ -75,7 +75,7 @@ func TestGoodJsonDecode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := jsonDecoder.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := jsonDecoder.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("json.decode processing failed: %s", err) } @@ -113,7 +113,7 @@ func TestBadJsonDecode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := stringEncoder.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := stringEncoder.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("json.decode expected to fail but got payload: %+v", got) diff --git a/internal/processor/test/json-encode_test.go b/internal/processor/test/json-encode_test.go index a74b87a..8996a1c 100644 --- a/internal/processor/test/json-encode_test.go +++ b/internal/processor/test/json-encode_test.go @@ -35,7 +35,7 @@ func TestJsonEncodeFromRegistry(t *testing.T) { expected := []byte("{\"property\":\"hello\"}") - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: payload}) if err != nil { t.Fatalf("json.encode processing failed: %s", err) } @@ -69,7 +69,7 @@ func TestGoodJsonEncode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := jsonEncoder.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := jsonEncoder.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("json.encode processing failed: %s", err) } @@ -102,7 +102,7 @@ func TestBadJsonEncode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := stringEncoder.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := stringEncoder.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("json.encode expected to fail but got payload: %+v", got) diff --git a/internal/processor/test/kv-get_test.go b/internal/processor/test/kv-get_test.go index 1d57c6b..39fc6a2 100644 --- a/internal/processor/test/kv-get_test.go +++ b/internal/processor/test/kv-get_test.go @@ -1,7 +1,6 @@ package processor_test import ( - "context" "reflect" "testing" @@ -35,12 +34,12 @@ func TestKvGetFromRegistry(t *testing.T) { payload := "hello" expected := "test" - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(test.GetContextWithModules( - t.Context(), - map[string]common.Module{ + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{ + Modules: map[string]common.Module{ "test": test.NewTestKVModule("test"), }, - ), payload)) + Payload: payload, + }) if err != nil { t.Fatalf("kv.get processing failed: %s", err) } @@ -95,12 +94,12 @@ func TestGoodKvGet(t *testing.T) { t.Fatalf("kv.get failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(test.GetContextWithModules( - t.Context(), - map[string]common.Module{ + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{ + Modules: map[string]common.Module{ "test": test.NewTestKVModule("test"), }, - ), testCase.payload)) + Payload: testCase.payload, + }) if err != nil { t.Fatalf("kv.get processing failed: %s", err) @@ -115,11 +114,11 @@ func TestGoodKvGet(t *testing.T) { func TestBadKvGet(t *testing.T) { tests := []struct { - name string - params map[string]any - payload any - wrappedPayloadCtx context.Context - errorString string + name string + params map[string]any + payload any + wrappedPayloadModules map[string]common.Module + errorString string }{ { name: "no module param", @@ -127,9 +126,9 @@ func TestBadKvGet(t *testing.T) { params: map[string]any{ "key": "test", }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{ + wrappedPayloadModules: map[string]common.Module{ "test": test.NewTestKVModule("test"), - }), + }, errorString: "kv.get module error: not found", }, { @@ -139,9 +138,9 @@ func TestBadKvGet(t *testing.T) { "module": 1, "key": "test", }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{ + wrappedPayloadModules: map[string]common.Module{ "test": test.NewTestKVModule("test"), - }), + }, errorString: "kv.get module error: not a string", }, { @@ -150,9 +149,9 @@ func TestBadKvGet(t *testing.T) { params: map[string]any{ "module": "test", }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{ + wrappedPayloadModules: map[string]common.Module{ "test": test.NewTestKVModule("test"), - }), + }, errorString: "kv.get key error: not found", }, { @@ -162,9 +161,9 @@ func TestBadKvGet(t *testing.T) { "module": "test", "key": 1, }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{ + wrappedPayloadModules: map[string]common.Module{ "test": test.NewTestKVModule("test"), - }), + }, errorString: "kv.get key error: not a string", }, { @@ -174,8 +173,8 @@ func TestBadKvGet(t *testing.T) { "module": "test", "key": "test", }, - wrappedPayloadCtx: t.Context(), - errorString: "kv.get wrapped payload has no modules", + wrappedPayloadModules: nil, + errorString: "kv.get wrapped payload has no modules", }, { name: "module not found in context", @@ -184,8 +183,8 @@ func TestBadKvGet(t *testing.T) { "module": "test", "key": "test", }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{}), - errorString: "kv.get unable to find module with id: test", + wrappedPayloadModules: map[string]common.Module{}, + errorString: "kv.get unable to find module with id: test", }, { name: "module not a kv module", @@ -194,9 +193,9 @@ func TestBadKvGet(t *testing.T) { "module": "test", "key": "test", }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{ + wrappedPayloadModules: map[string]common.Module{ "test": test.NewTestDBModule("test"), - }), + }, errorString: "kv.get module with id test is not a KeyValueModule", }, } @@ -221,7 +220,7 @@ func TestBadKvGet(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(test.wrappedPayloadCtx, test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Modules: test.wrappedPayloadModules, Payload: test.payload}) if err == nil { t.Fatalf("kv.get expected to fail but got payload: %+v", got) diff --git a/internal/processor/test/kv-set_test.go b/internal/processor/test/kv-set_test.go index 62677ea..e026cc3 100644 --- a/internal/processor/test/kv-set_test.go +++ b/internal/processor/test/kv-set_test.go @@ -1,7 +1,6 @@ package processor_test import ( - "context" "reflect" "testing" @@ -36,12 +35,12 @@ func TestKvSetFromRegistry(t *testing.T) { payload := "" expected := "" - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(test.GetContextWithModules( - t.Context(), - map[string]common.Module{ + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{ + Modules: map[string]common.Module{ "test": &test.TestKVModule{}, }, - ), payload)) + Payload: payload, + }) if err != nil { t.Fatalf("kv.set processing failed: %s", err) } @@ -86,12 +85,12 @@ func TestGoodKvSet(t *testing.T) { t.Fatalf("kv.set failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(test.GetContextWithModules( - t.Context(), - map[string]common.Module{ + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{ + Modules: map[string]common.Module{ "test": &test.TestKVModule{}, }, - ), testCase.payload)) + Payload: testCase.payload, + }) if err != nil { t.Fatalf("kv.set processing failed: %s", err) @@ -106,11 +105,11 @@ func TestGoodKvSet(t *testing.T) { func TestBadKvSet(t *testing.T) { testCases := []struct { - name string - params map[string]any - payload any - wrappedPayloadCtx context.Context - errorString string + name string + params map[string]any + payload any + wrappedPayloadModules map[string]common.Module + errorString string }{ { name: "no module param", @@ -119,9 +118,9 @@ func TestBadKvSet(t *testing.T) { "key": "test", "value": "test", }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{ + wrappedPayloadModules: map[string]common.Module{ "test": &test.TestKVModule{}, - }), + }, errorString: "kv.set module error: not found", }, { @@ -132,9 +131,9 @@ func TestBadKvSet(t *testing.T) { "key": "test", "value": "test", }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{ + wrappedPayloadModules: map[string]common.Module{ "test": &test.TestKVModule{}, - }), + }, errorString: "kv.set module error: not a string", }, { @@ -144,9 +143,9 @@ func TestBadKvSet(t *testing.T) { "module": "test", "value": "test", }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{ + wrappedPayloadModules: map[string]common.Module{ "test": &test.TestKVModule{}, - }), + }, errorString: "kv.set key error: not found", }, { @@ -157,9 +156,9 @@ func TestBadKvSet(t *testing.T) { "key": 1, "value": "test", }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{ + wrappedPayloadModules: map[string]common.Module{ "test": &test.TestKVModule{}, - }), + }, errorString: "kv.set key error: not a string", }, { @@ -169,9 +168,9 @@ func TestBadKvSet(t *testing.T) { "module": "test", "key": "test", }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{ + wrappedPayloadModules: map[string]common.Module{ "test": &test.TestKVModule{}, - }), + }, errorString: "kv.set value error: not found", }, { @@ -182,9 +181,9 @@ func TestBadKvSet(t *testing.T) { "key": "test", "value": 1, }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{ + wrappedPayloadModules: map[string]common.Module{ "test": &test.TestKVModule{}, - }), + }, errorString: "kv.set value error: not a string", }, { @@ -195,8 +194,8 @@ func TestBadKvSet(t *testing.T) { "key": "test", "value": "hello", }, - wrappedPayloadCtx: t.Context(), - errorString: "kv.set wrapped payload has no modules", + wrappedPayloadModules: nil, + errorString: "kv.set wrapped payload has no modules", }, { name: "value template syntax error", @@ -206,9 +205,9 @@ func TestBadKvSet(t *testing.T) { "key": "test", "value": "{{", }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{ + wrappedPayloadModules: map[string]common.Module{ "test": &test.TestKVModule{}, - }), + }, errorString: "template: template:1: unclosed action", }, { @@ -219,9 +218,9 @@ func TestBadKvSet(t *testing.T) { "key": "test", "value": "{{.Data}}", }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{ + wrappedPayloadModules: map[string]common.Module{ "test": &test.TestKVModule{}, - }), + }, errorString: "template: template:1:2: executing \"template\" at <.Data>: can't evaluate field Data in type common.WrappedPayload", }, { @@ -232,8 +231,8 @@ func TestBadKvSet(t *testing.T) { "key": "test", "value": "hello", }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{}), - errorString: "kv.set unable to find module with id: test", + wrappedPayloadModules: map[string]common.Module{}, + errorString: "kv.set unable to find module with id: test", }, { name: "module not a kv module", @@ -243,9 +242,9 @@ func TestBadKvSet(t *testing.T) { "key": "test", "value": "hello", }, - wrappedPayloadCtx: test.GetContextWithModules(t.Context(), map[string]common.Module{ + wrappedPayloadModules: map[string]common.Module{ "test": test.NewTestDBModule("test"), - }), + }, errorString: "kv.set module with id test is not a KeyValueModule", }, } @@ -270,7 +269,7 @@ func TestBadKvSet(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(testCase.wrappedPayloadCtx, testCase.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Modules: testCase.wrappedPayloadModules, Payload: testCase.payload}) if err == nil { t.Fatalf("kv.set expected to fail but got payload: %+v", got) diff --git a/internal/processor/test/midi-control_change-create_test.go b/internal/processor/test/midi-control_change-create_test.go index 060acd8..483af0f 100644 --- a/internal/processor/test/midi-control_change-create_test.go +++ b/internal/processor/test/midi-control_change-create_test.go @@ -71,7 +71,7 @@ func TestGoodMIDIControlChangeCreate(t *testing.T) { t.Fatalf("midi.control_change.create failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("midi.control_change.create processing failed: %s", err) } @@ -147,7 +147,7 @@ func TestBadMIDIControlChangeCreate(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("midi.control_change.create expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/midi-message-decode_test.go b/internal/processor/test/midi-message-decode_test.go index a2c8fdf..80842e1 100644 --- a/internal/processor/test/midi-message-decode_test.go +++ b/internal/processor/test/midi-message-decode_test.go @@ -45,7 +45,7 @@ func TestGoodMIDIMessageDecode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("midi.message.decode processing failed: %s", err) } @@ -79,7 +79,7 @@ func TestBadMIDIMessageDecode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("midi.message.decode expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/midi-message-encode_test.go b/internal/processor/test/midi-message-encode_test.go index e5326fe..bb97b78 100644 --- a/internal/processor/test/midi-message-encode_test.go +++ b/internal/processor/test/midi-message-encode_test.go @@ -45,7 +45,7 @@ func TestGoodMIDIMessageEncode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := midiMessageEncoder.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := midiMessageEncoder.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("midi.message.encode processing failed: %s", err) } @@ -78,7 +78,7 @@ func TestBadMIDIMessageEncode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := midiMessageEncoder.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := midiMessageEncoder.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("midi.message.encode expected to fail but got payload: %+v", got) diff --git a/internal/processor/test/midi-message-unpack_test.go b/internal/processor/test/midi-message-unpack_test.go index 823238c..ce5b2d4 100644 --- a/internal/processor/test/midi-message-unpack_test.go +++ b/internal/processor/test/midi-message-unpack_test.go @@ -85,7 +85,7 @@ func TestGoodMIDIMessageUnpack(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("midi.message.unpack processing failed: %s", err) } @@ -138,7 +138,7 @@ func TestBadMIDIMessageUnpack(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("midi.message.unpack expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/midi-note_off-create_test.go b/internal/processor/test/midi-note_off-create_test.go index ac6c48b..ebf697b 100644 --- a/internal/processor/test/midi-note_off-create_test.go +++ b/internal/processor/test/midi-note_off-create_test.go @@ -70,7 +70,7 @@ func TestGoodMIDINoteOffCretea(t *testing.T) { t.Fatalf("midi.note_off.create failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("midi.note_off.create processing failed: %s", err) } @@ -146,7 +146,7 @@ func TestBadMIDINoteOffCretea(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("midi.note_off.create expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/midi-note_on-create_test.go b/internal/processor/test/midi-note_on-create_test.go index 9867c9d..079a0f0 100644 --- a/internal/processor/test/midi-note_on-create_test.go +++ b/internal/processor/test/midi-note_on-create_test.go @@ -70,7 +70,7 @@ func TestGoodMIDINoteOnCreate(t *testing.T) { t.Fatalf("midi.note_on.create failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("midi.note_on.create processing failed: %s", err) } @@ -143,7 +143,7 @@ func TestBadMIDINoteOnCreate(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("midi.note_on.create expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/midi-program_change-create_test.go b/internal/processor/test/midi-program_change-create_test.go index 4289730..7048b9a 100644 --- a/internal/processor/test/midi-program_change-create_test.go +++ b/internal/processor/test/midi-program_change-create_test.go @@ -69,7 +69,7 @@ func TestGoodMIDIProgramChangeCreate(t *testing.T) { t.Fatalf("midi.program_change.create failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("midi.program_change.create processing failed: %s", err) } @@ -133,7 +133,7 @@ func TestBadMIDIProgramChangeCreate(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("midi.program_change.create expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/mqtt-message-create_test.go b/internal/processor/test/mqtt-message-create_test.go index f767d70..7c445ed 100644 --- a/internal/processor/test/mqtt-message-create_test.go +++ b/internal/processor/test/mqtt-message-create_test.go @@ -104,7 +104,7 @@ func TestGoodMQTTMessageCreate(t *testing.T) { t.Fatalf("mqtt.message.create failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("mqtt.message.create processing failed: %s", err) @@ -228,7 +228,7 @@ func TestBadMQTTMessageCreate(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("mqtt.message.create expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/nats-message-create_test.go b/internal/processor/test/nats-message-create_test.go index 1f72918..e2af2e0 100644 --- a/internal/processor/test/nats-message-create_test.go +++ b/internal/processor/test/nats-message-create_test.go @@ -85,7 +85,7 @@ func TestGoodNATSMessageCreate(t *testing.T) { t.Fatalf("nats.message.create failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("nats.message.create processing failed: %s", err) } @@ -196,7 +196,7 @@ func TestBadNATSMessageCreate(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("nats.message.create expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/osc-message-create_test.go b/internal/processor/test/osc-message-create_test.go index 4b354f4..e2cbe5d 100644 --- a/internal/processor/test/osc-message-create_test.go +++ b/internal/processor/test/osc-message-create_test.go @@ -160,7 +160,7 @@ func TestGoodOSCMessageCreate(t *testing.T) { t.Fatalf("osc.message.create failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("osc.message.create processing failed: %s", err) @@ -388,7 +388,7 @@ func TestBadOSCMessageCreate(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("osc.message.create expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/osc-message-decode_test.go b/internal/processor/test/osc-message-decode_test.go index 37a50fa..e28f6d9 100644 --- a/internal/processor/test/osc-message-decode_test.go +++ b/internal/processor/test/osc-message-decode_test.go @@ -61,7 +61,7 @@ func TestGoodOSCMessageDecode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("osc.message.decode processing failed: %s", err) @@ -110,7 +110,7 @@ func TestBadOSCMessageDecode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("osc.message.decode expected to fail but got payload: %+v", got) diff --git a/internal/processor/test/osc-message-encode_test.go b/internal/processor/test/osc-message-encode_test.go index d22cd1f..e978d74 100644 --- a/internal/processor/test/osc-message-encode_test.go +++ b/internal/processor/test/osc-message-encode_test.go @@ -60,7 +60,7 @@ func TestGoodOSCMessageEncode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("osc.message.encode processing failed: %s", err) } @@ -101,7 +101,7 @@ func TestBadOSCMessageEncode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("osc.message.encode expected to fail but got payload: %+v", got) diff --git a/internal/processor/test/router-input_test.go b/internal/processor/test/router-input_test.go index 21ff410..fc6d8a3 100644 --- a/internal/processor/test/router-input_test.go +++ b/internal/processor/test/router-input_test.go @@ -1,7 +1,6 @@ package processor_test import ( - "context" "reflect" "testing" @@ -35,7 +34,10 @@ func TestRouterOutputFromRegistry(t *testing.T) { payload := "test" expected := "test" - got, err := processorInstance.Process(test.GetContextWithRouter(t.Context()), common.GetWrappedPayload(t.Context(), payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{ + Router: test.GetNewTestRouter(), + Payload: payload, + }) if err != nil { t.Fatalf("router.output processing failed: %s", err) } @@ -71,7 +73,7 @@ func TestGoodRouterOutput(t *testing.T) { t.Fatalf("router.output failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(test.GetContextWithRouter(t.Context()), testCase.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: testCase.payload}) if err != nil { t.Fatalf("router.output processing failed: %s", err) } @@ -85,40 +87,37 @@ func TestGoodRouterOutput(t *testing.T) { func TestBadRouterOutput(t *testing.T) { testCases := []struct { - name string - params map[string]any - payload any - processCtx context.Context - wrappedPayloadCtx context.Context - errorString string + name string + params map[string]any + payload any + router common.RouteIO + errorString string }{ { - name: "no module param", - params: map[string]any{}, - payload: "test", - processCtx: test.GetContextWithRouter(t.Context()), - wrappedPayloadCtx: t.Context(), - errorString: "router.output module error: not found", + name: "no module param", + params: map[string]any{}, + payload: "test", + router: test.GetNewTestRouter(), + errorString: "router.output module error: not found", }, { name: "non-string module", params: map[string]any{ "module": 123, }, - payload: "test", - processCtx: test.GetContextWithRouter(t.Context()), - wrappedPayloadCtx: t.Context(), - errorString: "router.output module error: not a string", + payload: "test", + router: test.GetNewTestRouter(), + + errorString: "router.output module error: not a string", }, { name: "router not found in context", params: map[string]any{ "module": "test", }, - payload: "test", - processCtx: t.Context(), - wrappedPayloadCtx: t.Context(), - errorString: "router.output no router found", + payload: "test", + router: nil, + errorString: "router.output no router found", }, } @@ -142,7 +141,7 @@ func TestBadRouterOutput(t *testing.T) { return } - got, err := processorInstance.Process(testCase.processCtx, common.GetWrappedPayload(testCase.wrappedPayloadCtx, testCase.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Router: testCase.router, Payload: testCase.payload}) if err == nil { t.Fatalf("router.output expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/router-output_test.go b/internal/processor/test/router-output_test.go index 13eacb0..b3859bf 100644 --- a/internal/processor/test/router-output_test.go +++ b/internal/processor/test/router-output_test.go @@ -1,7 +1,6 @@ package processor_test import ( - "context" "reflect" "testing" @@ -35,7 +34,10 @@ func TestRouterInputFromRegistry(t *testing.T) { payload := "test" expected := "test" - got, err := processorInstance.Process(test.GetContextWithRouter(t.Context()), common.GetWrappedPayload(t.Context(), payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{ + Router: test.GetNewTestRouter(), + Payload: payload, + }) if err != nil { t.Fatalf("router.input processing failed: %s", err) } @@ -71,7 +73,7 @@ func TestGoodRouterInput(t *testing.T) { t.Fatalf("router.input failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(test.GetContextWithRouter(t.Context()), testCase.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: testCase.payload}) if err != nil { t.Fatalf("router.input processing failed: %s", err) } @@ -85,40 +87,36 @@ func TestGoodRouterInput(t *testing.T) { func TestBadRouterInput(t *testing.T) { testCases := []struct { - name string - params map[string]any - payload any - processCtx context.Context - wrappedPayloadCtx context.Context - errorString string + name string + params map[string]any + payload any + router common.RouteIO + errorString string }{ { - name: "no source param", - params: map[string]any{}, - payload: "test", - processCtx: test.GetContextWithRouter(t.Context()), - wrappedPayloadCtx: t.Context(), - errorString: "router.input source error: not found", + name: "no source param", + params: map[string]any{}, + payload: "test", + router: test.GetNewTestRouter(), + errorString: "router.input source error: not found", }, { name: "non-string source", params: map[string]any{ "source": 123, }, - payload: "test", - processCtx: test.GetContextWithRouter(t.Context()), - wrappedPayloadCtx: t.Context(), - errorString: "router.input source error: not a string", + payload: "test", + router: test.GetNewTestRouter(), + errorString: "router.input source error: not a string", }, { name: "router not found in context", params: map[string]any{ "source": "test", }, - payload: "test", - processCtx: t.Context(), - wrappedPayloadCtx: t.Context(), - errorString: "router.input no router found", + payload: "test", + router: nil, + errorString: "router.input no router found", }, } @@ -142,7 +140,7 @@ func TestBadRouterInput(t *testing.T) { return } - got, err := processorInstance.Process(testCase.processCtx, common.GetWrappedPayload(testCase.wrappedPayloadCtx, testCase.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Router: testCase.router, Payload: testCase.payload}) if err == nil { t.Fatalf("router.input expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/script-expr_test.go b/internal/processor/test/script-expr_test.go index c70d705..db8c41e 100644 --- a/internal/processor/test/script-expr_test.go +++ b/internal/processor/test/script-expr_test.go @@ -76,7 +76,7 @@ func TestGoodScriptExpr(t *testing.T) { t.Fatalf("script.expr failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("script.expr processing failed: %s", err) @@ -134,7 +134,7 @@ func TestBadScriptExpr(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("script.expr expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/script-js_test.go b/internal/processor/test/script-js_test.go index 71165f2..8e5cacf 100644 --- a/internal/processor/test/script-js_test.go +++ b/internal/processor/test/script-js_test.go @@ -34,7 +34,7 @@ func TestScriptJSFromRegistry(t *testing.T) { payload := 1 expected := 2 - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: payload}) if err != nil { t.Fatalf("script.js processing failed: %s", err) } @@ -165,7 +165,7 @@ func TestGoodScriptJS(t *testing.T) { t.Fatalf("script.js failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("script.js processing failed: %s", err) @@ -209,7 +209,7 @@ func TestBadScriptJS(t *testing.T) { Params: test.params, }) - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("script.js expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/script-wasm_test.go b/internal/processor/test/script-wasm_test.go index fe0bdef..fcd492a 100644 --- a/internal/processor/test/script-wasm_test.go +++ b/internal/processor/test/script-wasm_test.go @@ -73,7 +73,7 @@ func TestGoodScriptWASM(t *testing.T) { t.Fatalf("script.wasm failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("script.wasm processing failed: %s", err) @@ -176,7 +176,7 @@ func TestBadScriptWASM(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("script.wasm expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/sip-response-audio-create_test.go b/internal/processor/test/sip-response-audio-create_test.go index c1de01b..797f68a 100644 --- a/internal/processor/test/sip-response-audio-create_test.go +++ b/internal/processor/test/sip-response-audio-create_test.go @@ -90,7 +90,7 @@ func TestGoodSipResponseAudioCreate(t *testing.T) { t.Fatalf("sip.response.audio.create failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("sip.response.audio.create processing failed: %s", err) } @@ -200,7 +200,7 @@ func TestBadSipResponseAudioCreate(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("sip.response.audio.create expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/sip-response-dtmf-create_test.go b/internal/processor/test/sip-response-dtmf-create_test.go index dd437b6..e3f3059 100644 --- a/internal/processor/test/sip-response-dtmf-create_test.go +++ b/internal/processor/test/sip-response-dtmf-create_test.go @@ -88,7 +88,7 @@ func TestGoodSipResponseDTMFCreate(t *testing.T) { t.Fatalf("sip.response.dtmf.create failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("sip.response.dtmf.create processing failed: %s", err) } @@ -208,7 +208,7 @@ func TestBadSipResponseDTMFCreate(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("sip.response.dtmf.create expected to fail but succeeded, got: %v", got) diff --git a/internal/processor/test/string-create_test.go b/internal/processor/test/string-create_test.go index 213a6ad..4033950 100644 --- a/internal/processor/test/string-create_test.go +++ b/internal/processor/test/string-create_test.go @@ -32,7 +32,7 @@ func TestStringCreateFromRegistry(t *testing.T) { payload := "hello" expected := "hello" - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: payload}) if err != nil { t.Fatalf("string.create processing failed: %s", err) } @@ -98,7 +98,7 @@ func TestGoodStringCreate(t *testing.T) { t.Fatalf("string.create failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("string.create processing failed: %s", err) @@ -174,7 +174,7 @@ func TestBadStringCreate(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("string.create expected to fail but got payload: %+v", got) diff --git a/internal/processor/test/string-decode_test.go b/internal/processor/test/string-decode_test.go index 0a1c6fa..8994369 100644 --- a/internal/processor/test/string-decode_test.go +++ b/internal/processor/test/string-decode_test.go @@ -28,7 +28,7 @@ func TestStringDecodeFromRegistry(t *testing.T) { payload := []byte{'h', 'e', 'l', 'l', 'o'} expected := "hello" - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: payload}) if err != nil { t.Fatalf("string.decode processing failed: %s", err) } @@ -54,7 +54,7 @@ func TestGoodStringDecode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := stringDecoder.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := stringDecoder.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("string.decode processing failed: %s", err) } @@ -87,7 +87,7 @@ func TestBadStringDecode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := stringDecoder.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := stringDecoder.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("string.decode expected to fail but got payload: %+v", got) diff --git a/internal/processor/test/string-encode_test.go b/internal/processor/test/string-encode_test.go index cec7d86..e3926a5 100644 --- a/internal/processor/test/string-encode_test.go +++ b/internal/processor/test/string-encode_test.go @@ -29,7 +29,7 @@ func TestStringEncodeFromRegistry(t *testing.T) { payload := "hello" expected := []byte{'h', 'e', 'l', 'l', 'o'} - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: payload}) if err != nil { t.Fatalf("string.encode processing failed: %s", err) } @@ -61,7 +61,7 @@ func TestGoodStringEncode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := stringEncoder.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := stringEncoder.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("string.encode processing failed: %s", err) } @@ -93,7 +93,7 @@ func TestBadStringEncode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := stringEncoder.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := stringEncoder.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("string.encode expected to fail but got payload: %+v", got) diff --git a/internal/processor/test/string-split_test.go b/internal/processor/test/string-split_test.go index d2a9246..9eaf7bb 100644 --- a/internal/processor/test/string-split_test.go +++ b/internal/processor/test/string-split_test.go @@ -32,7 +32,7 @@ func TestStringSplitFromRegistry(t *testing.T) { payload := "part1,part2,part3" expected := []string{"part1", "part2", "part3"} - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: payload}) if err != nil { t.Fatalf("string.split processing failed: %s", err) } @@ -79,7 +79,7 @@ func TestGoodStringSplit(t *testing.T) { t.Fatalf("string.split failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("string.split processing failed: %s", err) } @@ -142,7 +142,7 @@ func TestBadStringSplit(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("string.split expected error but got none, payload: %+v", got) diff --git a/internal/processor/test/struct-field-get_test.go b/internal/processor/test/struct-field-get_test.go index 6104285..b78ee11 100644 --- a/internal/processor/test/struct-field-get_test.go +++ b/internal/processor/test/struct-field-get_test.go @@ -33,7 +33,7 @@ func TestStructFieldGetFromRegistry(t *testing.T) { payload := test.TestStruct{Data: "hello"} expected := "hello" - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: payload}) if err != nil { t.Fatalf("struct.field.get processing failed: %s", err) } @@ -107,7 +107,7 @@ func TestGoodStructFieldGet(t *testing.T) { t.Fatalf("struct.field.get failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("struct.field.get processing failed: %s", err) @@ -179,7 +179,7 @@ func TestBadStructFieldGet(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("struct.field.get expected to fail but got payload: %+v", got) diff --git a/internal/processor/test/struct-method-get_test.go b/internal/processor/test/struct-method-get_test.go index 3bf6728..c45f996 100644 --- a/internal/processor/test/struct-method-get_test.go +++ b/internal/processor/test/struct-method-get_test.go @@ -33,7 +33,7 @@ func TestStructMethodGetFromRegistry(t *testing.T) { payload := test.TestStruct{Data: "hello"} expected := "hello" - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: payload}) if err != nil { t.Fatalf("struct.method.get processing failed: %s", err) } @@ -132,7 +132,7 @@ func TestGoodStructMethodGet(t *testing.T) { t.Fatalf("struct.method.get failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("struct.method.get processing failed: %s", err) @@ -204,7 +204,7 @@ func TestBadStructMethodGet(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("struct.method.get expected to fail but got payload: %+v", got) diff --git a/internal/processor/test/time-sleep_test.go b/internal/processor/test/time-sleep_test.go index 6fa75d3..139613b 100644 --- a/internal/processor/test/time-sleep_test.go +++ b/internal/processor/test/time-sleep_test.go @@ -59,7 +59,7 @@ func TestGoodTimeSleep(t *testing.T) { t.Fatalf("time.sleep failed to create processor: %s", err) } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err != nil { t.Fatalf("time.sleep processing failed: %s", err) @@ -114,7 +114,7 @@ func TestBadTimeSleep(t *testing.T) { return } - got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload)) + got, err := processorInstance.Process(t.Context(), common.WrappedPayload{Payload: test.payload}) if err == nil { t.Fatalf("time.sleep expected to fail but succeeded, got: %v", got) diff --git a/internal/route/route.go b/internal/route/route.go index 6893843..31a9757 100644 --- a/internal/route/route.go +++ b/internal/route/route.go @@ -43,10 +43,9 @@ func (r *Route) Input() string { return r.input } -func (r *Route) ProcessPayload(ctx context.Context, payload any) (any, error) { - wrappedPayload := common.GetWrappedPayload(ctx, payload) +func (r *Route) ProcessPayload(ctx context.Context, wrappedPayload common.WrappedPayload) (any, error) { tracer := otel.Tracer("route") - processCtx, processSpan := tracer.Start(ctx, "ProcessPayload", trace.WithAttributes(attribute.String("payload.type", fmt.Sprintf("%T", payload)))) + processCtx, processSpan := tracer.Start(ctx, "ProcessPayload", trace.WithAttributes(attribute.String("payload.type", fmt.Sprintf("%T", wrappedPayload.Payload)))) defer processSpan.End() for processorIndex, processor := range r.processors { processorCtx, processorSpan := otel.Tracer("processor").Start(processCtx, "process", trace.WithAttributes(attribute.Int("processor.index", processorIndex), attribute.String("processor.type", processor.Type()))) diff --git a/internal/route/route_test.go b/internal/route/route_test.go index bad055e..3bd6640 100644 --- a/internal/route/route_test.go +++ b/internal/route/route_test.go @@ -55,7 +55,10 @@ func TestGoodRouteHandleInput(t *testing.T) { } inputData := "test input data" - payload, err := testRoute.ProcessPayload(context.WithValue(t.Context(), common.RouterContextKey, &MockRouter{}), inputData) + payload, err := testRoute.ProcessPayload(t.Context(), common.WrappedPayload{ + Router: &MockRouter{}, + Payload: inputData, + }) if err != nil { t.Fatalf("route ProcessPayload returned error: %v", err) } @@ -90,7 +93,10 @@ func TestRouteHandleInputWithProcessorError(t *testing.T) { } inputData := "test input data" - _, err = testRoute.ProcessPayload(context.WithValue(t.Context(), common.RouterContextKey, &MockRouter{}), inputData) + _, err = testRoute.ProcessPayload(t.Context(), common.WrappedPayload{ + Router: &MockRouter{}, + Payload: inputData, + }) if err == nil { t.Fatalf("route HandleOutput did not return error for bad processor") } @@ -115,7 +121,10 @@ func TestRouteHandleNilPayload(t *testing.T) { return } - payload, err := testRoute.ProcessPayload(context.WithValue(t.Context(), common.RouterContextKey, &MockRouter{}), nil) + payload, err := testRoute.ProcessPayload(t.Context(), common.WrappedPayload{ + Router: &MockRouter{}, + Payload: nil, + }) if err != nil { t.Fatalf("route ProcessPayload returned error: %v", err) } @@ -143,7 +152,10 @@ func TestRouteHandleNilPayloadFromProcessor(t *testing.T) { t.Fatalf("route failed to create: %v", err) } - _, err = testRoute.ProcessPayload(context.WithValue(t.Context(), common.RouterContextKey, &MockRouter{}), "test") + _, err = testRoute.ProcessPayload(t.Context(), common.WrappedPayload{ + Router: &MockRouter{}, + Payload: "test", + }) if err != nil { t.Fatalf("route HandleOutput returned error for nil payload: %v", err) } diff --git a/internal/test/context.go b/internal/test/context.go deleted file mode 100644 index adbb469..0000000 --- a/internal/test/context.go +++ /dev/null @@ -1,27 +0,0 @@ -package test - -import ( - "context" - - "github.com/jwetzell/showbridge-go/internal/common" -) - -func GetContextWithModules(ctx context.Context, modules map[string]common.Module) context.Context { - ctx = context.WithValue(ctx, common.ModulesContextKey, modules) - return ctx -} - -func GetContextWithRouter(ctx context.Context) context.Context { - ctx = context.WithValue(ctx, common.RouterContextKey, GetNewTestRouter()) - return ctx -} - -func GetContextWithSender(ctx context.Context, sender any) context.Context { - ctx = context.WithValue(ctx, common.SenderContextKey, sender) - return ctx -} - -func GetContextWithSource(ctx context.Context, source string) context.Context { - ctx = context.WithValue(ctx, common.SourceContextKey, source) - return ctx -} diff --git a/internal/test/module.go b/internal/test/module.go index 8c63727..c95e616 100644 --- a/internal/test/module.go +++ b/internal/test/module.go @@ -4,13 +4,14 @@ import ( "context" "database/sql" + "github.com/jwetzell/showbridge-go/internal/common" _ "modernc.org/sqlite" ) type TestModule struct { } -func (m *TestModule) Start(ctx context.Context) error { +func (m *TestModule) Start(ctx context.Context, router common.RouteIO) error { <-ctx.Done() return nil } @@ -36,7 +37,7 @@ type TestKVModule struct { kvData map[string]any } -func (m *TestKVModule) Start(ctx context.Context) error { +func (m *TestKVModule) Start(ctx context.Context, router common.RouteIO) error { <-ctx.Done() return nil } @@ -73,7 +74,7 @@ type TestDBModule struct { db *sql.DB } -func (m *TestDBModule) Start(ctx context.Context) error { +func (m *TestDBModule) Start(ctx context.Context, router common.RouteIO) error { <-ctx.Done() return nil } diff --git a/router.go b/router.go index aeed26a..af6cf3c 100644 --- a/router.go +++ b/router.go @@ -74,7 +74,7 @@ func (r *Router) startModule(ctx context.Context, moduleId string) error { return errors.New("module id not found") } r.moduleWait.Go(func() { - err := moduleInstance.Start(ctx) + err := moduleInstance.Start(ctx, r) if err != nil { // TODO(jwetzell): propagate module run errors better r.logger.Error("error encountered running module", "moduleId", moduleId, "error", err) @@ -210,12 +210,15 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) routeWaitGroup.Go(func() { routeFound = true - routeContext := context.WithValue(spanCtx, common.SourceContextKey, sourceId) - routeContext = context.WithValue(routeContext, common.RouterContextKey, r) - routeContext = context.WithValue(routeContext, common.ModulesContextKey, r.ModuleInstances) - routeCtx, routeSpan := otel.Tracer("router").Start(routeContext, "route", trace.WithAttributes(attribute.Int("route.index", routeIndex), attribute.String("route.input", routeInstance.Input()))) - _, err := routeInstance.ProcessPayload(routeCtx, payload) + routeCtx, routeSpan := otel.Tracer("router").Start(spanCtx, "route", trace.WithAttributes(attribute.Int("route.index", routeIndex), attribute.String("route.input", routeInstance.Input()))) + _, err := routeInstance.ProcessPayload(routeCtx, common.WrappedPayload{ + Payload: payload, + Source: sourceId, + Modules: r.ModuleInstances, + Router: r, + End: false, + }) if err != nil { if routeIOErrors == nil { routeIOErrors = []common.RouteIOError{} @@ -298,11 +301,10 @@ func (r *Router) HandleOutput(ctx context.Context, destinationId string, payload } func (r *Router) startModules() { - contextWithRouter := context.WithValue(r.Context, common.RouterContextKey, r) for moduleId := range r.ModuleInstances { // TODO(jwetzell): handle module run errors - err := r.startModule(contextWithRouter, moduleId) + err := r.startModule(r.Context, moduleId) if err != nil { r.logger.Error("error starting module", "moduleId", moduleId, "error", err) } diff --git a/router_test.go b/router_test.go index afb7b10..a0aa937 100644 --- a/router_test.go +++ b/router_test.go @@ -33,12 +33,7 @@ func (mcm *MockCounterModule) Output(context.Context, any) error { return nil } -func (mcm *MockCounterModule) Start(ctx context.Context) error { - router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) - - if !ok { - return fmt.Errorf("mock.counter could not get router from context") - } +func (mcm *MockCounterModule) Start(ctx context.Context, router common.RouteIO) error { mcm.router = router moduleContext, cancel := context.WithCancel(ctx) mcm.ctx = moduleContext