diff --git a/cmd/showbridge/main.go b/cmd/showbridge/main.go index 05f1204..a7ff6e0 100644 --- a/cmd/showbridge/main.go +++ b/cmd/showbridge/main.go @@ -14,8 +14,6 @@ import ( "github.com/jwetzell/showbridge-go" "github.com/jwetzell/showbridge-go/internal/config" - "github.com/jwetzell/showbridge-go/internal/module" - "github.com/jwetzell/showbridge-go/internal/route" "github.com/jwetzell/showbridge-go/internal/schema" "github.com/urfave/cli/v3" "go.opentelemetry.io/otel" @@ -262,7 +260,12 @@ func (app *showbridgeApp) handleChannels() { app.routerMutex.Unlock() continue } - moduleErrors, routeErrors := app.router.UpdateConfig(config) + err, moduleErrors, routeErrors := app.router.UpdateConfig(config, false) + if err != nil { + app.logger.Error("failed to update router config", "error", err) + app.routerMutex.Unlock() + continue + } app.logConfigErrors(moduleErrors, routeErrors) app.logger.Info("configuration reloaded successfully") app.routerMutex.Unlock() @@ -280,7 +283,7 @@ func (app *showbridgeApp) handleChannels() { } } -func (app *showbridgeApp) logConfigErrors(moduleErrors []module.ModuleError, routeErrors []route.RouteError) { +func (app *showbridgeApp) logConfigErrors(moduleErrors []config.ModuleError, routeErrors []config.RouteError) { for _, moduleError := range moduleErrors { app.logger.Error("problem initializing module", "index", moduleError.Index, "error", moduleError.Error) } diff --git a/config.go b/config.go new file mode 100644 index 0000000..e6d829e --- /dev/null +++ b/config.go @@ -0,0 +1,85 @@ +package showbridge + +import ( + "errors" + "reflect" + + "github.com/jwetzell/showbridge-go/internal/common" + "github.com/jwetzell/showbridge-go/internal/config" + "github.com/jwetzell/showbridge-go/internal/route" +) + +func (r *Router) GetRunningConfig() config.Config { + r.runningConfigMu.RLock() + defer r.runningConfigMu.RUnlock() + return r.runningConfig +} + +func (r *Router) UpdateConfig(newConfig config.Config, triggerChangeChan bool) (error, []config.ModuleError, []config.RouteError) { + if !r.runningConfigMu.TryLock() { + return errors.New("config update in progress"), nil, nil + } + defer r.runningConfigMu.Unlock() + oldConfig := r.runningConfig + r.logger.Debug("received config update", "oldConfig", oldConfig, "newConfig", newConfig) + + if !reflect.DeepEqual(oldConfig.Api, newConfig.Api) { + r.logger.Info("applying new API config") + r.apiServer.Stop() + r.apiServer.Start(newConfig.Api) + r.runningConfig.Api = newConfig.Api + } + + // TODO(jwetzell): handle config update errors better + for _, moduleInstance := range r.ModuleInstances { + moduleInstance.Stop() + } + r.logger.Debug("waiting for modules to exit") + r.moduleWait.Wait() + + r.ModuleInstances = make(map[string]common.Module) + r.RouteInstances = []*route.Route{} + + var moduleErrors []config.ModuleError + + for moduleIndex, moduleDecl := range newConfig.Modules { + + err := r.addModule(moduleDecl) + if err != nil { + if moduleErrors == nil { + moduleErrors = []config.ModuleError{} + } + moduleErrors = append(moduleErrors, config.ModuleError{ + Index: moduleIndex, + Config: moduleDecl, + Error: err.Error(), + }) + continue + } + + } + + var routeErrors []config.RouteError + for routeIndex, routeDecl := range newConfig.Routes { + err := r.addRoute(routeDecl) + if err != nil { + if routeErrors == nil { + routeErrors = []config.RouteError{} + } + routeErrors = append(routeErrors, config.RouteError{ + Index: routeIndex, + Config: routeDecl, + Error: err.Error(), + }) + continue + } + } + r.runningConfig = newConfig + r.startModules() + + if triggerChangeChan { + r.ConfigChange <- newConfig + } + + return nil, moduleErrors, routeErrors +} diff --git a/events.go b/events.go index 0d60517..47cbdb3 100644 --- a/events.go +++ b/events.go @@ -1,64 +1,53 @@ package showbridge import ( - "encoding/json" + "time" - "github.com/gorilla/websocket" + "github.com/jwetzell/showbridge-go/internal/common" ) -type Event struct { - Type string `json:"type"` - Data any `json:"data,omitempty"` - Error string `json:"error,omitempty"` -} - -func (e Event) toJSON() ([]byte, error) { - return json.Marshal(e) -} - -func (r *Router) handleEvent(event Event, sender *websocket.Conn) { +func (r *Router) HandleEvent(event common.Event, sender common.EventDestination) { switch event.Type { case "ping": - r.unicastEvent(Event{Type: "pong"}, sender) + r.unicastEvent(common.Event{Type: "pong", Data: map[string]any{ + "timestamp": time.Now().UnixMilli(), + }}, sender) default: r.logger.Warn("unknown event type", "eventType", event.Type) } } -func (r *Router) unicastEvent(event Event, conn *websocket.Conn) { - eventJSON, err := event.toJSON() - if err != nil { - r.logger.Error("failed to marshal event to JSON", "error", err) - return - } - err = conn.WriteMessage(websocket.TextMessage, eventJSON) - if err != nil { - r.logger.Error("failed to write message to websocket connection", "error", err) +func (r *Router) AddEventDestination(dest common.EventDestination) { + r.eventDestinationsMu.Lock() + defer r.eventDestinationsMu.Unlock() + r.eventDestinations = append(r.eventDestinations, dest) +} + +func (r *Router) RemoveEventDestination(dest common.EventDestination) { + r.eventDestinationsMu.Lock() + defer r.eventDestinationsMu.Unlock() + for i, d := range r.eventDestinations { + if d.Is(dest) { + r.eventDestinations = append(r.eventDestinations[:i], r.eventDestinations[i+1:]...) + break + } } } -func (r *Router) broadcastEvent(event Event, excluded ...*websocket.Conn) { - eventJSON, err := event.toJSON() +func (r *Router) unicastEvent(event common.Event, dest common.EventDestination) { + err := dest.Send(event) if err != nil { - r.logger.Error("failed to marshal event to JSON", "error", err) - return + r.logger.Error("failed to send event", "error", err) } - r.wsConnsMu.Lock() - defer r.wsConnsMu.Unlock() - for _, conn := range r.wsConns { - exclude := false - for _, excludedConn := range excluded { - if conn == excludedConn { - exclude = true - break - } - } - if exclude { - continue - } - err := conn.WriteMessage(websocket.TextMessage, eventJSON) +} + +func (r *Router) broadcastEvent(event common.Event) { + r.eventDestinationsMu.Lock() + defer r.eventDestinationsMu.Unlock() + for _, dest := range r.eventDestinations { + err := dest.Send(event) if err != nil { - r.logger.Error("failed to write message to websocket connection", "error", err) + r.logger.Error("failed to send event", "error", err) } } } diff --git a/api.go b/internal/api/api.go similarity index 77% rename from api.go rename to internal/api/api.go index f3bce95..144f9c7 100644 --- a/api.go +++ b/internal/api/api.go @@ -1,4 +1,4 @@ -package showbridge +package api import ( "context" @@ -6,60 +6,80 @@ import ( "encoding/json" "fmt" "io" + "log/slog" "net/http" + "sync" "time" + "github.com/jwetzell/showbridge-go/internal/common" "github.com/jwetzell/showbridge-go/internal/config" - "github.com/jwetzell/showbridge-go/internal/module" - "github.com/jwetzell/showbridge-go/internal/route" "github.com/jwetzell/showbridge-go/internal/schema" ) -func (r *Router) startAPIServer(config config.ApiConfig) { - if !config.Enabled { - r.logger.Warn("API not enabled") +type ApiServer struct { + config config.ApiConfig + serverMu sync.Mutex + server *http.Server + shutdown context.CancelFunc + logger *slog.Logger + configurableRouter config.Configurable + eventRouter common.EventRouter +} + +func NewApiServer(configurableRouter config.Configurable, eventRouter common.EventRouter) *ApiServer { + return &ApiServer{ + configurableRouter: configurableRouter, + eventRouter: eventRouter, + logger: slog.Default().With("component", "api"), + } +} + +func (as *ApiServer) Start(config config.ApiConfig) { + as.config = config + if !as.config.Enabled { + as.logger.Warn("not enabled") return } - r.logger.Debug("starting API server", "port", config.Port) + as.logger.Debug("starting", "port", as.config.Port) mux := http.NewServeMux() - mux.HandleFunc("/ws", r.handleWebsocket) - mux.HandleFunc("/health", r.handleHealthHTTP) - mux.HandleFunc("/api/v1/config", r.handleConfigHTTP) + mux.HandleFunc("/ws", as.handleWebsocket) + mux.HandleFunc("/health", as.handleHealthHTTP) + mux.HandleFunc("/api/v1/config", as.handleConfigHTTP) mux.HandleFunc("/schema/config.schema.json", handleConfigSchema) mux.HandleFunc("/schema/routes.schema.json", handleRoutesSchema) mux.HandleFunc("/schema/modules.schema.json", handleModulesSchema) mux.HandleFunc("/schema/processors.schema.json", handleProcessorsSchema) - r.apiServerMu.Lock() - defer r.apiServerMu.Unlock() - r.apiServer = &http.Server{ - Addr: fmt.Sprintf(":%d", config.Port), + as.serverMu.Lock() + defer as.serverMu.Unlock() + as.server = &http.Server{ + Addr: fmt.Sprintf(":%d", as.config.Port), Handler: mux, } go func() { - r.apiServer.ListenAndServe() - r.apiServerShutdown() + as.server.ListenAndServe() + as.shutdown() }() } -func (r *Router) stopAPIServer() { - if r.apiServer == nil { +func (as *ApiServer) Stop() { + if as.server == nil { return } - r.logger.Debug("stopping API server") - r.apiServerMu.Lock() - defer r.apiServerMu.Unlock() - if r.apiServer != nil { + as.logger.Debug("stopping") + as.serverMu.Lock() + defer as.serverMu.Unlock() + if as.server != nil { apiShutdownCtx, apiShutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) - r.apiServerShutdown = apiShutdownCancel - r.apiServer.Shutdown(apiShutdownCtx) + as.shutdown = apiShutdownCancel + as.server.Shutdown(apiShutdownCtx) <-apiShutdownCtx.Done() - r.apiServer = nil + as.server = nil } } -func (r *Router) handleHealthHTTP(w http.ResponseWriter, req *http.Request) { +func (as *ApiServer) handleHealthHTTP(w http.ResponseWriter, req *http.Request) { switch req.Method { case http.MethodGet: w.Header().Set("Access-Control-Allow-Origin", "*") @@ -75,11 +95,11 @@ func (r *Router) handleHealthHTTP(w http.ResponseWriter, req *http.Request) { } } -func (r *Router) handleConfigHTTP(w http.ResponseWriter, req *http.Request) { +func (as *ApiServer) handleConfigHTTP(w http.ResponseWriter, req *http.Request) { switch req.Method { case http.MethodGet: - configJSON, err := json.Marshal(r.runningConfig) + configJSON, err := json.Marshal(as.configurableRouter.GetRunningConfig()) if err != nil { http.Error(w, "Internal server error", http.StatusInternalServerError) return @@ -89,10 +109,6 @@ func (r *Router) handleConfigHTTP(w http.ResponseWriter, req *http.Request) { w.Header().Set("Content-Type", "application/json") w.Write(configJSON) case http.MethodPut: - if r.updatingConfig { - http.Error(w, "Config update in progress.", http.StatusConflict) - return - } //TODO(jwetzell): again way too much marshaling cfgBytes, err := io.ReadAll(req.Body) if err != nil { @@ -132,11 +148,15 @@ func (r *Router) handleConfigHTTP(w http.ResponseWriter, req *http.Request) { http.Error(w, "Bad request", http.StatusBadRequest) return } - moduleErrors, routeErrors := r.UpdateConfig(newConfig) + err, moduleErrors, routeErrors := as.configurableRouter.UpdateConfig(newConfig, true) + if err != nil { + http.Error(w, err.Error(), http.StatusConflict) + return + } if len(moduleErrors) > 0 || len(routeErrors) > 0 { errorResponse := struct { - ModuleErrors []module.ModuleError `json:"moduleErrors,omitempty"` - RouteErrors []route.RouteError `json:"routeErrors,omitempty"` + ModuleErrors []config.ModuleError `json:"moduleErrors,omitempty"` + RouteErrors []config.RouteError `json:"routeErrors,omitempty"` }{ ModuleErrors: moduleErrors, RouteErrors: routeErrors, @@ -154,7 +174,6 @@ func (r *Router) handleConfigHTTP(w http.ResponseWriter, req *http.Request) { } w.Header().Set("Access-Control-Allow-Origin", "*") w.WriteHeader(http.StatusOK) - r.ConfigChange <- newConfig case http.MethodOptions: w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Methods", "GET, PUT, OPTIONS") diff --git a/internal/api/websocket.go b/internal/api/websocket.go new file mode 100644 index 0000000..f327647 --- /dev/null +++ b/internal/api/websocket.go @@ -0,0 +1,82 @@ +package api + +import ( + "encoding/json" + "net/http" + + "github.com/gorilla/websocket" + "github.com/jwetzell/showbridge-go/internal/common" +) + +var upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, +} + +type WebsocketEventDestination struct { + conn *websocket.Conn +} + +func (d WebsocketEventDestination) Send(event common.Event) error { + eventJSON, err := event.ToJSON() + if err != nil { + return err + } + return d.conn.WriteMessage(websocket.TextMessage, eventJSON) +} + +func (d WebsocketEventDestination) Is(dest common.EventDestination) bool { + other, ok := dest.(WebsocketEventDestination) + if !ok { + return false + } + return d.conn == other.conn +} + +func (as *ApiServer) handleWebsocket(w http.ResponseWriter, req *http.Request) { + conn, err := upgrader.Upgrade(w, req, nil) + if err != nil { + as.logger.Error("websocket upgrade error", "error", err) + return + } + defer conn.Close() + + eventDestination := WebsocketEventDestination{conn: conn} + + as.eventRouter.AddEventDestination(eventDestination) +READ_LOOP: + for { + messageType, message, err := conn.ReadMessage() + if err != nil { + _, ok := err.(*websocket.CloseError) + if ok { + break READ_LOOP + } + } + + switch messageType { + case websocket.TextMessage, websocket.BinaryMessage: + event := common.Event{} + err = json.Unmarshal(message, &event) + if err != nil { + as.logger.Error("websocket message unmarshal error", "error", err) + continue + } + as.eventRouter.HandleEvent(event, WebsocketEventDestination{conn: conn}) + case websocket.CloseMessage: + break READ_LOOP + case websocket.PingMessage: + err = conn.WriteMessage(websocket.PongMessage, nil) + if err != nil { + as.logger.Error("websocket pong error", "error", err) + } + default: + as.logger.Warn("unsupported websocket message type", "type", messageType) + continue + } + + } + //NOTE(jwetzell): remove ws connection + as.eventRouter.RemoveEventDestination(eventDestination) +} diff --git a/internal/common/events.go b/internal/common/events.go new file mode 100644 index 0000000..9e181e2 --- /dev/null +++ b/internal/common/events.go @@ -0,0 +1,26 @@ +package common + +import ( + "encoding/json" +) + +type Event struct { + Type string `json:"type"` + Data any `json:"data,omitempty"` + Error string `json:"error,omitempty"` +} + +func (e Event) ToJSON() ([]byte, error) { + return json.Marshal(e) +} + +type EventDestination interface { + Send(event Event) error + Is(dest EventDestination) bool +} + +type EventRouter interface { + HandleEvent(event Event, source EventDestination) + AddEventDestination(dest EventDestination) + RemoveEventDestination(dest EventDestination) +} diff --git a/internal/common/routing.go b/internal/common/routing.go index 555350e..8e6d317 100644 --- a/internal/common/routing.go +++ b/internal/common/routing.go @@ -1,6 +1,8 @@ package common -import "context" +import ( + "context" +) type RouteIO interface { HandleInput(ctx context.Context, sourceId string, payload any) (bool, []RouteIOError) diff --git a/internal/config/config.go b/internal/config/config.go index d07a00a..a2e76ad 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -5,3 +5,8 @@ type Config struct { Modules []ModuleConfig `json:"modules"` Routes []RouteConfig `json:"routes"` } + +type Configurable interface { + UpdateConfig(newConfig Config, triggerChangeChannel bool) (error, []ModuleError, []RouteError) + GetRunningConfig() Config +} diff --git a/internal/config/module.go b/internal/config/module.go index ae750e2..f83e183 100644 --- a/internal/config/module.go +++ b/internal/config/module.go @@ -5,3 +5,9 @@ type ModuleConfig struct { Type string `json:"type"` Params Params `json:"params,omitempty"` } + +type ModuleError struct { + Index int `json:"index"` + Config ModuleConfig `json:"config"` + Error string `json:"error"` +} diff --git a/internal/config/route.go b/internal/config/route.go index c23c3b1..66b9f6c 100644 --- a/internal/config/route.go +++ b/internal/config/route.go @@ -4,3 +4,9 @@ type RouteConfig struct { Input string `json:"input"` Processors []ProcessorConfig `json:"processors"` } + +type RouteError struct { + Index int `json:"index"` + Config RouteConfig `json:"config"` + Error string `json:"error"` +} diff --git a/internal/module/module.go b/internal/module/module.go index bd26176..6c2d00c 100644 --- a/internal/module/module.go +++ b/internal/module/module.go @@ -10,12 +10,6 @@ import ( "github.com/jwetzell/showbridge-go/internal/config" ) -type ModuleError struct { - Index int `json:"index"` - Config config.ModuleConfig `json:"config"` - Error string `json:"error"` -} - type ModuleRegistration struct { Type string `json:"type"` Title string `json:"title,omitempty"` diff --git a/internal/route/route.go b/internal/route/route.go index 5d26ded..6893843 100644 --- a/internal/route/route.go +++ b/internal/route/route.go @@ -13,11 +13,6 @@ import ( "go.opentelemetry.io/otel/trace" ) -type RouteError struct { - Index int `json:"index"` - Config config.RouteConfig `json:"config"` - Error string `json:"error"` -} type Route struct { input string processors []processor.Processor diff --git a/router.go b/router.go index e0fbfbf..aeed26a 100644 --- a/router.go +++ b/router.go @@ -4,11 +4,9 @@ import ( "context" "errors" "log/slog" - "net/http" - "reflect" "sync" - "github.com/gorilla/websocket" + "github.com/jwetzell/showbridge-go/internal/api" "github.com/jwetzell/showbridge-go/internal/common" "github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/module" @@ -20,25 +18,22 @@ import ( "go.opentelemetry.io/otel/trace" ) -// TODO(jwetzell): can/should this be split into different components? +// TODO(jwetzell): can/should this be split into different "components"? type Router struct { contextCancel context.CancelFunc Context context.Context // TODO(jwetzell): do these need to be guarded against concurrency? ModuleInstances map[string]common.Module // TODO(jwetzell): change to something easier to lookup - RouteInstances []*route.Route - ConfigChange chan config.Config - moduleWait sync.WaitGroup - logger *slog.Logger - runningConfig config.Config - runningConfigMu sync.RWMutex - wsConns []*websocket.Conn - wsConnsMu sync.Mutex - apiServer *http.Server - apiServerMu sync.Mutex - apiServerShutdown context.CancelFunc - updatingConfig bool + RouteInstances []*route.Route + ConfigChange chan config.Config + moduleWait sync.WaitGroup + logger *slog.Logger + runningConfig config.Config + runningConfigMu sync.RWMutex + apiServer *api.ApiServer + eventDestinations []common.EventDestination + eventDestinationsMu sync.Mutex } func (r *Router) addModule(moduleDecl config.ModuleConfig) error { @@ -115,7 +110,7 @@ func (r *Router) getModule(moduleId string) common.Module { return moduleInstance } -func NewRouter(routerConfig config.Config) (*Router, []module.ModuleError, []route.RouteError) { +func NewRouter(routerConfig config.Config) (*Router, []config.ModuleError, []config.RouteError) { router := Router{ ModuleInstances: make(map[string]common.Module), @@ -123,20 +118,19 @@ func NewRouter(routerConfig config.Config) (*Router, []module.ModuleError, []rou ConfigChange: make(chan config.Config, 1), logger: slog.Default().With("component", "router"), runningConfig: routerConfig, - updatingConfig: false, } router.logger.Debug("creating") - var moduleErrors []module.ModuleError + var moduleErrors []config.ModuleError for moduleIndex, moduleDecl := range routerConfig.Modules { err := router.addModule(moduleDecl) if err != nil { if moduleErrors == nil { - moduleErrors = []module.ModuleError{} + moduleErrors = []config.ModuleError{} } - moduleErrors = append(moduleErrors, module.ModuleError{ + moduleErrors = append(moduleErrors, config.ModuleError{ Index: moduleIndex, Config: moduleDecl, Error: err.Error(), @@ -146,14 +140,14 @@ func NewRouter(routerConfig config.Config) (*Router, []module.ModuleError, []rou } - var routeErrors []route.RouteError + var routeErrors []config.RouteError for routeIndex, routeDecl := range routerConfig.Routes { err := router.addRoute(routeDecl) if err != nil { if routeErrors == nil { - routeErrors = []route.RouteError{} + routeErrors = []config.RouteError{} } - routeErrors = append(routeErrors, route.RouteError{ + routeErrors = append(routeErrors, config.RouteError{ Index: routeIndex, Config: routeDecl, Error: err.Error(), @@ -162,6 +156,10 @@ func NewRouter(routerConfig config.Config) (*Router, []module.ModuleError, []rou } } + apiServer := api.NewApiServer(&router, &router) + + router.apiServer = apiServer + return &router, moduleErrors, routeErrors } @@ -171,10 +169,10 @@ func (r *Router) Start(ctx context.Context) { r.Context = routerContext r.contextCancel = cancel r.startModules() - r.startAPIServer(r.runningConfig.Api) + r.apiServer.Start(r.GetRunningConfig().Api) <-r.Context.Done() r.logger.Debug("shutting down api server") - r.stopAPIServer() + r.apiServer.Stop() r.logger.Debug("waiting for modules to exit") r.moduleWait.Wait() r.logger.Info("done") @@ -194,7 +192,7 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) var routeIOErrors []common.RouteIOError routeFound := false - r.broadcastEvent(Event{ + r.broadcastEvent(common.Event{ Type: "input", Data: map[string]any{ "source": sourceId, @@ -227,7 +225,7 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) Index: routeIndex, ProcessError: err, }) - r.broadcastEvent(Event{ + r.broadcastEvent(common.Event{ Type: "route", Data: map[string]any{ "index": routeIndex, @@ -236,7 +234,7 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) }) return } - r.broadcastEvent(Event{ + r.broadcastEvent(common.Event{ Type: "route", Data: map[string]any{ "index": routeIndex, @@ -253,7 +251,7 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) func (r *Router) HandleOutput(ctx context.Context, destinationId string, payload any) error { spanCtx, span := otel.Tracer("router").Start(ctx, "output", trace.WithAttributes(attribute.String("destination.id", destinationId))) defer span.End() - outputEvent := Event{ + outputEvent := common.Event{ Type: "output", Data: map[string]any{ "destination": destinationId, @@ -310,76 +308,3 @@ func (r *Router) startModules() { } } } - -func (r *Router) RunningConfig() config.Config { - r.runningConfigMu.RLock() - defer r.runningConfigMu.RLock() - return r.runningConfig -} - -func (r *Router) UpdateConfig(newConfig config.Config) ([]module.ModuleError, []route.RouteError) { - r.runningConfigMu.Lock() - defer r.runningConfigMu.Unlock() - r.updatingConfig = true - defer func() { - r.updatingConfig = false - }() - oldConfig := r.runningConfig - r.logger.Debug("received config update", "oldConfig", oldConfig, "newConfig", newConfig) - - if !reflect.DeepEqual(oldConfig.Api, newConfig.Api) { - r.logger.Info("applying new API config") - r.stopAPIServer() - r.startAPIServer(newConfig.Api) - r.runningConfig.Api = newConfig.Api - } - - // TODO(jwetzell): handle config update errors better - for _, moduleInstance := range r.ModuleInstances { - moduleInstance.Stop() - } - r.logger.Debug("waiting for modules to exit") - r.moduleWait.Wait() - - r.ModuleInstances = make(map[string]common.Module) - r.RouteInstances = []*route.Route{} - - var moduleErrors []module.ModuleError - - for moduleIndex, moduleDecl := range newConfig.Modules { - - err := r.addModule(moduleDecl) - if err != nil { - if moduleErrors == nil { - moduleErrors = []module.ModuleError{} - } - moduleErrors = append(moduleErrors, module.ModuleError{ - Index: moduleIndex, - Config: moduleDecl, - Error: err.Error(), - }) - continue - } - - } - - var routeErrors []route.RouteError - for routeIndex, routeDecl := range newConfig.Routes { - err := r.addRoute(routeDecl) - if err != nil { - if routeErrors == nil { - routeErrors = []route.RouteError{} - } - routeErrors = append(routeErrors, route.RouteError{ - Index: routeIndex, - Config: routeDecl, - Error: err.Error(), - }) - continue - } - } - r.runningConfig = newConfig - r.startModules() - - return moduleErrors, routeErrors -} diff --git a/router_test.go b/router_test.go index 1f3abc2..afb7b10 100644 --- a/router_test.go +++ b/router_test.go @@ -91,8 +91,8 @@ func TestNewRouter(t *testing.T) { t.Fatalf("router should not have returned any route errors: %v", routeErrors) } - if !reflect.DeepEqual(routerConfig, router.RunningConfig()) { - t.Fatalf("router running config did not match expected, got: %v, expected: %v", router.RunningConfig(), routerConfig) + if !reflect.DeepEqual(routerConfig, router.GetRunningConfig()) { + t.Fatalf("router running config did not match expected, got: %v, expected: %v", router.GetRunningConfig(), routerConfig) } } diff --git a/websocket.go b/websocket.go deleted file mode 100644 index c1520f9..0000000 --- a/websocket.go +++ /dev/null @@ -1,68 +0,0 @@ -package showbridge - -import ( - "encoding/json" - "net/http" - - "github.com/gorilla/websocket" -) - -var upgrader = websocket.Upgrader{ - CheckOrigin: func(r *http.Request) bool { - return true - }, -} - -func (r *Router) handleWebsocket(w http.ResponseWriter, req *http.Request) { - conn, err := upgrader.Upgrade(w, req, nil) - if err != nil { - r.logger.Error("websocket upgrade error", "error", err) - return - } - defer conn.Close() - - r.wsConnsMu.Lock() - r.wsConns = append(r.wsConns, conn) - r.wsConnsMu.Unlock() -READ_LOOP: - for { - messageType, message, err := conn.ReadMessage() - if err != nil { - _, ok := err.(*websocket.CloseError) - if ok { - break READ_LOOP - } - } - - switch messageType { - case websocket.TextMessage, websocket.BinaryMessage: - event := Event{} - err = json.Unmarshal(message, &event) - if err != nil { - r.logger.Error("websocket message unmarshal error", "error", err) - continue - } - r.handleEvent(event, conn) - case websocket.CloseMessage: - break READ_LOOP - case websocket.PingMessage: - err = conn.WriteMessage(websocket.PongMessage, nil) - if err != nil { - r.logger.Error("websocket pong error", "error", err) - } - default: - r.logger.Warn("unsupported websocket message type", "type", messageType) - continue - } - - } - //NOTE(jwetzell): remove ws connection - r.wsConnsMu.Lock() - for i, c := range r.wsConns { - if c == conn { - r.wsConns = append(r.wsConns[:i], r.wsConns[i+1:]...) - break - } - } - r.wsConnsMu.Unlock() -}