mirror of
https://github.com/jwetzell/showbridge-go.git
synced 2026-04-26 21:05:30 +00:00
Merge pull request #71 from jwetzell/feat/nats-server-module
add nats.server module
This commit is contained in:
125
internal/module/nats-server.go
Normal file
125
internal/module/nats-server.go
Normal file
@@ -0,0 +1,125 @@
|
||||
package module
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/jwetzell/showbridge-go/internal/config"
|
||||
"github.com/jwetzell/showbridge-go/internal/route"
|
||||
"github.com/nats-io/nats-server/v2/server"
|
||||
)
|
||||
|
||||
type NATSServer struct {
|
||||
config config.ModuleConfig
|
||||
ctx context.Context
|
||||
Ip string
|
||||
Port int
|
||||
router route.RouteIO
|
||||
server *server.Server
|
||||
logger *slog.Logger
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func init() {
|
||||
RegisterModule(ModuleRegistration{
|
||||
Type: "nats.server",
|
||||
New: func(config config.ModuleConfig) (Module, error) {
|
||||
params := config.Params
|
||||
portNum := 4222
|
||||
|
||||
port, ok := params["port"]
|
||||
if ok {
|
||||
specificportNum, ok := port.(int)
|
||||
if !ok {
|
||||
specificportNum, ok := port.(float64)
|
||||
if !ok {
|
||||
return nil, errors.New("nats.server port must be a number")
|
||||
}
|
||||
portNum = int(specificportNum)
|
||||
} else {
|
||||
portNum = int(specificportNum)
|
||||
}
|
||||
}
|
||||
|
||||
ipString := "0.0.0.0"
|
||||
|
||||
ip, ok := params["ip"]
|
||||
if ok {
|
||||
|
||||
specificIpString, ok := ip.(string)
|
||||
|
||||
if !ok {
|
||||
return nil, errors.New("nats.server ip must be a string")
|
||||
}
|
||||
ipString = specificIpString
|
||||
}
|
||||
|
||||
_, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", ipString, uint16(portNum)))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &NATSServer{config: config, logger: CreateLogger(config), Ip: ipString, Port: portNum}, nil
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func (ns *NATSServer) Id() string {
|
||||
return ns.config.Id
|
||||
}
|
||||
|
||||
func (ns *NATSServer) Type() string {
|
||||
return ns.config.Type
|
||||
}
|
||||
|
||||
func (ns *NATSServer) Start(ctx context.Context) error {
|
||||
ns.logger.Debug("running")
|
||||
router, ok := ctx.Value(route.RouterContextKey).(route.RouteIO)
|
||||
|
||||
if !ok {
|
||||
return errors.New("nats.server unable to get router from context")
|
||||
}
|
||||
|
||||
ns.router = router
|
||||
moduleContext, cancel := context.WithCancel(ctx)
|
||||
ns.ctx = moduleContext
|
||||
ns.cancel = cancel
|
||||
|
||||
natsServer, err := server.NewServer(&server.Options{
|
||||
Host: ns.Ip,
|
||||
Port: ns.Port,
|
||||
NoLog: true,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ns.server = natsServer
|
||||
natsServer.Start()
|
||||
defer natsServer.Shutdown()
|
||||
|
||||
if !natsServer.ReadyForConnections(5 * time.Second) {
|
||||
return errors.New("nats.server failed to start")
|
||||
}
|
||||
ns.logger.Info("NATS server started", "client_url", natsServer.ClientURL())
|
||||
|
||||
<-ns.ctx.Done()
|
||||
|
||||
ns.logger.Debug("done")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ns *NATSServer) Output(ctx context.Context, payload any) error {
|
||||
return errors.ErrUnsupported
|
||||
}
|
||||
|
||||
func (ns *NATSServer) Stop() {
|
||||
ns.cancel()
|
||||
if ns.server != nil {
|
||||
ns.server.Shutdown()
|
||||
}
|
||||
}
|
||||
85
internal/module/test/nats-server_test.go
Normal file
85
internal/module/test/nats-server_test.go
Normal file
@@ -0,0 +1,85 @@
|
||||
package module_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/jwetzell/showbridge-go/internal/config"
|
||||
"github.com/jwetzell/showbridge-go/internal/module"
|
||||
)
|
||||
|
||||
func TestNATSServerFromRegistry(t *testing.T) {
|
||||
registration, ok := module.ModuleRegistry["nats.server"]
|
||||
if !ok {
|
||||
t.Fatalf("nats.server module not registered")
|
||||
}
|
||||
|
||||
moduleInstance, err := registration.New(config.ModuleConfig{
|
||||
Id: "test",
|
||||
Type: "nats.server",
|
||||
Params: map[string]any{
|
||||
"ip": "127.0.0.1",
|
||||
"port": 4222,
|
||||
},
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create nats.server module: %s", err)
|
||||
}
|
||||
|
||||
if moduleInstance.Id() != "test" {
|
||||
t.Fatalf("nats.server module has wrong id: %s", moduleInstance.Id())
|
||||
}
|
||||
|
||||
if moduleInstance.Type() != "nats.server" {
|
||||
t.Fatalf("nats.server module has wrong type: %s", moduleInstance.Type())
|
||||
}
|
||||
}
|
||||
|
||||
func TestBadNATSServer(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
params map[string]any
|
||||
errorString string
|
||||
}{
|
||||
{
|
||||
name: "non-string ip",
|
||||
params: map[string]any{
|
||||
"ip": 123,
|
||||
},
|
||||
errorString: "nats.server ip must be a string",
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
|
||||
registration, ok := module.ModuleRegistry["nats.server"]
|
||||
if !ok {
|
||||
t.Fatalf("nats.server module not registered")
|
||||
}
|
||||
|
||||
moduleInstance, err := registration.New(config.ModuleConfig{
|
||||
Id: "test",
|
||||
Type: "nats.server",
|
||||
Params: test.params,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
if test.errorString != err.Error() {
|
||||
t.Fatalf("nats.server got error '%s', expected '%s'", err.Error(), test.errorString)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
err = moduleInstance.Start(t.Context())
|
||||
|
||||
if err == nil {
|
||||
t.Fatalf("nats.server expected to fail")
|
||||
}
|
||||
|
||||
if err.Error() != test.errorString {
|
||||
t.Fatalf("nats.server got error '%s', expected '%s'", err.Error(), test.errorString)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user