package module import ( "context" "errors" "log/slog" "github.com/jwetzell/showbridge-go/internal/common" "github.com/jwetzell/showbridge-go/internal/config" "github.com/jwetzell/showbridge-go/internal/processor" "github.com/jwetzell/showbridge-go/internal/route" "github.com/nats-io/nats.go" ) type NATSClient struct { config config.ModuleConfig ctx context.Context router route.RouteIO URL string Subject string client *nats.Conn logger *slog.Logger cancel context.CancelFunc } func init() { RegisterModule(ModuleRegistration{ Type: "nats.client", New: func(config config.ModuleConfig) (Module, error) { params := config.Params urlString, err := params.GetString("url") if err != nil { return nil, errors.New("nats.client url error: " + err.Error()) } subjectString, err := params.GetString("subject") if err != nil { return nil, errors.New("nats.client subject error: " + err.Error()) } return &NATSClient{config: config, URL: urlString, Subject: subjectString, logger: CreateLogger(config)}, nil }, }) } func (nc *NATSClient) Id() string { return nc.config.Id } func (nc *NATSClient) Type() string { return nc.config.Type } func (nc *NATSClient) Start(ctx context.Context) error { nc.logger.Debug("running") router, ok := ctx.Value(common.RouterContextKey).(route.RouteIO) if !ok { return errors.New("nats.client unable to get router from context") } nc.router = router moduleContext, cancel := context.WithCancel(ctx) nc.ctx = moduleContext nc.cancel = cancel client, err := nats.Connect(nc.URL, nats.RetryOnFailedConnect(true)) if err != nil { return err } nc.client = client defer client.Drain() defer client.Close() sub, err := nc.client.Subscribe(nc.Subject, func(msg *nats.Msg) { if nc.router != nil { nc.router.HandleInput(nc.ctx, nc.Id(), msg) } }) if err != nil { return err } defer sub.Unsubscribe() <-nc.ctx.Done() nc.logger.Debug("done") return nil } func (nc *NATSClient) Output(ctx context.Context, payload any) error { payloadMessage, ok := processor.GetAnyAs[processor.NATSMessage](payload) if !ok { return errors.New("nats.client is only able to output NATSMessage") } if nc.client == nil { return errors.New("nats.client client is not setup") } if !nc.client.IsConnected() { return errors.New("nats.client is not connected") } err := nc.client.Publish(payloadMessage.Subject, payloadMessage.Payload) return err } func (nc *NATSClient) Stop() { nc.cancel() } func (nc *NATSClient) Get(key string) (any, error) { return nil, errors.New("nats.client does not support Get") }