Files
showbridge-go/internal/module/nats-client.go

136 lines
2.9 KiB
Go

package module
import (
"context"
"errors"
"log/slog"
"github.com/google/jsonschema-go/jsonschema"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
"github.com/jwetzell/showbridge-go/internal/processor"
"github.com/nats-io/nats.go"
)
type NATSClient struct {
config config.ModuleConfig
ctx context.Context
router common.RouteIO
URL string
Subject string
client *nats.Conn
logger *slog.Logger
cancel context.CancelFunc
}
func init() {
RegisterModule(ModuleRegistration{
Type: "nats.client",
Title: "NATS Client",
ParamsSchema: &jsonschema.Schema{
Type: "object",
Properties: map[string]*jsonschema.Schema{
"url": {
Title: "NATS Server URL",
Type: "string",
},
"subject": {
Title: "Subject",
Type: "string",
},
},
Required: []string{"url", "subject"},
AdditionalProperties: &jsonschema.Schema{Not: &jsonschema.Schema{}},
},
New: func(config config.ModuleConfig) (common.Module, error) {
params := config.Params
urlString, err := params.GetString("url")
if err != nil {
return nil, errors.New("nats.client url error: " + err.Error())
}
subjectString, err := params.GetString("subject")
if err != nil {
return nil, errors.New("nats.client subject error: " + err.Error())
}
return &NATSClient{config: config, URL: urlString, Subject: subjectString, logger: CreateLogger(config)}, nil
},
})
}
func (nc *NATSClient) Id() string {
return nc.config.Id
}
func (nc *NATSClient) Type() string {
return nc.config.Type
}
func (nc *NATSClient) Start(ctx context.Context) error {
nc.logger.Debug("running")
router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO)
if !ok {
return errors.New("nats.client unable to get router from context")
}
nc.router = router
moduleContext, cancel := context.WithCancel(ctx)
nc.ctx = moduleContext
nc.cancel = cancel
client, err := nats.Connect(nc.URL, nats.RetryOnFailedConnect(true))
if err != nil {
return err
}
nc.client = client
defer client.Drain()
defer client.Close()
sub, err := nc.client.Subscribe(nc.Subject, func(msg *nats.Msg) {
if nc.router != nil {
nc.router.HandleInput(nc.ctx, nc.Id(), msg)
}
})
if err != nil {
return err
}
defer sub.Unsubscribe()
<-nc.ctx.Done()
nc.logger.Debug("done")
return nil
}
func (nc *NATSClient) Output(ctx context.Context, payload any) error {
payloadMessage, ok := common.GetAnyAs[processor.NATSMessage](payload)
if !ok {
return errors.New("nats.client is only able to output NATSMessage")
}
if nc.client == nil {
return errors.New("nats.client client is not setup")
}
if !nc.client.IsConnected() {
return errors.New("nats.client is not connected")
}
err := nc.client.Publish(payloadMessage.Subject, payloadMessage.Payload)
return err
}
func (nc *NATSClient) Stop() {
nc.cancel()
}