mirror of
https://github.com/jwetzell/showbridge-go.git
synced 2026-04-26 12:55:29 +00:00
168 lines
3.9 KiB
Go
168 lines
3.9 KiB
Go
package showbridge
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"sync"
|
|
)
|
|
|
|
type RoutingError struct {
|
|
Index int
|
|
Error error
|
|
}
|
|
|
|
type Router struct {
|
|
contextCancel context.CancelFunc
|
|
Context context.Context
|
|
ModuleInstances []Module
|
|
RouteInstances []*Route
|
|
moduleWait sync.WaitGroup
|
|
}
|
|
|
|
func NewRouter(ctx context.Context, config Config) (*Router, []ModuleError, []RouteError) {
|
|
|
|
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
|
|
Level: slog.LevelDebug,
|
|
}))
|
|
|
|
slog.SetDefault(logger)
|
|
|
|
slog.Debug("creating router")
|
|
|
|
routerContext, cancel := context.WithCancel(ctx)
|
|
router := Router{
|
|
Context: routerContext,
|
|
contextCancel: cancel,
|
|
ModuleInstances: []Module{},
|
|
RouteInstances: []*Route{},
|
|
}
|
|
|
|
var moduleErrors []ModuleError
|
|
|
|
for moduleIndex, moduleDecl := range config.Modules {
|
|
|
|
moduleInfo, ok := moduleRegistry[moduleDecl.Type]
|
|
if !ok {
|
|
if moduleErrors == nil {
|
|
moduleErrors = []ModuleError{}
|
|
}
|
|
moduleErrors = append(moduleErrors, ModuleError{
|
|
Index: moduleIndex,
|
|
Config: moduleDecl,
|
|
Error: fmt.Errorf("module type not defined"),
|
|
})
|
|
continue
|
|
}
|
|
|
|
moduleInstanceExists := false
|
|
for _, moduleInstance := range router.ModuleInstances {
|
|
if moduleInstance.Id() == moduleDecl.Id {
|
|
moduleInstanceExists = true
|
|
if moduleErrors == nil {
|
|
moduleErrors = []ModuleError{}
|
|
}
|
|
moduleErrors = append(moduleErrors, ModuleError{
|
|
Index: moduleIndex,
|
|
Config: moduleDecl,
|
|
Error: fmt.Errorf("duplicate module id"),
|
|
})
|
|
break
|
|
}
|
|
}
|
|
|
|
if !moduleInstanceExists {
|
|
moduleInstance, err := moduleInfo.New(moduleDecl)
|
|
if err != nil {
|
|
if moduleErrors == nil {
|
|
moduleErrors = []ModuleError{}
|
|
}
|
|
moduleErrors = append(moduleErrors, ModuleError{
|
|
Index: moduleIndex,
|
|
Config: moduleDecl,
|
|
Error: err,
|
|
})
|
|
continue
|
|
}
|
|
|
|
router.ModuleInstances = append(router.ModuleInstances, moduleInstance)
|
|
}
|
|
|
|
}
|
|
|
|
var routeErrors []RouteError
|
|
for routeIndex, routeDecl := range config.Routes {
|
|
route, err := NewRoute(routeIndex, routeDecl, &router)
|
|
if err != nil {
|
|
if routeErrors == nil {
|
|
routeErrors = []RouteError{}
|
|
}
|
|
routeErrors = append(routeErrors, RouteError{
|
|
Index: routeIndex,
|
|
Config: routeDecl,
|
|
Error: err,
|
|
})
|
|
continue
|
|
}
|
|
router.RouteInstances = append(router.RouteInstances, route)
|
|
}
|
|
|
|
for _, moduleInstance := range router.ModuleInstances {
|
|
slog.Debug("registering router with module", "id", moduleInstance.Id())
|
|
moduleInstance.RegisterRouter(&router)
|
|
}
|
|
|
|
return &router, moduleErrors, routeErrors
|
|
}
|
|
|
|
func (r *Router) Run() {
|
|
for _, moduleInstance := range r.ModuleInstances {
|
|
moduleInstance.RegisterRouter(r)
|
|
r.moduleWait.Add(1)
|
|
go func() {
|
|
err := moduleInstance.Run()
|
|
if err != nil {
|
|
slog.Error("error encountered running module", "id", moduleInstance.Id(), "error", err)
|
|
}
|
|
r.moduleWait.Done()
|
|
}()
|
|
}
|
|
<-r.Context.Done()
|
|
r.moduleWait.Wait()
|
|
slog.Info("router context done")
|
|
}
|
|
|
|
func (r *Router) Stop() {
|
|
r.contextCancel()
|
|
}
|
|
|
|
func (r *Router) HandleInput(sourceId string, payload any) []RoutingError {
|
|
var routingErrors []RoutingError
|
|
for routeIndex, route := range r.RouteInstances {
|
|
if route.Input == sourceId {
|
|
err := route.HandleInput(sourceId, payload)
|
|
if err != nil {
|
|
if routingErrors == nil {
|
|
routingErrors = []RoutingError{}
|
|
}
|
|
routingErrors = append(routingErrors, RoutingError{
|
|
Index: routeIndex,
|
|
Error: err,
|
|
})
|
|
slog.Error("router unable to route input", "route", routeIndex, "source", sourceId, "error", err)
|
|
}
|
|
}
|
|
}
|
|
return routingErrors
|
|
}
|
|
|
|
func (r *Router) HandleOutput(destinationId string, payload any) error {
|
|
for _, moduleInstance := range r.ModuleInstances {
|
|
if moduleInstance.Id() == destinationId {
|
|
return moduleInstance.Output(payload)
|
|
}
|
|
}
|
|
return fmt.Errorf("no module instance found for destination %s", destinationId)
|
|
}
|