diff --git a/http-client.go b/internal/module/http-client.go similarity index 76% rename from http-client.go rename to internal/module/http-client.go index 7e6650f..35ed6d3 100644 --- a/http-client.go +++ b/internal/module/http-client.go @@ -1,26 +1,29 @@ -package showbridge +package module import ( + "context" "fmt" "log/slog" "net/http" "time" "github.com/jwetzell/showbridge-go/internal/config" + "github.com/jwetzell/showbridge-go/internal/route" ) type HTTPClient struct { config config.ModuleConfig - router *Router + ctx context.Context client *http.Client + router route.RouteIO } func init() { RegisterModule(ModuleRegistration{ Type: "net.http.client", - New: func(config config.ModuleConfig, router *Router) (Module, error) { + New: func(ctx context.Context, config config.ModuleConfig, router route.RouteIO) (Module, error) { - return &HTTPClient{config: config, router: router}, nil + return &HTTPClient{config: config, ctx: ctx, router: router}, nil }, }) } @@ -39,7 +42,7 @@ func (hc *HTTPClient) Run() error { Timeout: 10 * time.Second, } - <-hc.router.Context.Done() + <-hc.ctx.Done() slog.Debug("router context done in module", "id", hc.config.Id) return nil } diff --git a/http-server.go b/internal/module/http-server.go similarity index 85% rename from http-server.go rename to internal/module/http-server.go index b11247d..04fe9ac 100644 --- a/http-server.go +++ b/internal/module/http-server.go @@ -1,18 +1,21 @@ -package showbridge +package module import ( + "context" "encoding/json" "fmt" "log/slog" "net/http" "github.com/jwetzell/showbridge-go/internal/config" + "github.com/jwetzell/showbridge-go/internal/route" ) type HTTPServer struct { config config.ModuleConfig Port uint16 - router *Router + ctx context.Context + router route.RouteIO } type ResponseData struct { @@ -23,7 +26,7 @@ type ResponseData struct { func init() { RegisterModule(ModuleRegistration{ Type: "net.http.server", - New: func(config config.ModuleConfig, router *Router) (Module, error) { + New: func(ctx context.Context, config config.ModuleConfig, router route.RouteIO) (Module, error) { params := config.Params port, ok := params["port"] if !ok { @@ -36,7 +39,7 @@ func init() { return nil, fmt.Errorf("net.http.server port must be uint16") } - return &HTTPServer{Port: uint16(portNum), config: config, router: router}, nil + return &HTTPServer{Port: uint16(portNum), config: config, ctx: ctx, router: router}, nil }, }) } @@ -85,7 +88,7 @@ func (hs *HTTPServer) Run() error { } go func() { - <-hs.router.Context.Done() + <-hs.ctx.Done() slog.Debug("router context done in module", "id", hs.config.Id) httpServer.Close() }() @@ -97,7 +100,7 @@ func (hs *HTTPServer) Run() error { return err } - <-hs.router.Context.Done() + <-hs.ctx.Done() return nil } diff --git a/interval.go b/internal/module/interval.go similarity index 80% rename from interval.go rename to internal/module/interval.go index fb70995..76c8f3c 100644 --- a/interval.go +++ b/internal/module/interval.go @@ -1,24 +1,27 @@ -package showbridge +package module import ( + "context" "fmt" "log/slog" "time" "github.com/jwetzell/showbridge-go/internal/config" + "github.com/jwetzell/showbridge-go/internal/route" ) type Interval struct { config config.ModuleConfig Duration uint32 - router *Router + ctx context.Context + router route.RouteIO ticker *time.Ticker } func init() { RegisterModule(ModuleRegistration{ Type: "gen.interval", - New: func(config config.ModuleConfig, router *Router) (Module, error) { + New: func(ctx context.Context, config config.ModuleConfig, router route.RouteIO) (Module, error) { params := config.Params duration, ok := params["duration"] @@ -32,7 +35,7 @@ func init() { return nil, fmt.Errorf("gen.interval duration must be number") } - return &Interval{Duration: uint32(durationNum), config: config, router: router}, nil + return &Interval{Duration: uint32(durationNum), config: config, ctx: ctx, router: router}, nil }, }) } @@ -52,7 +55,7 @@ func (i *Interval) Run() error { for { select { - case <-i.router.Context.Done(): + case <-i.ctx.Done(): slog.Debug("router context done in module", "id", i.config.Id) return nil case <-ticker.C: diff --git a/midi-client.go b/internal/module/midi-client.go similarity index 87% rename from midi-client.go rename to internal/module/midi-client.go index 9f14535..0352d39 100644 --- a/midi-client.go +++ b/internal/module/midi-client.go @@ -1,19 +1,22 @@ //go:build cgo -package showbridge +package module import ( + "context" "fmt" "log/slog" "github.com/jwetzell/showbridge-go/internal/config" + "github.com/jwetzell/showbridge-go/internal/route" "gitlab.com/gomidi/midi/v2" _ "gitlab.com/gomidi/midi/v2/drivers/rtmididrv" ) type MIDIClient struct { config config.ModuleConfig - router *Router + ctx context.Context + router route.RouteIO InputPort string OutputPort string SendFunc func(midi.Message) error @@ -23,7 +26,7 @@ func init() { RegisterModule(ModuleRegistration{ //TODO(jwetzell): find a better namespace than "misc" Type: "misc.midi.client", - New: func(config config.ModuleConfig, router *Router) (Module, error) { + New: func(ctx context.Context, config config.ModuleConfig, router route.RouteIO) (Module, error) { params := config.Params input, ok := params["input"] @@ -49,7 +52,7 @@ func init() { return nil, fmt.Errorf("misc.midi.client output must be a string") } - return &MIDIClient{config: config, InputPort: inputString, OutputPort: outputString, router: router}, nil + return &MIDIClient{config: config, InputPort: inputString, OutputPort: outputString, ctx: ctx, router: router}, nil }, }) } @@ -95,7 +98,7 @@ func (mc *MIDIClient) Run() error { mc.SendFunc = send - <-mc.router.Context.Done() + <-mc.ctx.Done() slog.Debug("router context done in module", "id", mc.config.Id) return nil } diff --git a/module.go b/internal/module/module.go similarity index 67% rename from module.go rename to internal/module/module.go index 007af43..aed40f1 100644 --- a/module.go +++ b/internal/module/module.go @@ -1,10 +1,12 @@ -package showbridge +package module import ( + "context" "fmt" "sync" "github.com/jwetzell/showbridge-go/internal/config" + "github.com/jwetzell/showbridge-go/internal/route" ) type ModuleError struct { @@ -22,7 +24,7 @@ type Module interface { type ModuleRegistration struct { Type string `json:"type"` - New func(config.ModuleConfig, *Router) (Module, error) + New func(context.Context, config.ModuleConfig, route.RouteIO) (Module, error) } func RegisterModule(mod ModuleRegistration) { @@ -37,13 +39,13 @@ func RegisterModule(mod ModuleRegistration) { moduleRegistryMu.Lock() defer moduleRegistryMu.Unlock() - if _, ok := moduleRegistry[string(mod.Type)]; ok { + if _, ok := ModuleRegistry[string(mod.Type)]; ok { panic(fmt.Sprintf("module already registered: %s", mod.Type)) } - moduleRegistry[string(mod.Type)] = mod + ModuleRegistry[string(mod.Type)] = mod } var ( moduleRegistryMu sync.RWMutex - moduleRegistry = make(map[string]ModuleRegistration) + ModuleRegistry = make(map[string]ModuleRegistration) ) diff --git a/mqtt-client.go b/internal/module/mqtt-client.go similarity index 88% rename from mqtt-client.go rename to internal/module/mqtt-client.go index c3c446f..c865152 100644 --- a/mqtt-client.go +++ b/internal/module/mqtt-client.go @@ -1,17 +1,20 @@ -package showbridge +package module import ( + "context" "fmt" "log/slog" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/processing" + "github.com/jwetzell/showbridge-go/internal/route" ) type MQTTClient struct { config config.ModuleConfig - router *Router + ctx context.Context + router route.RouteIO Broker string ClientID string Topic string @@ -21,7 +24,7 @@ type MQTTClient struct { func init() { RegisterModule(ModuleRegistration{ Type: "net.mqtt.client", - New: func(config config.ModuleConfig, router *Router) (Module, error) { + New: func(ctx context.Context, config config.ModuleConfig, router route.RouteIO) (Module, error) { params := config.Params broker, ok := params["broker"] @@ -59,7 +62,7 @@ func init() { return nil, fmt.Errorf("net.mqtt.client clientId must be string") } - return &MQTTClient{config: config, Broker: brokerString, Topic: topicString, ClientID: clientIdString, router: router}, nil + return &MQTTClient{config: config, Broker: brokerString, Topic: topicString, ClientID: clientIdString, ctx: ctx, router: router}, nil }, }) } @@ -96,7 +99,7 @@ func (mc *MQTTClient) Run() error { return err } - <-mc.router.Context.Done() + <-mc.ctx.Done() slog.Debug("router context done in module", "id", mc.config.Id) return nil } diff --git a/nats-client.go b/internal/module/nats-client.go similarity index 87% rename from nats-client.go rename to internal/module/nats-client.go index 9f61970..7015828 100644 --- a/nats-client.go +++ b/internal/module/nats-client.go @@ -1,17 +1,20 @@ -package showbridge +package module import ( + "context" "fmt" "log/slog" "github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/processing" + "github.com/jwetzell/showbridge-go/internal/route" "github.com/nats-io/nats.go" ) type NATSClient struct { config config.ModuleConfig - router *Router + ctx context.Context + router route.RouteIO URL string Subject string client *nats.Conn @@ -20,7 +23,7 @@ type NATSClient struct { func init() { RegisterModule(ModuleRegistration{ Type: "net.nats.client", - New: func(config config.ModuleConfig, router *Router) (Module, error) { + New: func(ctx context.Context, config config.ModuleConfig, router route.RouteIO) (Module, error) { params := config.Params url, ok := params["url"] @@ -46,7 +49,7 @@ func init() { return nil, fmt.Errorf("net.nats.client subject must be string") } - return &NATSClient{config: config, URL: urlString, Subject: subjectString, router: router}, nil + return &NATSClient{config: config, URL: urlString, Subject: subjectString, ctx: ctx, router: router}, nil }, }) } @@ -83,7 +86,7 @@ func (nc *NATSClient) Run() error { defer sub.Unsubscribe() - <-nc.router.Context.Done() + <-nc.ctx.Done() slog.Debug("router context done in module", "id", nc.config.Id) return nil } diff --git a/psn-client.go b/internal/module/psn-client.go similarity index 86% rename from psn-client.go rename to internal/module/psn-client.go index 113fed0..0043d73 100644 --- a/psn-client.go +++ b/internal/module/psn-client.go @@ -1,6 +1,7 @@ -package showbridge +package module import ( + "context" "fmt" "log/slog" "net" @@ -8,21 +9,23 @@ import ( "github.com/jwetzell/psn-go" "github.com/jwetzell/showbridge-go/internal/config" + "github.com/jwetzell/showbridge-go/internal/route" ) type PSNClient struct { config config.ModuleConfig conn *net.UDPConn - router *Router + ctx context.Context + router route.RouteIO decoder *psn.Decoder } func init() { RegisterModule(ModuleRegistration{ Type: "net.psn.client", - New: func(config config.ModuleConfig, router *Router) (Module, error) { + New: func(ctx context.Context, config config.ModuleConfig, router route.RouteIO) (Module, error) { - return &PSNClient{config: config, decoder: psn.NewDecoder(), router: router}, nil + return &PSNClient{config: config, decoder: psn.NewDecoder(), ctx: ctx, router: router}, nil }, }) } @@ -53,7 +56,7 @@ func (pc *PSNClient) Run() error { buffer := make([]byte, 2048) for { select { - case <-pc.router.Context.Done(): + case <-pc.ctx.Done(): // TODO(jwetzell): cleanup? slog.Debug("router context done in module", "id", pc.config.Id) return nil diff --git a/serial-client.go b/internal/module/serial-client.go similarity index 89% rename from serial-client.go rename to internal/module/serial-client.go index 4ea3ec4..12d4922 100644 --- a/serial-client.go +++ b/internal/module/serial-client.go @@ -1,20 +1,23 @@ //go:build cgo -package showbridge +package module import ( + "context" "fmt" "log/slog" "time" "github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/framing" + "github.com/jwetzell/showbridge-go/internal/route" "go.bug.st/serial" ) type SerialClient struct { config config.ModuleConfig - router *Router + ctx context.Context + router route.RouteIO Port string Framer framing.Framer Mode *serial.Mode @@ -25,7 +28,7 @@ func init() { RegisterModule(ModuleRegistration{ //TODO(jwetzell): find a better namespace than "misc" Type: "misc.serial.client", - New: func(config config.ModuleConfig, router *Router) (Module, error) { + New: func(ctx context.Context, config config.ModuleConfig, router route.RouteIO) (Module, error) { params := config.Params port, ok := params["port"] @@ -70,7 +73,7 @@ func init() { BaudRate: int(baudRateNum), } - return &SerialClient{config: config, Port: portString, Framer: framer, Mode: &mode, router: router}, nil + return &SerialClient{config: config, Port: portString, Framer: framer, Mode: &mode, ctx: ctx, router: router}, nil }, }) } @@ -99,7 +102,7 @@ func (mc *SerialClient) Run() error { // TODO(jwetzell): shutdown with router.Context properly go func() { - <-mc.router.Context.Done() + <-mc.ctx.Done() slog.Debug("router context done in module", "id", mc.config.Id) if mc.port != nil { mc.port.Close() @@ -109,7 +112,7 @@ func (mc *SerialClient) Run() error { for { err := mc.SetupPort() if err != nil { - if mc.router.Context.Err() != nil { + if mc.ctx.Err() != nil { slog.Debug("router context done in module", "id", mc.config.Id) return nil } @@ -120,14 +123,14 @@ func (mc *SerialClient) Run() error { buffer := make([]byte, 1024) select { - case <-mc.router.Context.Done(): + case <-mc.ctx.Done(): slog.Debug("router context done in module", "id", mc.config.Id) return nil default: READ: for { select { - case <-mc.router.Context.Done(): + case <-mc.ctx.Done(): slog.Debug("router context done in module", "id", mc.config.Id) return nil default: diff --git a/tcp-client.go b/internal/module/tcp-client.go similarity index 90% rename from tcp-client.go rename to internal/module/tcp-client.go index 6d0ba01..6c88b21 100644 --- a/tcp-client.go +++ b/internal/module/tcp-client.go @@ -1,6 +1,7 @@ -package showbridge +package module import ( + "context" "fmt" "log/slog" "net" @@ -8,20 +9,22 @@ import ( "github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/framing" + "github.com/jwetzell/showbridge-go/internal/route" ) type TCPClient struct { config config.ModuleConfig framer framing.Framer conn *net.TCPConn - router *Router + ctx context.Context + router route.RouteIO Addr *net.TCPAddr } func init() { RegisterModule(ModuleRegistration{ Type: "net.tcp.client", - New: func(config config.ModuleConfig, router *Router) (Module, error) { + New: func(ctx context.Context, config config.ModuleConfig, router route.RouteIO) (Module, error) { params := config.Params host, ok := params["host"] @@ -68,7 +71,7 @@ func init() { return nil, err } - return &TCPClient{framer: framer, Addr: addr, config: config, router: router}, nil + return &TCPClient{framer: framer, Addr: addr, config: config, ctx: ctx, router: router}, nil }, }) } @@ -85,7 +88,7 @@ func (tc *TCPClient) Run() error { // TODO(jwetzell): shutdown with router.Context properly go func() { - <-tc.router.Context.Done() + <-tc.ctx.Done() slog.Debug("router context done in module", "id", tc.config.Id) if tc.conn != nil { tc.conn.Close() @@ -95,7 +98,7 @@ func (tc *TCPClient) Run() error { for { err := tc.SetupConn() if err != nil { - if tc.router.Context.Err() != nil { + if tc.ctx.Err() != nil { slog.Debug("router context done in module", "id", tc.config.Id) return nil } @@ -106,14 +109,14 @@ func (tc *TCPClient) Run() error { buffer := make([]byte, 1024) select { - case <-tc.router.Context.Done(): + case <-tc.ctx.Done(): slog.Debug("router context done in module", "id", tc.config.Id) return nil default: READ: for { select { - case <-tc.router.Context.Done(): + case <-tc.ctx.Done(): slog.Debug("router context done in module", "id", tc.config.Id) return nil default: diff --git a/tcp-server.go b/internal/module/tcp-server.go similarity index 93% rename from tcp-server.go rename to internal/module/tcp-server.go index 3abafc7..baa1a68 100644 --- a/tcp-server.go +++ b/internal/module/tcp-server.go @@ -1,6 +1,7 @@ -package showbridge +package module import ( + "context" "errors" "fmt" "log/slog" @@ -12,13 +13,15 @@ import ( "github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/framing" + "github.com/jwetzell/showbridge-go/internal/route" ) type TCPServer struct { config config.ModuleConfig Addr *net.TCPAddr Framer framing.Framer - router *Router + ctx context.Context + router route.RouteIO quit chan interface{} wg sync.WaitGroup connections []*net.TCPConn @@ -28,7 +31,7 @@ type TCPServer struct { func init() { RegisterModule(ModuleRegistration{ Type: "net.tcp.server", - New: func(config config.ModuleConfig, router *Router) (Module, error) { + New: func(ctx context.Context, config config.ModuleConfig, router route.RouteIO) (Module, error) { params := config.Params port, ok := params["port"] if !ok { @@ -76,7 +79,7 @@ func init() { return nil, err } - return &TCPServer{Framer: framer, Addr: addr, config: config, quit: make(chan interface{}), router: router}, nil + return &TCPServer{Framer: framer, Addr: addr, config: config, quit: make(chan interface{}), ctx: ctx, router: router}, nil }, }) } @@ -162,7 +165,7 @@ func (ts *TCPServer) Run() error { ts.wg.Add(1) go func() { - <-ts.router.Context.Done() + <-ts.ctx.Done() close(ts.quit) listener.Close() slog.Debug("router context done in module", "id", ts.config.Id) diff --git a/timer.go b/internal/module/timer.go similarity index 80% rename from timer.go rename to internal/module/timer.go index 3ec32e3..ff4922a 100644 --- a/timer.go +++ b/internal/module/timer.go @@ -1,24 +1,27 @@ -package showbridge +package module import ( + "context" "fmt" "log/slog" "time" "github.com/jwetzell/showbridge-go/internal/config" + "github.com/jwetzell/showbridge-go/internal/route" ) type Timer struct { config config.ModuleConfig Duration uint32 - router *Router + ctx context.Context + router route.RouteIO timer *time.Timer } func init() { RegisterModule(ModuleRegistration{ Type: "gen.timer", - New: func(config config.ModuleConfig, router *Router) (Module, error) { + New: func(ctx context.Context, config config.ModuleConfig, router route.RouteIO) (Module, error) { params := config.Params duration, ok := params["duration"] @@ -32,7 +35,7 @@ func init() { return nil, fmt.Errorf("gen.timer duration must be a number") } - return &Timer{Duration: uint32(durationNum), config: config, router: router}, nil + return &Timer{Duration: uint32(durationNum), config: config, ctx: ctx, router: router}, nil }, }) } @@ -50,7 +53,7 @@ func (t *Timer) Run() error { defer t.timer.Stop() for { select { - case <-t.router.Context.Done(): + case <-t.ctx.Done(): t.timer.Stop() slog.Debug("router context done in module", "id", t.config.Id) return nil diff --git a/udp-client.go b/internal/module/udp-client.go similarity index 83% rename from udp-client.go rename to internal/module/udp-client.go index ce9ccc7..2db7207 100644 --- a/udp-client.go +++ b/internal/module/udp-client.go @@ -1,11 +1,13 @@ -package showbridge +package module import ( + "context" "fmt" "log/slog" "net" "github.com/jwetzell/showbridge-go/internal/config" + "github.com/jwetzell/showbridge-go/internal/route" ) type UDPClient struct { @@ -13,13 +15,14 @@ type UDPClient struct { Addr *net.UDPAddr Port uint16 conn *net.UDPConn - router *Router + ctx context.Context + router route.RouteIO } func init() { RegisterModule(ModuleRegistration{ Type: "net.udp.client", - New: func(config config.ModuleConfig, router *Router) (Module, error) { + New: func(ctx context.Context, config config.ModuleConfig, router route.RouteIO) (Module, error) { params := config.Params host, ok := params["host"] @@ -49,7 +52,7 @@ func init() { return nil, err } - return &UDPClient{Addr: addr, config: config, router: router}, nil + return &UDPClient{Addr: addr, config: config, ctx: ctx, router: router}, nil }, }) } @@ -70,8 +73,7 @@ func (uc *UDPClient) Run() error { } uc.conn = client - - <-uc.router.Context.Done() + <-uc.ctx.Done() slog.Debug("router context done in module", "id", uc.config.Id) if uc.conn != nil { uc.conn.Close() diff --git a/udp-multicast.go b/internal/module/udp-multicast.go similarity index 86% rename from udp-multicast.go rename to internal/module/udp-multicast.go index debf583..2c84d08 100644 --- a/udp-multicast.go +++ b/internal/module/udp-multicast.go @@ -1,25 +1,28 @@ -package showbridge +package module import ( + "context" "fmt" "log/slog" "net" "time" "github.com/jwetzell/showbridge-go/internal/config" + "github.com/jwetzell/showbridge-go/internal/route" ) type UDPMulticast struct { config config.ModuleConfig conn *net.UDPConn - router *Router + ctx context.Context + router route.RouteIO Addr *net.UDPAddr } func init() { RegisterModule(ModuleRegistration{ Type: "net.udp.multicast", - New: func(config config.ModuleConfig, router *Router) (Module, error) { + New: func(ctx context.Context, config config.ModuleConfig, router route.RouteIO) (Module, error) { params := config.Params ip, ok := params["ip"] @@ -48,7 +51,7 @@ func init() { if err != nil { return nil, err } - return &UDPMulticast{config: config, Addr: addr, router: router}, nil + return &UDPMulticast{config: config, Addr: addr, ctx: ctx, router: router}, nil }, }) } @@ -74,7 +77,7 @@ func (um *UDPMulticast) Run() error { buffer := make([]byte, 2048) for { select { - case <-um.router.Context.Done(): + case <-um.ctx.Done(): // TODO(jwetzell): cleanup? slog.Debug("router context done in module", "id", um.config.Id) return nil diff --git a/udp-server.go b/internal/module/udp-server.go similarity index 85% rename from udp-server.go rename to internal/module/udp-server.go index 6f11887..539c08a 100644 --- a/udp-server.go +++ b/internal/module/udp-server.go @@ -1,6 +1,7 @@ -package showbridge +package module import ( + "context" "fmt" "log" "log/slog" @@ -8,18 +9,20 @@ import ( "time" "github.com/jwetzell/showbridge-go/internal/config" + "github.com/jwetzell/showbridge-go/internal/route" ) type UDPServer struct { Addr *net.UDPAddr config config.ModuleConfig - router *Router + ctx context.Context + router route.RouteIO } func init() { RegisterModule(ModuleRegistration{ Type: "net.udp.server", - New: func(config config.ModuleConfig, router *Router) (Module, error) { + New: func(ctx context.Context, config config.ModuleConfig, router route.RouteIO) (Module, error) { params := config.Params port, ok := params["port"] if !ok { @@ -50,7 +53,7 @@ func init() { log.Fatalf("error resolving UDP address: %v", err) } - return &UDPServer{Addr: addr, config: config, router: router}, nil + return &UDPServer{Addr: addr, config: config, ctx: ctx, router: router}, nil }, }) } @@ -75,7 +78,7 @@ func (us *UDPServer) Run() error { buffer := make([]byte, 1024) for { select { - case <-us.router.Context.Done(): + case <-us.ctx.Done(): // TODO(jwetzell): cleanup? slog.Debug("router context done in module", "id", us.config.Id) return nil diff --git a/route.go b/internal/route/route.go similarity index 65% rename from route.go rename to internal/route/route.go index bef1d16..99ef4fc 100644 --- a/route.go +++ b/internal/route/route.go @@ -1,6 +1,7 @@ -package showbridge +package route import ( + "context" "fmt" "github.com/jwetzell/showbridge-go/internal/config" @@ -13,11 +14,21 @@ type RouteError struct { Error error } +type RouteIOError struct { + Index int + Error error +} + +type RouteIO interface { + HandleInput(sourceId string, payload any) []RouteIOError + HandleOutput(sourceId string, destinationId string, payload any) error +} + type Route interface { Input() string Output() string - HandleInput(sourceId string, payload any, router *Router) error - HandleOutput(sourceId string, payload any, router *Router) error + HandleInput(ctx context.Context, sourceId string, payload any, router RouteIO) error + HandleOutput(ctx context.Context, sourceId string, payload any, router RouteIO) error } type ProcessorRoute struct { @@ -55,10 +66,10 @@ func (r *ProcessorRoute) Output() string { return r.output } -func (r *ProcessorRoute) HandleInput(sourceId string, payload any, router *Router) error { +func (r *ProcessorRoute) HandleInput(ctx context.Context, sourceId string, payload any, router RouteIO) error { var err error for _, processor := range r.processors { - payload, err = processor.Process(router.Context, payload) + payload, err = processor.Process(ctx, payload) if err != nil { return err } @@ -67,9 +78,9 @@ func (r *ProcessorRoute) HandleInput(sourceId string, payload any, router *Route return nil } } - return r.HandleOutput(sourceId, payload, router) + return r.HandleOutput(ctx, sourceId, payload, router) } -func (r *ProcessorRoute) HandleOutput(sourceId string, payload any, router *Router) error { +func (r *ProcessorRoute) HandleOutput(ctx context.Context, sourceId string, payload any, router RouteIO) error { return router.HandleOutput(sourceId, r.output, payload) } diff --git a/router.go b/router.go index 04c8e01..14c9b30 100644 --- a/router.go +++ b/router.go @@ -8,23 +8,19 @@ import ( "sync" "github.com/jwetzell/showbridge-go/internal/config" + "github.com/jwetzell/showbridge-go/internal/module" + "github.com/jwetzell/showbridge-go/internal/route" ) -type RoutingError struct { - Index int - Error error -} - type Router struct { contextCancel context.CancelFunc Context context.Context - ModuleInstances []Module - RouteInstances []Route + ModuleInstances []module.Module + RouteInstances []route.Route moduleWait sync.WaitGroup } -func NewRouter(ctx context.Context, config config.Config) (*Router, []ModuleError, []RouteError) { - +func NewRouter(ctx context.Context, config config.Config) (*Router, []module.ModuleError, []route.RouteError) { logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelInfo, })) @@ -37,20 +33,20 @@ func NewRouter(ctx context.Context, config config.Config) (*Router, []ModuleErro router := Router{ Context: routerContext, contextCancel: cancel, - ModuleInstances: []Module{}, - RouteInstances: []Route{}, + ModuleInstances: []module.Module{}, + RouteInstances: []route.Route{}, } - var moduleErrors []ModuleError + var moduleErrors []module.ModuleError for moduleIndex, moduleDecl := range config.Modules { - moduleInfo, ok := moduleRegistry[moduleDecl.Type] + moduleInfo, ok := module.ModuleRegistry[moduleDecl.Type] if !ok { if moduleErrors == nil { - moduleErrors = []ModuleError{} + moduleErrors = []module.ModuleError{} } - moduleErrors = append(moduleErrors, ModuleError{ + moduleErrors = append(moduleErrors, module.ModuleError{ Index: moduleIndex, Config: moduleDecl, Error: fmt.Errorf("module type not defined"), @@ -63,9 +59,9 @@ func NewRouter(ctx context.Context, config config.Config) (*Router, []ModuleErro if moduleInstance.Id() == moduleDecl.Id { moduleInstanceExists = true if moduleErrors == nil { - moduleErrors = []ModuleError{} + moduleErrors = []module.ModuleError{} } - moduleErrors = append(moduleErrors, ModuleError{ + moduleErrors = append(moduleErrors, module.ModuleError{ Index: moduleIndex, Config: moduleDecl, Error: fmt.Errorf("duplicate module id"), @@ -75,12 +71,12 @@ func NewRouter(ctx context.Context, config config.Config) (*Router, []ModuleErro } if !moduleInstanceExists { - moduleInstance, err := moduleInfo.New(moduleDecl, &router) + moduleInstance, err := moduleInfo.New(router.Context, moduleDecl, &router) if err != nil { if moduleErrors == nil { - moduleErrors = []ModuleError{} + moduleErrors = []module.ModuleError{} } - moduleErrors = append(moduleErrors, ModuleError{ + moduleErrors = append(moduleErrors, module.ModuleError{ Index: moduleIndex, Config: moduleDecl, Error: err, @@ -93,21 +89,21 @@ func NewRouter(ctx context.Context, config config.Config) (*Router, []ModuleErro } - var routeErrors []RouteError + var routeErrors []route.RouteError for routeIndex, routeDecl := range config.Routes { - route, err := NewRoute(routeDecl) + routeInstance, err := route.NewRoute(routeDecl) if err != nil { if routeErrors == nil { - routeErrors = []RouteError{} + routeErrors = []route.RouteError{} } - routeErrors = append(routeErrors, RouteError{ + routeErrors = append(routeErrors, route.RouteError{ Index: routeIndex, Config: routeDecl, Error: err, }) continue } - router.RouteInstances = append(router.RouteInstances, route) + router.RouteInstances = append(router.RouteInstances, routeInstance) } return &router, moduleErrors, routeErrors @@ -134,16 +130,16 @@ func (r *Router) Stop() { r.contextCancel() } -func (r *Router) HandleInput(sourceId string, payload any) []RoutingError { - var routingErrors []RoutingError - for routeIndex, route := range r.RouteInstances { - if route.Input() == sourceId { - err := route.HandleInput(sourceId, payload, r) +func (r *Router) HandleInput(sourceId string, payload any) []route.RouteIOError { + var routingErrors []route.RouteIOError + for routeIndex, routeInstance := range r.RouteInstances { + if routeInstance.Input() == sourceId { + err := routeInstance.HandleInput(r.Context, sourceId, payload, r) if err != nil { if routingErrors == nil { - routingErrors = []RoutingError{} + routingErrors = []route.RouteIOError{} } - routingErrors = append(routingErrors, RoutingError{ + routingErrors = append(routingErrors, route.RouteIOError{ Index: routeIndex, Error: err, })