diff --git a/internal/processor/router-input.go b/internal/processor/router-input.go new file mode 100644 index 0000000..fe81cd6 --- /dev/null +++ b/internal/processor/router-input.go @@ -0,0 +1,55 @@ +package processor + +import ( + "context" + "errors" + "fmt" + "log/slog" + + "github.com/jwetzell/showbridge-go/internal/common" + "github.com/jwetzell/showbridge-go/internal/config" +) + +type RouterInput struct { + config config.ProcessorConfig + SourceId string + logger *slog.Logger +} + +func (ro *RouterInput) Process(ctx context.Context, payload any) (any, error) { + + router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) + if !ok { + return nil, errors.New("router.input no router found") + } + + _, err := router.HandleInput(ctx, ro.SourceId, payload) + + if err != nil { + return nil, errors.New("router.input failed to send input") + } + + return payload, nil +} + +func (ro *RouterInput) Type() string { + return ro.config.Type +} + +func init() { + RegisterProcessor(ProcessorRegistration{ + Type: "router.input", + New: func(config config.ProcessorConfig) (Processor, error) { + + params := config.Params + + sourceId, err := params.GetString("source") + + if err != nil { + return nil, fmt.Errorf("router.input source error: %w", err) + } + + return &RouterInput{config: config, SourceId: sourceId, logger: slog.Default().With("component", "processor", "type", config.Type)}, nil + }, + }) +} diff --git a/router.go b/router.go index 5be194a..3586bf9 100644 --- a/router.go +++ b/router.go @@ -31,7 +31,7 @@ type Router struct { moduleWait sync.WaitGroup logger *slog.Logger runningConfig config.Config - runningConfigMu sync.Mutex + runningConfigMu sync.RWMutex wsConns []*websocket.Conn wsConnsMu sync.Mutex apiServer *http.Server @@ -183,10 +183,10 @@ func (r *Router) Stop() { } func (r *Router) HandleInput(ctx context.Context, sourceId string, payload any) (bool, []common.RouteIOError) { - r.runningConfigMu.Lock() - defer r.runningConfigMu.Unlock() + r.runningConfigMu.RLock() + defer r.runningConfigMu.RUnlock() - spanCtx, span := otel.Tracer("router").Start(ctx, "input", trace.WithAttributes(attribute.String("source.id", sourceId)), trace.WithNewRoot()) + spanCtx, span := otel.Tracer("router").Start(ctx, "input", trace.WithAttributes(attribute.String("source.id", sourceId))) defer span.End() var routeIOErrors []common.RouteIOError routeFound := false diff --git a/schema/processors.schema.json b/schema/processors.schema.json index f174c26..afefe42 100644 --- a/schema/processors.schema.json +++ b/schema/processors.schema.json @@ -649,6 +649,29 @@ "required": ["type"], "additionalProperties": false }, + { + "type": "object", + "title": "Router Input", + "properties": { + "type": { + "type": "string", + "const": "router.input" + }, + "params": { + "type": "object", + "properties": { + "module": { + "title": "Source", + "type": "string", + "description": "source to report as to the router" + } + }, + "required": ["module"] + } + }, + "required": ["type", "params"], + "additionalProperties": false + }, { "type": "object", "title": "Router Output",