From d629146592548d8c3eabcb76c49a6eb99cde55d6 Mon Sep 17 00:00:00 2001 From: Joel Wetzell Date: Tue, 2 Dec 2025 12:57:05 -0600 Subject: [PATCH] add simple NATS client --- go.mod | 5 + go.sum | 10 ++ internal/processing/nats-message-create.go | 83 +++++++++++++++ internal/processing/nats-message-encode.go | 35 +++++++ nats-client.go | 113 +++++++++++++++++++++ 5 files changed, 246 insertions(+) create mode 100644 internal/processing/nats-message-create.go create mode 100644 internal/processing/nats-message-encode.go create mode 100644 nats-client.go diff --git a/go.mod b/go.mod index 063a793..62764f8 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/jwetzell/free-d-go v0.1.0 github.com/jwetzell/osc-go v0.1.0 github.com/jwetzell/psn-go v0.2.1 + github.com/nats-io/nats.go v1.47.0 github.com/urfave/cli/v3 v3.6.1 gitlab.com/gomidi/midi/v2 v2.3.16 modernc.org/quickjs v0.17.0 @@ -18,10 +19,14 @@ require ( github.com/dustin/go-humanize v1.0.1 // indirect github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.5.3 // indirect + github.com/klauspost/compress v1.18.0 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/nats-io/nkeys v0.4.11 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/ncruces/go-strftime v0.1.9 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect + golang.org/x/crypto v0.42.0 // indirect golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect golang.org/x/net v0.44.0 // indirect golang.org/x/sync v0.17.0 // indirect diff --git a/go.sum b/go.sum index a62735c..dfcf349 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,16 @@ github.com/jwetzell/osc-go v0.1.0 h1:EXxup5VWBErHot2Ri4MFToPf6KCzLDTbCt2x6GLfw8I github.com/jwetzell/osc-go v0.1.0/go.mod h1:xLz0jTwebSxtx1TkKN1YVdeRqvpFNweDhTut5TE393A= github.com/jwetzell/psn-go v0.2.1 h1:pNG6XNfVRTb4qctH6pJjRJ1ReYGnGgNRA4H7tNbmzRU= github.com/jwetzell/psn-go v0.2.1/go.mod h1:bcEAeti4sQM375buujb3mIfmUstD4Aby18gq3ENb6+o= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/nats-io/nats.go v1.47.0 h1:YQdADw6J/UfGUd2Oy6tn4Hq6YHxCaJrVKayxxFqYrgM= +github.com/nats-io/nats.go v1.47.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= +github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -38,6 +46,8 @@ go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= go.yaml.in/yaml/v3 v3.0.3 h1:bXOww4E/J3f66rav3pX3m8w6jDE4knZjGOw8b5Y6iNE= go.yaml.in/yaml/v3 v3.0.3/go.mod h1:tBHosrYAkRZjRAOREWbDnBXUf08JOwYq++0QNwQiWzI= +golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI= +golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8= golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o= golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8= golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA= diff --git a/internal/processing/nats-message-create.go b/internal/processing/nats-message-create.go new file mode 100644 index 0000000..54bbb0a --- /dev/null +++ b/internal/processing/nats-message-create.go @@ -0,0 +1,83 @@ +package processing + +import ( + "bytes" + "context" + "fmt" + "text/template" +) + +type NATSMessage struct { + Subject string + Payload []byte +} + +type NATSMessageCreate struct { + config ProcessorConfig + Subject string + Payload *template.Template +} + +func (nmc *NATSMessageCreate) Process(ctx context.Context, payload any) (any, error) { + + var payloadBuffer bytes.Buffer + err := nmc.Payload.Execute(&payloadBuffer, payload) + + if err != nil { + return nil, err + } + + payloadString := payloadBuffer.String() + + message := NATSMessage{ + Subject: nmc.Subject, + Payload: []byte(payloadString), + } + + return message, nil +} + +func (nmc *NATSMessageCreate) Type() string { + return nmc.config.Type +} + +func init() { + RegisterProcessor(ProcessorRegistration{ + Type: "nats.message.create", + New: func(config ProcessorConfig) (Processor, error) { + params := config.Params + // TODO(jwetzell): support template for subject + subject, ok := params["subject"] + + if !ok { + return nil, fmt.Errorf("nats.message.create requires a subject parameter") + } + + subjectString, ok := subject.(string) + + if !ok { + return nil, fmt.Errorf("nats.message.create subject must be a string") + } + + payload, ok := params["payload"] + + if !ok { + return nil, fmt.Errorf("osc.message.create requires a payload parameter") + } + + payloadString, ok := payload.(string) + + if !ok { + return nil, fmt.Errorf("osc.message.create payload must be a string") + } + + payloadTemplate, err := template.New("payload").Parse(payloadString) + + if err != nil { + return nil, err + } + + return &NATSMessageCreate{config: config, Subject: subjectString, Payload: payloadTemplate}, nil + }, + }) +} diff --git a/internal/processing/nats-message-encode.go b/internal/processing/nats-message-encode.go new file mode 100644 index 0000000..f36aa2c --- /dev/null +++ b/internal/processing/nats-message-encode.go @@ -0,0 +1,35 @@ +package processing + +import ( + "context" + "fmt" + + "github.com/nats-io/nats.go" +) + +type NATSMessageEncode struct { + config ProcessorConfig +} + +func (nme *NATSMessageEncode) Process(ctx context.Context, payload any) (any, error) { + payloadMessage, ok := payload.(*nats.Msg) + + if !ok { + return nil, fmt.Errorf("nats.message.encode processor only accepts an nats.Msg") + } + + return payloadMessage.Data, nil +} + +func (nme *NATSMessageEncode) Type() string { + return nme.config.Type +} + +func init() { + RegisterProcessor(ProcessorRegistration{ + Type: "nats.message.encode", + New: func(config ProcessorConfig) (Processor, error) { + return &NATSMessageEncode{config: config}, nil + }, + }) +} diff --git a/nats-client.go b/nats-client.go new file mode 100644 index 0000000..c53b42c --- /dev/null +++ b/nats-client.go @@ -0,0 +1,113 @@ +package showbridge + +import ( + "fmt" + "log/slog" + + "github.com/jwetzell/showbridge-go/internal/processing" + "github.com/nats-io/nats.go" +) + +type NATSClient struct { + config ModuleConfig + router *Router + URL string + Subject string + client *nats.Conn +} + +func init() { + RegisterModule(ModuleRegistration{ + Type: "net.nats.client", + New: func(config ModuleConfig) (Module, error) { + params := config.Params + url, ok := params["url"] + + if !ok { + return nil, fmt.Errorf("net.nats.client requires a url parameter") + } + + urlString, ok := url.(string) + + if !ok { + return nil, fmt.Errorf("net.nats.client url must be string") + } + + subject, ok := params["subject"] + + if !ok { + return nil, fmt.Errorf("net.nats.client requires a subject parameter") + } + + subjectString, ok := subject.(string) + + if !ok { + return nil, fmt.Errorf("net.nats.client subject must be string") + } + + return &NATSClient{config: config, URL: urlString, Subject: subjectString}, nil + }, + }) +} + +func (nc *NATSClient) Id() string { + return nc.config.Id +} + +func (nc *NATSClient) Type() string { + return nc.config.Type +} + +func (nc *NATSClient) RegisterRouter(router *Router) { + nc.router = router +} + +func (nc *NATSClient) Run() error { + client, err := nats.Connect(nc.URL, nats.RetryOnFailedConnect(true)) + + if err != nil { + return err + } + + nc.client = client + + defer client.Drain() + defer client.Close() + + sub, err := nc.client.Subscribe(nc.Subject, func(msg *nats.Msg) { + if nc.router != nil { + nc.router.HandleInput(nc.config.Id, msg) + } + }) + + if err != nil { + return err + } + + defer sub.Unsubscribe() + + <-nc.router.Context.Done() + slog.Debug("router context done in module", "id", nc.config.Id) + return nil +} + +func (nc *NATSClient) Output(payload any) error { + + payloadMessage, ok := payload.(processing.NATSMessage) + + if !ok { + return fmt.Errorf("net.nats.client is only able to output NATSMessage") + } + + if nc.client == nil { + return fmt.Errorf("net.nats.client client is not setup") + } + + if !nc.client.IsConnected() { + return fmt.Errorf("net.nats.client is not connected") + } + + err := nc.client.Publish(payloadMessage.Subject, payloadMessage.Payload) + + return err +}