diff --git a/api.go b/api.go new file mode 100644 index 0000000..7b408eb --- /dev/null +++ b/api.go @@ -0,0 +1,46 @@ +package showbridge + +import ( + "embed" + _ "embed" + "encoding/json" + "fmt" + "net/http" +) + +func (r *Router) handleConfigHTTP(w http.ResponseWriter, req *http.Request) { + if req.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + configJSON, err := json.Marshal(r.runningConfig) + if err != nil { + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Content-Type", "application/json") + w.Write(configJSON) +} + +//go:embed schema +var schema embed.FS + +func (r *Router) handleSchemaHTTP(w http.ResponseWriter, req *http.Request) { + if req.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + schemaName := req.PathValue("schema") + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Content-Type", "application/json") + configSchema, err := schema.ReadFile(fmt.Sprintf("schema/%s.schema.json", schemaName)) + if err != nil { + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + w.Write(configSchema) +} diff --git a/events.go b/events.go new file mode 100644 index 0000000..24a497a --- /dev/null +++ b/events.go @@ -0,0 +1,42 @@ +package showbridge + +import ( + "encoding/json" + + "github.com/gorilla/websocket" +) + +type Event struct { + Type string `json:"type"` + Data any `json:"data,omitempty"` + Error string `json:"error,omitempty"` +} + +func (e Event) toJSON() ([]byte, error) { + return json.Marshal(e) +} + +func (r *Router) handleEvent(event Event) { + switch event.Type { + case "ping": + r.broadcastEvent(Event{Type: "pong"}) + default: + r.logger.Warn("unknown event type", "eventType", event.Type) + } +} + +func (r *Router) broadcastEvent(event Event) { + eventJSON, err := event.toJSON() + if err != nil { + r.logger.Error("failed to marshal event to JSON", "error", err) + return + } + r.wsConnsMu.Lock() + defer r.wsConnsMu.Unlock() + for _, conn := range r.wsConns { + err := conn.WriteMessage(websocket.TextMessage, eventJSON) + if err != nil { + r.logger.Error("failed to write message to websocket connection", "error", err) + } + } +} diff --git a/internal/common/routing.go b/internal/common/routing.go index 70bd17e..555350e 100644 --- a/internal/common/routing.go +++ b/internal/common/routing.go @@ -8,8 +8,8 @@ type RouteIO interface { } type RouteIOError struct { - Index int - OutputError error - ProcessError error - InputError error + Index int `json:"index"` + OutputError error `json:"outputError"` + ProcessError error `json:"processError"` + InputError error `json:"inputError"` } diff --git a/internal/config/config.go b/internal/config/config.go index fc1b5eb..6878187 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1,14 +1,18 @@ package config type Config struct { + Api ApiConfig `json:"api"` Modules []ModuleConfig `json:"modules"` Routes []RouteConfig `json:"routes"` } +type ApiConfig struct { + Port int `json:"port"` +} type ModuleConfig struct { Id string `json:"id"` Type string `json:"type"` - Params Params `json:"params"` + Params Params `json:"params,omitempty"` } type RouteConfig struct { @@ -18,5 +22,5 @@ type RouteConfig struct { type ProcessorConfig struct { Type string `json:"type"` - Params Params `json:"params"` + Params Params `json:"params,omitempty"` } diff --git a/router.go b/router.go index c4d28f6..a12e19e 100644 --- a/router.go +++ b/router.go @@ -3,9 +3,13 @@ package showbridge import ( "context" "errors" + "fmt" "log/slog" + "net/http" "sync" + "time" + "github.com/gorilla/websocket" "github.com/jwetzell/showbridge-go/internal/common" "github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/module" @@ -26,6 +30,9 @@ type Router struct { moduleWait sync.WaitGroup logger *slog.Logger runningConfig config.Config + wsConns []*websocket.Conn + wsConnsMu sync.Mutex + apiServer *http.Server } func (r *Router) addModule(moduleDecl config.ModuleConfig) error { @@ -164,7 +171,23 @@ func (r *Router) Start(ctx context.Context) { r.logger.Error("error starting module", "moduleId", moduleId, "error", err) } } + apiShutdownCtx, apiShutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) + + go func() { + r.apiServer = &http.Server{ + Addr: fmt.Sprintf(":%d", r.runningConfig.Api.Port), + } + http.HandleFunc("/ws", r.handleWebsocket) + http.HandleFunc("/api/v1/config", r.handleConfigHTTP) + http.HandleFunc("/api/v1/schema/{schema}", r.handleSchemaHTTP) + r.logger.Debug("starting api server", "port", r.runningConfig.Api.Port) + r.apiServer.ListenAndServe() + apiShutdownCancel() + }() <-r.Context.Done() + r.logger.Debug("shutting down api server") + r.apiServer.Shutdown(apiShutdownCtx) + <-apiShutdownCtx.Done() r.logger.Debug("waiting for modules to exit") r.moduleWait.Wait() r.logger.Info("done") @@ -181,6 +204,13 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) var routeIOErrors []common.RouteIOError routeFound := false + r.broadcastEvent(Event{ + Type: "input", + Data: map[string]any{ + "source": sourceId, + }, + }) + var routeWaitGroup sync.WaitGroup for routeIndex, routeInstance := range r.RouteInstances { @@ -207,8 +237,21 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) Index: routeIndex, ProcessError: err, }) + r.broadcastEvent(Event{ + Type: "route", + Data: map[string]any{ + "index": routeIndex, + }, + Error: err.Error(), + }) return } + r.broadcastEvent(Event{ + Type: "route", + Data: map[string]any{ + "index": routeIndex, + }, + }) routeSpan.End() }) } @@ -220,7 +263,12 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) func (r *Router) HandleOutput(ctx context.Context, destinationId string, payload any) error { spanCtx, span := otel.Tracer("router").Start(ctx, "output", trace.WithAttributes(attribute.String("destination.id", destinationId))) defer span.End() - + outputEvent := Event{ + Type: "output", + Data: map[string]any{ + "destination": destinationId, + }, + } destinationModule := r.getModule(destinationId) if destinationModule == nil { @@ -228,6 +276,8 @@ func (r *Router) HandleOutput(ctx context.Context, destinationId string, payload span.SetStatus(codes.Error, err.Error()) span.RecordError(err) r.logger.Error("no module found for destination id", "destinationId", destinationId) + outputEvent.Error = err.Error() + r.broadcastEvent(outputEvent) return err } @@ -238,11 +288,13 @@ func (r *Router) HandleOutput(ctx context.Context, destinationId string, payload moduleOutputSpan.SetStatus(codes.Error, err.Error()) moduleOutputSpan.RecordError(err) r.logger.ErrorContext(moduleOutputCtx, "module output encountered error", "module", destinationModule.Id(), "error", err) + outputEvent.Error = err.Error() + r.broadcastEvent(outputEvent) return err } else { moduleOutputSpan.SetStatus(codes.Ok, "module output successful") } - + r.broadcastEvent(outputEvent) return nil } diff --git a/schema/config.schema.json b/schema/config.schema.json index bfc34c3..7046004 100644 --- a/schema/config.schema.json +++ b/schema/config.schema.json @@ -5,6 +5,16 @@ "description": "showbridge configuration", "type": "object", "properties": { + "api": { + "type": "object", + "properties": { + "port": { + "type": "integer", + "description": "Port for the API server to listen on" + } + }, + "required": ["port"] + }, "modules": { "$ref": "https://showbridge.io/modules.schema.json" }, diff --git a/websocket.go b/websocket.go new file mode 100644 index 0000000..215f932 --- /dev/null +++ b/websocket.go @@ -0,0 +1,50 @@ +package showbridge + +import ( + "encoding/json" + "net/http" + + "github.com/gorilla/websocket" +) + +var upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, +} + +func (r *Router) handleWebsocket(w http.ResponseWriter, req *http.Request) { + conn, err := upgrader.Upgrade(w, req, nil) + if err != nil { + r.logger.Error("websocket upgrade error", "error", err) + return + } + defer conn.Close() + + r.wsConnsMu.Lock() + r.wsConns = append(r.wsConns, conn) + r.wsConnsMu.Unlock() + for { + _, message, err := conn.ReadMessage() + if err != nil { + r.logger.Error("websocket read error", "error", err) + break + } + + event := Event{} + err = json.Unmarshal(message, &event) + if err != nil { + r.logger.Error("websocket message unmarshal error", "error", err) + continue + } + r.handleEvent(event) + } + r.wsConnsMu.Lock() + for i, c := range r.wsConns { + if c == conn { + r.wsConns = append(r.wsConns[:i], r.wsConns[i+1:]...) + break + } + } + r.wsConnsMu.Unlock() +}