work towards decoupling api from router

This commit is contained in:
Joel Wetzell
2026-05-06 17:16:45 -05:00
parent 427d69d443
commit 984cb435d5
15 changed files with 336 additions and 267 deletions

View File

@@ -14,8 +14,6 @@ import (
"github.com/jwetzell/showbridge-go" "github.com/jwetzell/showbridge-go"
"github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/module"
"github.com/jwetzell/showbridge-go/internal/route"
"github.com/jwetzell/showbridge-go/internal/schema" "github.com/jwetzell/showbridge-go/internal/schema"
"github.com/urfave/cli/v3" "github.com/urfave/cli/v3"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
@@ -262,7 +260,12 @@ func (app *showbridgeApp) handleChannels() {
app.routerMutex.Unlock() app.routerMutex.Unlock()
continue continue
} }
moduleErrors, routeErrors := app.router.UpdateConfig(config) err, moduleErrors, routeErrors := app.router.UpdateConfig(config, false)
if err != nil {
app.logger.Error("failed to update router config", "error", err)
app.routerMutex.Unlock()
continue
}
app.logConfigErrors(moduleErrors, routeErrors) app.logConfigErrors(moduleErrors, routeErrors)
app.logger.Info("configuration reloaded successfully") app.logger.Info("configuration reloaded successfully")
app.routerMutex.Unlock() app.routerMutex.Unlock()
@@ -280,7 +283,7 @@ func (app *showbridgeApp) handleChannels() {
} }
} }
func (app *showbridgeApp) logConfigErrors(moduleErrors []module.ModuleError, routeErrors []route.RouteError) { func (app *showbridgeApp) logConfigErrors(moduleErrors []config.ModuleError, routeErrors []config.RouteError) {
for _, moduleError := range moduleErrors { for _, moduleError := range moduleErrors {
app.logger.Error("problem initializing module", "index", moduleError.Index, "error", moduleError.Error) app.logger.Error("problem initializing module", "index", moduleError.Index, "error", moduleError.Error)
} }

85
config.go Normal file
View File

@@ -0,0 +1,85 @@
package showbridge
import (
"errors"
"reflect"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/route"
)
func (r *Router) GetRunningConfig() config.Config {
r.runningConfigMu.RLock()
defer r.runningConfigMu.RUnlock()
return r.runningConfig
}
func (r *Router) UpdateConfig(newConfig config.Config, triggerChangeChan bool) (error, []config.ModuleError, []config.RouteError) {
if !r.runningConfigMu.TryLock() {
return errors.New("config update in progress"), nil, nil
}
defer r.runningConfigMu.Unlock()
oldConfig := r.runningConfig
r.logger.Debug("received config update", "oldConfig", oldConfig, "newConfig", newConfig)
if !reflect.DeepEqual(oldConfig.Api, newConfig.Api) {
r.logger.Info("applying new API config")
r.apiServer.Stop()
r.apiServer.Start(newConfig.Api)
r.runningConfig.Api = newConfig.Api
}
// TODO(jwetzell): handle config update errors better
for _, moduleInstance := range r.ModuleInstances {
moduleInstance.Stop()
}
r.logger.Debug("waiting for modules to exit")
r.moduleWait.Wait()
r.ModuleInstances = make(map[string]common.Module)
r.RouteInstances = []*route.Route{}
var moduleErrors []config.ModuleError
for moduleIndex, moduleDecl := range newConfig.Modules {
err := r.addModule(moduleDecl)
if err != nil {
if moduleErrors == nil {
moduleErrors = []config.ModuleError{}
}
moduleErrors = append(moduleErrors, config.ModuleError{
Index: moduleIndex,
Config: moduleDecl,
Error: err.Error(),
})
continue
}
}
var routeErrors []config.RouteError
for routeIndex, routeDecl := range newConfig.Routes {
err := r.addRoute(routeDecl)
if err != nil {
if routeErrors == nil {
routeErrors = []config.RouteError{}
}
routeErrors = append(routeErrors, config.RouteError{
Index: routeIndex,
Config: routeDecl,
Error: err.Error(),
})
continue
}
}
r.runningConfig = newConfig
r.startModules()
if triggerChangeChan {
r.ConfigChange <- newConfig
}
return nil, moduleErrors, routeErrors
}

View File

@@ -1,64 +1,53 @@
package showbridge package showbridge
import ( import (
"encoding/json" "time"
"github.com/gorilla/websocket" "github.com/jwetzell/showbridge-go/internal/common"
) )
type Event struct { func (r *Router) HandleEvent(event common.Event, sender common.EventDestination) {
Type string `json:"type"`
Data any `json:"data,omitempty"`
Error string `json:"error,omitempty"`
}
func (e Event) toJSON() ([]byte, error) {
return json.Marshal(e)
}
func (r *Router) handleEvent(event Event, sender *websocket.Conn) {
switch event.Type { switch event.Type {
case "ping": case "ping":
r.unicastEvent(Event{Type: "pong"}, sender) r.unicastEvent(common.Event{Type: "pong", Data: map[string]any{
"timestamp": time.Now().UnixMilli(),
}}, sender)
default: default:
r.logger.Warn("unknown event type", "eventType", event.Type) r.logger.Warn("unknown event type", "eventType", event.Type)
} }
} }
func (r *Router) unicastEvent(event Event, conn *websocket.Conn) { func (r *Router) AddEventDestination(dest common.EventDestination) {
eventJSON, err := event.toJSON() r.eventDestinationsMu.Lock()
if err != nil { defer r.eventDestinationsMu.Unlock()
r.logger.Error("failed to marshal event to JSON", "error", err) r.eventDestinations = append(r.eventDestinations, dest)
return }
}
err = conn.WriteMessage(websocket.TextMessage, eventJSON) func (r *Router) RemoveEventDestination(dest common.EventDestination) {
if err != nil { r.eventDestinationsMu.Lock()
r.logger.Error("failed to write message to websocket connection", "error", err) defer r.eventDestinationsMu.Unlock()
for i, d := range r.eventDestinations {
if d.Is(dest) {
r.eventDestinations = append(r.eventDestinations[:i], r.eventDestinations[i+1:]...)
break
}
} }
} }
func (r *Router) broadcastEvent(event Event, excluded ...*websocket.Conn) { func (r *Router) unicastEvent(event common.Event, dest common.EventDestination) {
eventJSON, err := event.toJSON() err := dest.Send(event)
if err != nil { if err != nil {
r.logger.Error("failed to marshal event to JSON", "error", err) r.logger.Error("failed to send event", "error", err)
return
} }
r.wsConnsMu.Lock() }
defer r.wsConnsMu.Unlock()
for _, conn := range r.wsConns { func (r *Router) broadcastEvent(event common.Event) {
exclude := false r.eventDestinationsMu.Lock()
for _, excludedConn := range excluded { defer r.eventDestinationsMu.Unlock()
if conn == excludedConn { for _, dest := range r.eventDestinations {
exclude = true err := dest.Send(event)
break
}
}
if exclude {
continue
}
err := conn.WriteMessage(websocket.TextMessage, eventJSON)
if err != nil { if err != nil {
r.logger.Error("failed to write message to websocket connection", "error", err) r.logger.Error("failed to send event", "error", err)
} }
} }
} }

View File

@@ -1,4 +1,4 @@
package showbridge package api
import ( import (
"context" "context"
@@ -6,60 +6,80 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"log/slog"
"net/http" "net/http"
"sync"
"time" "time"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/module"
"github.com/jwetzell/showbridge-go/internal/route"
"github.com/jwetzell/showbridge-go/internal/schema" "github.com/jwetzell/showbridge-go/internal/schema"
) )
func (r *Router) startAPIServer(config config.ApiConfig) { type ApiServer struct {
if !config.Enabled { config config.ApiConfig
r.logger.Warn("API not enabled") serverMu sync.Mutex
server *http.Server
shutdown context.CancelFunc
logger *slog.Logger
configurableRouter config.Configurable
eventRouter common.EventRouter
}
func NewApiServer(configurableRouter config.Configurable, eventRouter common.EventRouter) *ApiServer {
return &ApiServer{
configurableRouter: configurableRouter,
eventRouter: eventRouter,
logger: slog.Default().With("component", "api"),
}
}
func (as *ApiServer) Start(config config.ApiConfig) {
as.config = config
if !as.config.Enabled {
as.logger.Warn("not enabled")
return return
} }
r.logger.Debug("starting API server", "port", config.Port) as.logger.Debug("starting", "port", as.config.Port)
mux := http.NewServeMux() mux := http.NewServeMux()
mux.HandleFunc("/ws", r.handleWebsocket) mux.HandleFunc("/ws", as.handleWebsocket)
mux.HandleFunc("/health", r.handleHealthHTTP) mux.HandleFunc("/health", as.handleHealthHTTP)
mux.HandleFunc("/api/v1/config", r.handleConfigHTTP) mux.HandleFunc("/api/v1/config", as.handleConfigHTTP)
mux.HandleFunc("/schema/config.schema.json", handleConfigSchema) mux.HandleFunc("/schema/config.schema.json", handleConfigSchema)
mux.HandleFunc("/schema/routes.schema.json", handleRoutesSchema) mux.HandleFunc("/schema/routes.schema.json", handleRoutesSchema)
mux.HandleFunc("/schema/modules.schema.json", handleModulesSchema) mux.HandleFunc("/schema/modules.schema.json", handleModulesSchema)
mux.HandleFunc("/schema/processors.schema.json", handleProcessorsSchema) mux.HandleFunc("/schema/processors.schema.json", handleProcessorsSchema)
r.apiServerMu.Lock() as.serverMu.Lock()
defer r.apiServerMu.Unlock() defer as.serverMu.Unlock()
r.apiServer = &http.Server{ as.server = &http.Server{
Addr: fmt.Sprintf(":%d", config.Port), Addr: fmt.Sprintf(":%d", as.config.Port),
Handler: mux, Handler: mux,
} }
go func() { go func() {
r.apiServer.ListenAndServe() as.server.ListenAndServe()
r.apiServerShutdown() as.shutdown()
}() }()
} }
func (r *Router) stopAPIServer() { func (as *ApiServer) Stop() {
if r.apiServer == nil { if as.server == nil {
return return
} }
r.logger.Debug("stopping API server") as.logger.Debug("stopping")
r.apiServerMu.Lock() as.serverMu.Lock()
defer r.apiServerMu.Unlock() defer as.serverMu.Unlock()
if r.apiServer != nil { if as.server != nil {
apiShutdownCtx, apiShutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) apiShutdownCtx, apiShutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
r.apiServerShutdown = apiShutdownCancel as.shutdown = apiShutdownCancel
r.apiServer.Shutdown(apiShutdownCtx) as.server.Shutdown(apiShutdownCtx)
<-apiShutdownCtx.Done() <-apiShutdownCtx.Done()
r.apiServer = nil as.server = nil
} }
} }
func (r *Router) handleHealthHTTP(w http.ResponseWriter, req *http.Request) { func (as *ApiServer) handleHealthHTTP(w http.ResponseWriter, req *http.Request) {
switch req.Method { switch req.Method {
case http.MethodGet: case http.MethodGet:
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*")
@@ -75,11 +95,11 @@ func (r *Router) handleHealthHTTP(w http.ResponseWriter, req *http.Request) {
} }
} }
func (r *Router) handleConfigHTTP(w http.ResponseWriter, req *http.Request) { func (as *ApiServer) handleConfigHTTP(w http.ResponseWriter, req *http.Request) {
switch req.Method { switch req.Method {
case http.MethodGet: case http.MethodGet:
configJSON, err := json.Marshal(r.runningConfig) configJSON, err := json.Marshal(as.configurableRouter.GetRunningConfig())
if err != nil { if err != nil {
http.Error(w, "Internal server error", http.StatusInternalServerError) http.Error(w, "Internal server error", http.StatusInternalServerError)
return return
@@ -89,10 +109,6 @@ func (r *Router) handleConfigHTTP(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.Write(configJSON) w.Write(configJSON)
case http.MethodPut: case http.MethodPut:
if r.updatingConfig {
http.Error(w, "Config update in progress.", http.StatusConflict)
return
}
//TODO(jwetzell): again way too much marshaling //TODO(jwetzell): again way too much marshaling
cfgBytes, err := io.ReadAll(req.Body) cfgBytes, err := io.ReadAll(req.Body)
if err != nil { if err != nil {
@@ -132,11 +148,15 @@ func (r *Router) handleConfigHTTP(w http.ResponseWriter, req *http.Request) {
http.Error(w, "Bad request", http.StatusBadRequest) http.Error(w, "Bad request", http.StatusBadRequest)
return return
} }
moduleErrors, routeErrors := r.UpdateConfig(newConfig) err, moduleErrors, routeErrors := as.configurableRouter.UpdateConfig(newConfig, true)
if err != nil {
http.Error(w, err.Error(), http.StatusConflict)
return
}
if len(moduleErrors) > 0 || len(routeErrors) > 0 { if len(moduleErrors) > 0 || len(routeErrors) > 0 {
errorResponse := struct { errorResponse := struct {
ModuleErrors []module.ModuleError `json:"moduleErrors,omitempty"` ModuleErrors []config.ModuleError `json:"moduleErrors,omitempty"`
RouteErrors []route.RouteError `json:"routeErrors,omitempty"` RouteErrors []config.RouteError `json:"routeErrors,omitempty"`
}{ }{
ModuleErrors: moduleErrors, ModuleErrors: moduleErrors,
RouteErrors: routeErrors, RouteErrors: routeErrors,
@@ -154,7 +174,6 @@ func (r *Router) handleConfigHTTP(w http.ResponseWriter, req *http.Request) {
} }
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*")
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
r.ConfigChange <- newConfig
case http.MethodOptions: case http.MethodOptions:
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, PUT, OPTIONS") w.Header().Set("Access-Control-Allow-Methods", "GET, PUT, OPTIONS")

82
internal/api/websocket.go Normal file
View File

@@ -0,0 +1,82 @@
package api
import (
"encoding/json"
"net/http"
"github.com/gorilla/websocket"
"github.com/jwetzell/showbridge-go/internal/common"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type WebsocketEventDestination struct {
conn *websocket.Conn
}
func (d WebsocketEventDestination) Send(event common.Event) error {
eventJSON, err := event.ToJSON()
if err != nil {
return err
}
return d.conn.WriteMessage(websocket.TextMessage, eventJSON)
}
func (d WebsocketEventDestination) Is(dest common.EventDestination) bool {
other, ok := dest.(WebsocketEventDestination)
if !ok {
return false
}
return d.conn == other.conn
}
func (as *ApiServer) handleWebsocket(w http.ResponseWriter, req *http.Request) {
conn, err := upgrader.Upgrade(w, req, nil)
if err != nil {
as.logger.Error("websocket upgrade error", "error", err)
return
}
defer conn.Close()
eventDestination := WebsocketEventDestination{conn: conn}
as.eventRouter.AddEventDestination(eventDestination)
READ_LOOP:
for {
messageType, message, err := conn.ReadMessage()
if err != nil {
_, ok := err.(*websocket.CloseError)
if ok {
break READ_LOOP
}
}
switch messageType {
case websocket.TextMessage, websocket.BinaryMessage:
event := common.Event{}
err = json.Unmarshal(message, &event)
if err != nil {
as.logger.Error("websocket message unmarshal error", "error", err)
continue
}
as.eventRouter.HandleEvent(event, WebsocketEventDestination{conn: conn})
case websocket.CloseMessage:
break READ_LOOP
case websocket.PingMessage:
err = conn.WriteMessage(websocket.PongMessage, nil)
if err != nil {
as.logger.Error("websocket pong error", "error", err)
}
default:
as.logger.Warn("unsupported websocket message type", "type", messageType)
continue
}
}
//NOTE(jwetzell): remove ws connection
as.eventRouter.RemoveEventDestination(eventDestination)
}

26
internal/common/events.go Normal file
View File

@@ -0,0 +1,26 @@
package common
import (
"encoding/json"
)
type Event struct {
Type string `json:"type"`
Data any `json:"data,omitempty"`
Error string `json:"error,omitempty"`
}
func (e Event) ToJSON() ([]byte, error) {
return json.Marshal(e)
}
type EventDestination interface {
Send(event Event) error
Is(dest EventDestination) bool
}
type EventRouter interface {
HandleEvent(event Event, source EventDestination)
AddEventDestination(dest EventDestination)
RemoveEventDestination(dest EventDestination)
}

View File

@@ -1,6 +1,8 @@
package common package common
import "context" import (
"context"
)
type RouteIO interface { type RouteIO interface {
HandleInput(ctx context.Context, sourceId string, payload any) (bool, []RouteIOError) HandleInput(ctx context.Context, sourceId string, payload any) (bool, []RouteIOError)

View File

@@ -5,3 +5,8 @@ type Config struct {
Modules []ModuleConfig `json:"modules"` Modules []ModuleConfig `json:"modules"`
Routes []RouteConfig `json:"routes"` Routes []RouteConfig `json:"routes"`
} }
type Configurable interface {
UpdateConfig(newConfig Config, triggerChangeChannel bool) (error, []ModuleError, []RouteError)
GetRunningConfig() Config
}

View File

@@ -5,3 +5,9 @@ type ModuleConfig struct {
Type string `json:"type"` Type string `json:"type"`
Params Params `json:"params,omitempty"` Params Params `json:"params,omitempty"`
} }
type ModuleError struct {
Index int `json:"index"`
Config ModuleConfig `json:"config"`
Error string `json:"error"`
}

View File

@@ -4,3 +4,9 @@ type RouteConfig struct {
Input string `json:"input"` Input string `json:"input"`
Processors []ProcessorConfig `json:"processors"` Processors []ProcessorConfig `json:"processors"`
} }
type RouteError struct {
Index int `json:"index"`
Config RouteConfig `json:"config"`
Error string `json:"error"`
}

View File

@@ -10,12 +10,6 @@ import (
"github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/config"
) )
type ModuleError struct {
Index int `json:"index"`
Config config.ModuleConfig `json:"config"`
Error string `json:"error"`
}
type ModuleRegistration struct { type ModuleRegistration struct {
Type string `json:"type"` Type string `json:"type"`
Title string `json:"title,omitempty"` Title string `json:"title,omitempty"`

View File

@@ -13,11 +13,6 @@ import (
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
) )
type RouteError struct {
Index int `json:"index"`
Config config.RouteConfig `json:"config"`
Error string `json:"error"`
}
type Route struct { type Route struct {
input string input string
processors []processor.Processor processors []processor.Processor

131
router.go
View File

@@ -4,11 +4,9 @@ import (
"context" "context"
"errors" "errors"
"log/slog" "log/slog"
"net/http"
"reflect"
"sync" "sync"
"github.com/gorilla/websocket" "github.com/jwetzell/showbridge-go/internal/api"
"github.com/jwetzell/showbridge-go/internal/common" "github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/module" "github.com/jwetzell/showbridge-go/internal/module"
@@ -20,25 +18,22 @@ import (
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
) )
// TODO(jwetzell): can/should this be split into different components? // TODO(jwetzell): can/should this be split into different "components"?
type Router struct { type Router struct {
contextCancel context.CancelFunc contextCancel context.CancelFunc
Context context.Context Context context.Context
// TODO(jwetzell): do these need to be guarded against concurrency? // TODO(jwetzell): do these need to be guarded against concurrency?
ModuleInstances map[string]common.Module ModuleInstances map[string]common.Module
// TODO(jwetzell): change to something easier to lookup // TODO(jwetzell): change to something easier to lookup
RouteInstances []*route.Route RouteInstances []*route.Route
ConfigChange chan config.Config ConfigChange chan config.Config
moduleWait sync.WaitGroup moduleWait sync.WaitGroup
logger *slog.Logger logger *slog.Logger
runningConfig config.Config runningConfig config.Config
runningConfigMu sync.RWMutex runningConfigMu sync.RWMutex
wsConns []*websocket.Conn apiServer *api.ApiServer
wsConnsMu sync.Mutex eventDestinations []common.EventDestination
apiServer *http.Server eventDestinationsMu sync.Mutex
apiServerMu sync.Mutex
apiServerShutdown context.CancelFunc
updatingConfig bool
} }
func (r *Router) addModule(moduleDecl config.ModuleConfig) error { func (r *Router) addModule(moduleDecl config.ModuleConfig) error {
@@ -115,7 +110,7 @@ func (r *Router) getModule(moduleId string) common.Module {
return moduleInstance return moduleInstance
} }
func NewRouter(routerConfig config.Config) (*Router, []module.ModuleError, []route.RouteError) { func NewRouter(routerConfig config.Config) (*Router, []config.ModuleError, []config.RouteError) {
router := Router{ router := Router{
ModuleInstances: make(map[string]common.Module), ModuleInstances: make(map[string]common.Module),
@@ -123,20 +118,19 @@ func NewRouter(routerConfig config.Config) (*Router, []module.ModuleError, []rou
ConfigChange: make(chan config.Config, 1), ConfigChange: make(chan config.Config, 1),
logger: slog.Default().With("component", "router"), logger: slog.Default().With("component", "router"),
runningConfig: routerConfig, runningConfig: routerConfig,
updatingConfig: false,
} }
router.logger.Debug("creating") router.logger.Debug("creating")
var moduleErrors []module.ModuleError var moduleErrors []config.ModuleError
for moduleIndex, moduleDecl := range routerConfig.Modules { for moduleIndex, moduleDecl := range routerConfig.Modules {
err := router.addModule(moduleDecl) err := router.addModule(moduleDecl)
if err != nil { if err != nil {
if moduleErrors == nil { if moduleErrors == nil {
moduleErrors = []module.ModuleError{} moduleErrors = []config.ModuleError{}
} }
moduleErrors = append(moduleErrors, module.ModuleError{ moduleErrors = append(moduleErrors, config.ModuleError{
Index: moduleIndex, Index: moduleIndex,
Config: moduleDecl, Config: moduleDecl,
Error: err.Error(), Error: err.Error(),
@@ -146,14 +140,14 @@ func NewRouter(routerConfig config.Config) (*Router, []module.ModuleError, []rou
} }
var routeErrors []route.RouteError var routeErrors []config.RouteError
for routeIndex, routeDecl := range routerConfig.Routes { for routeIndex, routeDecl := range routerConfig.Routes {
err := router.addRoute(routeDecl) err := router.addRoute(routeDecl)
if err != nil { if err != nil {
if routeErrors == nil { if routeErrors == nil {
routeErrors = []route.RouteError{} routeErrors = []config.RouteError{}
} }
routeErrors = append(routeErrors, route.RouteError{ routeErrors = append(routeErrors, config.RouteError{
Index: routeIndex, Index: routeIndex,
Config: routeDecl, Config: routeDecl,
Error: err.Error(), Error: err.Error(),
@@ -162,6 +156,10 @@ func NewRouter(routerConfig config.Config) (*Router, []module.ModuleError, []rou
} }
} }
apiServer := api.NewApiServer(&router, &router)
router.apiServer = apiServer
return &router, moduleErrors, routeErrors return &router, moduleErrors, routeErrors
} }
@@ -171,10 +169,10 @@ func (r *Router) Start(ctx context.Context) {
r.Context = routerContext r.Context = routerContext
r.contextCancel = cancel r.contextCancel = cancel
r.startModules() r.startModules()
r.startAPIServer(r.runningConfig.Api) r.apiServer.Start(r.GetRunningConfig().Api)
<-r.Context.Done() <-r.Context.Done()
r.logger.Debug("shutting down api server") r.logger.Debug("shutting down api server")
r.stopAPIServer() r.apiServer.Stop()
r.logger.Debug("waiting for modules to exit") r.logger.Debug("waiting for modules to exit")
r.moduleWait.Wait() r.moduleWait.Wait()
r.logger.Info("done") r.logger.Info("done")
@@ -194,7 +192,7 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any)
var routeIOErrors []common.RouteIOError var routeIOErrors []common.RouteIOError
routeFound := false routeFound := false
r.broadcastEvent(Event{ r.broadcastEvent(common.Event{
Type: "input", Type: "input",
Data: map[string]any{ Data: map[string]any{
"source": sourceId, "source": sourceId,
@@ -227,7 +225,7 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any)
Index: routeIndex, Index: routeIndex,
ProcessError: err, ProcessError: err,
}) })
r.broadcastEvent(Event{ r.broadcastEvent(common.Event{
Type: "route", Type: "route",
Data: map[string]any{ Data: map[string]any{
"index": routeIndex, "index": routeIndex,
@@ -236,7 +234,7 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any)
}) })
return return
} }
r.broadcastEvent(Event{ r.broadcastEvent(common.Event{
Type: "route", Type: "route",
Data: map[string]any{ Data: map[string]any{
"index": routeIndex, "index": routeIndex,
@@ -253,7 +251,7 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any)
func (r *Router) HandleOutput(ctx context.Context, destinationId string, payload any) error { func (r *Router) HandleOutput(ctx context.Context, destinationId string, payload any) error {
spanCtx, span := otel.Tracer("router").Start(ctx, "output", trace.WithAttributes(attribute.String("destination.id", destinationId))) spanCtx, span := otel.Tracer("router").Start(ctx, "output", trace.WithAttributes(attribute.String("destination.id", destinationId)))
defer span.End() defer span.End()
outputEvent := Event{ outputEvent := common.Event{
Type: "output", Type: "output",
Data: map[string]any{ Data: map[string]any{
"destination": destinationId, "destination": destinationId,
@@ -310,76 +308,3 @@ func (r *Router) startModules() {
} }
} }
} }
func (r *Router) RunningConfig() config.Config {
r.runningConfigMu.RLock()
defer r.runningConfigMu.RLock()
return r.runningConfig
}
func (r *Router) UpdateConfig(newConfig config.Config) ([]module.ModuleError, []route.RouteError) {
r.runningConfigMu.Lock()
defer r.runningConfigMu.Unlock()
r.updatingConfig = true
defer func() {
r.updatingConfig = false
}()
oldConfig := r.runningConfig
r.logger.Debug("received config update", "oldConfig", oldConfig, "newConfig", newConfig)
if !reflect.DeepEqual(oldConfig.Api, newConfig.Api) {
r.logger.Info("applying new API config")
r.stopAPIServer()
r.startAPIServer(newConfig.Api)
r.runningConfig.Api = newConfig.Api
}
// TODO(jwetzell): handle config update errors better
for _, moduleInstance := range r.ModuleInstances {
moduleInstance.Stop()
}
r.logger.Debug("waiting for modules to exit")
r.moduleWait.Wait()
r.ModuleInstances = make(map[string]common.Module)
r.RouteInstances = []*route.Route{}
var moduleErrors []module.ModuleError
for moduleIndex, moduleDecl := range newConfig.Modules {
err := r.addModule(moduleDecl)
if err != nil {
if moduleErrors == nil {
moduleErrors = []module.ModuleError{}
}
moduleErrors = append(moduleErrors, module.ModuleError{
Index: moduleIndex,
Config: moduleDecl,
Error: err.Error(),
})
continue
}
}
var routeErrors []route.RouteError
for routeIndex, routeDecl := range newConfig.Routes {
err := r.addRoute(routeDecl)
if err != nil {
if routeErrors == nil {
routeErrors = []route.RouteError{}
}
routeErrors = append(routeErrors, route.RouteError{
Index: routeIndex,
Config: routeDecl,
Error: err.Error(),
})
continue
}
}
r.runningConfig = newConfig
r.startModules()
return moduleErrors, routeErrors
}

View File

@@ -91,8 +91,8 @@ func TestNewRouter(t *testing.T) {
t.Fatalf("router should not have returned any route errors: %v", routeErrors) t.Fatalf("router should not have returned any route errors: %v", routeErrors)
} }
if !reflect.DeepEqual(routerConfig, router.RunningConfig()) { if !reflect.DeepEqual(routerConfig, router.GetRunningConfig()) {
t.Fatalf("router running config did not match expected, got: %v, expected: %v", router.RunningConfig(), routerConfig) t.Fatalf("router running config did not match expected, got: %v, expected: %v", router.GetRunningConfig(), routerConfig)
} }
} }

View File

@@ -1,68 +0,0 @@
package showbridge
import (
"encoding/json"
"net/http"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func (r *Router) handleWebsocket(w http.ResponseWriter, req *http.Request) {
conn, err := upgrader.Upgrade(w, req, nil)
if err != nil {
r.logger.Error("websocket upgrade error", "error", err)
return
}
defer conn.Close()
r.wsConnsMu.Lock()
r.wsConns = append(r.wsConns, conn)
r.wsConnsMu.Unlock()
READ_LOOP:
for {
messageType, message, err := conn.ReadMessage()
if err != nil {
_, ok := err.(*websocket.CloseError)
if ok {
break READ_LOOP
}
}
switch messageType {
case websocket.TextMessage, websocket.BinaryMessage:
event := Event{}
err = json.Unmarshal(message, &event)
if err != nil {
r.logger.Error("websocket message unmarshal error", "error", err)
continue
}
r.handleEvent(event, conn)
case websocket.CloseMessage:
break READ_LOOP
case websocket.PingMessage:
err = conn.WriteMessage(websocket.PongMessage, nil)
if err != nil {
r.logger.Error("websocket pong error", "error", err)
}
default:
r.logger.Warn("unsupported websocket message type", "type", messageType)
continue
}
}
//NOTE(jwetzell): remove ws connection
r.wsConnsMu.Lock()
for i, c := range r.wsConns {
if c == conn {
r.wsConns = append(r.wsConns[:i], r.wsConns[i+1:]...)
break
}
}
r.wsConnsMu.Unlock()
}