From 4f7820af5ee6c7d853175397631691ff032d4e14 Mon Sep 17 00:00:00 2001 From: Joel Wetzell Date: Sun, 1 Mar 2026 13:02:55 -0600 Subject: [PATCH 1/2] add basic nats server module --- go.mod | 8 +- go.sum | 17 +++- internal/module/nats-server.go | 122 +++++++++++++++++++++++ internal/module/test/nats-server_test.go | 85 ++++++++++++++++ schema/modules.schema.json | 32 ++++++ 5 files changed, 261 insertions(+), 3 deletions(-) create mode 100644 internal/module/nats-server.go create mode 100644 internal/module/test/nats-server_test.go diff --git a/go.mod b/go.mod index 56ee30a..9d9c733 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/jwetzell/free-d-go v0.1.0 github.com/jwetzell/osc-go v0.1.0 github.com/jwetzell/psn-go v0.3.0 + github.com/nats-io/nats-server/v2 v2.12.4 github.com/nats-io/nats.go v1.49.0 github.com/urfave/cli/v3 v3.6.2 gitlab.com/gomidi/midi/v2 v2.3.22 @@ -25,6 +26,7 @@ require ( ) require ( + github.com/antithesishq/antithesis-sdk-go v0.5.0-default-no-op // indirect github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/creack/goselect v0.1.2 // indirect @@ -38,13 +40,16 @@ require ( github.com/gobwas/httphead v0.1.0 // indirect github.com/gobwas/pool v0.2.1 // indirect github.com/gobwas/ws v1.4.0 // indirect + github.com/google/go-tpm v0.9.8 // indirect github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.5.3 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 // indirect github.com/ianlancetaylor/demangle v0.0.0-20240805132620-81f5be970eca // indirect github.com/icholy/digest v1.1.0 // indirect - github.com/klauspost/compress v1.18.2 // indirect + github.com/klauspost/compress v1.18.3 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 // indirect + github.com/nats-io/jwt/v2 v2.8.0 // indirect github.com/nats-io/nkeys v0.4.12 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/ncruces/go-strftime v1.0.0 // indirect @@ -70,6 +75,7 @@ require ( golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.40.0 // indirect golang.org/x/text v0.33.0 // indirect + golang.org/x/time v0.14.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect google.golang.org/grpc v1.78.0 // indirect diff --git a/go.sum b/go.sum index c721d33..3bfbb5a 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/antithesishq/antithesis-sdk-go v0.5.0-default-no-op h1:Ucf+QxEKMbPogRO5guBNe5cgd9uZgfoJLOYs8WWhtjM= +github.com/antithesishq/antithesis-sdk-go v0.5.0-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E= github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -41,6 +43,8 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/go-tpm v0.9.8 h1:slArAR9Ft+1ybZu0lBwpSmpwhRXaa85hWtMinMyRAWo= +github.com/google/go-tpm v0.9.8/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= @@ -61,14 +65,20 @@ github.com/jwetzell/osc-go v0.1.0 h1:EXxup5VWBErHot2Ri4MFToPf6KCzLDTbCt2x6GLfw8I github.com/jwetzell/osc-go v0.1.0/go.mod h1:xLz0jTwebSxtx1TkKN1YVdeRqvpFNweDhTut5TE393A= github.com/jwetzell/psn-go v0.3.0 h1:WVpCEmExYE8a+I5hQak5jNJJp2x35VdGX/VuMUKPmhY= github.com/jwetzell/psn-go v0.3.0/go.mod h1:bcEAeti4sQM375buujb3mIfmUstD4Aby18gq3ENb6+o= -github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= -github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= +github.com/klauspost/compress v1.18.3 h1:9PJRvfbmTabkOX8moIpXPbMMbYN60bWImDDU7L+/6zw= +github.com/klauspost/compress v1.18.3/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 h1:KGuD/pM2JpL9FAYvBrnBBeENKZNh6eNtjqytV6TYjnk= +github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= +github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g= +github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA= +github.com/nats-io/nats-server/v2 v2.12.4 h1:ZnT10v2LU2Xcoiy8ek9X6Se4YG8EuMfIfvAEuFVx1Ts= +github.com/nats-io/nats-server/v2 v2.12.4/go.mod h1:5MCp/pqm5SEfsvVZ31ll1088ZTwEUdvRX1Hmh/mTTDg= github.com/nats-io/nats.go v1.49.0 h1:yh/WvY59gXqYpgl33ZI+XoVPKyut/IcEaqtsiuTJpoE= github.com/nats-io/nats.go v1.49.0/go.mod h1:fDCn3mN5cY8HooHwE2ukiLb4p4G4ImmzvXyJt+tGwdw= github.com/nats-io/nkeys v0.4.12 h1:nssm7JKOG9/x4J8II47VWCL1Ds29avyiQDRn0ckMvDc= @@ -146,10 +156,13 @@ golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= +golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= +golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= golang.org/x/tools v0.40.0 h1:yLkxfA+Qnul4cs9QA3KnlFu0lVmd8JJfoq+E41uSutA= golang.org/x/tools v0.40.0/go.mod h1:Ik/tzLRlbscWpqqMRjyWYDisX8bG13FrdXp3o4Sr9lc= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= diff --git a/internal/module/nats-server.go b/internal/module/nats-server.go new file mode 100644 index 0000000..f417535 --- /dev/null +++ b/internal/module/nats-server.go @@ -0,0 +1,122 @@ +package module + +import ( + "context" + "errors" + "fmt" + "log/slog" + "net" + "time" + + "github.com/jwetzell/showbridge-go/internal/config" + "github.com/jwetzell/showbridge-go/internal/route" + "github.com/nats-io/nats-server/v2/server" +) + +type NATSServer struct { + config config.ModuleConfig + ctx context.Context + Ip string + Port int + router route.RouteIO + server *server.Server + logger *slog.Logger + cancel context.CancelFunc +} + +func init() { + RegisterModule(ModuleRegistration{ + Type: "nats.server", + New: func(config config.ModuleConfig) (Module, error) { + params := config.Params + portNum := 4222 + + port, ok := params["port"] + if ok { + + specificportNum, ok := port.(float64) + + if !ok { + return nil, errors.New("net.udp.server port must be a number") + } + portNum = int(specificportNum) + } + + ipString := "0.0.0.0" + + ip, ok := params["ip"] + if ok { + + specificIpString, ok := ip.(string) + + if !ok { + return nil, errors.New("net.udp.server ip must be a string") + } + ipString = specificIpString + } + + _, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", ipString, uint16(portNum))) + if err != nil { + return nil, err + } + return &NATSServer{config: config, logger: CreateLogger(config), Ip: ipString, Port: portNum}, nil + }, + }) +} + +func (ns *NATSServer) Id() string { + return ns.config.Id +} + +func (ns *NATSServer) Type() string { + return ns.config.Type +} + +func (ns *NATSServer) Start(ctx context.Context) error { + ns.logger.Debug("running") + router, ok := ctx.Value(route.RouterContextKey).(route.RouteIO) + + if !ok { + return errors.New("nats.server unable to get router from context") + } + + ns.router = router + moduleContext, cancel := context.WithCancel(ctx) + ns.ctx = moduleContext + ns.cancel = cancel + + natsServer, err := server.NewServer(&server.Options{ + Host: ns.Ip, + Port: ns.Port, + NoLog: true, + }) + + if err != nil { + return err + } + + ns.server = natsServer + natsServer.Start() + defer natsServer.Shutdown() + + if !natsServer.ReadyForConnections(5 * time.Second) { + return errors.New("nats.server failed to start") + } + ns.logger.Info("NATS server started", "client_url", natsServer.ClientURL()) + + <-ns.ctx.Done() + + ns.logger.Debug("done") + return nil +} + +func (ns *NATSServer) Output(ctx context.Context, payload any) error { + return errors.ErrUnsupported +} + +func (ns *NATSServer) Stop() { + ns.cancel() + if ns.server != nil { + ns.server.Shutdown() + } +} diff --git a/internal/module/test/nats-server_test.go b/internal/module/test/nats-server_test.go new file mode 100644 index 0000000..377862a --- /dev/null +++ b/internal/module/test/nats-server_test.go @@ -0,0 +1,85 @@ +package module_test + +import ( + "testing" + + "github.com/jwetzell/showbridge-go/internal/config" + "github.com/jwetzell/showbridge-go/internal/module" +) + +func TestNATSServerFromRegistry(t *testing.T) { + registration, ok := module.ModuleRegistry["nats.server"] + if !ok { + t.Fatalf("nats.server module not registered") + } + + moduleInstance, err := registration.New(config.ModuleConfig{ + Id: "test", + Type: "nats.server", + Params: map[string]any{ + "ip": "127.0.0.1", + "port": 4222, + }, + }) + + if err != nil { + t.Fatalf("failed to create nats.server module: %s", err) + } + + if moduleInstance.Id() != "test" { + t.Fatalf("nats.server module has wrong id: %s", moduleInstance.Id()) + } + + if moduleInstance.Type() != "nats.server" { + t.Fatalf("nats.server module has wrong type: %s", moduleInstance.Type()) + } +} + +func TestBadNATSServer(t *testing.T) { + tests := []struct { + name string + params map[string]any + errorString string + }{ + { + name: "non-string ip", + params: map[string]any{ + "ip": 123, + }, + errorString: "nats.server ip must be a string", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + + registration, ok := module.ModuleRegistry["nats.server"] + if !ok { + t.Fatalf("nats.server module not registered") + } + + moduleInstance, err := registration.New(config.ModuleConfig{ + Id: "test", + Type: "nats.server", + Params: test.params, + }) + + if err != nil { + if test.errorString != err.Error() { + t.Fatalf("nats.server got error '%s', expected '%s'", err.Error(), test.errorString) + } + return + } + + err = moduleInstance.Start(t.Context()) + + if err == nil { + t.Fatalf("nats.server expected to fail") + } + + if err.Error() != test.errorString { + t.Fatalf("nats.server got error '%s', expected '%s'", err.Error(), test.errorString) + } + }) + } +} diff --git a/schema/modules.schema.json b/schema/modules.schema.json index bc39564..6a2c4d9 100644 --- a/schema/modules.schema.json +++ b/schema/modules.schema.json @@ -207,6 +207,38 @@ "required": ["id", "type", "params"], "additionalProperties": false }, + { + "type": "object", + "title": "NATSServerModule", + "properties": { + "id": { + "type": "string", + "minLength": 1 + }, + "type": { + "const": "nats.server" + }, + "params": { + "type": "object", + "properties": { + "ip": { + "type": "string", + "default": "0.0.0.0" + }, + "port": { + "type": "integer", + "minimum": 1, + "maximum": 65535, + "default": 4222 + } + }, + "required": [], + "additionalProperties": false + } + }, + "required": ["id", "type", "params"], + "additionalProperties": false + }, { "type": "object", "title": "PSNClientModule", From 20fd4170ed8b23aa437ace4022ccd13c860d131e Mon Sep 17 00:00:00 2001 From: Joel Wetzell Date: Sun, 1 Mar 2026 13:16:13 -0600 Subject: [PATCH 2/2] handle param values that aren't from loading in JSON --- internal/module/nats-server.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/internal/module/nats-server.go b/internal/module/nats-server.go index f417535..04c0f48 100644 --- a/internal/module/nats-server.go +++ b/internal/module/nats-server.go @@ -33,13 +33,16 @@ func init() { port, ok := params["port"] if ok { - - specificportNum, ok := port.(float64) - + specificportNum, ok := port.(int) if !ok { - return nil, errors.New("net.udp.server port must be a number") + specificportNum, ok := port.(float64) + if !ok { + return nil, errors.New("nats.server port must be a number") + } + portNum = int(specificportNum) + } else { + portNum = int(specificportNum) } - portNum = int(specificportNum) } ipString := "0.0.0.0" @@ -50,7 +53,7 @@ func init() { specificIpString, ok := ip.(string) if !ok { - return nil, errors.New("net.udp.server ip must be a string") + return nil, errors.New("nats.server ip must be a string") } ipString = specificIpString }