do some decent context reworking

This commit is contained in:
Joel Wetzell
2025-11-22 11:31:09 -06:00
parent 4ae5261d25
commit 51e656313c
9 changed files with 64 additions and 27 deletions

View File

@@ -4,7 +4,9 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log/slog"
"os" "os"
"os/signal"
"github.com/jwetzell/showbridge-go" "github.com/jwetzell/showbridge-go"
"github.com/urfave/cli/v3" "github.com/urfave/cli/v3"
@@ -42,7 +44,9 @@ func main() {
}, },
} }
err := cmd.Run(context.Background(), os.Args) ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Interrupt)
defer cancel()
err := cmd.Run(ctx, os.Args)
if err != nil { if err != nil {
panic(err) panic(err)

View File

@@ -1,7 +1,6 @@
package showbridge package showbridge
import ( import (
"context"
"fmt" "fmt"
"log/slog" "log/slog"
"time" "time"
@@ -44,17 +43,17 @@ func (i *Interval) Type() string {
} }
func (i *Interval) RegisterRouter(router *Router) { func (i *Interval) RegisterRouter(router *Router) {
slog.Debug("registering router", "id", i.config.Id)
i.router = router i.router = router
} }
func (i *Interval) Run(ctx context.Context) error { func (i *Interval) Run() error {
ticker := time.NewTicker(time.Millisecond * time.Duration(i.Duration)) ticker := time.NewTicker(time.Millisecond * time.Duration(i.Duration))
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
case <-ctx.Done(): case <-i.router.Context.Done():
ticker.Stop() ticker.Stop()
slog.Debug("router context done in module", "id", i.config.Id)
return nil return nil
case t := <-ticker.C: case t := <-ticker.C:
if i.router != nil { if i.router != nil {

View File

@@ -1,7 +1,6 @@
package showbridge package showbridge
import ( import (
"context"
"fmt" "fmt"
"sync" "sync"
) )
@@ -16,7 +15,7 @@ type Module interface {
Id() string Id() string
Type() string Type() string
RegisterRouter(*Router) RegisterRouter(*Router)
Run(context.Context) error Run() error
Output(any) error Output(any) error
} }

View File

@@ -5,12 +5,15 @@ import (
"fmt" "fmt"
"log/slog" "log/slog"
"os" "os"
"sync"
) )
type Router struct { type Router struct {
contextCancel context.CancelFunc
Context context.Context Context context.Context
ModuleInstances []Module ModuleInstances []Module
RouteInstances []*Route RouteInstances []*Route
moduleWait sync.WaitGroup
} }
func NewRouter(ctx context.Context, config Config) (*Router, []ModuleError, []RouteError) { func NewRouter(ctx context.Context, config Config) (*Router, []ModuleError, []RouteError) {
@@ -23,8 +26,10 @@ func NewRouter(ctx context.Context, config Config) (*Router, []ModuleError, []Ro
slog.Debug("creating router") slog.Debug("creating router")
routerContext, cancel := context.WithCancel(ctx)
router := Router{ router := Router{
Context: ctx, Context: routerContext,
contextCancel: cancel,
ModuleInstances: []Module{}, ModuleInstances: []Module{},
RouteInstances: []*Route{}, RouteInstances: []*Route{},
} }
@@ -108,9 +113,20 @@ func NewRouter(ctx context.Context, config Config) (*Router, []ModuleError, []Ro
func (r *Router) Run() { func (r *Router) Run() {
for _, moduleInstance := range r.ModuleInstances { for _, moduleInstance := range r.ModuleInstances {
go moduleInstance.Run(r.Context) moduleInstance.RegisterRouter(r)
r.moduleWait.Add(1)
go func() {
moduleInstance.Run()
r.moduleWait.Done()
}()
} }
<-r.Context.Done() <-r.Context.Done()
r.moduleWait.Wait()
slog.Info("router context done")
}
func (r *Router) Stop() {
r.contextCancel()
} }
func (r *Router) HandleInput(sourceId string, payload any) { func (r *Router) HandleInput(sourceId string, payload any) {

View File

@@ -1,7 +1,6 @@
package showbridge package showbridge
import ( import (
"context"
"fmt" "fmt"
"log/slog" "log/slog"
"net" "net"
@@ -87,18 +86,31 @@ func (tc *TCPClient) Type() string {
} }
func (tc *TCPClient) RegisterRouter(router *Router) { func (tc *TCPClient) RegisterRouter(router *Router) {
slog.Debug("registering router", "id", tc.config.Id)
tc.router = router tc.router = router
} }
func (tc *TCPClient) Run(ctx context.Context) error { func (tc *TCPClient) Run() error {
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", tc.Host, tc.Port)) addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", tc.Host, tc.Port))
if err != nil { if err != nil {
return err return err
} }
// TODO(jwetzell): shutdown with router.Context properly
go func() {
<-tc.router.Context.Done()
slog.Debug("router context done in module", "id", tc.config.Id)
if tc.conn != nil {
tc.conn.Close()
}
}()
for { for {
client, err := net.DialTCP("tcp", nil, addr) client, err := net.DialTCP("tcp", nil, addr)
if err != nil { if err != nil {
if tc.router.Context.Err() != nil {
slog.Debug("router context done in module", "id", tc.config.Id)
return nil
}
slog.Error(err.Error()) slog.Error(err.Error())
time.Sleep(time.Second * 2) time.Sleep(time.Second * 2)
continue continue
@@ -108,19 +120,20 @@ func (tc *TCPClient) Run(ctx context.Context) error {
buffer := make([]byte, 1024) buffer := make([]byte, 1024)
select { select {
case <-ctx.Done(): case <-tc.router.Context.Done():
slog.Debug("router context done in module", "id", tc.config.Id)
return nil return nil
default: default:
READ: READ:
for { for {
select { select {
case <-ctx.Done(): case <-tc.router.Context.Done():
slog.Debug("router context done in module", "id", tc.config.Id)
return nil return nil
default: default:
byteCount, err := client.Read(buffer) byteCount, err := client.Read(buffer)
if err != nil { if err != nil {
slog.Debug("connection closed")
tc.framer.Clear() tc.framer.Clear()
break READ break READ
} }

View File

@@ -57,7 +57,6 @@ func (ts *TCPServer) Type() string {
} }
func (ts *TCPServer) RegisterRouter(router *Router) { func (ts *TCPServer) RegisterRouter(router *Router) {
slog.Debug("registering router", "id", ts.config.Id)
ts.router = router ts.router = router
} }
@@ -108,22 +107,29 @@ func (ts *TCPServer) HandleClient(ctx context.Context, client net.Conn) {
} }
} }
func (ts TCPServer) Run(ctx context.Context) error { func (ts TCPServer) Run() error {
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", ts.Port)) listener, err := net.Listen("tcp", fmt.Sprintf(":%d", ts.Port))
if err != nil { if err != nil {
return err return err
} }
// TODO(jwetzell): shutdown with router.Context properly
go func() {
<-ts.router.Context.Done()
slog.Debug("router context done in module", "id", ts.config.Id)
listener.Close()
}()
for { for {
select { select {
case <-ctx.Done(): case <-ts.router.Context.Done():
return nil return nil
default: default:
client, err := listener.Accept() client, err := listener.Accept()
if err != nil { if err != nil {
return err return err
} }
go ts.HandleClient(ctx, client) go ts.HandleClient(ts.router.Context, client)
} }
} }
} }

View File

@@ -45,7 +45,6 @@ func (t *Timer) Type() string {
} }
func (t *Timer) RegisterRouter(router *Router) { func (t *Timer) RegisterRouter(router *Router) {
slog.Debug("registering router", "id", t.config.Id)
t.router = router t.router = router
} }
@@ -56,6 +55,7 @@ func (t *Timer) Run(ctx context.Context) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
t.timer.Stop() t.timer.Stop()
slog.Debug("router context done in module", "id", t.config.Id)
return nil return nil
case time := <-t.timer.C: case time := <-t.timer.C:
if t.router != nil { if t.router != nil {

View File

@@ -1,7 +1,6 @@
package showbridge package showbridge
import ( import (
"context"
"fmt" "fmt"
"log/slog" "log/slog"
"net" "net"
@@ -57,11 +56,10 @@ func (uc *UDPClient) Type() string {
} }
func (uc *UDPClient) RegisterRouter(router *Router) { func (uc *UDPClient) RegisterRouter(router *Router) {
slog.Debug("registering router", "id", uc.config.Id)
uc.router = router uc.router = router
} }
func (uc *UDPClient) Run(ctx context.Context) error { func (uc *UDPClient) Run() error {
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", uc.Host, uc.Port)) addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", uc.Host, uc.Port))
if err != nil { if err != nil {
return err return err
@@ -72,7 +70,8 @@ func (uc *UDPClient) Run(ctx context.Context) error {
} }
uc.conn = client uc.conn = client
<-ctx.Done() <-uc.router.Context.Done()
slog.Debug("router context done in module", "id", uc.config.Id)
return nil return nil
} }

View File

@@ -1,7 +1,6 @@
package showbridge package showbridge
import ( import (
"context"
"fmt" "fmt"
"log" "log"
"log/slog" "log/slog"
@@ -47,7 +46,7 @@ func (us *UDPServer) RegisterRouter(router *Router) {
us.router = router us.router = router
} }
func (us *UDPServer) Run(ctx context.Context) error { func (us *UDPServer) Run() error {
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", us.Port)) addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", us.Port))
if err != nil { if err != nil {
@@ -64,7 +63,9 @@ func (us *UDPServer) Run(ctx context.Context) error {
buffer := make([]byte, 1024) buffer := make([]byte, 1024)
for { for {
select { select {
case <-ctx.Done(): case <-us.router.Context.Done():
// TODO(jwetzell): cleanup?
slog.Debug("router context done in module", "id", us.config.Id)
return nil return nil
default: default:
numBytes, _, err := listener.ReadFromUDP(buffer) numBytes, _, err := listener.ReadFromUDP(buffer)