26 Commits

Author SHA1 Message Date
Joel Wetzell
a7f889f6b5 Merge pull request #8 from jwetzell/feat/udp-multicast
add net.udp.multicast module
2025-12-06 10:36:47 -06:00
Joel Wetzell
5c5111a25e add net.udp.multicast module 2025-12-06 10:35:25 -06:00
Joel Wetzell
b31729fafe switch net.udp.client conn to pointer 2025-12-06 09:02:41 -06:00
Joel Wetzell
dff5430eb4 switch net.tcp.client conn to pointer 2025-12-06 09:02:30 -06:00
Joel Wetzell
eaca0dbf86 Merge pull request #7 from jwetzell/feat/tcp-server-output
support output from net.tcp.server
2025-12-06 08:59:21 -06:00
Joel Wetzell
aa3a1032f3 logging tweaks 2025-12-06 08:39:34 -06:00
Joel Wetzell
51a62f7fb2 cleanup 2025-12-06 08:23:40 -06:00
Joel Wetzell
2c8efcea4b add output to TCP server 2025-12-05 23:18:37 -06:00
Joel Wetzell
df1882b8f7 update psn-go 2025-12-04 23:11:30 -06:00
Joel Wetzell
1c8346cf65 cleanup error messages 2025-12-04 16:35:03 -06:00
Joel Wetzell
ba2fead834 cleanup TCP client connection handling 2025-12-03 23:03:02 -06:00
Joel Wetzell
cb7504922e add github funding file 2025-12-03 18:13:41 -06:00
Joel Wetzell
59d9405781 Merge pull request #6 from jwetzell/feat/tcp-udp-server-ip-binding
add ability to bind to specific IP address for TCP and UDP servers
2025-12-03 00:19:45 -06:00
Joel Wetzell
fbda348b58 add optional ip setting for net.udp.server 2025-12-03 00:18:30 -06:00
Joel Wetzell
c1a98483a4 add optional ip setting for net.tcp.server 2025-12-03 00:18:17 -06:00
Joel Wetzell
cd567e5b97 add simple test for string.encode 2025-12-02 12:59:28 -06:00
Joel Wetzell
d8d53f01d2 Merge pull request #5 from jwetzell/module/nats-client
add NATS client module
2025-12-02 12:59:09 -06:00
Joel Wetzell
f363fbf0a6 add NATS to readme 2025-12-02 12:57:45 -06:00
Joel Wetzell
d629146592 add simple NATS client 2025-12-02 12:57:05 -06:00
Joel Wetzell
b06ced2631 Add initial README with supported protocols 2025-12-02 08:21:49 -06:00
Joel Wetzell
a33fe88757 Merge pull request #4 from jwetzell/dependabot/github_actions/actions/setup-go-6
Bump actions/setup-go from 5 to 6
2025-12-02 08:16:44 -06:00
dependabot[bot]
ce673e31db Bump actions/setup-go from 5 to 6
Bumps [actions/setup-go](https://github.com/actions/setup-go) from 5 to 6.
- [Release notes](https://github.com/actions/setup-go/releases)
- [Commits](https://github.com/actions/setup-go/compare/v5...v6)

---
updated-dependencies:
- dependency-name: actions/setup-go
  dependency-version: '6'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-12-02 04:56:11 +00:00
Joel Wetzell
38857f7a29 Merge pull request #3 from jwetzell/fix/js-processor-err-handling
fix missing error handling in script.js
2025-12-01 22:13:25 -06:00
Joel Wetzell
45965a4eac fix missing error handling in script.js 2025-12-01 22:12:10 -06:00
Joel Wetzell
b372b53422 Merge pull request #2 from jwetzell/processing/string-create
add string.create processor
2025-12-01 22:10:48 -06:00
Joel Wetzell
97cf721abc add string.create processor 2025-12-01 22:05:53 -06:00
20 changed files with 638 additions and 56 deletions

2
.github/FUNDING.yml vendored Normal file
View File

@@ -0,0 +1,2 @@
# These are supported funding model platforms
github: [jwetzell]

View File

@@ -17,7 +17,7 @@ jobs:
with:
fetch-depth: 0
- name: setup go
uses: actions/setup-go@v5
uses: actions/setup-go@v6
with:
go-version-file: 'go.mod'
- name: release

29
README.md Normal file
View File

@@ -0,0 +1,29 @@
<div align="center">
# showbridge (go edition)
Simple protocol router _/s_
</div>
### Supported Protocols
- HTTP
- client
- server
- UDP
- client
- server
- TCP
- client
- server
- [MQTT](https://mqtt.org/)
- client
- [NATS](https://nats.io/)
- client
- [PosiStageNet](https://posistage.net/)
- client
- MIDI
- client (not included in pre-built binaries yet)

7
go.mod
View File

@@ -7,7 +7,8 @@ require (
github.com/expr-lang/expr v1.17.6
github.com/jwetzell/free-d-go v0.1.0
github.com/jwetzell/osc-go v0.1.0
github.com/jwetzell/psn-go v0.2.1
github.com/jwetzell/psn-go v0.3.0
github.com/nats-io/nats.go v1.47.0
github.com/urfave/cli/v3 v3.6.1
gitlab.com/gomidi/midi/v2 v2.3.16
modernc.org/quickjs v0.17.0
@@ -18,10 +19,14 @@ require (
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/nats-io/nkeys v0.4.11 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/ncruces/go-strftime v0.1.9 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
golang.org/x/crypto v0.42.0 // indirect
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect
golang.org/x/net v0.44.0 // indirect
golang.org/x/sync v0.17.0 // indirect

14
go.sum
View File

@@ -18,10 +18,18 @@ github.com/jwetzell/free-d-go v0.1.0 h1:xHt6dvyit98X+OC3jVzV0aLidxbyzi3vI9QiYkte
github.com/jwetzell/free-d-go v0.1.0/go.mod h1:KmrkooRARRaxJTBSPvwt/6IMAIaHH1R8bSA8cwbbELw=
github.com/jwetzell/osc-go v0.1.0 h1:EXxup5VWBErHot2Ri4MFToPf6KCzLDTbCt2x6GLfw8I=
github.com/jwetzell/osc-go v0.1.0/go.mod h1:xLz0jTwebSxtx1TkKN1YVdeRqvpFNweDhTut5TE393A=
github.com/jwetzell/psn-go v0.2.1 h1:pNG6XNfVRTb4qctH6pJjRJ1ReYGnGgNRA4H7tNbmzRU=
github.com/jwetzell/psn-go v0.2.1/go.mod h1:bcEAeti4sQM375buujb3mIfmUstD4Aby18gq3ENb6+o=
github.com/jwetzell/psn-go v0.3.0 h1:WVpCEmExYE8a+I5hQak5jNJJp2x35VdGX/VuMUKPmhY=
github.com/jwetzell/psn-go v0.3.0/go.mod h1:bcEAeti4sQM375buujb3mIfmUstD4Aby18gq3ENb6+o=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/nats-io/nats.go v1.47.0 h1:YQdADw6J/UfGUd2Oy6tn4Hq6YHxCaJrVKayxxFqYrgM=
github.com/nats-io/nats.go v1.47.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -38,6 +46,8 @@ go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
go.yaml.in/yaml/v3 v3.0.3 h1:bXOww4E/J3f66rav3pX3m8w6jDE4knZjGOw8b5Y6iNE=
go.yaml.in/yaml/v3 v3.0.3/go.mod h1:tBHosrYAkRZjRAOREWbDnBXUf08JOwYq++0QNwQiWzI=
golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI=
golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8=
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o=
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8=
golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA=

View File

@@ -0,0 +1,83 @@
package processing
import (
"bytes"
"context"
"fmt"
"text/template"
)
type NATSMessage struct {
Subject string
Payload []byte
}
type NATSMessageCreate struct {
config ProcessorConfig
Subject string
Payload *template.Template
}
func (nmc *NATSMessageCreate) Process(ctx context.Context, payload any) (any, error) {
var payloadBuffer bytes.Buffer
err := nmc.Payload.Execute(&payloadBuffer, payload)
if err != nil {
return nil, err
}
payloadString := payloadBuffer.String()
message := NATSMessage{
Subject: nmc.Subject,
Payload: []byte(payloadString),
}
return message, nil
}
func (nmc *NATSMessageCreate) Type() string {
return nmc.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "nats.message.create",
New: func(config ProcessorConfig) (Processor, error) {
params := config.Params
// TODO(jwetzell): support template for subject
subject, ok := params["subject"]
if !ok {
return nil, fmt.Errorf("nats.message.create requires a subject parameter")
}
subjectString, ok := subject.(string)
if !ok {
return nil, fmt.Errorf("nats.message.create subject must be a string")
}
payload, ok := params["payload"]
if !ok {
return nil, fmt.Errorf("osc.message.create requires a payload parameter")
}
payloadString, ok := payload.(string)
if !ok {
return nil, fmt.Errorf("osc.message.create payload must be a string")
}
payloadTemplate, err := template.New("payload").Parse(payloadString)
if err != nil {
return nil, err
}
return &NATSMessageCreate{config: config, Subject: subjectString, Payload: payloadTemplate}, nil
},
})
}

View File

@@ -0,0 +1,35 @@
package processing
import (
"context"
"fmt"
"github.com/nats-io/nats.go"
)
type NATSMessageEncode struct {
config ProcessorConfig
}
func (nme *NATSMessageEncode) Process(ctx context.Context, payload any) (any, error) {
payloadMessage, ok := payload.(*nats.Msg)
if !ok {
return nil, fmt.Errorf("nats.message.encode processor only accepts an nats.Msg")
}
return payloadMessage.Data, nil
}
func (nme *NATSMessageEncode) Type() string {
return nme.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "nats.message.encode",
New: func(config ProcessorConfig) (Processor, error) {
return &NATSMessageEncode{config: config}, nil
},
})
}

View File

@@ -31,6 +31,10 @@ func (sj *ScriptJS) Process(ctx context.Context, payload any) (any, error) {
_, err = vm.Eval(sj.Program, quickjs.EvalGlobal)
if err != nil {
return nil, err
}
output, err := vm.GetProperty(vm.GlobalObject(), payloadAtom)
if err != nil {

View File

@@ -0,0 +1,57 @@
package processing
import (
"bytes"
"context"
"fmt"
"text/template"
)
type StringCreate struct {
config ProcessorConfig
Template *template.Template
}
func (sc *StringCreate) Process(ctx context.Context, payload any) (any, error) {
var templateBuffer bytes.Buffer
err := sc.Template.Execute(&templateBuffer, payload)
if err != nil {
return nil, err
}
payloadString := templateBuffer.String()
return payloadString, nil
}
func (sc *StringCreate) Type() string {
return sc.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "string.create",
New: func(config ProcessorConfig) (Processor, error) {
params := config.Params
tmpl, ok := params["template"]
if !ok {
return nil, fmt.Errorf("string.create requires a template parameter")
}
templateString, ok := tmpl.(string)
if !ok {
return nil, fmt.Errorf("string.create template must be a string")
}
templateTemplate, err := template.New("template").Parse(templateString)
if err != nil {
return nil, err
}
return &StringCreate{config: config, Template: templateTemplate}, nil
},
})
}

View File

@@ -0,0 +1,42 @@
package processing_test
import (
"slices"
"testing"
"github.com/jwetzell/showbridge-go/internal/processing"
)
func TestGoodStringEncode(t *testing.T) {
stringEncoder := processing.StringEncode{}
tests := []struct {
processor processing.Processor
name string
payload any
expected []byte
}{
{
processor: &stringEncoder,
name: "hello",
payload: "hello",
expected: []byte{0x68, 0x65, 0x6c, 0x6c, 0x6f},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got, err := test.processor.Process(t.Context(), test.payload)
gotBytes, ok := got.([]byte)
if !ok {
t.Errorf("string.encode returned a %T payload: %s", got, got)
}
if err != nil {
t.Errorf("string.encode failed: %s", err)
}
if !slices.Equal(gotBytes, test.expected) {
t.Errorf("string.encode got %s, expected %s", got, test.expected)
}
})
}
}

View File

@@ -21,13 +21,13 @@ func init() {
duration, ok := params["duration"]
if !ok {
return nil, fmt.Errorf("interval requires a duration parameter")
return nil, fmt.Errorf("gen.interval requires a duration parameter")
}
durationNum, ok := duration.(float64)
if !ok {
return nil, fmt.Errorf("interval duration must be number")
return nil, fmt.Errorf("gen.interval duration must be number")
}
return &Interval{Duration: uint32(durationNum), config: config}, nil
@@ -51,14 +51,15 @@ func (i *Interval) Run() error {
ticker := time.NewTicker(time.Millisecond * time.Duration(i.Duration))
i.ticker = ticker
defer ticker.Stop()
for {
select {
case <-i.router.Context.Done():
slog.Debug("router context done in module", "id", i.config.Id)
return nil
case t := <-ticker.C:
case <-ticker.C:
if i.router != nil {
i.router.HandleInput(i.config.Id, t)
i.router.HandleInput(i.config.Id, time.Now())
}
}
}

View File

@@ -27,7 +27,7 @@ func init() {
input, ok := params["input"]
if !ok {
return nil, fmt.Errorf("net.mqtt.client requires a input parameter")
return nil, fmt.Errorf("misc.midi.client requires a input parameter")
}
inputString, ok := input.(string)
@@ -39,7 +39,7 @@ func init() {
output, ok := params["output"]
if !ok {
return nil, fmt.Errorf("net.mqtt.client requires a output parameter")
return nil, fmt.Errorf("misc.midi.client requires a output parameter")
}
outputString, ok := output.(string)

View File

@@ -31,7 +31,7 @@ func init() {
brokerString, ok := broker.(string)
if !ok {
return nil, fmt.Errorf("net.mqtt.client host must be string")
return nil, fmt.Errorf("net.mqtt.client broker must be string")
}
topic, ok := params["topic"]
@@ -43,7 +43,7 @@ func init() {
topicString, ok := topic.(string)
if !ok {
return nil, fmt.Errorf("net.mqtt.client host must be string")
return nil, fmt.Errorf("net.mqtt.client topic must be string")
}
clientId, ok := params["clientId"]
@@ -55,7 +55,7 @@ func init() {
clientIdString, ok := clientId.(string)
if !ok {
return nil, fmt.Errorf("net.mqtt.client host must be string")
return nil, fmt.Errorf("net.mqtt.client clientId must be string")
}
return &MQTTClient{config: config, Broker: brokerString, Topic: topicString, ClientID: clientIdString}, nil
@@ -108,7 +108,7 @@ func (mc *MQTTClient) Output(payload any) error {
payloadMessage, ok := payload.(processing.MQTTMessage)
if !ok {
return fmt.Errorf("net.mqtt.client is only able to output MQTTMessage")
return fmt.Errorf("net.mqtt.client is only able to output a MQTTMessage")
}
if mc.client == nil {

113
nats-client.go Normal file
View File

@@ -0,0 +1,113 @@
package showbridge
import (
"fmt"
"log/slog"
"github.com/jwetzell/showbridge-go/internal/processing"
"github.com/nats-io/nats.go"
)
type NATSClient struct {
config ModuleConfig
router *Router
URL string
Subject string
client *nats.Conn
}
func init() {
RegisterModule(ModuleRegistration{
Type: "net.nats.client",
New: func(config ModuleConfig) (Module, error) {
params := config.Params
url, ok := params["url"]
if !ok {
return nil, fmt.Errorf("net.nats.client requires a url parameter")
}
urlString, ok := url.(string)
if !ok {
return nil, fmt.Errorf("net.nats.client url must be string")
}
subject, ok := params["subject"]
if !ok {
return nil, fmt.Errorf("net.nats.client requires a subject parameter")
}
subjectString, ok := subject.(string)
if !ok {
return nil, fmt.Errorf("net.nats.client subject must be string")
}
return &NATSClient{config: config, URL: urlString, Subject: subjectString}, nil
},
})
}
func (nc *NATSClient) Id() string {
return nc.config.Id
}
func (nc *NATSClient) Type() string {
return nc.config.Type
}
func (nc *NATSClient) RegisterRouter(router *Router) {
nc.router = router
}
func (nc *NATSClient) Run() error {
client, err := nats.Connect(nc.URL, nats.RetryOnFailedConnect(true))
if err != nil {
return err
}
nc.client = client
defer client.Drain()
defer client.Close()
sub, err := nc.client.Subscribe(nc.Subject, func(msg *nats.Msg) {
if nc.router != nil {
nc.router.HandleInput(nc.config.Id, msg)
}
})
if err != nil {
return err
}
defer sub.Unsubscribe()
<-nc.router.Context.Done()
slog.Debug("router context done in module", "id", nc.config.Id)
return nil
}
func (nc *NATSClient) Output(payload any) error {
payloadMessage, ok := payload.(processing.NATSMessage)
if !ok {
return fmt.Errorf("net.nats.client is only able to output NATSMessage")
}
if nc.client == nil {
return fmt.Errorf("net.nats.client client is not setup")
}
if !nc.client.IsConnected() {
return fmt.Errorf("net.nats.client is not connected")
}
err := nc.client.Publish(payloadMessage.Subject, payloadMessage.Payload)
return err
}

View File

@@ -11,11 +11,10 @@ import (
type TCPClient struct {
config ModuleConfig
Host string
Port uint16
framer framing.Framer
conn net.Conn
conn *net.TCPConn
router *Router
Addr *net.TCPAddr
}
func init() {
@@ -32,7 +31,7 @@ func init() {
hostString, ok := host.(string)
if !ok {
return nil, fmt.Errorf("net.tcp.client host must be uint16")
return nil, fmt.Errorf("net.tcp.client host must be string")
}
port, ok := params["port"]
@@ -43,7 +42,12 @@ func init() {
portNum, ok := port.(float64)
if !ok {
return nil, fmt.Errorf("net.tcp.client port must be uint16")
return nil, fmt.Errorf("net.tcp.client port must be a number")
}
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", hostString, uint16(portNum)))
if err != nil {
return nil, err
}
framingMethod, ok := params["framing"]
@@ -54,7 +58,7 @@ func init() {
framingMethodString, ok := framingMethod.(string)
if !ok {
return nil, fmt.Errorf("tcp framing method must be a string")
return nil, fmt.Errorf("net.tcp.client framing method must be a string")
}
var framer framing.Framer
@@ -72,7 +76,7 @@ func init() {
return nil, fmt.Errorf("unknown framing method: %s", framingMethodString)
}
return &TCPClient{framer: framer, Host: hostString, Port: uint16(portNum), config: config}, nil
return &TCPClient{framer: framer, Addr: addr, config: config}, nil
},
})
}
@@ -90,10 +94,6 @@ func (tc *TCPClient) RegisterRouter(router *Router) {
}
func (tc *TCPClient) Run() error {
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", tc.Host, tc.Port))
if err != nil {
return err
}
// TODO(jwetzell): shutdown with router.Context properly
go func() {
@@ -105,7 +105,7 @@ func (tc *TCPClient) Run() error {
}()
for {
client, err := net.DialTCP("tcp", nil, addr)
err := tc.SetupConn()
if err != nil {
if tc.router.Context.Err() != nil {
slog.Debug("router context done in module", "id", tc.config.Id)
@@ -116,8 +116,6 @@ func (tc *TCPClient) Run() error {
continue
}
tc.conn = client
buffer := make([]byte, 1024)
select {
case <-tc.router.Context.Done():
@@ -131,7 +129,7 @@ func (tc *TCPClient) Run() error {
slog.Debug("router context done in module", "id", tc.config.Id)
return nil
default:
byteCount, err := client.Read(buffer)
byteCount, err := tc.conn.Read(buffer)
if err != nil {
tc.framer.Clear()
@@ -151,22 +149,29 @@ func (tc *TCPClient) Run() error {
}
}
}
}
}
}
}
func (tc *TCPClient) SetupConn() error {
client, err := net.DialTCP("tcp", nil, tc.Addr)
tc.conn = client
return err
}
func (tc *TCPClient) Output(payload any) error {
if tc.conn != nil {
payloadBytes, ok := payload.([]byte)
if !ok {
return fmt.Errorf("net.tcp.client is only able to output bytes")
// NOTE(jwetzell): not sure how this would occur but
if tc.conn == nil {
err := tc.SetupConn()
if err != nil {
return err
}
_, err := tc.conn.Write(tc.framer.Encode(payloadBytes))
return err
}
return nil
payloadBytes, ok := payload.([]byte)
if !ok {
return fmt.Errorf("net.tcp.client is only able to output bytes")
}
_, err := tc.conn.Write(tc.framer.Encode(payloadBytes))
return err
}

View File

@@ -1,10 +1,13 @@
package showbridge
import (
"errors"
"fmt"
"log/slog"
"net"
"slices"
"sync"
"syscall"
"time"
"github.com/jwetzell/showbridge-go/internal/framing"
@@ -12,11 +15,14 @@ import (
type TCPServer struct {
config ModuleConfig
Ip string
Port uint16
framingMethod string
router *Router
quit chan interface{}
wg sync.WaitGroup
connections []net.Conn
connectionsMu sync.RWMutex
}
func init() {
@@ -32,7 +38,7 @@ func init() {
portNum, ok := port.(float64)
if !ok {
return nil, fmt.Errorf("net.tcp.server port must be uint16")
return nil, fmt.Errorf("net.tcp.server port must be a number")
}
framingMethod, ok := params["framing"]
@@ -43,10 +49,23 @@ func init() {
framingMethodString, ok := framingMethod.(string)
if !ok {
return nil, fmt.Errorf("tcp framing method must be a string")
return nil, fmt.Errorf("net.tcp.server framing method must be a string")
}
return &TCPServer{framingMethod: framingMethodString, Port: uint16(portNum), config: config, quit: make(chan interface{})}, nil
ipString := "0.0.0.0"
ip, ok := params["ip"]
if ok {
specificIpString, ok := ip.(string)
if !ok {
return nil, fmt.Errorf("net.tcp.server ip must be a string")
}
ipString = specificIpString
}
return &TCPServer{framingMethod: framingMethodString, Port: uint16(portNum), Ip: ipString, config: config, quit: make(chan interface{})}, nil
},
})
}
@@ -64,7 +83,10 @@ func (ts *TCPServer) RegisterRouter(router *Router) {
}
func (ts *TCPServer) handleClient(client net.Conn) {
slog.Debug("connection accepted", "id", ts.config.Id, "remoteAddr", client.RemoteAddr().String())
ts.connectionsMu.Lock()
ts.connections = append(ts.connections, client)
ts.connectionsMu.Unlock()
slog.Debug("net.tcp.server connection accepted", "id", ts.config.Id, "remoteAddr", client.RemoteAddr().String())
defer client.Close()
var framer framing.Framer
@@ -90,12 +112,34 @@ ClientRead:
byteCount, err := client.Read(buffer)
if err != nil {
//NOTE(jwetzell) we hit deadline
if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
continue ClientRead
if opErr, ok := err.(*net.OpError); ok {
//NOTE(jwetzell) we hit deadline
if opErr.Timeout() {
continue ClientRead
}
if errors.Is(opErr, syscall.ECONNRESET) {
ts.connectionsMu.Lock()
for i := 0; i < len(ts.connections); i++ {
if ts.connections[i] == client {
ts.connections = slices.Delete(ts.connections, i, i+1)
break
}
}
slog.Debug("net.tcp.server connection reset", "id", ts.config.Id, "remoteAddr", client.RemoteAddr().String())
ts.connectionsMu.Unlock()
}
}
if err.Error() == "EOF" {
slog.Debug("connection closed", "id", ts.config.Id, "remoteAddr", client.RemoteAddr().String())
ts.connectionsMu.Lock()
for i := 0; i < len(ts.connections); i++ {
if ts.connections[i] == client {
ts.connections = slices.Delete(ts.connections, i, i+1)
break
}
}
slog.Debug("net.tcp.server stream ended", "id", ts.config.Id, "remoteAddr", client.RemoteAddr().String())
ts.connectionsMu.Unlock()
}
return
}
@@ -106,7 +150,7 @@ ClientRead:
if ts.router != nil {
ts.router.HandleInput(ts.config.Id, message)
} else {
slog.Error("tcp-server has no router", "id", ts.config.Id)
slog.Error("net.tcp.server has no router", "id", ts.config.Id)
}
}
}
@@ -116,7 +160,8 @@ ClientRead:
}
func (ts *TCPServer) Run() error {
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", ts.Port))
// TODO(jwetzell): switch to net.ListenTCP and move addr resolution to init
listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", ts.Ip, ts.Port))
if err != nil {
return err
}
@@ -153,5 +198,24 @@ AcceptLoop:
}
func (ts *TCPServer) Output(payload any) error {
return fmt.Errorf("net.tcp.server output is not implemented")
payloadBytes, ok := payload.([]byte)
if !ok {
return fmt.Errorf("net.tcp.server is only able to output bytes")
}
ts.connectionsMu.Lock()
errorString := ""
for _, connection := range ts.connections {
_, err := connection.Write(payloadBytes)
if err != nil {
errorString += fmt.Sprintf("%s\n", err.Error())
}
}
ts.connectionsMu.Unlock()
if errorString == "" {
return nil
}
return fmt.Errorf("%s", errorString)
}

View File

@@ -21,13 +21,13 @@ func init() {
duration, ok := params["duration"]
if !ok {
return nil, fmt.Errorf("timer requires a duration parameter")
return nil, fmt.Errorf("gen.timer requires a duration parameter")
}
durationNum, ok := duration.(float64)
if !ok {
return nil, fmt.Errorf("timer duration must be number")
return nil, fmt.Errorf("gen.timer duration must be a number")
}
return &Timer{Duration: uint32(durationNum), config: config}, nil

View File

@@ -10,7 +10,7 @@ type UDPClient struct {
config ModuleConfig
Host string
Port uint16
conn net.Conn
conn *net.UDPConn
router *Router
addr *net.UDPAddr
}
@@ -29,7 +29,7 @@ func init() {
hostString, ok := host.(string)
if !ok {
return nil, fmt.Errorf("net.udp.client host must be uint16")
return nil, fmt.Errorf("net.udp.client host must be a string")
}
port, ok := params["port"]
@@ -71,6 +71,9 @@ func (uc *UDPClient) Run() error {
<-uc.router.Context.Done()
slog.Debug("router context done in module", "id", uc.config.Id)
if uc.conn != nil {
uc.conn.Close()
}
return nil
}
@@ -81,6 +84,7 @@ func (uc *UDPClient) Output(payload any) error {
return fmt.Errorf("net.udp.client is only able to output bytes")
}
// TODO(jwetzell): reuse connection or setup new one when necessary
client, err := net.DialUDP("udp", nil, uc.addr)
if err != nil {
return err

113
udp-multicast.go Normal file
View File

@@ -0,0 +1,113 @@
package showbridge
import (
"fmt"
"log/slog"
"net"
"time"
)
type UDPMulticast struct {
config ModuleConfig
conn *net.UDPConn
router *Router
Addr *net.UDPAddr
}
func init() {
RegisterModule(ModuleRegistration{
Type: "net.udp.multicast",
New: func(config ModuleConfig) (Module, error) {
params := config.Params
ip, ok := params["ip"]
if !ok {
return nil, fmt.Errorf("net.udp.client requires am ip parameter")
}
ipString, ok := ip.(string)
if !ok {
return nil, fmt.Errorf("net.udp.client ip must be a string")
}
port, ok := params["port"]
if !ok {
return nil, fmt.Errorf("net.udp.client requires a port parameter")
}
portNum, ok := port.(float64)
if !ok {
return nil, fmt.Errorf("net.udp.client port must be a number")
}
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", ipString, uint16(portNum)))
if err != nil {
return nil, err
}
return &UDPMulticast{config: config, Addr: addr}, nil
},
})
}
func (um *UDPMulticast) Id() string {
return um.config.Id
}
func (um *UDPMulticast) Type() string {
return um.config.Type
}
func (um *UDPMulticast) RegisterRouter(router *Router) {
um.router = router
}
func (um *UDPMulticast) Run() error {
client, err := net.ListenMulticastUDP("udp", nil, um.Addr)
if err != nil {
return err
}
defer client.Close()
um.conn = client
buffer := make([]byte, 2048)
for {
select {
case <-um.router.Context.Done():
// TODO(jwetzell): cleanup?
slog.Debug("router context done in module", "id", um.config.Id)
return nil
default:
um.conn.SetDeadline(time.Now().Add(time.Millisecond * 200))
numBytes, _, err := um.conn.ReadFromUDP(buffer)
if err != nil {
//NOTE(jwetzell) we hit deadline
if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
continue
}
return err
}
if numBytes > 0 {
message := buffer[:numBytes]
if err != nil {
slog.Error("net.udp.multicast problem decoding psn traffic", "id", um.config.Id, "error", err)
}
if um.router != nil {
um.router.HandleInput(um.config.Id, message)
} else {
slog.Error("net.udp.multicast has no router", "id", um.config.Id)
}
}
}
}
}
func (um *UDPMulticast) Output(payload any) error {
return fmt.Errorf("net.udp.multicast output is not implemented")
}

View File

@@ -9,6 +9,7 @@ import (
)
type UDPServer struct {
Ip string
Port uint16
config ModuleConfig
router *Router
@@ -27,10 +28,23 @@ func init() {
portNum, ok := port.(float64)
if !ok {
return nil, fmt.Errorf("net.udp.server port must be uint16")
return nil, fmt.Errorf("net.udp.server port must be a number")
}
return &UDPServer{Port: uint16(portNum), config: config}, nil
ipString := "0.0.0.0"
ip, ok := params["ip"]
if ok {
specificIpString, ok := ip.(string)
if !ok {
return nil, fmt.Errorf("net.udp.server ip must be a string")
}
ipString = specificIpString
}
return &UDPServer{Ip: ipString, Port: uint16(portNum), config: config}, nil
},
})
}
@@ -49,7 +63,8 @@ func (us *UDPServer) RegisterRouter(router *Router) {
func (us *UDPServer) Run() error {
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", us.Port))
// TODO(jwetzell): move this to init
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", us.Ip, us.Port))
if err != nil {
log.Fatalf("error resolving UDP address: %v", err)
}