Files
showbridge-go/router.go
2025-12-06 23:50:44 -06:00

165 lines
3.8 KiB
Go

package showbridge
import (
"context"
"fmt"
"log/slog"
"os"
"sync"
"github.com/jwetzell/showbridge-go/internal/config"
)
type RoutingError struct {
Index int
Error error
}
type Router struct {
contextCancel context.CancelFunc
Context context.Context
ModuleInstances []Module
RouteInstances []Route
moduleWait sync.WaitGroup
}
func NewRouter(ctx context.Context, config config.Config) (*Router, []ModuleError, []RouteError) {
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
slog.SetDefault(logger)
slog.Debug("creating router")
routerContext, cancel := context.WithCancel(ctx)
router := Router{
Context: routerContext,
contextCancel: cancel,
ModuleInstances: []Module{},
RouteInstances: []Route{},
}
var moduleErrors []ModuleError
for moduleIndex, moduleDecl := range config.Modules {
moduleInfo, ok := moduleRegistry[moduleDecl.Type]
if !ok {
if moduleErrors == nil {
moduleErrors = []ModuleError{}
}
moduleErrors = append(moduleErrors, ModuleError{
Index: moduleIndex,
Config: moduleDecl,
Error: fmt.Errorf("module type not defined"),
})
continue
}
moduleInstanceExists := false
for _, moduleInstance := range router.ModuleInstances {
if moduleInstance.Id() == moduleDecl.Id {
moduleInstanceExists = true
if moduleErrors == nil {
moduleErrors = []ModuleError{}
}
moduleErrors = append(moduleErrors, ModuleError{
Index: moduleIndex,
Config: moduleDecl,
Error: fmt.Errorf("duplicate module id"),
})
break
}
}
if !moduleInstanceExists {
moduleInstance, err := moduleInfo.New(moduleDecl, &router)
if err != nil {
if moduleErrors == nil {
moduleErrors = []ModuleError{}
}
moduleErrors = append(moduleErrors, ModuleError{
Index: moduleIndex,
Config: moduleDecl,
Error: err,
})
continue
}
router.ModuleInstances = append(router.ModuleInstances, moduleInstance)
}
}
var routeErrors []RouteError
for routeIndex, routeDecl := range config.Routes {
route, err := NewRoute(routeDecl)
if err != nil {
if routeErrors == nil {
routeErrors = []RouteError{}
}
routeErrors = append(routeErrors, RouteError{
Index: routeIndex,
Config: routeDecl,
Error: err,
})
continue
}
router.RouteInstances = append(router.RouteInstances, route)
}
return &router, moduleErrors, routeErrors
}
func (r *Router) Run() {
slog.Info("running router")
for _, moduleInstance := range r.ModuleInstances {
r.moduleWait.Add(1)
go func() {
err := moduleInstance.Run()
if err != nil {
slog.Error("error encountered running module", "id", moduleInstance.Id(), "error", err)
}
r.moduleWait.Done()
}()
}
<-r.Context.Done()
r.moduleWait.Wait()
slog.Info("router done")
}
func (r *Router) Stop() {
r.contextCancel()
}
func (r *Router) HandleInput(sourceId string, payload any) []RoutingError {
var routingErrors []RoutingError
for routeIndex, route := range r.RouteInstances {
if route.Input() == sourceId {
err := route.HandleInput(sourceId, payload, r)
if err != nil {
if routingErrors == nil {
routingErrors = []RoutingError{}
}
routingErrors = append(routingErrors, RoutingError{
Index: routeIndex,
Error: err,
})
slog.Error("router unable to route input", "route", routeIndex, "source", sourceId, "error", err)
}
}
}
return routingErrors
}
func (r *Router) HandleOutput(sourceId string, destinationId string, payload any) error {
for _, moduleInstance := range r.ModuleInstances {
if moduleInstance.Id() == destinationId {
return moduleInstance.Output(payload)
}
}
return fmt.Errorf("router could not find module instance for destination %s", destinationId)
}