start work on http/ws api

This commit is contained in:
Joel Wetzell
2026-03-11 20:58:53 -05:00
parent 82ba1d5d10
commit 0f57e123ce
7 changed files with 212 additions and 8 deletions

46
api.go Normal file
View File

@@ -0,0 +1,46 @@
package showbridge
import (
"embed"
_ "embed"
"encoding/json"
"fmt"
"net/http"
)
func (r *Router) handleConfigHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
configJSON, err := json.Marshal(r.runningConfig)
if err != nil {
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "application/json")
w.Write(configJSON)
}
//go:embed schema
var schema embed.FS
func (r *Router) handleSchemaHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
schemaName := req.PathValue("schema")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "application/json")
configSchema, err := schema.ReadFile(fmt.Sprintf("schema/%s.schema.json", schemaName))
if err != nil {
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
w.Write(configSchema)
}

42
events.go Normal file
View File

@@ -0,0 +1,42 @@
package showbridge
import (
"encoding/json"
"github.com/gorilla/websocket"
)
type Event struct {
Type string `json:"type"`
Data any `json:"data,omitempty"`
Error string `json:"error,omitempty"`
}
func (e Event) toJSON() ([]byte, error) {
return json.Marshal(e)
}
func (r *Router) handleEvent(event Event) {
switch event.Type {
case "ping":
r.broadcastEvent(Event{Type: "pong"})
default:
r.logger.Warn("unknown event type", "eventType", event.Type)
}
}
func (r *Router) broadcastEvent(event Event) {
eventJSON, err := event.toJSON()
if err != nil {
r.logger.Error("failed to marshal event to JSON", "error", err)
return
}
r.wsConnsMu.Lock()
defer r.wsConnsMu.Unlock()
for _, conn := range r.wsConns {
err := conn.WriteMessage(websocket.TextMessage, eventJSON)
if err != nil {
r.logger.Error("failed to write message to websocket connection", "error", err)
}
}
}

View File

@@ -8,8 +8,8 @@ type RouteIO interface {
} }
type RouteIOError struct { type RouteIOError struct {
Index int Index int `json:"index"`
OutputError error OutputError error `json:"outputError"`
ProcessError error ProcessError error `json:"processError"`
InputError error InputError error `json:"inputError"`
} }

View File

@@ -1,14 +1,18 @@
package config package config
type Config struct { type Config struct {
Api ApiConfig `json:"api"`
Modules []ModuleConfig `json:"modules"` Modules []ModuleConfig `json:"modules"`
Routes []RouteConfig `json:"routes"` Routes []RouteConfig `json:"routes"`
} }
type ApiConfig struct {
Port int `json:"port"`
}
type ModuleConfig struct { type ModuleConfig struct {
Id string `json:"id"` Id string `json:"id"`
Type string `json:"type"` Type string `json:"type"`
Params Params `json:"params"` Params Params `json:"params,omitempty"`
} }
type RouteConfig struct { type RouteConfig struct {
@@ -18,5 +22,5 @@ type RouteConfig struct {
type ProcessorConfig struct { type ProcessorConfig struct {
Type string `json:"type"` Type string `json:"type"`
Params Params `json:"params"` Params Params `json:"params,omitempty"`
} }

View File

@@ -3,9 +3,13 @@ package showbridge
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"log/slog" "log/slog"
"net/http"
"sync" "sync"
"time"
"github.com/gorilla/websocket"
"github.com/jwetzell/showbridge-go/internal/common" "github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/module" "github.com/jwetzell/showbridge-go/internal/module"
@@ -26,6 +30,9 @@ type Router struct {
moduleWait sync.WaitGroup moduleWait sync.WaitGroup
logger *slog.Logger logger *slog.Logger
runningConfig config.Config runningConfig config.Config
wsConns []*websocket.Conn
wsConnsMu sync.Mutex
apiServer *http.Server
} }
func (r *Router) addModule(moduleDecl config.ModuleConfig) error { func (r *Router) addModule(moduleDecl config.ModuleConfig) error {
@@ -164,7 +171,23 @@ func (r *Router) Start(ctx context.Context) {
r.logger.Error("error starting module", "moduleId", moduleId, "error", err) r.logger.Error("error starting module", "moduleId", moduleId, "error", err)
} }
} }
apiShutdownCtx, apiShutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
go func() {
r.apiServer = &http.Server{
Addr: fmt.Sprintf(":%d", r.runningConfig.Api.Port),
}
http.HandleFunc("/ws", r.handleWebsocket)
http.HandleFunc("/api/v1/config", r.handleConfigHTTP)
http.HandleFunc("/api/v1/schema/{schema}", r.handleSchemaHTTP)
r.logger.Debug("starting api server", "port", r.runningConfig.Api.Port)
r.apiServer.ListenAndServe()
apiShutdownCancel()
}()
<-r.Context.Done() <-r.Context.Done()
r.logger.Debug("shutting down api server")
r.apiServer.Shutdown(apiShutdownCtx)
<-apiShutdownCtx.Done()
r.logger.Debug("waiting for modules to exit") r.logger.Debug("waiting for modules to exit")
r.moduleWait.Wait() r.moduleWait.Wait()
r.logger.Info("done") r.logger.Info("done")
@@ -181,6 +204,13 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any)
var routeIOErrors []common.RouteIOError var routeIOErrors []common.RouteIOError
routeFound := false routeFound := false
r.broadcastEvent(Event{
Type: "input",
Data: map[string]any{
"source": sourceId,
},
})
var routeWaitGroup sync.WaitGroup var routeWaitGroup sync.WaitGroup
for routeIndex, routeInstance := range r.RouteInstances { for routeIndex, routeInstance := range r.RouteInstances {
@@ -207,8 +237,21 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any)
Index: routeIndex, Index: routeIndex,
ProcessError: err, ProcessError: err,
}) })
r.broadcastEvent(Event{
Type: "route",
Data: map[string]any{
"index": routeIndex,
},
Error: err.Error(),
})
return return
} }
r.broadcastEvent(Event{
Type: "route",
Data: map[string]any{
"index": routeIndex,
},
})
routeSpan.End() routeSpan.End()
}) })
} }
@@ -220,7 +263,12 @@ func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any)
func (r *Router) HandleOutput(ctx context.Context, destinationId string, payload any) error { func (r *Router) HandleOutput(ctx context.Context, destinationId string, payload any) error {
spanCtx, span := otel.Tracer("router").Start(ctx, "output", trace.WithAttributes(attribute.String("destination.id", destinationId))) spanCtx, span := otel.Tracer("router").Start(ctx, "output", trace.WithAttributes(attribute.String("destination.id", destinationId)))
defer span.End() defer span.End()
outputEvent := Event{
Type: "output",
Data: map[string]any{
"destination": destinationId,
},
}
destinationModule := r.getModule(destinationId) destinationModule := r.getModule(destinationId)
if destinationModule == nil { if destinationModule == nil {
@@ -228,6 +276,8 @@ func (r *Router) HandleOutput(ctx context.Context, destinationId string, payload
span.SetStatus(codes.Error, err.Error()) span.SetStatus(codes.Error, err.Error())
span.RecordError(err) span.RecordError(err)
r.logger.Error("no module found for destination id", "destinationId", destinationId) r.logger.Error("no module found for destination id", "destinationId", destinationId)
outputEvent.Error = err.Error()
r.broadcastEvent(outputEvent)
return err return err
} }
@@ -238,11 +288,13 @@ func (r *Router) HandleOutput(ctx context.Context, destinationId string, payload
moduleOutputSpan.SetStatus(codes.Error, err.Error()) moduleOutputSpan.SetStatus(codes.Error, err.Error())
moduleOutputSpan.RecordError(err) moduleOutputSpan.RecordError(err)
r.logger.ErrorContext(moduleOutputCtx, "module output encountered error", "module", destinationModule.Id(), "error", err) r.logger.ErrorContext(moduleOutputCtx, "module output encountered error", "module", destinationModule.Id(), "error", err)
outputEvent.Error = err.Error()
r.broadcastEvent(outputEvent)
return err return err
} else { } else {
moduleOutputSpan.SetStatus(codes.Ok, "module output successful") moduleOutputSpan.SetStatus(codes.Ok, "module output successful")
} }
r.broadcastEvent(outputEvent)
return nil return nil
} }

View File

@@ -5,6 +5,16 @@
"description": "showbridge configuration", "description": "showbridge configuration",
"type": "object", "type": "object",
"properties": { "properties": {
"api": {
"type": "object",
"properties": {
"port": {
"type": "integer",
"description": "Port for the API server to listen on"
}
},
"required": ["port"]
},
"modules": { "modules": {
"$ref": "https://showbridge.io/modules.schema.json" "$ref": "https://showbridge.io/modules.schema.json"
}, },

50
websocket.go Normal file
View File

@@ -0,0 +1,50 @@
package showbridge
import (
"encoding/json"
"net/http"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func (r *Router) handleWebsocket(w http.ResponseWriter, req *http.Request) {
conn, err := upgrader.Upgrade(w, req, nil)
if err != nil {
r.logger.Error("websocket upgrade error", "error", err)
return
}
defer conn.Close()
r.wsConnsMu.Lock()
r.wsConns = append(r.wsConns, conn)
r.wsConnsMu.Unlock()
for {
_, message, err := conn.ReadMessage()
if err != nil {
r.logger.Error("websocket read error", "error", err)
break
}
event := Event{}
err = json.Unmarshal(message, &event)
if err != nil {
r.logger.Error("websocket message unmarshal error", "error", err)
continue
}
r.handleEvent(event)
}
r.wsConnsMu.Lock()
for i, c := range r.wsConns {
if c == conn {
r.wsConns = append(r.wsConns[:i], r.wsConns[i+1:]...)
break
}
}
r.wsConnsMu.Unlock()
}