10 Commits

Author SHA1 Message Date
Joel Wetzell
a7f889f6b5 Merge pull request #8 from jwetzell/feat/udp-multicast
add net.udp.multicast module
2025-12-06 10:36:47 -06:00
Joel Wetzell
5c5111a25e add net.udp.multicast module 2025-12-06 10:35:25 -06:00
Joel Wetzell
b31729fafe switch net.udp.client conn to pointer 2025-12-06 09:02:41 -06:00
Joel Wetzell
dff5430eb4 switch net.tcp.client conn to pointer 2025-12-06 09:02:30 -06:00
Joel Wetzell
eaca0dbf86 Merge pull request #7 from jwetzell/feat/tcp-server-output
support output from net.tcp.server
2025-12-06 08:59:21 -06:00
Joel Wetzell
aa3a1032f3 logging tweaks 2025-12-06 08:39:34 -06:00
Joel Wetzell
51a62f7fb2 cleanup 2025-12-06 08:23:40 -06:00
Joel Wetzell
2c8efcea4b add output to TCP server 2025-12-05 23:18:37 -06:00
Joel Wetzell
df1882b8f7 update psn-go 2025-12-04 23:11:30 -06:00
Joel Wetzell
1c8346cf65 cleanup error messages 2025-12-04 16:35:03 -06:00
11 changed files with 202 additions and 33 deletions

2
go.mod
View File

@@ -7,7 +7,7 @@ require (
github.com/expr-lang/expr v1.17.6 github.com/expr-lang/expr v1.17.6
github.com/jwetzell/free-d-go v0.1.0 github.com/jwetzell/free-d-go v0.1.0
github.com/jwetzell/osc-go v0.1.0 github.com/jwetzell/osc-go v0.1.0
github.com/jwetzell/psn-go v0.2.1 github.com/jwetzell/psn-go v0.3.0
github.com/nats-io/nats.go v1.47.0 github.com/nats-io/nats.go v1.47.0
github.com/urfave/cli/v3 v3.6.1 github.com/urfave/cli/v3 v3.6.1
gitlab.com/gomidi/midi/v2 v2.3.16 gitlab.com/gomidi/midi/v2 v2.3.16

4
go.sum
View File

@@ -18,8 +18,8 @@ github.com/jwetzell/free-d-go v0.1.0 h1:xHt6dvyit98X+OC3jVzV0aLidxbyzi3vI9QiYkte
github.com/jwetzell/free-d-go v0.1.0/go.mod h1:KmrkooRARRaxJTBSPvwt/6IMAIaHH1R8bSA8cwbbELw= github.com/jwetzell/free-d-go v0.1.0/go.mod h1:KmrkooRARRaxJTBSPvwt/6IMAIaHH1R8bSA8cwbbELw=
github.com/jwetzell/osc-go v0.1.0 h1:EXxup5VWBErHot2Ri4MFToPf6KCzLDTbCt2x6GLfw8I= github.com/jwetzell/osc-go v0.1.0 h1:EXxup5VWBErHot2Ri4MFToPf6KCzLDTbCt2x6GLfw8I=
github.com/jwetzell/osc-go v0.1.0/go.mod h1:xLz0jTwebSxtx1TkKN1YVdeRqvpFNweDhTut5TE393A= github.com/jwetzell/osc-go v0.1.0/go.mod h1:xLz0jTwebSxtx1TkKN1YVdeRqvpFNweDhTut5TE393A=
github.com/jwetzell/psn-go v0.2.1 h1:pNG6XNfVRTb4qctH6pJjRJ1ReYGnGgNRA4H7tNbmzRU= github.com/jwetzell/psn-go v0.3.0 h1:WVpCEmExYE8a+I5hQak5jNJJp2x35VdGX/VuMUKPmhY=
github.com/jwetzell/psn-go v0.2.1/go.mod h1:bcEAeti4sQM375buujb3mIfmUstD4Aby18gq3ENb6+o= github.com/jwetzell/psn-go v0.3.0/go.mod h1:bcEAeti4sQM375buujb3mIfmUstD4Aby18gq3ENb6+o=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=

View File

@@ -21,13 +21,13 @@ func init() {
duration, ok := params["duration"] duration, ok := params["duration"]
if !ok { if !ok {
return nil, fmt.Errorf("interval requires a duration parameter") return nil, fmt.Errorf("gen.interval requires a duration parameter")
} }
durationNum, ok := duration.(float64) durationNum, ok := duration.(float64)
if !ok { if !ok {
return nil, fmt.Errorf("interval duration must be number") return nil, fmt.Errorf("gen.interval duration must be number")
} }
return &Interval{Duration: uint32(durationNum), config: config}, nil return &Interval{Duration: uint32(durationNum), config: config}, nil
@@ -51,14 +51,15 @@ func (i *Interval) Run() error {
ticker := time.NewTicker(time.Millisecond * time.Duration(i.Duration)) ticker := time.NewTicker(time.Millisecond * time.Duration(i.Duration))
i.ticker = ticker i.ticker = ticker
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
case <-i.router.Context.Done(): case <-i.router.Context.Done():
slog.Debug("router context done in module", "id", i.config.Id) slog.Debug("router context done in module", "id", i.config.Id)
return nil return nil
case t := <-ticker.C: case <-ticker.C:
if i.router != nil { if i.router != nil {
i.router.HandleInput(i.config.Id, t) i.router.HandleInput(i.config.Id, time.Now())
} }
} }
} }

View File

@@ -27,7 +27,7 @@ func init() {
input, ok := params["input"] input, ok := params["input"]
if !ok { if !ok {
return nil, fmt.Errorf("net.mqtt.client requires a input parameter") return nil, fmt.Errorf("misc.midi.client requires a input parameter")
} }
inputString, ok := input.(string) inputString, ok := input.(string)
@@ -39,7 +39,7 @@ func init() {
output, ok := params["output"] output, ok := params["output"]
if !ok { if !ok {
return nil, fmt.Errorf("net.mqtt.client requires a output parameter") return nil, fmt.Errorf("misc.midi.client requires a output parameter")
} }
outputString, ok := output.(string) outputString, ok := output.(string)

View File

@@ -31,7 +31,7 @@ func init() {
brokerString, ok := broker.(string) brokerString, ok := broker.(string)
if !ok { if !ok {
return nil, fmt.Errorf("net.mqtt.client host must be string") return nil, fmt.Errorf("net.mqtt.client broker must be string")
} }
topic, ok := params["topic"] topic, ok := params["topic"]
@@ -43,7 +43,7 @@ func init() {
topicString, ok := topic.(string) topicString, ok := topic.(string)
if !ok { if !ok {
return nil, fmt.Errorf("net.mqtt.client host must be string") return nil, fmt.Errorf("net.mqtt.client topic must be string")
} }
clientId, ok := params["clientId"] clientId, ok := params["clientId"]
@@ -55,7 +55,7 @@ func init() {
clientIdString, ok := clientId.(string) clientIdString, ok := clientId.(string)
if !ok { if !ok {
return nil, fmt.Errorf("net.mqtt.client host must be string") return nil, fmt.Errorf("net.mqtt.client clientId must be string")
} }
return &MQTTClient{config: config, Broker: brokerString, Topic: topicString, ClientID: clientIdString}, nil return &MQTTClient{config: config, Broker: brokerString, Topic: topicString, ClientID: clientIdString}, nil
@@ -108,7 +108,7 @@ func (mc *MQTTClient) Output(payload any) error {
payloadMessage, ok := payload.(processing.MQTTMessage) payloadMessage, ok := payload.(processing.MQTTMessage)
if !ok { if !ok {
return fmt.Errorf("net.mqtt.client is only able to output MQTTMessage") return fmt.Errorf("net.mqtt.client is only able to output a MQTTMessage")
} }
if mc.client == nil { if mc.client == nil {

View File

@@ -12,7 +12,7 @@ import (
type TCPClient struct { type TCPClient struct {
config ModuleConfig config ModuleConfig
framer framing.Framer framer framing.Framer
conn net.Conn conn *net.TCPConn
router *Router router *Router
Addr *net.TCPAddr Addr *net.TCPAddr
} }
@@ -31,7 +31,7 @@ func init() {
hostString, ok := host.(string) hostString, ok := host.(string)
if !ok { if !ok {
return nil, fmt.Errorf("net.tcp.client host must be uint16") return nil, fmt.Errorf("net.tcp.client host must be string")
} }
port, ok := params["port"] port, ok := params["port"]
@@ -42,7 +42,7 @@ func init() {
portNum, ok := port.(float64) portNum, ok := port.(float64)
if !ok { if !ok {
return nil, fmt.Errorf("net.tcp.client port must be uint16") return nil, fmt.Errorf("net.tcp.client port must be a number")
} }
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", hostString, uint16(portNum))) addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", hostString, uint16(portNum)))
@@ -58,7 +58,7 @@ func init() {
framingMethodString, ok := framingMethod.(string) framingMethodString, ok := framingMethod.(string)
if !ok { if !ok {
return nil, fmt.Errorf("tcp framing method must be a string") return nil, fmt.Errorf("net.tcp.client framing method must be a string")
} }
var framer framing.Framer var framer framing.Framer

View File

@@ -1,10 +1,13 @@
package showbridge package showbridge
import ( import (
"errors"
"fmt" "fmt"
"log/slog" "log/slog"
"net" "net"
"slices"
"sync" "sync"
"syscall"
"time" "time"
"github.com/jwetzell/showbridge-go/internal/framing" "github.com/jwetzell/showbridge-go/internal/framing"
@@ -18,6 +21,8 @@ type TCPServer struct {
router *Router router *Router
quit chan interface{} quit chan interface{}
wg sync.WaitGroup wg sync.WaitGroup
connections []net.Conn
connectionsMu sync.RWMutex
} }
func init() { func init() {
@@ -33,7 +38,7 @@ func init() {
portNum, ok := port.(float64) portNum, ok := port.(float64)
if !ok { if !ok {
return nil, fmt.Errorf("net.tcp.server port must be uint16") return nil, fmt.Errorf("net.tcp.server port must be a number")
} }
framingMethod, ok := params["framing"] framingMethod, ok := params["framing"]
@@ -44,7 +49,7 @@ func init() {
framingMethodString, ok := framingMethod.(string) framingMethodString, ok := framingMethod.(string)
if !ok { if !ok {
return nil, fmt.Errorf("tcp framing method must be a string") return nil, fmt.Errorf("net.tcp.server framing method must be a string")
} }
ipString := "0.0.0.0" ipString := "0.0.0.0"
@@ -55,7 +60,7 @@ func init() {
specificIpString, ok := ip.(string) specificIpString, ok := ip.(string)
if !ok { if !ok {
return nil, fmt.Errorf("tcp ip method must be a string") return nil, fmt.Errorf("net.tcp.server ip must be a string")
} }
ipString = specificIpString ipString = specificIpString
} }
@@ -78,7 +83,10 @@ func (ts *TCPServer) RegisterRouter(router *Router) {
} }
func (ts *TCPServer) handleClient(client net.Conn) { func (ts *TCPServer) handleClient(client net.Conn) {
slog.Debug("connection accepted", "id", ts.config.Id, "remoteAddr", client.RemoteAddr().String()) ts.connectionsMu.Lock()
ts.connections = append(ts.connections, client)
ts.connectionsMu.Unlock()
slog.Debug("net.tcp.server connection accepted", "id", ts.config.Id, "remoteAddr", client.RemoteAddr().String())
defer client.Close() defer client.Close()
var framer framing.Framer var framer framing.Framer
@@ -104,12 +112,34 @@ ClientRead:
byteCount, err := client.Read(buffer) byteCount, err := client.Read(buffer)
if err != nil { if err != nil {
//NOTE(jwetzell) we hit deadline if opErr, ok := err.(*net.OpError); ok {
if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() { //NOTE(jwetzell) we hit deadline
continue ClientRead if opErr.Timeout() {
continue ClientRead
}
if errors.Is(opErr, syscall.ECONNRESET) {
ts.connectionsMu.Lock()
for i := 0; i < len(ts.connections); i++ {
if ts.connections[i] == client {
ts.connections = slices.Delete(ts.connections, i, i+1)
break
}
}
slog.Debug("net.tcp.server connection reset", "id", ts.config.Id, "remoteAddr", client.RemoteAddr().String())
ts.connectionsMu.Unlock()
}
} }
if err.Error() == "EOF" { if err.Error() == "EOF" {
slog.Debug("connection closed", "id", ts.config.Id, "remoteAddr", client.RemoteAddr().String()) ts.connectionsMu.Lock()
for i := 0; i < len(ts.connections); i++ {
if ts.connections[i] == client {
ts.connections = slices.Delete(ts.connections, i, i+1)
break
}
}
slog.Debug("net.tcp.server stream ended", "id", ts.config.Id, "remoteAddr", client.RemoteAddr().String())
ts.connectionsMu.Unlock()
} }
return return
} }
@@ -120,7 +150,7 @@ ClientRead:
if ts.router != nil { if ts.router != nil {
ts.router.HandleInput(ts.config.Id, message) ts.router.HandleInput(ts.config.Id, message)
} else { } else {
slog.Error("tcp-server has no router", "id", ts.config.Id) slog.Error("net.tcp.server has no router", "id", ts.config.Id)
} }
} }
} }
@@ -130,6 +160,7 @@ ClientRead:
} }
func (ts *TCPServer) Run() error { func (ts *TCPServer) Run() error {
// TODO(jwetzell): switch to net.ListenTCP and move addr resolution to init
listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", ts.Ip, ts.Port)) listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", ts.Ip, ts.Port))
if err != nil { if err != nil {
return err return err
@@ -167,5 +198,24 @@ AcceptLoop:
} }
func (ts *TCPServer) Output(payload any) error { func (ts *TCPServer) Output(payload any) error {
return fmt.Errorf("net.tcp.server output is not implemented") payloadBytes, ok := payload.([]byte)
if !ok {
return fmt.Errorf("net.tcp.server is only able to output bytes")
}
ts.connectionsMu.Lock()
errorString := ""
for _, connection := range ts.connections {
_, err := connection.Write(payloadBytes)
if err != nil {
errorString += fmt.Sprintf("%s\n", err.Error())
}
}
ts.connectionsMu.Unlock()
if errorString == "" {
return nil
}
return fmt.Errorf("%s", errorString)
} }

View File

@@ -21,13 +21,13 @@ func init() {
duration, ok := params["duration"] duration, ok := params["duration"]
if !ok { if !ok {
return nil, fmt.Errorf("timer requires a duration parameter") return nil, fmt.Errorf("gen.timer requires a duration parameter")
} }
durationNum, ok := duration.(float64) durationNum, ok := duration.(float64)
if !ok { if !ok {
return nil, fmt.Errorf("timer duration must be number") return nil, fmt.Errorf("gen.timer duration must be a number")
} }
return &Timer{Duration: uint32(durationNum), config: config}, nil return &Timer{Duration: uint32(durationNum), config: config}, nil

View File

@@ -10,7 +10,7 @@ type UDPClient struct {
config ModuleConfig config ModuleConfig
Host string Host string
Port uint16 Port uint16
conn net.Conn conn *net.UDPConn
router *Router router *Router
addr *net.UDPAddr addr *net.UDPAddr
} }
@@ -29,7 +29,7 @@ func init() {
hostString, ok := host.(string) hostString, ok := host.(string)
if !ok { if !ok {
return nil, fmt.Errorf("net.udp.client host must be uint16") return nil, fmt.Errorf("net.udp.client host must be a string")
} }
port, ok := params["port"] port, ok := params["port"]
@@ -71,6 +71,9 @@ func (uc *UDPClient) Run() error {
<-uc.router.Context.Done() <-uc.router.Context.Done()
slog.Debug("router context done in module", "id", uc.config.Id) slog.Debug("router context done in module", "id", uc.config.Id)
if uc.conn != nil {
uc.conn.Close()
}
return nil return nil
} }
@@ -81,6 +84,7 @@ func (uc *UDPClient) Output(payload any) error {
return fmt.Errorf("net.udp.client is only able to output bytes") return fmt.Errorf("net.udp.client is only able to output bytes")
} }
// TODO(jwetzell): reuse connection or setup new one when necessary
client, err := net.DialUDP("udp", nil, uc.addr) client, err := net.DialUDP("udp", nil, uc.addr)
if err != nil { if err != nil {
return err return err

113
udp-multicast.go Normal file
View File

@@ -0,0 +1,113 @@
package showbridge
import (
"fmt"
"log/slog"
"net"
"time"
)
type UDPMulticast struct {
config ModuleConfig
conn *net.UDPConn
router *Router
Addr *net.UDPAddr
}
func init() {
RegisterModule(ModuleRegistration{
Type: "net.udp.multicast",
New: func(config ModuleConfig) (Module, error) {
params := config.Params
ip, ok := params["ip"]
if !ok {
return nil, fmt.Errorf("net.udp.client requires am ip parameter")
}
ipString, ok := ip.(string)
if !ok {
return nil, fmt.Errorf("net.udp.client ip must be a string")
}
port, ok := params["port"]
if !ok {
return nil, fmt.Errorf("net.udp.client requires a port parameter")
}
portNum, ok := port.(float64)
if !ok {
return nil, fmt.Errorf("net.udp.client port must be a number")
}
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", ipString, uint16(portNum)))
if err != nil {
return nil, err
}
return &UDPMulticast{config: config, Addr: addr}, nil
},
})
}
func (um *UDPMulticast) Id() string {
return um.config.Id
}
func (um *UDPMulticast) Type() string {
return um.config.Type
}
func (um *UDPMulticast) RegisterRouter(router *Router) {
um.router = router
}
func (um *UDPMulticast) Run() error {
client, err := net.ListenMulticastUDP("udp", nil, um.Addr)
if err != nil {
return err
}
defer client.Close()
um.conn = client
buffer := make([]byte, 2048)
for {
select {
case <-um.router.Context.Done():
// TODO(jwetzell): cleanup?
slog.Debug("router context done in module", "id", um.config.Id)
return nil
default:
um.conn.SetDeadline(time.Now().Add(time.Millisecond * 200))
numBytes, _, err := um.conn.ReadFromUDP(buffer)
if err != nil {
//NOTE(jwetzell) we hit deadline
if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
continue
}
return err
}
if numBytes > 0 {
message := buffer[:numBytes]
if err != nil {
slog.Error("net.udp.multicast problem decoding psn traffic", "id", um.config.Id, "error", err)
}
if um.router != nil {
um.router.HandleInput(um.config.Id, message)
} else {
slog.Error("net.udp.multicast has no router", "id", um.config.Id)
}
}
}
}
}
func (um *UDPMulticast) Output(payload any) error {
return fmt.Errorf("net.udp.multicast output is not implemented")
}

View File

@@ -28,7 +28,7 @@ func init() {
portNum, ok := port.(float64) portNum, ok := port.(float64)
if !ok { if !ok {
return nil, fmt.Errorf("net.udp.server port must be uint16") return nil, fmt.Errorf("net.udp.server port must be a number")
} }
ipString := "0.0.0.0" ipString := "0.0.0.0"
@@ -39,7 +39,7 @@ func init() {
specificIpString, ok := ip.(string) specificIpString, ok := ip.(string)
if !ok { if !ok {
return nil, fmt.Errorf("tcp ip method must be a string") return nil, fmt.Errorf("net.udp.server ip must be a string")
} }
ipString = specificIpString ipString = specificIpString
} }
@@ -63,6 +63,7 @@ func (us *UDPServer) RegisterRouter(router *Router) {
func (us *UDPServer) Run() error { func (us *UDPServer) Run() error {
// TODO(jwetzell): move this to init
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", us.Ip, us.Port)) addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", us.Ip, us.Port))
if err != nil { if err != nil {
log.Fatalf("error resolving UDP address: %v", err) log.Fatalf("error resolving UDP address: %v", err)