mirror of
https://github.com/jwetzell/showbridge-go.git
synced 2026-04-26 21:05:30 +00:00
add simple NATS client
This commit is contained in:
5
go.mod
5
go.mod
@@ -8,6 +8,7 @@ require (
|
|||||||
github.com/jwetzell/free-d-go v0.1.0
|
github.com/jwetzell/free-d-go v0.1.0
|
||||||
github.com/jwetzell/osc-go v0.1.0
|
github.com/jwetzell/osc-go v0.1.0
|
||||||
github.com/jwetzell/psn-go v0.2.1
|
github.com/jwetzell/psn-go v0.2.1
|
||||||
|
github.com/nats-io/nats.go v1.47.0
|
||||||
github.com/urfave/cli/v3 v3.6.1
|
github.com/urfave/cli/v3 v3.6.1
|
||||||
gitlab.com/gomidi/midi/v2 v2.3.16
|
gitlab.com/gomidi/midi/v2 v2.3.16
|
||||||
modernc.org/quickjs v0.17.0
|
modernc.org/quickjs v0.17.0
|
||||||
@@ -18,10 +19,14 @@ require (
|
|||||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||||
github.com/google/uuid v1.6.0 // indirect
|
github.com/google/uuid v1.6.0 // indirect
|
||||||
github.com/gorilla/websocket v1.5.3 // indirect
|
github.com/gorilla/websocket v1.5.3 // indirect
|
||||||
|
github.com/klauspost/compress v1.18.0 // indirect
|
||||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||||
|
github.com/nats-io/nkeys v0.4.11 // indirect
|
||||||
|
github.com/nats-io/nuid v1.0.1 // indirect
|
||||||
github.com/ncruces/go-strftime v0.1.9 // indirect
|
github.com/ncruces/go-strftime v0.1.9 // indirect
|
||||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||||
go.yaml.in/yaml/v2 v2.4.2 // indirect
|
go.yaml.in/yaml/v2 v2.4.2 // indirect
|
||||||
|
golang.org/x/crypto v0.42.0 // indirect
|
||||||
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect
|
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect
|
||||||
golang.org/x/net v0.44.0 // indirect
|
golang.org/x/net v0.44.0 // indirect
|
||||||
golang.org/x/sync v0.17.0 // indirect
|
golang.org/x/sync v0.17.0 // indirect
|
||||||
|
|||||||
10
go.sum
10
go.sum
@@ -20,8 +20,16 @@ github.com/jwetzell/osc-go v0.1.0 h1:EXxup5VWBErHot2Ri4MFToPf6KCzLDTbCt2x6GLfw8I
|
|||||||
github.com/jwetzell/osc-go v0.1.0/go.mod h1:xLz0jTwebSxtx1TkKN1YVdeRqvpFNweDhTut5TE393A=
|
github.com/jwetzell/osc-go v0.1.0/go.mod h1:xLz0jTwebSxtx1TkKN1YVdeRqvpFNweDhTut5TE393A=
|
||||||
github.com/jwetzell/psn-go v0.2.1 h1:pNG6XNfVRTb4qctH6pJjRJ1ReYGnGgNRA4H7tNbmzRU=
|
github.com/jwetzell/psn-go v0.2.1 h1:pNG6XNfVRTb4qctH6pJjRJ1ReYGnGgNRA4H7tNbmzRU=
|
||||||
github.com/jwetzell/psn-go v0.2.1/go.mod h1:bcEAeti4sQM375buujb3mIfmUstD4Aby18gq3ENb6+o=
|
github.com/jwetzell/psn-go v0.2.1/go.mod h1:bcEAeti4sQM375buujb3mIfmUstD4Aby18gq3ENb6+o=
|
||||||
|
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
|
||||||
|
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
|
||||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||||
|
github.com/nats-io/nats.go v1.47.0 h1:YQdADw6J/UfGUd2Oy6tn4Hq6YHxCaJrVKayxxFqYrgM=
|
||||||
|
github.com/nats-io/nats.go v1.47.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
|
||||||
|
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
|
||||||
|
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
|
||||||
|
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||||
|
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||||
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
|
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
|
||||||
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
|
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
@@ -38,6 +46,8 @@ go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
|
|||||||
go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
|
go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
|
||||||
go.yaml.in/yaml/v3 v3.0.3 h1:bXOww4E/J3f66rav3pX3m8w6jDE4knZjGOw8b5Y6iNE=
|
go.yaml.in/yaml/v3 v3.0.3 h1:bXOww4E/J3f66rav3pX3m8w6jDE4knZjGOw8b5Y6iNE=
|
||||||
go.yaml.in/yaml/v3 v3.0.3/go.mod h1:tBHosrYAkRZjRAOREWbDnBXUf08JOwYq++0QNwQiWzI=
|
go.yaml.in/yaml/v3 v3.0.3/go.mod h1:tBHosrYAkRZjRAOREWbDnBXUf08JOwYq++0QNwQiWzI=
|
||||||
|
golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI=
|
||||||
|
golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8=
|
||||||
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o=
|
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o=
|
||||||
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8=
|
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8=
|
||||||
golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA=
|
golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA=
|
||||||
|
|||||||
83
internal/processing/nats-message-create.go
Normal file
83
internal/processing/nats-message-create.go
Normal file
@@ -0,0 +1,83 @@
|
|||||||
|
package processing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"text/template"
|
||||||
|
)
|
||||||
|
|
||||||
|
type NATSMessage struct {
|
||||||
|
Subject string
|
||||||
|
Payload []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
type NATSMessageCreate struct {
|
||||||
|
config ProcessorConfig
|
||||||
|
Subject string
|
||||||
|
Payload *template.Template
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nmc *NATSMessageCreate) Process(ctx context.Context, payload any) (any, error) {
|
||||||
|
|
||||||
|
var payloadBuffer bytes.Buffer
|
||||||
|
err := nmc.Payload.Execute(&payloadBuffer, payload)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
payloadString := payloadBuffer.String()
|
||||||
|
|
||||||
|
message := NATSMessage{
|
||||||
|
Subject: nmc.Subject,
|
||||||
|
Payload: []byte(payloadString),
|
||||||
|
}
|
||||||
|
|
||||||
|
return message, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nmc *NATSMessageCreate) Type() string {
|
||||||
|
return nmc.config.Type
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
RegisterProcessor(ProcessorRegistration{
|
||||||
|
Type: "nats.message.create",
|
||||||
|
New: func(config ProcessorConfig) (Processor, error) {
|
||||||
|
params := config.Params
|
||||||
|
// TODO(jwetzell): support template for subject
|
||||||
|
subject, ok := params["subject"]
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("nats.message.create requires a subject parameter")
|
||||||
|
}
|
||||||
|
|
||||||
|
subjectString, ok := subject.(string)
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("nats.message.create subject must be a string")
|
||||||
|
}
|
||||||
|
|
||||||
|
payload, ok := params["payload"]
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("osc.message.create requires a payload parameter")
|
||||||
|
}
|
||||||
|
|
||||||
|
payloadString, ok := payload.(string)
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("osc.message.create payload must be a string")
|
||||||
|
}
|
||||||
|
|
||||||
|
payloadTemplate, err := template.New("payload").Parse(payloadString)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &NATSMessageCreate{config: config, Subject: subjectString, Payload: payloadTemplate}, nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
35
internal/processing/nats-message-encode.go
Normal file
35
internal/processing/nats-message-encode.go
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
package processing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
)
|
||||||
|
|
||||||
|
type NATSMessageEncode struct {
|
||||||
|
config ProcessorConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nme *NATSMessageEncode) Process(ctx context.Context, payload any) (any, error) {
|
||||||
|
payloadMessage, ok := payload.(*nats.Msg)
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("nats.message.encode processor only accepts an nats.Msg")
|
||||||
|
}
|
||||||
|
|
||||||
|
return payloadMessage.Data, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nme *NATSMessageEncode) Type() string {
|
||||||
|
return nme.config.Type
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
RegisterProcessor(ProcessorRegistration{
|
||||||
|
Type: "nats.message.encode",
|
||||||
|
New: func(config ProcessorConfig) (Processor, error) {
|
||||||
|
return &NATSMessageEncode{config: config}, nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
113
nats-client.go
Normal file
113
nats-client.go
Normal file
@@ -0,0 +1,113 @@
|
|||||||
|
package showbridge
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
|
||||||
|
"github.com/jwetzell/showbridge-go/internal/processing"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
)
|
||||||
|
|
||||||
|
type NATSClient struct {
|
||||||
|
config ModuleConfig
|
||||||
|
router *Router
|
||||||
|
URL string
|
||||||
|
Subject string
|
||||||
|
client *nats.Conn
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
RegisterModule(ModuleRegistration{
|
||||||
|
Type: "net.nats.client",
|
||||||
|
New: func(config ModuleConfig) (Module, error) {
|
||||||
|
params := config.Params
|
||||||
|
url, ok := params["url"]
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("net.nats.client requires a url parameter")
|
||||||
|
}
|
||||||
|
|
||||||
|
urlString, ok := url.(string)
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("net.nats.client url must be string")
|
||||||
|
}
|
||||||
|
|
||||||
|
subject, ok := params["subject"]
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("net.nats.client requires a subject parameter")
|
||||||
|
}
|
||||||
|
|
||||||
|
subjectString, ok := subject.(string)
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("net.nats.client subject must be string")
|
||||||
|
}
|
||||||
|
|
||||||
|
return &NATSClient{config: config, URL: urlString, Subject: subjectString}, nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nc *NATSClient) Id() string {
|
||||||
|
return nc.config.Id
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nc *NATSClient) Type() string {
|
||||||
|
return nc.config.Type
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nc *NATSClient) RegisterRouter(router *Router) {
|
||||||
|
nc.router = router
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nc *NATSClient) Run() error {
|
||||||
|
client, err := nats.Connect(nc.URL, nats.RetryOnFailedConnect(true))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
nc.client = client
|
||||||
|
|
||||||
|
defer client.Drain()
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
sub, err := nc.client.Subscribe(nc.Subject, func(msg *nats.Msg) {
|
||||||
|
if nc.router != nil {
|
||||||
|
nc.router.HandleInput(nc.config.Id, msg)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
defer sub.Unsubscribe()
|
||||||
|
|
||||||
|
<-nc.router.Context.Done()
|
||||||
|
slog.Debug("router context done in module", "id", nc.config.Id)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nc *NATSClient) Output(payload any) error {
|
||||||
|
|
||||||
|
payloadMessage, ok := payload.(processing.NATSMessage)
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("net.nats.client is only able to output NATSMessage")
|
||||||
|
}
|
||||||
|
|
||||||
|
if nc.client == nil {
|
||||||
|
return fmt.Errorf("net.nats.client client is not setup")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !nc.client.IsConnected() {
|
||||||
|
return fmt.Errorf("net.nats.client is not connected")
|
||||||
|
}
|
||||||
|
|
||||||
|
err := nc.client.Publish(payloadMessage.Subject, payloadMessage.Payload)
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user