diff --git a/internal/module/http-client.go b/internal/module/http-client.go index 5e1e9e8..3df0071 100644 --- a/internal/module/http-client.go +++ b/internal/module/http-client.go @@ -73,7 +73,7 @@ func (hc *HTTPClient) Output(ctx context.Context, payload any) error { } if hc.router != nil { - hc.router.HandleInput(hc.Id(), response) + hc.router.HandleInput(hc.ctx, hc.Id(), response) } return nil diff --git a/internal/module/http-server.go b/internal/module/http-server.go index 78cfcce..1930a3a 100644 --- a/internal/module/http-server.go +++ b/internal/module/http-server.go @@ -77,7 +77,7 @@ func (hs *HTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { } if hs.router != nil { - aRouteFound, routingErrors := hs.router.HandleInput(hs.Id(), r) + aRouteFound, routingErrors := hs.router.HandleInput(hs.ctx, hs.Id(), r) if aRouteFound { if routingErrors != nil { w.WriteHeader(http.StatusInternalServerError) diff --git a/internal/module/midi-input.go b/internal/module/midi-input.go index 82c07b6..ca0f384 100644 --- a/internal/module/midi-input.go +++ b/internal/module/midi-input.go @@ -69,7 +69,7 @@ func (mi *MIDIInput) Run() error { stop, err := midi.ListenTo(in, func(msg midi.Message, timestampms int32) { if mi.router != nil { - mi.router.HandleInput(mi.Id(), msg) + mi.router.HandleInput(mi.ctx, mi.Id(), msg) } }, midi.UseSysEx()) diff --git a/internal/module/mqtt-client.go b/internal/module/mqtt-client.go index d169ad8..b6225ec 100644 --- a/internal/module/mqtt-client.go +++ b/internal/module/mqtt-client.go @@ -90,7 +90,7 @@ func (mc *MQTTClient) Run() error { opts.OnConnect = func(c mqtt.Client) { token := mc.client.Subscribe(mc.Topic, 1, func(c mqtt.Client, m mqtt.Message) { - mc.router.HandleInput(mc.Id(), m) + mc.router.HandleInput(mc.ctx, mc.Id(), m) }) token.Wait() } diff --git a/internal/module/nats-client.go b/internal/module/nats-client.go index ad13e47..bd5ad5c 100644 --- a/internal/module/nats-client.go +++ b/internal/module/nats-client.go @@ -83,7 +83,7 @@ func (nc *NATSClient) Run() error { sub, err := nc.client.Subscribe(nc.Subject, func(msg *nats.Msg) { if nc.router != nil { - nc.router.HandleInput(nc.Id(), msg) + nc.router.HandleInput(nc.ctx, nc.Id(), msg) } }) diff --git a/internal/module/psn-client.go b/internal/module/psn-client.go index dec52ff..0a8beb5 100644 --- a/internal/module/psn-client.go +++ b/internal/module/psn-client.go @@ -87,7 +87,7 @@ func (pc *PSNClient) Run() error { if pc.router != nil { for _, tracker := range pc.decoder.Trackers { - pc.router.HandleInput(pc.Id(), tracker) + pc.router.HandleInput(pc.ctx, pc.Id(), tracker) } } else { pc.logger.Error("has no router") diff --git a/internal/module/serial-client.go b/internal/module/serial-client.go index 5ba972a..a7f049c 100644 --- a/internal/module/serial-client.go +++ b/internal/module/serial-client.go @@ -155,7 +155,7 @@ func (sc *SerialClient) Run() error { messages := sc.Framer.Decode(buffer[0:byteCount]) for _, message := range messages { if sc.router != nil { - sc.router.HandleInput(sc.Id(), message) + sc.router.HandleInput(sc.ctx, sc.Id(), message) } else { sc.logger.Error("input received but no router is configured") } diff --git a/internal/module/sip-call-server.go b/internal/module/sip-call-server.go index 53b7750..17d161e 100644 --- a/internal/module/sip-call-server.go +++ b/internal/module/sip-call-server.go @@ -143,7 +143,7 @@ func (scs *SIPCallServer) HandleCall(inDialog *diago.DialogServerSession) { inDialog.Trying() inDialog.Ringing() inDialog.Answer() - scs.router.HandleInput(scs.Id(), SIPCallMessage{ + scs.router.HandleInput(scs.ctx, scs.Id(), SIPCallMessage{ To: inDialog.ToUser(), }) <-inDialog.Context().Done() diff --git a/internal/module/sip-dtmf-server.go b/internal/module/sip-dtmf-server.go index c8020f3..ebc2efb 100644 --- a/internal/module/sip-dtmf-server.go +++ b/internal/module/sip-dtmf-server.go @@ -151,7 +151,7 @@ func (sds *SIPDTMFServer) HandleCall(inDialog *diago.DialogServerSession) error return reader.Listen(func(dtmf rune) error { if dtmf == rune(sds.Separator[0]) { if sds.router != nil { - sds.router.HandleInput(sds.Id(), SIPDTMFMessage{ + sds.router.HandleInput(sds.ctx, sds.Id(), SIPDTMFMessage{ To: inDialog.ToUser(), Digits: userString, }) diff --git a/internal/module/tcp-client.go b/internal/module/tcp-client.go index e0942eb..c12a5e1 100644 --- a/internal/module/tcp-client.go +++ b/internal/module/tcp-client.go @@ -142,7 +142,7 @@ func (tc *TCPClient) Run() error { messages := tc.framer.Decode(buffer[0:byteCount]) for _, message := range messages { if tc.router != nil { - tc.router.HandleInput(tc.Id(), message) + tc.router.HandleInput(tc.ctx, tc.Id(), message) } else { tc.logger.Error("input received but no router is configured") } diff --git a/internal/module/tcp-server.go b/internal/module/tcp-server.go index db791d6..3419475 100644 --- a/internal/module/tcp-server.go +++ b/internal/module/tcp-server.go @@ -155,7 +155,7 @@ ClientRead: messages := ts.Framer.Decode(buffer[0:byteCount]) for _, message := range messages { if ts.router != nil { - ts.router.HandleInput(ts.Id(), message) + ts.router.HandleInput(ts.ctx, ts.Id(), message) } else { ts.logger.Error("input received but no router is configured") } diff --git a/internal/module/time-interval.go b/internal/module/time-interval.go index 7b982b7..11178f7 100644 --- a/internal/module/time-interval.go +++ b/internal/module/time-interval.go @@ -67,7 +67,7 @@ func (i *TimeInterval) Run() error { return nil case <-ticker.C: if i.router != nil { - i.router.HandleInput(i.Id(), time.Now()) + i.router.HandleInput(i.ctx, i.Id(), time.Now()) } } } diff --git a/internal/module/time-timer.go b/internal/module/time-timer.go index 087898e..ea2c2e4 100644 --- a/internal/module/time-timer.go +++ b/internal/module/time-timer.go @@ -66,7 +66,7 @@ func (t *TimeTimer) Run() error { return nil case time := <-t.timer.C: if t.router != nil { - t.router.HandleInput(t.Id(), time) + t.router.HandleInput(t.ctx, t.Id(), time) } } } diff --git a/internal/module/udp-multicast.go b/internal/module/udp-multicast.go index c5bc834..3840056 100644 --- a/internal/module/udp-multicast.go +++ b/internal/module/udp-multicast.go @@ -105,7 +105,7 @@ func (um *UDPMulticast) Run() error { message := buffer[:numBytes] if um.router != nil { - um.router.HandleInput(um.Id(), message) + um.router.HandleInput(um.ctx, um.Id(), message) } else { um.logger.Error("input received but no router is configured") } diff --git a/internal/module/udp-server.go b/internal/module/udp-server.go index 0597bd9..f551605 100644 --- a/internal/module/udp-server.go +++ b/internal/module/udp-server.go @@ -115,7 +115,7 @@ func (us *UDPServer) Run() error { } message := buffer[:numBytes] if us.router != nil { - us.router.HandleInput(us.Id(), message) + us.router.HandleInput(us.ctx, us.Id(), message) } else { us.logger.Error("input received but no router is configured") } diff --git a/internal/route/route.go b/internal/route/route.go index 06e1f7f..c096b74 100644 --- a/internal/route/route.go +++ b/internal/route/route.go @@ -27,7 +27,7 @@ type RouteIOError struct { } type RouteIO interface { - HandleInput(sourceId string, payload any) (bool, []RouteIOError) + HandleInput(ctx context.Context, sourceId string, payload any) (bool, []RouteIOError) HandleOutput(ctx context.Context, destinationId string, payload any) []error } diff --git a/internal/route/route_test.go b/internal/route/route_test.go index eb56ec7..4190baa 100644 --- a/internal/route/route_test.go +++ b/internal/route/route_test.go @@ -30,8 +30,8 @@ func TestRouteCreate(t *testing.T) { type MockRouter struct{} -func (mr *MockRouter) HandleInput(sourceId string, payload any) []route.RouteIOError { - return nil +func (mr *MockRouter) HandleInput(ctx context.Context, sourceId string, payload any) (bool, []route.RouteIOError) { + return false, []route.RouteIOError{} } func (mr *MockRouter) HandleOutput(ctx context.Context, destinationId string, payload any) error { diff --git a/router.go b/router.go index 01519c4..5f87b72 100644 --- a/router.go +++ b/router.go @@ -3,7 +3,6 @@ package showbridge import ( "context" "errors" - "fmt" "log/slog" "sync" @@ -129,13 +128,13 @@ func (r *Router) Stop() { r.contextCancel() } -func (r *Router) HandleInput(sourceId string, payload any) (bool, []route.RouteIOError) { +func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) (bool, []route.RouteIOError) { var routeIOErrors []route.RouteIOError routeFound := false for routeIndex, routeInstance := range r.RouteInstances { if routeInstance.Input() == sourceId { routeFound = true - routeContext := context.WithValue(r.Context, route.SourceContextKey, sourceId) + routeContext := context.WithValue(ctx, route.SourceContextKey, sourceId) payload, err := routeInstance.ProcessPayload(routeContext, payload) if err != nil { @@ -177,8 +176,8 @@ func (r *Router) HandleOutput(ctx context.Context, destinationId string, payload outputErrors = []error{} } outputErrors = append(outputErrors, err) + r.logger.Error("unable to route output", "module", moduleInstance.Id(), "error", err) } - // r.logger.Error("unable to route output", "module", moduleInstance.Id(), "error", err) } } return outputErrors