Merge pull request #5 from jwetzell/module/nats-client

add NATS client module
This commit is contained in:
Joel Wetzell
2025-12-02 12:59:09 -06:00
committed by GitHub
6 changed files with 248 additions and 0 deletions

View File

@@ -18,6 +18,8 @@ Simple protocol router _/s_
- server
- [MQTT](https://mqtt.org/)
- client
- [NATS](https://nats.io/)
- client
- [PosiStageNet](https://posistage.net/)
- client
- MIDI

5
go.mod
View File

@@ -8,6 +8,7 @@ require (
github.com/jwetzell/free-d-go v0.1.0
github.com/jwetzell/osc-go v0.1.0
github.com/jwetzell/psn-go v0.2.1
github.com/nats-io/nats.go v1.47.0
github.com/urfave/cli/v3 v3.6.1
gitlab.com/gomidi/midi/v2 v2.3.16
modernc.org/quickjs v0.17.0
@@ -18,10 +19,14 @@ require (
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/nats-io/nkeys v0.4.11 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/ncruces/go-strftime v0.1.9 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
golang.org/x/crypto v0.42.0 // indirect
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect
golang.org/x/net v0.44.0 // indirect
golang.org/x/sync v0.17.0 // indirect

10
go.sum
View File

@@ -20,8 +20,16 @@ github.com/jwetzell/osc-go v0.1.0 h1:EXxup5VWBErHot2Ri4MFToPf6KCzLDTbCt2x6GLfw8I
github.com/jwetzell/osc-go v0.1.0/go.mod h1:xLz0jTwebSxtx1TkKN1YVdeRqvpFNweDhTut5TE393A=
github.com/jwetzell/psn-go v0.2.1 h1:pNG6XNfVRTb4qctH6pJjRJ1ReYGnGgNRA4H7tNbmzRU=
github.com/jwetzell/psn-go v0.2.1/go.mod h1:bcEAeti4sQM375buujb3mIfmUstD4Aby18gq3ENb6+o=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/nats-io/nats.go v1.47.0 h1:YQdADw6J/UfGUd2Oy6tn4Hq6YHxCaJrVKayxxFqYrgM=
github.com/nats-io/nats.go v1.47.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -38,6 +46,8 @@ go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
go.yaml.in/yaml/v3 v3.0.3 h1:bXOww4E/J3f66rav3pX3m8w6jDE4knZjGOw8b5Y6iNE=
go.yaml.in/yaml/v3 v3.0.3/go.mod h1:tBHosrYAkRZjRAOREWbDnBXUf08JOwYq++0QNwQiWzI=
golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI=
golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8=
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o=
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8=
golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA=

View File

@@ -0,0 +1,83 @@
package processing
import (
"bytes"
"context"
"fmt"
"text/template"
)
type NATSMessage struct {
Subject string
Payload []byte
}
type NATSMessageCreate struct {
config ProcessorConfig
Subject string
Payload *template.Template
}
func (nmc *NATSMessageCreate) Process(ctx context.Context, payload any) (any, error) {
var payloadBuffer bytes.Buffer
err := nmc.Payload.Execute(&payloadBuffer, payload)
if err != nil {
return nil, err
}
payloadString := payloadBuffer.String()
message := NATSMessage{
Subject: nmc.Subject,
Payload: []byte(payloadString),
}
return message, nil
}
func (nmc *NATSMessageCreate) Type() string {
return nmc.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "nats.message.create",
New: func(config ProcessorConfig) (Processor, error) {
params := config.Params
// TODO(jwetzell): support template for subject
subject, ok := params["subject"]
if !ok {
return nil, fmt.Errorf("nats.message.create requires a subject parameter")
}
subjectString, ok := subject.(string)
if !ok {
return nil, fmt.Errorf("nats.message.create subject must be a string")
}
payload, ok := params["payload"]
if !ok {
return nil, fmt.Errorf("osc.message.create requires a payload parameter")
}
payloadString, ok := payload.(string)
if !ok {
return nil, fmt.Errorf("osc.message.create payload must be a string")
}
payloadTemplate, err := template.New("payload").Parse(payloadString)
if err != nil {
return nil, err
}
return &NATSMessageCreate{config: config, Subject: subjectString, Payload: payloadTemplate}, nil
},
})
}

View File

@@ -0,0 +1,35 @@
package processing
import (
"context"
"fmt"
"github.com/nats-io/nats.go"
)
type NATSMessageEncode struct {
config ProcessorConfig
}
func (nme *NATSMessageEncode) Process(ctx context.Context, payload any) (any, error) {
payloadMessage, ok := payload.(*nats.Msg)
if !ok {
return nil, fmt.Errorf("nats.message.encode processor only accepts an nats.Msg")
}
return payloadMessage.Data, nil
}
func (nme *NATSMessageEncode) Type() string {
return nme.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "nats.message.encode",
New: func(config ProcessorConfig) (Processor, error) {
return &NATSMessageEncode{config: config}, nil
},
})
}

113
nats-client.go Normal file
View File

@@ -0,0 +1,113 @@
package showbridge
import (
"fmt"
"log/slog"
"github.com/jwetzell/showbridge-go/internal/processing"
"github.com/nats-io/nats.go"
)
type NATSClient struct {
config ModuleConfig
router *Router
URL string
Subject string
client *nats.Conn
}
func init() {
RegisterModule(ModuleRegistration{
Type: "net.nats.client",
New: func(config ModuleConfig) (Module, error) {
params := config.Params
url, ok := params["url"]
if !ok {
return nil, fmt.Errorf("net.nats.client requires a url parameter")
}
urlString, ok := url.(string)
if !ok {
return nil, fmt.Errorf("net.nats.client url must be string")
}
subject, ok := params["subject"]
if !ok {
return nil, fmt.Errorf("net.nats.client requires a subject parameter")
}
subjectString, ok := subject.(string)
if !ok {
return nil, fmt.Errorf("net.nats.client subject must be string")
}
return &NATSClient{config: config, URL: urlString, Subject: subjectString}, nil
},
})
}
func (nc *NATSClient) Id() string {
return nc.config.Id
}
func (nc *NATSClient) Type() string {
return nc.config.Type
}
func (nc *NATSClient) RegisterRouter(router *Router) {
nc.router = router
}
func (nc *NATSClient) Run() error {
client, err := nats.Connect(nc.URL, nats.RetryOnFailedConnect(true))
if err != nil {
return err
}
nc.client = client
defer client.Drain()
defer client.Close()
sub, err := nc.client.Subscribe(nc.Subject, func(msg *nats.Msg) {
if nc.router != nil {
nc.router.HandleInput(nc.config.Id, msg)
}
})
if err != nil {
return err
}
defer sub.Unsubscribe()
<-nc.router.Context.Done()
slog.Debug("router context done in module", "id", nc.config.Id)
return nil
}
func (nc *NATSClient) Output(payload any) error {
payloadMessage, ok := payload.(processing.NATSMessage)
if !ok {
return fmt.Errorf("net.nats.client is only able to output NATSMessage")
}
if nc.client == nil {
return fmt.Errorf("net.nats.client client is not setup")
}
if !nc.client.IsConnected() {
return fmt.Errorf("net.nats.client is not connected")
}
err := nc.client.Publish(payloadMessage.Subject, payloadMessage.Payload)
return err
}