mirror of
https://github.com/jwetzell/showbridge-go.git
synced 2026-04-27 05:15:47 +00:00
add concept of framing to net.tcp
This commit is contained in:
@@ -3,7 +3,8 @@
|
||||
{
|
||||
"type": "net.tcp.server",
|
||||
"params": {
|
||||
"port": 8000
|
||||
"port": 8000,
|
||||
"framing": "LF"
|
||||
}
|
||||
}
|
||||
]
|
||||
|
||||
6
internals/framing/framer.go
Normal file
6
internals/framing/framer.go
Normal file
@@ -0,0 +1,6 @@
|
||||
package framing
|
||||
|
||||
type Framer interface {
|
||||
Frame([]byte) [][]byte
|
||||
Clear()
|
||||
}
|
||||
33
internals/framing/separator.go
Normal file
33
internals/framing/separator.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package framing
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
)
|
||||
|
||||
type ByteSeparatorFramer struct {
|
||||
buffer []byte
|
||||
separator []byte
|
||||
}
|
||||
|
||||
func NewByteSeparatorFramer(separator []byte) *ByteSeparatorFramer {
|
||||
return &ByteSeparatorFramer{separator: separator, buffer: []byte{}}
|
||||
}
|
||||
|
||||
func (bsf *ByteSeparatorFramer) Frame(data []byte) [][]byte {
|
||||
messages := [][]byte{}
|
||||
|
||||
bsf.buffer = append(bsf.buffer, data...)
|
||||
|
||||
parts := bytes.Split(bsf.buffer, bsf.separator)
|
||||
|
||||
if len(parts) > 0 {
|
||||
bsf.buffer = parts[len(parts)-1]
|
||||
messages = parts[:len(parts)-1]
|
||||
}
|
||||
|
||||
return messages
|
||||
}
|
||||
|
||||
func (bsf *ByteSeparatorFramer) Clear() {
|
||||
bsf.buffer = []byte{}
|
||||
}
|
||||
@@ -6,12 +6,15 @@ import (
|
||||
"log/slog"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/jwetzell/showbridge-go/internals/framing"
|
||||
)
|
||||
|
||||
type TCPClient struct {
|
||||
config ModuleConfig
|
||||
Host string
|
||||
Port uint16
|
||||
framer framing.Framer
|
||||
}
|
||||
|
||||
func init() {
|
||||
@@ -42,7 +45,31 @@ func init() {
|
||||
return nil, fmt.Errorf("tcp client port must be uint16")
|
||||
}
|
||||
|
||||
return TCPClient{Host: hostString, Port: uint16(portNum), config: config}, nil
|
||||
framingMethod, ok := params["framing"]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("tcp client requires a framing method")
|
||||
}
|
||||
|
||||
framingMethodString, ok := framingMethod.(string)
|
||||
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("tcp framing method must be a string")
|
||||
}
|
||||
|
||||
var framer framing.Framer
|
||||
|
||||
switch framingMethodString {
|
||||
case "CR":
|
||||
framer = framing.NewByteSeparatorFramer([]byte{'\r'})
|
||||
case "LF":
|
||||
framer = framing.NewByteSeparatorFramer([]byte{'\n'})
|
||||
case "CRLF":
|
||||
framer = framing.NewByteSeparatorFramer([]byte{'\r', '\n'})
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown framing method: %s", framingMethodString)
|
||||
}
|
||||
|
||||
return TCPClient{framer: framer, Host: hostString, Port: uint16(portNum), config: config}, nil
|
||||
},
|
||||
})
|
||||
}
|
||||
@@ -79,11 +106,17 @@ func (tc TCPClient) Run(ctx context.Context) error {
|
||||
|
||||
if err != nil {
|
||||
slog.Debug("connection closed")
|
||||
tc.framer.Clear()
|
||||
break READ
|
||||
}
|
||||
|
||||
if byteCount > 0 {
|
||||
slog.Info(string(buffer[0:byteCount]))
|
||||
if tc.framer != nil {
|
||||
if byteCount > 0 {
|
||||
messages := tc.framer.Frame(buffer[0:byteCount])
|
||||
for _, message := range messages {
|
||||
slog.Debug("tcp-client message", "bytes", message)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -5,11 +5,14 @@ import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net"
|
||||
|
||||
"github.com/jwetzell/showbridge-go/internals/framing"
|
||||
)
|
||||
|
||||
type TCPServer struct {
|
||||
config ModuleConfig
|
||||
Port uint16
|
||||
config ModuleConfig
|
||||
Port uint16
|
||||
framingMethod string
|
||||
}
|
||||
|
||||
func init() {
|
||||
@@ -28,7 +31,18 @@ func init() {
|
||||
return nil, fmt.Errorf("tcp server port must be uint16")
|
||||
}
|
||||
|
||||
return TCPServer{Port: uint16(portNum), config: config}, nil
|
||||
framingMethod, ok := params["framing"]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("tcp server requires a framing method")
|
||||
}
|
||||
|
||||
framingMethodString, ok := framingMethod.(string)
|
||||
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("tcp framing method must be a string")
|
||||
}
|
||||
|
||||
return TCPServer{framingMethod: framingMethodString, Port: uint16(portNum), config: config}, nil
|
||||
},
|
||||
})
|
||||
}
|
||||
@@ -44,6 +58,17 @@ func (ts TCPServer) Type() string {
|
||||
func (ts TCPServer) HandleClient(ctx context.Context, client net.Conn) {
|
||||
slog.Info("handling connection", "remoteAddr", client.RemoteAddr().String())
|
||||
|
||||
var framer framing.Framer
|
||||
|
||||
switch ts.framingMethod {
|
||||
case "LF":
|
||||
framer = framing.NewByteSeparatorFramer([]byte{'\n'})
|
||||
case "CR":
|
||||
framer = framing.NewByteSeparatorFramer([]byte{'\r'})
|
||||
case "CRLF":
|
||||
framer = framing.NewByteSeparatorFramer([]byte{'\r', '\n'})
|
||||
}
|
||||
|
||||
buffer := make([]byte, 1024)
|
||||
for {
|
||||
select {
|
||||
@@ -58,9 +83,13 @@ func (ts TCPServer) HandleClient(ctx context.Context, client net.Conn) {
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if byteCount > 0 {
|
||||
slog.Info(string(buffer[0:byteCount]))
|
||||
if framer != nil {
|
||||
if byteCount > 0 {
|
||||
messages := framer.Frame(buffer[0:byteCount])
|
||||
for _, message := range messages {
|
||||
slog.Debug("tcp-server message", "bytes", message)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user