mirror of
https://github.com/jwetzell/showbridge-go.git
synced 2026-04-27 13:25:40 +00:00
Compare commits
22 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a7f889f6b5 | ||
|
|
5c5111a25e | ||
|
|
b31729fafe | ||
|
|
dff5430eb4 | ||
|
|
eaca0dbf86 | ||
|
|
aa3a1032f3 | ||
|
|
51a62f7fb2 | ||
|
|
2c8efcea4b | ||
|
|
df1882b8f7 | ||
|
|
1c8346cf65 | ||
|
|
ba2fead834 | ||
|
|
cb7504922e | ||
|
|
59d9405781 | ||
|
|
fbda348b58 | ||
|
|
c1a98483a4 | ||
|
|
cd567e5b97 | ||
|
|
d8d53f01d2 | ||
|
|
f363fbf0a6 | ||
|
|
d629146592 | ||
|
|
b06ced2631 | ||
|
|
a33fe88757 | ||
|
|
ce673e31db |
2
.github/FUNDING.yml
vendored
Normal file
2
.github/FUNDING.yml
vendored
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
# These are supported funding model platforms
|
||||||
|
github: [jwetzell]
|
||||||
2
.github/workflows/release-showbridge.yaml
vendored
2
.github/workflows/release-showbridge.yaml
vendored
@@ -17,7 +17,7 @@ jobs:
|
|||||||
with:
|
with:
|
||||||
fetch-depth: 0
|
fetch-depth: 0
|
||||||
- name: setup go
|
- name: setup go
|
||||||
uses: actions/setup-go@v5
|
uses: actions/setup-go@v6
|
||||||
with:
|
with:
|
||||||
go-version-file: 'go.mod'
|
go-version-file: 'go.mod'
|
||||||
- name: release
|
- name: release
|
||||||
|
|||||||
29
README.md
Normal file
29
README.md
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
<div align="center">
|
||||||
|
|
||||||
|
# showbridge (go edition)
|
||||||
|
|
||||||
|
Simple protocol router _/s_
|
||||||
|
|
||||||
|
</div>
|
||||||
|
|
||||||
|
### Supported Protocols
|
||||||
|
- HTTP
|
||||||
|
- client
|
||||||
|
- server
|
||||||
|
- UDP
|
||||||
|
- client
|
||||||
|
- server
|
||||||
|
- TCP
|
||||||
|
- client
|
||||||
|
- server
|
||||||
|
- [MQTT](https://mqtt.org/)
|
||||||
|
- client
|
||||||
|
- [NATS](https://nats.io/)
|
||||||
|
- client
|
||||||
|
- [PosiStageNet](https://posistage.net/)
|
||||||
|
- client
|
||||||
|
- MIDI
|
||||||
|
- client (not included in pre-built binaries yet)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
7
go.mod
7
go.mod
@@ -7,7 +7,8 @@ require (
|
|||||||
github.com/expr-lang/expr v1.17.6
|
github.com/expr-lang/expr v1.17.6
|
||||||
github.com/jwetzell/free-d-go v0.1.0
|
github.com/jwetzell/free-d-go v0.1.0
|
||||||
github.com/jwetzell/osc-go v0.1.0
|
github.com/jwetzell/osc-go v0.1.0
|
||||||
github.com/jwetzell/psn-go v0.2.1
|
github.com/jwetzell/psn-go v0.3.0
|
||||||
|
github.com/nats-io/nats.go v1.47.0
|
||||||
github.com/urfave/cli/v3 v3.6.1
|
github.com/urfave/cli/v3 v3.6.1
|
||||||
gitlab.com/gomidi/midi/v2 v2.3.16
|
gitlab.com/gomidi/midi/v2 v2.3.16
|
||||||
modernc.org/quickjs v0.17.0
|
modernc.org/quickjs v0.17.0
|
||||||
@@ -18,10 +19,14 @@ require (
|
|||||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||||
github.com/google/uuid v1.6.0 // indirect
|
github.com/google/uuid v1.6.0 // indirect
|
||||||
github.com/gorilla/websocket v1.5.3 // indirect
|
github.com/gorilla/websocket v1.5.3 // indirect
|
||||||
|
github.com/klauspost/compress v1.18.0 // indirect
|
||||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||||
|
github.com/nats-io/nkeys v0.4.11 // indirect
|
||||||
|
github.com/nats-io/nuid v1.0.1 // indirect
|
||||||
github.com/ncruces/go-strftime v0.1.9 // indirect
|
github.com/ncruces/go-strftime v0.1.9 // indirect
|
||||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||||
go.yaml.in/yaml/v2 v2.4.2 // indirect
|
go.yaml.in/yaml/v2 v2.4.2 // indirect
|
||||||
|
golang.org/x/crypto v0.42.0 // indirect
|
||||||
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect
|
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect
|
||||||
golang.org/x/net v0.44.0 // indirect
|
golang.org/x/net v0.44.0 // indirect
|
||||||
golang.org/x/sync v0.17.0 // indirect
|
golang.org/x/sync v0.17.0 // indirect
|
||||||
|
|||||||
14
go.sum
14
go.sum
@@ -18,10 +18,18 @@ github.com/jwetzell/free-d-go v0.1.0 h1:xHt6dvyit98X+OC3jVzV0aLidxbyzi3vI9QiYkte
|
|||||||
github.com/jwetzell/free-d-go v0.1.0/go.mod h1:KmrkooRARRaxJTBSPvwt/6IMAIaHH1R8bSA8cwbbELw=
|
github.com/jwetzell/free-d-go v0.1.0/go.mod h1:KmrkooRARRaxJTBSPvwt/6IMAIaHH1R8bSA8cwbbELw=
|
||||||
github.com/jwetzell/osc-go v0.1.0 h1:EXxup5VWBErHot2Ri4MFToPf6KCzLDTbCt2x6GLfw8I=
|
github.com/jwetzell/osc-go v0.1.0 h1:EXxup5VWBErHot2Ri4MFToPf6KCzLDTbCt2x6GLfw8I=
|
||||||
github.com/jwetzell/osc-go v0.1.0/go.mod h1:xLz0jTwebSxtx1TkKN1YVdeRqvpFNweDhTut5TE393A=
|
github.com/jwetzell/osc-go v0.1.0/go.mod h1:xLz0jTwebSxtx1TkKN1YVdeRqvpFNweDhTut5TE393A=
|
||||||
github.com/jwetzell/psn-go v0.2.1 h1:pNG6XNfVRTb4qctH6pJjRJ1ReYGnGgNRA4H7tNbmzRU=
|
github.com/jwetzell/psn-go v0.3.0 h1:WVpCEmExYE8a+I5hQak5jNJJp2x35VdGX/VuMUKPmhY=
|
||||||
github.com/jwetzell/psn-go v0.2.1/go.mod h1:bcEAeti4sQM375buujb3mIfmUstD4Aby18gq3ENb6+o=
|
github.com/jwetzell/psn-go v0.3.0/go.mod h1:bcEAeti4sQM375buujb3mIfmUstD4Aby18gq3ENb6+o=
|
||||||
|
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
|
||||||
|
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
|
||||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||||
|
github.com/nats-io/nats.go v1.47.0 h1:YQdADw6J/UfGUd2Oy6tn4Hq6YHxCaJrVKayxxFqYrgM=
|
||||||
|
github.com/nats-io/nats.go v1.47.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
|
||||||
|
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
|
||||||
|
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
|
||||||
|
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||||
|
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||||
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
|
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
|
||||||
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
|
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
@@ -38,6 +46,8 @@ go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
|
|||||||
go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
|
go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
|
||||||
go.yaml.in/yaml/v3 v3.0.3 h1:bXOww4E/J3f66rav3pX3m8w6jDE4knZjGOw8b5Y6iNE=
|
go.yaml.in/yaml/v3 v3.0.3 h1:bXOww4E/J3f66rav3pX3m8w6jDE4knZjGOw8b5Y6iNE=
|
||||||
go.yaml.in/yaml/v3 v3.0.3/go.mod h1:tBHosrYAkRZjRAOREWbDnBXUf08JOwYq++0QNwQiWzI=
|
go.yaml.in/yaml/v3 v3.0.3/go.mod h1:tBHosrYAkRZjRAOREWbDnBXUf08JOwYq++0QNwQiWzI=
|
||||||
|
golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI=
|
||||||
|
golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8=
|
||||||
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o=
|
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o=
|
||||||
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8=
|
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8=
|
||||||
golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA=
|
golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA=
|
||||||
|
|||||||
83
internal/processing/nats-message-create.go
Normal file
83
internal/processing/nats-message-create.go
Normal file
@@ -0,0 +1,83 @@
|
|||||||
|
package processing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"text/template"
|
||||||
|
)
|
||||||
|
|
||||||
|
type NATSMessage struct {
|
||||||
|
Subject string
|
||||||
|
Payload []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
type NATSMessageCreate struct {
|
||||||
|
config ProcessorConfig
|
||||||
|
Subject string
|
||||||
|
Payload *template.Template
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nmc *NATSMessageCreate) Process(ctx context.Context, payload any) (any, error) {
|
||||||
|
|
||||||
|
var payloadBuffer bytes.Buffer
|
||||||
|
err := nmc.Payload.Execute(&payloadBuffer, payload)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
payloadString := payloadBuffer.String()
|
||||||
|
|
||||||
|
message := NATSMessage{
|
||||||
|
Subject: nmc.Subject,
|
||||||
|
Payload: []byte(payloadString),
|
||||||
|
}
|
||||||
|
|
||||||
|
return message, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nmc *NATSMessageCreate) Type() string {
|
||||||
|
return nmc.config.Type
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
RegisterProcessor(ProcessorRegistration{
|
||||||
|
Type: "nats.message.create",
|
||||||
|
New: func(config ProcessorConfig) (Processor, error) {
|
||||||
|
params := config.Params
|
||||||
|
// TODO(jwetzell): support template for subject
|
||||||
|
subject, ok := params["subject"]
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("nats.message.create requires a subject parameter")
|
||||||
|
}
|
||||||
|
|
||||||
|
subjectString, ok := subject.(string)
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("nats.message.create subject must be a string")
|
||||||
|
}
|
||||||
|
|
||||||
|
payload, ok := params["payload"]
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("osc.message.create requires a payload parameter")
|
||||||
|
}
|
||||||
|
|
||||||
|
payloadString, ok := payload.(string)
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("osc.message.create payload must be a string")
|
||||||
|
}
|
||||||
|
|
||||||
|
payloadTemplate, err := template.New("payload").Parse(payloadString)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &NATSMessageCreate{config: config, Subject: subjectString, Payload: payloadTemplate}, nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
35
internal/processing/nats-message-encode.go
Normal file
35
internal/processing/nats-message-encode.go
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
package processing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
)
|
||||||
|
|
||||||
|
type NATSMessageEncode struct {
|
||||||
|
config ProcessorConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nme *NATSMessageEncode) Process(ctx context.Context, payload any) (any, error) {
|
||||||
|
payloadMessage, ok := payload.(*nats.Msg)
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("nats.message.encode processor only accepts an nats.Msg")
|
||||||
|
}
|
||||||
|
|
||||||
|
return payloadMessage.Data, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nme *NATSMessageEncode) Type() string {
|
||||||
|
return nme.config.Type
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
RegisterProcessor(ProcessorRegistration{
|
||||||
|
Type: "nats.message.encode",
|
||||||
|
New: func(config ProcessorConfig) (Processor, error) {
|
||||||
|
return &NATSMessageEncode{config: config}, nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
42
internal/processing/string-encode_test.go
Normal file
42
internal/processing/string-encode_test.go
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
package processing_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"slices"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/jwetzell/showbridge-go/internal/processing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGoodStringEncode(t *testing.T) {
|
||||||
|
stringEncoder := processing.StringEncode{}
|
||||||
|
tests := []struct {
|
||||||
|
processor processing.Processor
|
||||||
|
name string
|
||||||
|
payload any
|
||||||
|
expected []byte
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
processor: &stringEncoder,
|
||||||
|
name: "hello",
|
||||||
|
payload: "hello",
|
||||||
|
expected: []byte{0x68, 0x65, 0x6c, 0x6c, 0x6f},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
got, err := test.processor.Process(t.Context(), test.payload)
|
||||||
|
|
||||||
|
gotBytes, ok := got.([]byte)
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("string.encode returned a %T payload: %s", got, got)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("string.encode failed: %s", err)
|
||||||
|
}
|
||||||
|
if !slices.Equal(gotBytes, test.expected) {
|
||||||
|
t.Errorf("string.encode got %s, expected %s", got, test.expected)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -21,13 +21,13 @@ func init() {
|
|||||||
|
|
||||||
duration, ok := params["duration"]
|
duration, ok := params["duration"]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("interval requires a duration parameter")
|
return nil, fmt.Errorf("gen.interval requires a duration parameter")
|
||||||
}
|
}
|
||||||
|
|
||||||
durationNum, ok := duration.(float64)
|
durationNum, ok := duration.(float64)
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("interval duration must be number")
|
return nil, fmt.Errorf("gen.interval duration must be number")
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Interval{Duration: uint32(durationNum), config: config}, nil
|
return &Interval{Duration: uint32(durationNum), config: config}, nil
|
||||||
@@ -51,14 +51,15 @@ func (i *Interval) Run() error {
|
|||||||
ticker := time.NewTicker(time.Millisecond * time.Duration(i.Duration))
|
ticker := time.NewTicker(time.Millisecond * time.Duration(i.Duration))
|
||||||
i.ticker = ticker
|
i.ticker = ticker
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-i.router.Context.Done():
|
case <-i.router.Context.Done():
|
||||||
slog.Debug("router context done in module", "id", i.config.Id)
|
slog.Debug("router context done in module", "id", i.config.Id)
|
||||||
return nil
|
return nil
|
||||||
case t := <-ticker.C:
|
case <-ticker.C:
|
||||||
if i.router != nil {
|
if i.router != nil {
|
||||||
i.router.HandleInput(i.config.Id, t)
|
i.router.HandleInput(i.config.Id, time.Now())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ func init() {
|
|||||||
input, ok := params["input"]
|
input, ok := params["input"]
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("net.mqtt.client requires a input parameter")
|
return nil, fmt.Errorf("misc.midi.client requires a input parameter")
|
||||||
}
|
}
|
||||||
|
|
||||||
inputString, ok := input.(string)
|
inputString, ok := input.(string)
|
||||||
@@ -39,7 +39,7 @@ func init() {
|
|||||||
output, ok := params["output"]
|
output, ok := params["output"]
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("net.mqtt.client requires a output parameter")
|
return nil, fmt.Errorf("misc.midi.client requires a output parameter")
|
||||||
}
|
}
|
||||||
|
|
||||||
outputString, ok := output.(string)
|
outputString, ok := output.(string)
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ func init() {
|
|||||||
brokerString, ok := broker.(string)
|
brokerString, ok := broker.(string)
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("net.mqtt.client host must be string")
|
return nil, fmt.Errorf("net.mqtt.client broker must be string")
|
||||||
}
|
}
|
||||||
|
|
||||||
topic, ok := params["topic"]
|
topic, ok := params["topic"]
|
||||||
@@ -43,7 +43,7 @@ func init() {
|
|||||||
topicString, ok := topic.(string)
|
topicString, ok := topic.(string)
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("net.mqtt.client host must be string")
|
return nil, fmt.Errorf("net.mqtt.client topic must be string")
|
||||||
}
|
}
|
||||||
|
|
||||||
clientId, ok := params["clientId"]
|
clientId, ok := params["clientId"]
|
||||||
@@ -55,7 +55,7 @@ func init() {
|
|||||||
clientIdString, ok := clientId.(string)
|
clientIdString, ok := clientId.(string)
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("net.mqtt.client host must be string")
|
return nil, fmt.Errorf("net.mqtt.client clientId must be string")
|
||||||
}
|
}
|
||||||
|
|
||||||
return &MQTTClient{config: config, Broker: brokerString, Topic: topicString, ClientID: clientIdString}, nil
|
return &MQTTClient{config: config, Broker: brokerString, Topic: topicString, ClientID: clientIdString}, nil
|
||||||
@@ -108,7 +108,7 @@ func (mc *MQTTClient) Output(payload any) error {
|
|||||||
payloadMessage, ok := payload.(processing.MQTTMessage)
|
payloadMessage, ok := payload.(processing.MQTTMessage)
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("net.mqtt.client is only able to output MQTTMessage")
|
return fmt.Errorf("net.mqtt.client is only able to output a MQTTMessage")
|
||||||
}
|
}
|
||||||
|
|
||||||
if mc.client == nil {
|
if mc.client == nil {
|
||||||
|
|||||||
113
nats-client.go
Normal file
113
nats-client.go
Normal file
@@ -0,0 +1,113 @@
|
|||||||
|
package showbridge
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
|
||||||
|
"github.com/jwetzell/showbridge-go/internal/processing"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
)
|
||||||
|
|
||||||
|
type NATSClient struct {
|
||||||
|
config ModuleConfig
|
||||||
|
router *Router
|
||||||
|
URL string
|
||||||
|
Subject string
|
||||||
|
client *nats.Conn
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
RegisterModule(ModuleRegistration{
|
||||||
|
Type: "net.nats.client",
|
||||||
|
New: func(config ModuleConfig) (Module, error) {
|
||||||
|
params := config.Params
|
||||||
|
url, ok := params["url"]
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("net.nats.client requires a url parameter")
|
||||||
|
}
|
||||||
|
|
||||||
|
urlString, ok := url.(string)
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("net.nats.client url must be string")
|
||||||
|
}
|
||||||
|
|
||||||
|
subject, ok := params["subject"]
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("net.nats.client requires a subject parameter")
|
||||||
|
}
|
||||||
|
|
||||||
|
subjectString, ok := subject.(string)
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("net.nats.client subject must be string")
|
||||||
|
}
|
||||||
|
|
||||||
|
return &NATSClient{config: config, URL: urlString, Subject: subjectString}, nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nc *NATSClient) Id() string {
|
||||||
|
return nc.config.Id
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nc *NATSClient) Type() string {
|
||||||
|
return nc.config.Type
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nc *NATSClient) RegisterRouter(router *Router) {
|
||||||
|
nc.router = router
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nc *NATSClient) Run() error {
|
||||||
|
client, err := nats.Connect(nc.URL, nats.RetryOnFailedConnect(true))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
nc.client = client
|
||||||
|
|
||||||
|
defer client.Drain()
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
sub, err := nc.client.Subscribe(nc.Subject, func(msg *nats.Msg) {
|
||||||
|
if nc.router != nil {
|
||||||
|
nc.router.HandleInput(nc.config.Id, msg)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
defer sub.Unsubscribe()
|
||||||
|
|
||||||
|
<-nc.router.Context.Done()
|
||||||
|
slog.Debug("router context done in module", "id", nc.config.Id)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nc *NATSClient) Output(payload any) error {
|
||||||
|
|
||||||
|
payloadMessage, ok := payload.(processing.NATSMessage)
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("net.nats.client is only able to output NATSMessage")
|
||||||
|
}
|
||||||
|
|
||||||
|
if nc.client == nil {
|
||||||
|
return fmt.Errorf("net.nats.client client is not setup")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !nc.client.IsConnected() {
|
||||||
|
return fmt.Errorf("net.nats.client is not connected")
|
||||||
|
}
|
||||||
|
|
||||||
|
err := nc.client.Publish(payloadMessage.Subject, payloadMessage.Payload)
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
@@ -11,11 +11,10 @@ import (
|
|||||||
|
|
||||||
type TCPClient struct {
|
type TCPClient struct {
|
||||||
config ModuleConfig
|
config ModuleConfig
|
||||||
Host string
|
|
||||||
Port uint16
|
|
||||||
framer framing.Framer
|
framer framing.Framer
|
||||||
conn net.Conn
|
conn *net.TCPConn
|
||||||
router *Router
|
router *Router
|
||||||
|
Addr *net.TCPAddr
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@@ -32,7 +31,7 @@ func init() {
|
|||||||
hostString, ok := host.(string)
|
hostString, ok := host.(string)
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("net.tcp.client host must be uint16")
|
return nil, fmt.Errorf("net.tcp.client host must be string")
|
||||||
}
|
}
|
||||||
|
|
||||||
port, ok := params["port"]
|
port, ok := params["port"]
|
||||||
@@ -43,7 +42,12 @@ func init() {
|
|||||||
portNum, ok := port.(float64)
|
portNum, ok := port.(float64)
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("net.tcp.client port must be uint16")
|
return nil, fmt.Errorf("net.tcp.client port must be a number")
|
||||||
|
}
|
||||||
|
|
||||||
|
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", hostString, uint16(portNum)))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
framingMethod, ok := params["framing"]
|
framingMethod, ok := params["framing"]
|
||||||
@@ -54,7 +58,7 @@ func init() {
|
|||||||
framingMethodString, ok := framingMethod.(string)
|
framingMethodString, ok := framingMethod.(string)
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("tcp framing method must be a string")
|
return nil, fmt.Errorf("net.tcp.client framing method must be a string")
|
||||||
}
|
}
|
||||||
|
|
||||||
var framer framing.Framer
|
var framer framing.Framer
|
||||||
@@ -72,7 +76,7 @@ func init() {
|
|||||||
return nil, fmt.Errorf("unknown framing method: %s", framingMethodString)
|
return nil, fmt.Errorf("unknown framing method: %s", framingMethodString)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &TCPClient{framer: framer, Host: hostString, Port: uint16(portNum), config: config}, nil
|
return &TCPClient{framer: framer, Addr: addr, config: config}, nil
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -90,10 +94,6 @@ func (tc *TCPClient) RegisterRouter(router *Router) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (tc *TCPClient) Run() error {
|
func (tc *TCPClient) Run() error {
|
||||||
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", tc.Host, tc.Port))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(jwetzell): shutdown with router.Context properly
|
// TODO(jwetzell): shutdown with router.Context properly
|
||||||
go func() {
|
go func() {
|
||||||
@@ -105,7 +105,7 @@ func (tc *TCPClient) Run() error {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
client, err := net.DialTCP("tcp", nil, addr)
|
err := tc.SetupConn()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if tc.router.Context.Err() != nil {
|
if tc.router.Context.Err() != nil {
|
||||||
slog.Debug("router context done in module", "id", tc.config.Id)
|
slog.Debug("router context done in module", "id", tc.config.Id)
|
||||||
@@ -116,8 +116,6 @@ func (tc *TCPClient) Run() error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
tc.conn = client
|
|
||||||
|
|
||||||
buffer := make([]byte, 1024)
|
buffer := make([]byte, 1024)
|
||||||
select {
|
select {
|
||||||
case <-tc.router.Context.Done():
|
case <-tc.router.Context.Done():
|
||||||
@@ -131,7 +129,7 @@ func (tc *TCPClient) Run() error {
|
|||||||
slog.Debug("router context done in module", "id", tc.config.Id)
|
slog.Debug("router context done in module", "id", tc.config.Id)
|
||||||
return nil
|
return nil
|
||||||
default:
|
default:
|
||||||
byteCount, err := client.Read(buffer)
|
byteCount, err := tc.conn.Read(buffer)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tc.framer.Clear()
|
tc.framer.Clear()
|
||||||
@@ -151,22 +149,29 @@ func (tc *TCPClient) Run() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tc *TCPClient) SetupConn() error {
|
||||||
|
client, err := net.DialTCP("tcp", nil, tc.Addr)
|
||||||
|
tc.conn = client
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tc *TCPClient) Output(payload any) error {
|
func (tc *TCPClient) Output(payload any) error {
|
||||||
if tc.conn != nil {
|
// NOTE(jwetzell): not sure how this would occur but
|
||||||
payloadBytes, ok := payload.([]byte)
|
if tc.conn == nil {
|
||||||
if !ok {
|
err := tc.SetupConn()
|
||||||
return fmt.Errorf("net.tcp.client is only able to output bytes")
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
_, err := tc.conn.Write(tc.framer.Encode(payloadBytes))
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
return nil
|
payloadBytes, ok := payload.([]byte)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("net.tcp.client is only able to output bytes")
|
||||||
|
}
|
||||||
|
_, err := tc.conn.Write(tc.framer.Encode(payloadBytes))
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,10 +1,13 @@
|
|||||||
package showbridge
|
package showbridge
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net"
|
"net"
|
||||||
|
"slices"
|
||||||
"sync"
|
"sync"
|
||||||
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/jwetzell/showbridge-go/internal/framing"
|
"github.com/jwetzell/showbridge-go/internal/framing"
|
||||||
@@ -12,11 +15,14 @@ import (
|
|||||||
|
|
||||||
type TCPServer struct {
|
type TCPServer struct {
|
||||||
config ModuleConfig
|
config ModuleConfig
|
||||||
|
Ip string
|
||||||
Port uint16
|
Port uint16
|
||||||
framingMethod string
|
framingMethod string
|
||||||
router *Router
|
router *Router
|
||||||
quit chan interface{}
|
quit chan interface{}
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
connections []net.Conn
|
||||||
|
connectionsMu sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@@ -32,7 +38,7 @@ func init() {
|
|||||||
portNum, ok := port.(float64)
|
portNum, ok := port.(float64)
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("net.tcp.server port must be uint16")
|
return nil, fmt.Errorf("net.tcp.server port must be a number")
|
||||||
}
|
}
|
||||||
|
|
||||||
framingMethod, ok := params["framing"]
|
framingMethod, ok := params["framing"]
|
||||||
@@ -43,10 +49,23 @@ func init() {
|
|||||||
framingMethodString, ok := framingMethod.(string)
|
framingMethodString, ok := framingMethod.(string)
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("tcp framing method must be a string")
|
return nil, fmt.Errorf("net.tcp.server framing method must be a string")
|
||||||
}
|
}
|
||||||
|
|
||||||
return &TCPServer{framingMethod: framingMethodString, Port: uint16(portNum), config: config, quit: make(chan interface{})}, nil
|
ipString := "0.0.0.0"
|
||||||
|
|
||||||
|
ip, ok := params["ip"]
|
||||||
|
if ok {
|
||||||
|
|
||||||
|
specificIpString, ok := ip.(string)
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("net.tcp.server ip must be a string")
|
||||||
|
}
|
||||||
|
ipString = specificIpString
|
||||||
|
}
|
||||||
|
|
||||||
|
return &TCPServer{framingMethod: framingMethodString, Port: uint16(portNum), Ip: ipString, config: config, quit: make(chan interface{})}, nil
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -64,7 +83,10 @@ func (ts *TCPServer) RegisterRouter(router *Router) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ts *TCPServer) handleClient(client net.Conn) {
|
func (ts *TCPServer) handleClient(client net.Conn) {
|
||||||
slog.Debug("connection accepted", "id", ts.config.Id, "remoteAddr", client.RemoteAddr().String())
|
ts.connectionsMu.Lock()
|
||||||
|
ts.connections = append(ts.connections, client)
|
||||||
|
ts.connectionsMu.Unlock()
|
||||||
|
slog.Debug("net.tcp.server connection accepted", "id", ts.config.Id, "remoteAddr", client.RemoteAddr().String())
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
var framer framing.Framer
|
var framer framing.Framer
|
||||||
|
|
||||||
@@ -90,12 +112,34 @@ ClientRead:
|
|||||||
byteCount, err := client.Read(buffer)
|
byteCount, err := client.Read(buffer)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
//NOTE(jwetzell) we hit deadline
|
if opErr, ok := err.(*net.OpError); ok {
|
||||||
if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
|
//NOTE(jwetzell) we hit deadline
|
||||||
continue ClientRead
|
if opErr.Timeout() {
|
||||||
|
continue ClientRead
|
||||||
|
}
|
||||||
|
if errors.Is(opErr, syscall.ECONNRESET) {
|
||||||
|
ts.connectionsMu.Lock()
|
||||||
|
for i := 0; i < len(ts.connections); i++ {
|
||||||
|
if ts.connections[i] == client {
|
||||||
|
ts.connections = slices.Delete(ts.connections, i, i+1)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
slog.Debug("net.tcp.server connection reset", "id", ts.config.Id, "remoteAddr", client.RemoteAddr().String())
|
||||||
|
ts.connectionsMu.Unlock()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err.Error() == "EOF" {
|
if err.Error() == "EOF" {
|
||||||
slog.Debug("connection closed", "id", ts.config.Id, "remoteAddr", client.RemoteAddr().String())
|
ts.connectionsMu.Lock()
|
||||||
|
for i := 0; i < len(ts.connections); i++ {
|
||||||
|
if ts.connections[i] == client {
|
||||||
|
ts.connections = slices.Delete(ts.connections, i, i+1)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
slog.Debug("net.tcp.server stream ended", "id", ts.config.Id, "remoteAddr", client.RemoteAddr().String())
|
||||||
|
ts.connectionsMu.Unlock()
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -106,7 +150,7 @@ ClientRead:
|
|||||||
if ts.router != nil {
|
if ts.router != nil {
|
||||||
ts.router.HandleInput(ts.config.Id, message)
|
ts.router.HandleInput(ts.config.Id, message)
|
||||||
} else {
|
} else {
|
||||||
slog.Error("tcp-server has no router", "id", ts.config.Id)
|
slog.Error("net.tcp.server has no router", "id", ts.config.Id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -116,7 +160,8 @@ ClientRead:
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ts *TCPServer) Run() error {
|
func (ts *TCPServer) Run() error {
|
||||||
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", ts.Port))
|
// TODO(jwetzell): switch to net.ListenTCP and move addr resolution to init
|
||||||
|
listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", ts.Ip, ts.Port))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -153,5 +198,24 @@ AcceptLoop:
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ts *TCPServer) Output(payload any) error {
|
func (ts *TCPServer) Output(payload any) error {
|
||||||
return fmt.Errorf("net.tcp.server output is not implemented")
|
payloadBytes, ok := payload.([]byte)
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("net.tcp.server is only able to output bytes")
|
||||||
|
}
|
||||||
|
ts.connectionsMu.Lock()
|
||||||
|
errorString := ""
|
||||||
|
|
||||||
|
for _, connection := range ts.connections {
|
||||||
|
_, err := connection.Write(payloadBytes)
|
||||||
|
if err != nil {
|
||||||
|
errorString += fmt.Sprintf("%s\n", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ts.connectionsMu.Unlock()
|
||||||
|
|
||||||
|
if errorString == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("%s", errorString)
|
||||||
}
|
}
|
||||||
|
|||||||
4
timer.go
4
timer.go
@@ -21,13 +21,13 @@ func init() {
|
|||||||
|
|
||||||
duration, ok := params["duration"]
|
duration, ok := params["duration"]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("timer requires a duration parameter")
|
return nil, fmt.Errorf("gen.timer requires a duration parameter")
|
||||||
}
|
}
|
||||||
|
|
||||||
durationNum, ok := duration.(float64)
|
durationNum, ok := duration.(float64)
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("timer duration must be number")
|
return nil, fmt.Errorf("gen.timer duration must be a number")
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Timer{Duration: uint32(durationNum), config: config}, nil
|
return &Timer{Duration: uint32(durationNum), config: config}, nil
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ type UDPClient struct {
|
|||||||
config ModuleConfig
|
config ModuleConfig
|
||||||
Host string
|
Host string
|
||||||
Port uint16
|
Port uint16
|
||||||
conn net.Conn
|
conn *net.UDPConn
|
||||||
router *Router
|
router *Router
|
||||||
addr *net.UDPAddr
|
addr *net.UDPAddr
|
||||||
}
|
}
|
||||||
@@ -29,7 +29,7 @@ func init() {
|
|||||||
hostString, ok := host.(string)
|
hostString, ok := host.(string)
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("net.udp.client host must be uint16")
|
return nil, fmt.Errorf("net.udp.client host must be a string")
|
||||||
}
|
}
|
||||||
|
|
||||||
port, ok := params["port"]
|
port, ok := params["port"]
|
||||||
@@ -71,6 +71,9 @@ func (uc *UDPClient) Run() error {
|
|||||||
|
|
||||||
<-uc.router.Context.Done()
|
<-uc.router.Context.Done()
|
||||||
slog.Debug("router context done in module", "id", uc.config.Id)
|
slog.Debug("router context done in module", "id", uc.config.Id)
|
||||||
|
if uc.conn != nil {
|
||||||
|
uc.conn.Close()
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -81,6 +84,7 @@ func (uc *UDPClient) Output(payload any) error {
|
|||||||
return fmt.Errorf("net.udp.client is only able to output bytes")
|
return fmt.Errorf("net.udp.client is only able to output bytes")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(jwetzell): reuse connection or setup new one when necessary
|
||||||
client, err := net.DialUDP("udp", nil, uc.addr)
|
client, err := net.DialUDP("udp", nil, uc.addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|||||||
113
udp-multicast.go
Normal file
113
udp-multicast.go
Normal file
@@ -0,0 +1,113 @@
|
|||||||
|
package showbridge
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
"net"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type UDPMulticast struct {
|
||||||
|
config ModuleConfig
|
||||||
|
conn *net.UDPConn
|
||||||
|
router *Router
|
||||||
|
Addr *net.UDPAddr
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
RegisterModule(ModuleRegistration{
|
||||||
|
Type: "net.udp.multicast",
|
||||||
|
New: func(config ModuleConfig) (Module, error) {
|
||||||
|
params := config.Params
|
||||||
|
ip, ok := params["ip"]
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("net.udp.client requires am ip parameter")
|
||||||
|
}
|
||||||
|
|
||||||
|
ipString, ok := ip.(string)
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("net.udp.client ip must be a string")
|
||||||
|
}
|
||||||
|
|
||||||
|
port, ok := params["port"]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("net.udp.client requires a port parameter")
|
||||||
|
}
|
||||||
|
|
||||||
|
portNum, ok := port.(float64)
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("net.udp.client port must be a number")
|
||||||
|
}
|
||||||
|
|
||||||
|
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", ipString, uint16(portNum)))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &UDPMulticast{config: config, Addr: addr}, nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (um *UDPMulticast) Id() string {
|
||||||
|
return um.config.Id
|
||||||
|
}
|
||||||
|
|
||||||
|
func (um *UDPMulticast) Type() string {
|
||||||
|
return um.config.Type
|
||||||
|
}
|
||||||
|
|
||||||
|
func (um *UDPMulticast) RegisterRouter(router *Router) {
|
||||||
|
um.router = router
|
||||||
|
}
|
||||||
|
|
||||||
|
func (um *UDPMulticast) Run() error {
|
||||||
|
|
||||||
|
client, err := net.ListenMulticastUDP("udp", nil, um.Addr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
um.conn = client
|
||||||
|
|
||||||
|
buffer := make([]byte, 2048)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-um.router.Context.Done():
|
||||||
|
// TODO(jwetzell): cleanup?
|
||||||
|
slog.Debug("router context done in module", "id", um.config.Id)
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
um.conn.SetDeadline(time.Now().Add(time.Millisecond * 200))
|
||||||
|
|
||||||
|
numBytes, _, err := um.conn.ReadFromUDP(buffer)
|
||||||
|
if err != nil {
|
||||||
|
//NOTE(jwetzell) we hit deadline
|
||||||
|
if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if numBytes > 0 {
|
||||||
|
message := buffer[:numBytes]
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("net.udp.multicast problem decoding psn traffic", "id", um.config.Id, "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if um.router != nil {
|
||||||
|
um.router.HandleInput(um.config.Id, message)
|
||||||
|
} else {
|
||||||
|
slog.Error("net.udp.multicast has no router", "id", um.config.Id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (um *UDPMulticast) Output(payload any) error {
|
||||||
|
return fmt.Errorf("net.udp.multicast output is not implemented")
|
||||||
|
}
|
||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type UDPServer struct {
|
type UDPServer struct {
|
||||||
|
Ip string
|
||||||
Port uint16
|
Port uint16
|
||||||
config ModuleConfig
|
config ModuleConfig
|
||||||
router *Router
|
router *Router
|
||||||
@@ -27,10 +28,23 @@ func init() {
|
|||||||
portNum, ok := port.(float64)
|
portNum, ok := port.(float64)
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("net.udp.server port must be uint16")
|
return nil, fmt.Errorf("net.udp.server port must be a number")
|
||||||
}
|
}
|
||||||
|
|
||||||
return &UDPServer{Port: uint16(portNum), config: config}, nil
|
ipString := "0.0.0.0"
|
||||||
|
|
||||||
|
ip, ok := params["ip"]
|
||||||
|
if ok {
|
||||||
|
|
||||||
|
specificIpString, ok := ip.(string)
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("net.udp.server ip must be a string")
|
||||||
|
}
|
||||||
|
ipString = specificIpString
|
||||||
|
}
|
||||||
|
|
||||||
|
return &UDPServer{Ip: ipString, Port: uint16(portNum), config: config}, nil
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -49,7 +63,8 @@ func (us *UDPServer) RegisterRouter(router *Router) {
|
|||||||
|
|
||||||
func (us *UDPServer) Run() error {
|
func (us *UDPServer) Run() error {
|
||||||
|
|
||||||
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", us.Port))
|
// TODO(jwetzell): move this to init
|
||||||
|
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", us.Ip, us.Port))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("error resolving UDP address: %v", err)
|
log.Fatalf("error resolving UDP address: %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user