mirror of
https://github.com/jwetzell/showbridge-go.git
synced 2026-04-26 12:55:29 +00:00
propagate a ctx all the way from input to output of a route
This commit is contained in:
@@ -73,7 +73,7 @@ func (hc *HTTPClient) Output(ctx context.Context, payload any) error {
|
||||
}
|
||||
|
||||
if hc.router != nil {
|
||||
hc.router.HandleInput(hc.Id(), response)
|
||||
hc.router.HandleInput(hc.ctx, hc.Id(), response)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -77,7 +77,7 @@ func (hs *HTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
if hs.router != nil {
|
||||
aRouteFound, routingErrors := hs.router.HandleInput(hs.Id(), r)
|
||||
aRouteFound, routingErrors := hs.router.HandleInput(hs.ctx, hs.Id(), r)
|
||||
if aRouteFound {
|
||||
if routingErrors != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
|
||||
@@ -69,7 +69,7 @@ func (mi *MIDIInput) Run() error {
|
||||
|
||||
stop, err := midi.ListenTo(in, func(msg midi.Message, timestampms int32) {
|
||||
if mi.router != nil {
|
||||
mi.router.HandleInput(mi.Id(), msg)
|
||||
mi.router.HandleInput(mi.ctx, mi.Id(), msg)
|
||||
}
|
||||
}, midi.UseSysEx())
|
||||
|
||||
|
||||
@@ -90,7 +90,7 @@ func (mc *MQTTClient) Run() error {
|
||||
|
||||
opts.OnConnect = func(c mqtt.Client) {
|
||||
token := mc.client.Subscribe(mc.Topic, 1, func(c mqtt.Client, m mqtt.Message) {
|
||||
mc.router.HandleInput(mc.Id(), m)
|
||||
mc.router.HandleInput(mc.ctx, mc.Id(), m)
|
||||
})
|
||||
token.Wait()
|
||||
}
|
||||
|
||||
@@ -83,7 +83,7 @@ func (nc *NATSClient) Run() error {
|
||||
|
||||
sub, err := nc.client.Subscribe(nc.Subject, func(msg *nats.Msg) {
|
||||
if nc.router != nil {
|
||||
nc.router.HandleInput(nc.Id(), msg)
|
||||
nc.router.HandleInput(nc.ctx, nc.Id(), msg)
|
||||
}
|
||||
})
|
||||
|
||||
|
||||
@@ -87,7 +87,7 @@ func (pc *PSNClient) Run() error {
|
||||
|
||||
if pc.router != nil {
|
||||
for _, tracker := range pc.decoder.Trackers {
|
||||
pc.router.HandleInput(pc.Id(), tracker)
|
||||
pc.router.HandleInput(pc.ctx, pc.Id(), tracker)
|
||||
}
|
||||
} else {
|
||||
pc.logger.Error("has no router")
|
||||
|
||||
@@ -155,7 +155,7 @@ func (sc *SerialClient) Run() error {
|
||||
messages := sc.Framer.Decode(buffer[0:byteCount])
|
||||
for _, message := range messages {
|
||||
if sc.router != nil {
|
||||
sc.router.HandleInput(sc.Id(), message)
|
||||
sc.router.HandleInput(sc.ctx, sc.Id(), message)
|
||||
} else {
|
||||
sc.logger.Error("input received but no router is configured")
|
||||
}
|
||||
|
||||
@@ -143,7 +143,7 @@ func (scs *SIPCallServer) HandleCall(inDialog *diago.DialogServerSession) {
|
||||
inDialog.Trying()
|
||||
inDialog.Ringing()
|
||||
inDialog.Answer()
|
||||
scs.router.HandleInput(scs.Id(), SIPCallMessage{
|
||||
scs.router.HandleInput(scs.ctx, scs.Id(), SIPCallMessage{
|
||||
To: inDialog.ToUser(),
|
||||
})
|
||||
<-inDialog.Context().Done()
|
||||
|
||||
@@ -151,7 +151,7 @@ func (sds *SIPDTMFServer) HandleCall(inDialog *diago.DialogServerSession) error
|
||||
return reader.Listen(func(dtmf rune) error {
|
||||
if dtmf == rune(sds.Separator[0]) {
|
||||
if sds.router != nil {
|
||||
sds.router.HandleInput(sds.Id(), SIPDTMFMessage{
|
||||
sds.router.HandleInput(sds.ctx, sds.Id(), SIPDTMFMessage{
|
||||
To: inDialog.ToUser(),
|
||||
Digits: userString,
|
||||
})
|
||||
|
||||
@@ -142,7 +142,7 @@ func (tc *TCPClient) Run() error {
|
||||
messages := tc.framer.Decode(buffer[0:byteCount])
|
||||
for _, message := range messages {
|
||||
if tc.router != nil {
|
||||
tc.router.HandleInput(tc.Id(), message)
|
||||
tc.router.HandleInput(tc.ctx, tc.Id(), message)
|
||||
} else {
|
||||
tc.logger.Error("input received but no router is configured")
|
||||
}
|
||||
|
||||
@@ -155,7 +155,7 @@ ClientRead:
|
||||
messages := ts.Framer.Decode(buffer[0:byteCount])
|
||||
for _, message := range messages {
|
||||
if ts.router != nil {
|
||||
ts.router.HandleInput(ts.Id(), message)
|
||||
ts.router.HandleInput(ts.ctx, ts.Id(), message)
|
||||
} else {
|
||||
ts.logger.Error("input received but no router is configured")
|
||||
}
|
||||
|
||||
@@ -67,7 +67,7 @@ func (i *TimeInterval) Run() error {
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
if i.router != nil {
|
||||
i.router.HandleInput(i.Id(), time.Now())
|
||||
i.router.HandleInput(i.ctx, i.Id(), time.Now())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,7 +66,7 @@ func (t *TimeTimer) Run() error {
|
||||
return nil
|
||||
case time := <-t.timer.C:
|
||||
if t.router != nil {
|
||||
t.router.HandleInput(t.Id(), time)
|
||||
t.router.HandleInput(t.ctx, t.Id(), time)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -105,7 +105,7 @@ func (um *UDPMulticast) Run() error {
|
||||
message := buffer[:numBytes]
|
||||
|
||||
if um.router != nil {
|
||||
um.router.HandleInput(um.Id(), message)
|
||||
um.router.HandleInput(um.ctx, um.Id(), message)
|
||||
} else {
|
||||
um.logger.Error("input received but no router is configured")
|
||||
}
|
||||
|
||||
@@ -115,7 +115,7 @@ func (us *UDPServer) Run() error {
|
||||
}
|
||||
message := buffer[:numBytes]
|
||||
if us.router != nil {
|
||||
us.router.HandleInput(us.Id(), message)
|
||||
us.router.HandleInput(us.ctx, us.Id(), message)
|
||||
} else {
|
||||
us.logger.Error("input received but no router is configured")
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ type RouteIOError struct {
|
||||
}
|
||||
|
||||
type RouteIO interface {
|
||||
HandleInput(sourceId string, payload any) (bool, []RouteIOError)
|
||||
HandleInput(ctx context.Context, sourceId string, payload any) (bool, []RouteIOError)
|
||||
HandleOutput(ctx context.Context, destinationId string, payload any) []error
|
||||
}
|
||||
|
||||
|
||||
@@ -30,8 +30,8 @@ func TestRouteCreate(t *testing.T) {
|
||||
|
||||
type MockRouter struct{}
|
||||
|
||||
func (mr *MockRouter) HandleInput(sourceId string, payload any) []route.RouteIOError {
|
||||
return nil
|
||||
func (mr *MockRouter) HandleInput(ctx context.Context, sourceId string, payload any) (bool, []route.RouteIOError) {
|
||||
return false, []route.RouteIOError{}
|
||||
}
|
||||
|
||||
func (mr *MockRouter) HandleOutput(ctx context.Context, destinationId string, payload any) error {
|
||||
|
||||
@@ -3,7 +3,6 @@ package showbridge
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"sync"
|
||||
|
||||
@@ -129,13 +128,13 @@ func (r *Router) Stop() {
|
||||
r.contextCancel()
|
||||
}
|
||||
|
||||
func (r *Router) HandleInput(sourceId string, payload any) (bool, []route.RouteIOError) {
|
||||
func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) (bool, []route.RouteIOError) {
|
||||
var routeIOErrors []route.RouteIOError
|
||||
routeFound := false
|
||||
for routeIndex, routeInstance := range r.RouteInstances {
|
||||
if routeInstance.Input() == sourceId {
|
||||
routeFound = true
|
||||
routeContext := context.WithValue(r.Context, route.SourceContextKey, sourceId)
|
||||
routeContext := context.WithValue(ctx, route.SourceContextKey, sourceId)
|
||||
|
||||
payload, err := routeInstance.ProcessPayload(routeContext, payload)
|
||||
if err != nil {
|
||||
@@ -177,8 +176,8 @@ func (r *Router) HandleOutput(ctx context.Context, destinationId string, payload
|
||||
outputErrors = []error{}
|
||||
}
|
||||
outputErrors = append(outputErrors, err)
|
||||
r.logger.Error("unable to route output", "module", moduleInstance.Id(), "error", err)
|
||||
}
|
||||
// r.logger.Error("unable to route output", "module", moduleInstance.Id(), "error", err)
|
||||
}
|
||||
}
|
||||
return outputErrors
|
||||
|
||||
Reference in New Issue
Block a user