From 86966132cad4ad7213785e9b5e4332cf97da25da Mon Sep 17 00:00:00 2001 From: Joel Wetzell Date: Tue, 18 Nov 2025 21:02:48 -0600 Subject: [PATCH] add concept of framing to net.tcp --- config.json | 3 ++- internals/framing/framer.go | 6 +++++ internals/framing/separator.go | 33 +++++++++++++++++++++++++++ tcp-client.go | 39 +++++++++++++++++++++++++++++--- tcp-server.go | 41 +++++++++++++++++++++++++++++----- 5 files changed, 112 insertions(+), 10 deletions(-) create mode 100644 internals/framing/framer.go create mode 100644 internals/framing/separator.go diff --git a/config.json b/config.json index d30108a..0ad1321 100644 --- a/config.json +++ b/config.json @@ -3,7 +3,8 @@ { "type": "net.tcp.server", "params": { - "port": 8000 + "port": 8000, + "framing": "LF" } } ] diff --git a/internals/framing/framer.go b/internals/framing/framer.go new file mode 100644 index 0000000..3044f04 --- /dev/null +++ b/internals/framing/framer.go @@ -0,0 +1,6 @@ +package framing + +type Framer interface { + Frame([]byte) [][]byte + Clear() +} diff --git a/internals/framing/separator.go b/internals/framing/separator.go new file mode 100644 index 0000000..03f3ed8 --- /dev/null +++ b/internals/framing/separator.go @@ -0,0 +1,33 @@ +package framing + +import ( + "bytes" +) + +type ByteSeparatorFramer struct { + buffer []byte + separator []byte +} + +func NewByteSeparatorFramer(separator []byte) *ByteSeparatorFramer { + return &ByteSeparatorFramer{separator: separator, buffer: []byte{}} +} + +func (bsf *ByteSeparatorFramer) Frame(data []byte) [][]byte { + messages := [][]byte{} + + bsf.buffer = append(bsf.buffer, data...) + + parts := bytes.Split(bsf.buffer, bsf.separator) + + if len(parts) > 0 { + bsf.buffer = parts[len(parts)-1] + messages = parts[:len(parts)-1] + } + + return messages +} + +func (bsf *ByteSeparatorFramer) Clear() { + bsf.buffer = []byte{} +} diff --git a/tcp-client.go b/tcp-client.go index c7dde80..e1f18a6 100644 --- a/tcp-client.go +++ b/tcp-client.go @@ -6,12 +6,15 @@ import ( "log/slog" "net" "time" + + "github.com/jwetzell/showbridge-go/internals/framing" ) type TCPClient struct { config ModuleConfig Host string Port uint16 + framer framing.Framer } func init() { @@ -42,7 +45,31 @@ func init() { return nil, fmt.Errorf("tcp client port must be uint16") } - return TCPClient{Host: hostString, Port: uint16(portNum), config: config}, nil + framingMethod, ok := params["framing"] + if !ok { + return nil, fmt.Errorf("tcp client requires a framing method") + } + + framingMethodString, ok := framingMethod.(string) + + if !ok { + return nil, fmt.Errorf("tcp framing method must be a string") + } + + var framer framing.Framer + + switch framingMethodString { + case "CR": + framer = framing.NewByteSeparatorFramer([]byte{'\r'}) + case "LF": + framer = framing.NewByteSeparatorFramer([]byte{'\n'}) + case "CRLF": + framer = framing.NewByteSeparatorFramer([]byte{'\r', '\n'}) + default: + return nil, fmt.Errorf("unknown framing method: %s", framingMethodString) + } + + return TCPClient{framer: framer, Host: hostString, Port: uint16(portNum), config: config}, nil }, }) } @@ -79,11 +106,17 @@ func (tc TCPClient) Run(ctx context.Context) error { if err != nil { slog.Debug("connection closed") + tc.framer.Clear() break READ } - if byteCount > 0 { - slog.Info(string(buffer[0:byteCount])) + if tc.framer != nil { + if byteCount > 0 { + messages := tc.framer.Frame(buffer[0:byteCount]) + for _, message := range messages { + slog.Debug("tcp-client message", "bytes", message) + } + } } } diff --git a/tcp-server.go b/tcp-server.go index 0ee6575..f32d160 100644 --- a/tcp-server.go +++ b/tcp-server.go @@ -5,11 +5,14 @@ import ( "fmt" "log/slog" "net" + + "github.com/jwetzell/showbridge-go/internals/framing" ) type TCPServer struct { - config ModuleConfig - Port uint16 + config ModuleConfig + Port uint16 + framingMethod string } func init() { @@ -28,7 +31,18 @@ func init() { return nil, fmt.Errorf("tcp server port must be uint16") } - return TCPServer{Port: uint16(portNum), config: config}, nil + framingMethod, ok := params["framing"] + if !ok { + return nil, fmt.Errorf("tcp server requires a framing method") + } + + framingMethodString, ok := framingMethod.(string) + + if !ok { + return nil, fmt.Errorf("tcp framing method must be a string") + } + + return TCPServer{framingMethod: framingMethodString, Port: uint16(portNum), config: config}, nil }, }) } @@ -44,6 +58,17 @@ func (ts TCPServer) Type() string { func (ts TCPServer) HandleClient(ctx context.Context, client net.Conn) { slog.Info("handling connection", "remoteAddr", client.RemoteAddr().String()) + var framer framing.Framer + + switch ts.framingMethod { + case "LF": + framer = framing.NewByteSeparatorFramer([]byte{'\n'}) + case "CR": + framer = framing.NewByteSeparatorFramer([]byte{'\r'}) + case "CRLF": + framer = framing.NewByteSeparatorFramer([]byte{'\r', '\n'}) + } + buffer := make([]byte, 1024) for { select { @@ -58,9 +83,13 @@ func (ts TCPServer) HandleClient(ctx context.Context, client net.Conn) { } return } - - if byteCount > 0 { - slog.Info(string(buffer[0:byteCount])) + if framer != nil { + if byteCount > 0 { + messages := framer.Frame(buffer[0:byteCount]) + for _, message := range messages { + slog.Debug("tcp-server message", "bytes", message) + } + } } }