package module import ( "context" "encoding/json" "errors" "fmt" "log/slog" "net" "time" "github.com/google/jsonschema-go/jsonschema" "github.com/jwetzell/showbridge-go/internal/common" "github.com/jwetzell/showbridge-go/internal/config" "github.com/nats-io/nats-server/v2/server" ) type NATSServer struct { config config.ModuleConfig ctx context.Context Ip string Port int router common.RouteIO server *server.Server logger *slog.Logger cancel context.CancelFunc } func init() { RegisterModule(ModuleRegistration{ Type: "nats.server", Title: "NATS Server", ParamsSchema: &jsonschema.Schema{ Type: "object", Properties: map[string]*jsonschema.Schema{ "ip": { Title: "IP", Type: "string", Default: json.RawMessage(`"0.0.0.0"`), }, "port": { Title: "Port", Type: "integer", Minimum: jsonschema.Ptr[float64](1024), Maximum: jsonschema.Ptr[float64](65535), Default: json.RawMessage(`4222`), }, }, Required: []string{}, AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}}, }, New: func(moduleConfig config.ModuleConfig) (common.Module, error) { params := moduleConfig.Params portNum, err := params.GetInt("port") if err != nil { if errors.Is(err, config.ErrParamNotFound) { portNum = 4222 } else { return nil, fmt.Errorf("nats.server port error: %w", err) } } ipString, err := params.GetString("ip") if err != nil { if errors.Is(err, config.ErrParamNotFound) { ipString = "0.0.0.0" } else { return nil, fmt.Errorf("nats.server ip error: %w", err) } } _, err = net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", ipString, uint16(portNum))) if err != nil { return nil, err } return &NATSServer{config: moduleConfig, logger: CreateLogger(moduleConfig), Ip: ipString, Port: portNum}, nil }, }) } func (ns *NATSServer) Id() string { return ns.config.Id } func (ns *NATSServer) Type() string { return ns.config.Type } func (ns *NATSServer) Start(ctx context.Context) error { ns.logger.Debug("running") router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) if !ok { return errors.New("nats.server unable to get router from context") } ns.router = router moduleContext, cancel := context.WithCancel(ctx) ns.ctx = moduleContext ns.cancel = cancel natsServer, err := server.NewServer(&server.Options{ Host: ns.Ip, Port: ns.Port, NoLog: true, }) if err != nil { return err } ns.server = natsServer natsServer.Start() defer natsServer.Shutdown() if !natsServer.ReadyForConnections(5 * time.Second) { return errors.New("nats.server failed to start") } ns.logger.Info("NATS server started", "client_url", natsServer.ClientURL()) <-ns.ctx.Done() ns.logger.Debug("done") return nil } func (ns *NATSServer) Stop() { ns.cancel() if ns.server != nil { ns.server.Shutdown() } }