diff --git a/config.go b/config.go index 4680026..c8f47cf 100644 --- a/config.go +++ b/config.go @@ -2,4 +2,5 @@ package showbridge type Config struct { Modules []ModuleConfig `json:"modules"` + Routes []RouteConfig `json:"routes"` } diff --git a/config.json b/config.json index 0ad1321..b3da26f 100644 --- a/config.json +++ b/config.json @@ -1,11 +1,26 @@ { - "modules": [ - { - "type": "net.tcp.server", - "params": { - "port": 8000, - "framing": "LF" - } - } - ] -} \ No newline at end of file + "modules": [ + { + "id": "tcp1", + "type": "net.tcp.server", + "params": { + "port": 8000, + "framing": "LF" + } + }, + { + "id": "tcp2", + "type": "net.tcp.server", + "params": { + "port": 8001, + "framing": "LF" + } + } + ], + "routes": [ + { + "input": "tcp1", + "output": "tcp2" + } + ] +} diff --git a/internals/framing/framer.go b/internals/framing/framer.go index 3044f04..36500a5 100644 --- a/internals/framing/framer.go +++ b/internals/framing/framer.go @@ -1,6 +1,7 @@ package framing type Framer interface { - Frame([]byte) [][]byte + Decode([]byte) [][]byte + Encode([]byte) []byte Clear() } diff --git a/internals/framing/separator.go b/internals/framing/separator.go index 03f3ed8..d3a5ab9 100644 --- a/internals/framing/separator.go +++ b/internals/framing/separator.go @@ -13,7 +13,7 @@ func NewByteSeparatorFramer(separator []byte) *ByteSeparatorFramer { return &ByteSeparatorFramer{separator: separator, buffer: []byte{}} } -func (bsf *ByteSeparatorFramer) Frame(data []byte) [][]byte { +func (bsf *ByteSeparatorFramer) Decode(data []byte) [][]byte { messages := [][]byte{} bsf.buffer = append(bsf.buffer, data...) @@ -28,6 +28,10 @@ func (bsf *ByteSeparatorFramer) Frame(data []byte) [][]byte { return messages } +func (bsf *ByteSeparatorFramer) Encode(data []byte) []byte { + return append(data, bsf.separator...) +} + func (bsf *ByteSeparatorFramer) Clear() { bsf.buffer = []byte{} } diff --git a/internals/framing/slip.go b/internals/framing/slip.go index 607d94f..d322d6b 100644 --- a/internals/framing/slip.go +++ b/internals/framing/slip.go @@ -8,8 +8,9 @@ func NewSlipFramer() *SlipFramer { return &SlipFramer{buffer: []byte{}} } -func (sf *SlipFramer) Frame(data []byte) [][]byte { +func (sf *SlipFramer) Decode(data []byte) [][]byte { messages := [][]byte{} + END := byte(0xc0) ESC := byte(0xdb) ESC_END := byte(0xdc) @@ -47,6 +48,29 @@ func (sf *SlipFramer) Frame(data []byte) [][]byte { return messages } +func (sf *SlipFramer) Encode(data []byte) []byte { + END := byte(0xc0) + ESC := byte(0xdb) + ESC_END := byte(0xdc) + ESC_ESC := byte(0xdd) + + var encodedBytes = []byte{END} + + for _, byteToEncode := range data { + switch byteToEncode { + case END: + encodedBytes = append(encodedBytes, ESC_END) + case ESC: + encodedBytes = append(encodedBytes, ESC_ESC) + default: + encodedBytes = append(encodedBytes, byteToEncode) + } + } + + encodedBytes = append(encodedBytes, END) + return encodedBytes +} + func (sf *SlipFramer) Clear() { sf.buffer = []byte{} } diff --git a/module.go b/module.go index 491813e..3c90ac3 100644 --- a/module.go +++ b/module.go @@ -9,7 +9,9 @@ import ( type Module interface { Id() string Type() string + RegisterRouter(*Router) Run(context.Context) error + Output(any) error } type ModuleConfig struct { diff --git a/route.go b/route.go new file mode 100644 index 0000000..b0427d5 --- /dev/null +++ b/route.go @@ -0,0 +1,32 @@ +package showbridge + +import "log/slog" + +type Route struct { + index int + Input string + Output string + router *Router +} + +type RouteConfig struct { + Input string `json:"input"` + Output string `json:"output"` +} + +func NewRoute(index int, config RouteConfig, router *Router) *Route { + return &Route{Input: config.Input, Output: config.Output, router: router, index: index} +} + +func (r *Route) HandleInput(sourceId string, payload any) { + slog.Debug("route input", "index", r.index, "source", sourceId, "payload", payload) + r.HandleOutput(payload) +} + +func (r *Route) HandleOutput(payload any) { + slog.Debug("route output", "index", r.index, "destination", r.Output, "payload", payload) + err := r.router.HandleOutput(r.Output, payload) + if err != nil { + slog.Error("problem with route output", "error", err.Error()) + } +} diff --git a/router.go b/router.go index 585db1d..c03f71c 100644 --- a/router.go +++ b/router.go @@ -10,21 +10,23 @@ import ( type Router struct { Context context.Context ModuleInstances []Module + RouteInstances []*Route } func NewRouter(ctx context.Context, config Config) (*Router, error) { logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ - Level: slog.LevelDebug, + Level: slog.LevelInfo, })) slog.SetDefault(logger) - slog.Debug("creating router", "config", config) + slog.Debug("creating router") router := Router{ Context: ctx, ModuleInstances: []Module{}, + RouteInstances: []*Route{}, } for _, moduleDecl := range config.Modules { @@ -54,6 +56,14 @@ func NewRouter(ctx context.Context, config Config) (*Router, error) { } + for routeIndex, routeDecl := range config.Routes { + router.RouteInstances = append(router.RouteInstances, NewRoute(routeIndex, routeDecl, &router)) + } + + for _, moduleInstance := range router.ModuleInstances { + moduleInstance.RegisterRouter(&router) + } + return &router, nil } @@ -63,3 +73,20 @@ func (r *Router) Run() { } <-r.Context.Done() } + +func (r *Router) HandleInput(sourceId string, payload any) { + for _, route := range r.RouteInstances { + if route.Input == sourceId { + route.HandleInput(sourceId, payload) + } + } +} + +func (r *Router) HandleOutput(destinationId string, payload any) error { + for _, moduleInstance := range r.ModuleInstances { + if moduleInstance.Id() == destinationId { + return moduleInstance.Output(payload) + } + } + return fmt.Errorf("no module instance found for destination %s", destinationId) +} diff --git a/tcp-client.go b/tcp-client.go index 7bd4720..d67c797 100644 --- a/tcp-client.go +++ b/tcp-client.go @@ -15,6 +15,8 @@ type TCPClient struct { Host string Port uint16 framer framing.Framer + conn net.Conn + router *Router } func init() { @@ -71,20 +73,25 @@ func init() { return nil, fmt.Errorf("unknown framing method: %s", framingMethodString) } - return TCPClient{framer: framer, Host: hostString, Port: uint16(portNum), config: config}, nil + return &TCPClient{framer: framer, Host: hostString, Port: uint16(portNum), config: config}, nil }, }) } -func (tc TCPClient) Id() string { +func (tc *TCPClient) Id() string { return tc.config.Id } -func (tc TCPClient) Type() string { +func (tc *TCPClient) Type() string { return tc.config.Type } -func (tc TCPClient) Run(ctx context.Context) error { +func (tc *TCPClient) RegisterRouter(router *Router) { + slog.Debug("registering router", "id", tc.config.Id) + tc.router = router +} + +func (tc *TCPClient) Run(ctx context.Context) error { for { client, err := net.Dial("tcp", fmt.Sprintf(":%d", tc.Port)) if err != nil { @@ -93,6 +100,8 @@ func (tc TCPClient) Run(ctx context.Context) error { continue } + tc.conn = client + buffer := make([]byte, 1024) select { case <-ctx.Done(): @@ -114,9 +123,13 @@ func (tc TCPClient) Run(ctx context.Context) error { if tc.framer != nil { if byteCount > 0 { - messages := tc.framer.Frame(buffer[0:byteCount]) + messages := tc.framer.Decode(buffer[0:byteCount]) for _, message := range messages { - slog.Debug("tcp-client message", "bytes", message) + if tc.router != nil { + tc.router.HandleInput(tc.config.Id, message) + } else { + slog.Error("tcp-client has not router", "id", tc.config.Id) + } } } } @@ -128,3 +141,15 @@ func (tc TCPClient) Run(ctx context.Context) error { } } + +func (tc *TCPClient) Output(payload any) error { + if tc.conn != nil { + payloadBytes, ok := payload.([]byte) + if !ok { + return fmt.Errorf("tcp-client is only able to output bytes") + } + _, err := tc.conn.Write(tc.framer.Encode(payloadBytes)) + return err + } + return nil +} diff --git a/tcp-server.go b/tcp-server.go index 3fc6385..04d5352 100644 --- a/tcp-server.go +++ b/tcp-server.go @@ -13,6 +13,7 @@ type TCPServer struct { config ModuleConfig Port uint16 framingMethod string + router *Router } func init() { @@ -42,21 +43,26 @@ func init() { return nil, fmt.Errorf("tcp framing method must be a string") } - return TCPServer{framingMethod: framingMethodString, Port: uint16(portNum), config: config}, nil + return &TCPServer{framingMethod: framingMethodString, Port: uint16(portNum), config: config}, nil }, }) } -func (ts TCPServer) Id() string { +func (ts *TCPServer) Id() string { return ts.config.Id } -func (ts TCPServer) Type() string { +func (ts *TCPServer) Type() string { return ts.config.Type } -func (ts TCPServer) HandleClient(ctx context.Context, client net.Conn) { - slog.Info("handling connection", "remoteAddr", client.RemoteAddr().String()) +func (ts *TCPServer) RegisterRouter(router *Router) { + slog.Debug("registering router", "id", ts.config.Id) + ts.router = router +} + +func (ts *TCPServer) HandleClient(ctx context.Context, client net.Conn) { + slog.Debug("connection accepted", "id", ts.config.Id, "remoteAddr", client.RemoteAddr().String()) var framer framing.Framer @@ -81,15 +87,19 @@ func (ts TCPServer) HandleClient(ctx context.Context, client net.Conn) { if err != nil { if err.Error() == "EOF" { - slog.Debug("connection closed") + slog.Debug("connection closed", "id", ts.config.Id, "remoteAddr", client.RemoteAddr().String()) } return } if framer != nil { if byteCount > 0 { - messages := framer.Frame(buffer[0:byteCount]) + messages := framer.Decode(buffer[0:byteCount]) for _, message := range messages { - slog.Debug("tcp-server message", "bytes", message) + if ts.router != nil { + ts.router.HandleInput(ts.config.Id, message) + } else { + slog.Error("tcp-server has not router", "id", ts.config.Id) + } } } } @@ -116,5 +126,8 @@ func (ts TCPServer) Run(ctx context.Context) error { go ts.HandleClient(ctx, client) } } - +} + +func (ts *TCPServer) Output(payload any) error { + return fmt.Errorf("tcp-server output is not implemented") } diff --git a/udp-server.go b/udp-server.go index fc2a15c..95c3cba 100644 --- a/udp-server.go +++ b/udp-server.go @@ -11,6 +11,7 @@ import ( type UDPServer struct { Port uint16 config ModuleConfig + router *Router } func init() { @@ -29,20 +30,24 @@ func init() { return nil, fmt.Errorf("udp server port must be uint16") } - return UDPServer{Port: uint16(portNum), config: config}, nil + return &UDPServer{Port: uint16(portNum), config: config}, nil }, }) } -func (us UDPServer) Id() string { +func (us *UDPServer) Id() string { return us.config.Id } -func (us UDPServer) Type() string { +func (us *UDPServer) Type() string { return us.config.Id } -func (us UDPServer) Run(ctx context.Context) error { +func (us *UDPServer) RegisterRouter(router *Router) { + us.router = router +} + +func (us *UDPServer) Run(ctx context.Context) error { addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", us.Port)) if err != nil { @@ -66,8 +71,17 @@ func (us UDPServer) Run(ctx context.Context) error { if err != nil { return err } - slog.Info(string(buffer[:numBytes])) + message := buffer[:numBytes] + if us.router != nil { + us.router.HandleInput(us.config.Id, message) + } else { + slog.Error("tcp-server has not router", "id", us.config.Id) + } } } } + +func (us *UDPServer) Output(payload any) error { + return fmt.Errorf("udp-server output is not implemented") +}