From 33ecc94097ed650e5e1599e8f40215cef17f72b7 Mon Sep 17 00:00:00 2001 From: Joel Wetzell Date: Sat, 7 Feb 2026 09:53:38 -0600 Subject: [PATCH] add Stop function to module --- internal/module/http-client.go | 9 ++++++++- internal/module/http-server.go | 9 ++++++++- internal/module/midi-input.go | 9 ++++++++- internal/module/midi-output.go | 9 ++++++++- internal/module/module.go | 1 + internal/module/mqtt-client.go | 10 +++++++++- internal/module/nats-client.go | 9 ++++++++- internal/module/psn-client.go | 9 ++++++++- internal/module/serial-client.go | 9 ++++++++- internal/module/sip-call-server.go | 9 ++++++++- internal/module/sip-dtmf-server.go | 9 ++++++++- internal/module/tcp-client.go | 9 ++++++++- internal/module/tcp-server.go | 19 ++++++++++++++++++- internal/module/time-interval.go | 9 ++++++++- internal/module/time-timer.go | 9 ++++++++- internal/module/udp-client.go | 9 ++++++++- internal/module/udp-multicast.go | 9 ++++++++- internal/module/udp-server.go | 9 ++++++++- router_test.go | 9 ++++++++- 19 files changed, 156 insertions(+), 18 deletions(-) diff --git a/internal/module/http-client.go b/internal/module/http-client.go index e2ba7b9..bf5c7a9 100644 --- a/internal/module/http-client.go +++ b/internal/module/http-client.go @@ -17,6 +17,7 @@ type HTTPClient struct { client *http.Client router route.RouteIO logger *slog.Logger + cancel context.CancelFunc } func init() { @@ -44,7 +45,9 @@ func (hc *HTTPClient) Run(ctx context.Context) error { return errors.New("http.client unable to get router from context") } hc.router = router - hc.ctx = ctx + moduleContext, cancel := context.WithCancel(ctx) + hc.ctx = moduleContext + hc.cancel = cancel hc.client = &http.Client{ Timeout: 10 * time.Second, @@ -79,3 +82,7 @@ func (hc *HTTPClient) Output(ctx context.Context, payload any) error { return nil } + +func (hc *HTTPClient) Stop() { + hc.cancel() +} diff --git a/internal/module/http-server.go b/internal/module/http-server.go index dc56137..76ea8cb 100644 --- a/internal/module/http-server.go +++ b/internal/module/http-server.go @@ -19,6 +19,7 @@ type HTTPServer struct { ctx context.Context router route.RouteIO logger *slog.Logger + cancel context.CancelFunc } type ResponseIOError struct { @@ -153,7 +154,9 @@ func (hs *HTTPServer) Run(ctx context.Context) error { return errors.New("http.server unable to get router from context") } hs.router = router - hs.ctx = ctx + moduleContext, cancel := context.WithCancel(ctx) + hs.ctx = moduleContext + hs.cancel = cancel httpServer := &http.Server{ Addr: fmt.Sprintf(":%d", hs.Port), @@ -199,3 +202,7 @@ func (hs *HTTPServer) Output(ctx context.Context, payload any) error { responseWriter.Write(payloadResponse.Body) return nil } + +func (hs *HTTPServer) Stop() { + hs.cancel() +} diff --git a/internal/module/midi-input.go b/internal/module/midi-input.go index f952dc5..24b2873 100644 --- a/internal/module/midi-input.go +++ b/internal/module/midi-input.go @@ -21,6 +21,7 @@ type MIDIInput struct { Port string SendFunc func(midi.Message) error logger *slog.Logger + cancel context.CancelFunc } func init() { @@ -61,7 +62,9 @@ func (mi *MIDIInput) Run(ctx context.Context) error { return errors.New("midi.input unable to get router from context") } mi.router = router - mi.ctx = ctx + moduleContext, cancel := context.WithCancel(ctx) + mi.ctx = moduleContext + mi.cancel = cancel in, err := midi.FindInPort(mi.Port) if err != nil { @@ -88,3 +91,7 @@ func (mi *MIDIInput) Run(ctx context.Context) error { func (mi *MIDIInput) Output(ctx context.Context, payload any) error { return errors.New("midi.input output is not implemented") } + +func (mi *MIDIInput) Stop() { + mi.cancel() +} diff --git a/internal/module/midi-output.go b/internal/module/midi-output.go index cb1207e..70a1972 100644 --- a/internal/module/midi-output.go +++ b/internal/module/midi-output.go @@ -21,6 +21,7 @@ type MIDIOutput struct { Port string SendFunc func(midi.Message) error logger *slog.Logger + cancel context.CancelFunc } func init() { @@ -62,7 +63,9 @@ func (mo *MIDIOutput) Run(ctx context.Context) error { return errors.New("midi.output unable to get router from context") } mo.router = router - mo.ctx = ctx + moduleContext, cancel := context.WithCancel(ctx) + mo.ctx = moduleContext + mo.cancel = cancel out, err := midi.FindOutPort(mo.Port) @@ -95,3 +98,7 @@ func (mo *MIDIOutput) Output(ctx context.Context, payload any) error { return mo.SendFunc(payloadMessage) } + +func (mo *MIDIOutput) Stop() { + mo.cancel() +} diff --git a/internal/module/module.go b/internal/module/module.go index 002b34a..8a1937d 100644 --- a/internal/module/module.go +++ b/internal/module/module.go @@ -19,6 +19,7 @@ type Module interface { Id() string Type() string Run(context.Context) error + Stop() Output(context.Context, any) error } diff --git a/internal/module/mqtt-client.go b/internal/module/mqtt-client.go index 65953eb..21caaa9 100644 --- a/internal/module/mqtt-client.go +++ b/internal/module/mqtt-client.go @@ -19,6 +19,7 @@ type MQTTClient struct { Topic string client mqtt.Client logger *slog.Logger + cancel context.CancelFunc } func init() { @@ -82,7 +83,9 @@ func (mc *MQTTClient) Run(ctx context.Context) error { return errors.New("mqtt.client unable to get router from context") } mc.router = router - mc.ctx = ctx + moduleContext, cancel := context.WithCancel(ctx) + mc.ctx = moduleContext + mc.cancel = cancel opts := mqtt.NewClientOptions() opts.AddBroker(mc.Broker) @@ -98,6 +101,7 @@ func (mc *MQTTClient) Run(ctx context.Context) error { } mc.client = mqtt.NewClient(opts) + defer mc.client.Disconnect(250) token := mc.client.Connect() @@ -133,3 +137,7 @@ func (mc *MQTTClient) Output(ctx context.Context, payload any) error { return token.Error() } + +func (mc *MQTTClient) Stop() { + mc.cancel() +} diff --git a/internal/module/nats-client.go b/internal/module/nats-client.go index b9ee35a..3f5f100 100644 --- a/internal/module/nats-client.go +++ b/internal/module/nats-client.go @@ -19,6 +19,7 @@ type NATSClient struct { Subject string client *nats.Conn logger *slog.Logger + cancel context.CancelFunc } func init() { @@ -71,7 +72,9 @@ func (nc *NATSClient) Run(ctx context.Context) error { } nc.router = router - nc.ctx = ctx + moduleContext, cancel := context.WithCancel(ctx) + nc.ctx = moduleContext + nc.cancel = cancel client, err := nats.Connect(nc.URL, nats.RetryOnFailedConnect(true)) @@ -121,3 +124,7 @@ func (nc *NATSClient) Output(ctx context.Context, payload any) error { return err } + +func (nc *NATSClient) Stop() { + nc.cancel() +} diff --git a/internal/module/psn-client.go b/internal/module/psn-client.go index 5664e75..2255daa 100644 --- a/internal/module/psn-client.go +++ b/internal/module/psn-client.go @@ -20,6 +20,7 @@ type PSNClient struct { router route.RouteIO decoder *psn.Decoder logger *slog.Logger + cancel context.CancelFunc } func init() { @@ -47,7 +48,9 @@ func (pc *PSNClient) Run(ctx context.Context) error { return errors.New("psn.client unable to get router from context") } pc.router = router - pc.ctx = ctx + moduleContext, cancel := context.WithCancel(ctx) + pc.ctx = moduleContext + pc.cancel = cancel addr, err := net.ResolveUDPAddr("udp", "236.10.10.10:56565") if err != nil { @@ -104,3 +107,7 @@ func (pc *PSNClient) Run(ctx context.Context) error { func (pc *PSNClient) Output(ctx context.Context, payload any) error { return fmt.Errorf("psn.client output is not implemented") } + +func (pc *PSNClient) Stop() { + pc.cancel() +} diff --git a/internal/module/serial-client.go b/internal/module/serial-client.go index 64702e4..50f8a11 100644 --- a/internal/module/serial-client.go +++ b/internal/module/serial-client.go @@ -24,6 +24,7 @@ type SerialClient struct { Mode *serial.Mode port serial.Port logger *slog.Logger + cancel context.CancelFunc } func init() { @@ -108,7 +109,9 @@ func (sc *SerialClient) Run(ctx context.Context) error { } sc.router = router - sc.ctx = ctx + moduleContext, cancel := context.WithCancel(ctx) + sc.ctx = moduleContext + sc.cancel = cancel // TODO(jwetzell): shutdown with router.Context properly go func() { @@ -180,3 +183,7 @@ func (sc *SerialClient) Output(ctx context.Context, payload any) error { _, err := sc.port.Write(sc.Framer.Encode(payloadBytes)) return err } + +func (sc *SerialClient) Stop() { + sc.cancel() +} diff --git a/internal/module/sip-call-server.go b/internal/module/sip-call-server.go index 9786a12..0852b7d 100644 --- a/internal/module/sip-call-server.go +++ b/internal/module/sip-call-server.go @@ -29,6 +29,7 @@ type SIPCallServer struct { UserAgent string dg *diago.Diago logger *slog.Logger + cancel context.CancelFunc } type SIPCallMessage struct { @@ -118,7 +119,9 @@ func (scs *SIPCallServer) Run(ctx context.Context) error { return errors.New("sip.call.server unable to get router from context") } scs.router = router - scs.ctx = ctx + moduleContext, cancel := context.WithCancel(ctx) + scs.ctx = moduleContext + scs.cancel = cancel diagoLogger := slog.New(slog.NewJSONHandler(io.Discard, nil)) @@ -228,3 +231,7 @@ func (scs *SIPCallServer) Output(ctx context.Context, payload any) error { } return errors.New("sip.dtmf.server can only output SipDTMFResponse or SipAudioFileResponse") } + +func (scs *SIPCallServer) Stop() { + scs.cancel() +} diff --git a/internal/module/sip-dtmf-server.go b/internal/module/sip-dtmf-server.go index 4504e78..4c332b6 100644 --- a/internal/module/sip-dtmf-server.go +++ b/internal/module/sip-dtmf-server.go @@ -29,6 +29,7 @@ type SIPDTMFServer struct { Transport string Separator string logger *slog.Logger + cancel context.CancelFunc } type SIPDTMFMessage struct { @@ -120,7 +121,9 @@ func (sds *SIPDTMFServer) Run(ctx context.Context) error { return errors.New("sip.dtmf.server unable to get router from context") } sds.router = router - sds.ctx = ctx + moduleContext, cancel := context.WithCancel(ctx) + sds.ctx = moduleContext + sds.cancel = cancel diagoLogger := slog.New(slog.NewJSONHandler(io.Discard, nil)) @@ -243,3 +246,7 @@ func (sds *SIPDTMFServer) Output(ctx context.Context, payload any) error { return errors.New("sip.dtmf.server can only output SipDTMFResponse or SipAudioFileResponse") } + +func (sds *SIPDTMFServer) Stop() { + sds.cancel() +} diff --git a/internal/module/tcp-client.go b/internal/module/tcp-client.go index 0022e47..8060d7a 100644 --- a/internal/module/tcp-client.go +++ b/internal/module/tcp-client.go @@ -21,6 +21,7 @@ type TCPClient struct { router route.RouteIO Addr *net.TCPAddr logger *slog.Logger + cancel context.CancelFunc } func init() { @@ -94,7 +95,9 @@ func (tc *TCPClient) Run(ctx context.Context) error { return errors.New("net.tcp.client unable to get router from context") } tc.router = router - tc.ctx = ctx + moduleContext, cancel := context.WithCancel(ctx) + tc.ctx = moduleContext + tc.cancel = cancel // TODO(jwetzell): shutdown with router.Context properly go func() { @@ -176,3 +179,7 @@ func (tc *TCPClient) Output(ctx context.Context, payload any) error { _, err := tc.conn.Write(tc.framer.Encode(payloadBytes)) return err } + +func (tc *TCPClient) Stop() { + tc.cancel() +} diff --git a/internal/module/tcp-server.go b/internal/module/tcp-server.go index de2b073..fbeb90e 100644 --- a/internal/module/tcp-server.go +++ b/internal/module/tcp-server.go @@ -27,6 +27,7 @@ type TCPServer struct { connections []*net.TCPConn connectionsMu sync.RWMutex logger *slog.Logger + cancel context.CancelFunc } func init() { @@ -105,6 +106,15 @@ ClientRead: for { select { case <-ts.quit: + client.Close() + ts.connectionsMu.Lock() + for i := 0; i < len(ts.connections); i++ { + if ts.connections[i] == client { + ts.connections = slices.Delete(ts.connections, i, i+1) + break + } + } + ts.connectionsMu.Unlock() return default: client.SetDeadline(time.Now().Add(time.Millisecond * 200)) @@ -166,7 +176,9 @@ func (ts *TCPServer) Run(ctx context.Context) error { return errors.New("net.tcp.server unable to get router from context") } ts.router = router - ts.ctx = ctx + moduleContext, cancel := context.WithCancel(ctx) + ts.ctx = moduleContext + ts.cancel = cancel listener, err := net.ListenTCP("tcp", ts.Addr) if err != nil { @@ -226,3 +238,8 @@ func (ts *TCPServer) Output(ctx context.Context, payload any) error { } return fmt.Errorf("net.tcp.server error during output: %s", errorString) } + +func (ts *TCPServer) Stop() { + ts.cancel() + ts.wg.Wait() +} diff --git a/internal/module/time-interval.go b/internal/module/time-interval.go index af5dbfe..0c1c2a2 100644 --- a/internal/module/time-interval.go +++ b/internal/module/time-interval.go @@ -17,6 +17,7 @@ type TimeInterval struct { router route.RouteIO ticker *time.Ticker logger *slog.Logger + cancel context.CancelFunc } func init() { @@ -56,7 +57,9 @@ func (i *TimeInterval) Run(ctx context.Context) error { return errors.New("time.interval unable to get router from context") } i.router = router - i.ctx = ctx + moduleContext, cancel := context.WithCancel(ctx) + i.ctx = moduleContext + i.cancel = cancel ticker := time.NewTicker(time.Millisecond * time.Duration(i.Duration)) i.ticker = ticker @@ -80,3 +83,7 @@ func (i *TimeInterval) Output(ctx context.Context, payload any) error { i.ticker.Reset(time.Millisecond * time.Duration(i.Duration)) return nil } + +func (i *TimeInterval) Stop() { + i.cancel() +} diff --git a/internal/module/time-timer.go b/internal/module/time-timer.go index 06828f6..1cc9539 100644 --- a/internal/module/time-timer.go +++ b/internal/module/time-timer.go @@ -17,6 +17,7 @@ type TimeTimer struct { router route.RouteIO timer *time.Timer logger *slog.Logger + cancel context.CancelFunc } func init() { @@ -57,7 +58,9 @@ func (t *TimeTimer) Run(ctx context.Context) error { return errors.New("net.tcp.client unable to get router from context") } t.router = router - t.ctx = ctx + moduleContext, cancel := context.WithCancel(ctx) + t.ctx = moduleContext + t.cancel = cancel t.timer = time.NewTimer(time.Millisecond * time.Duration(t.Duration)) defer t.timer.Stop() @@ -79,3 +82,7 @@ func (t *TimeTimer) Output(ctx context.Context, payload any) error { t.timer.Reset(time.Millisecond * time.Duration(t.Duration)) return nil } + +func (t *TimeTimer) Stop() { + t.cancel() +} diff --git a/internal/module/udp-client.go b/internal/module/udp-client.go index b3b2511..b90a36d 100644 --- a/internal/module/udp-client.go +++ b/internal/module/udp-client.go @@ -19,6 +19,7 @@ type UDPClient struct { ctx context.Context router route.RouteIO logger *slog.Logger + cancel context.CancelFunc } func init() { @@ -80,7 +81,9 @@ func (uc *UDPClient) Run(ctx context.Context) error { return errors.New("net.udp.client unable to get router from context") } uc.router = router - uc.ctx = ctx + moduleContext, cancel := context.WithCancel(ctx) + uc.ctx = moduleContext + uc.cancel = cancel err := uc.SetupConn() if err != nil { @@ -112,3 +115,7 @@ func (uc *UDPClient) Output(ctx context.Context, payload any) error { } return nil } + +func (uc *UDPClient) Stop() { + uc.cancel() +} diff --git a/internal/module/udp-multicast.go b/internal/module/udp-multicast.go index 92c6d8b..0c76853 100644 --- a/internal/module/udp-multicast.go +++ b/internal/module/udp-multicast.go @@ -19,6 +19,7 @@ type UDPMulticast struct { router route.RouteIO Addr *net.UDPAddr logger *slog.Logger + cancel context.CancelFunc } func init() { @@ -74,7 +75,9 @@ func (um *UDPMulticast) Run(ctx context.Context) error { return errors.New("net.udp.multicast unable to get router from context") } um.router = router - um.ctx = ctx + moduleContext, cancel := context.WithCancel(ctx) + um.ctx = moduleContext + um.cancel = cancel client, err := net.ListenMulticastUDP("udp", nil, um.Addr) if err != nil { @@ -130,3 +133,7 @@ func (um *UDPMulticast) Output(ctx context.Context, payload any) error { _, err := um.conn.Write(payloadBytes) return err } + +func (um *UDPMulticast) Stop() { + um.cancel() +} diff --git a/internal/module/udp-server.go b/internal/module/udp-server.go index c749153..82aebbe 100644 --- a/internal/module/udp-server.go +++ b/internal/module/udp-server.go @@ -20,6 +20,7 @@ type UDPServer struct { ctx context.Context router route.RouteIO logger *slog.Logger + cancel context.CancelFunc } func init() { @@ -88,7 +89,9 @@ func (us *UDPServer) Run(ctx context.Context) error { return errors.New("net.udp.server unable to get router from context") } us.router = router - us.ctx = ctx + moduleContext, cancel := context.WithCancel(ctx) + us.ctx = moduleContext + us.cancel = cancel listener, err := net.ListenUDP("udp", us.Addr) if err != nil { @@ -129,3 +132,7 @@ func (us *UDPServer) Run(ctx context.Context) error { func (us *UDPServer) Output(ctx context.Context, payload any) error { return errors.New("net.udp.server output is not implemented") } + +func (us *UDPServer) Stop() { + us.cancel() +} diff --git a/router_test.go b/router_test.go index 9af83b4..651e739 100644 --- a/router_test.go +++ b/router_test.go @@ -25,6 +25,7 @@ type MockCounterModule struct { outputCount int router route.RouteIO logger *slog.Logger + cancel context.CancelFunc } func (mcm *MockCounterModule) Id() string { @@ -43,7 +44,9 @@ func (mcm *MockCounterModule) Run(ctx context.Context) error { return fmt.Errorf("mock.counter could not get router from context") } mcm.router = router - mcm.ctx = ctx + moduleContext, cancel := context.WithCancel(ctx) + mcm.ctx = moduleContext + mcm.cancel = cancel <-mcm.ctx.Done() return nil } @@ -52,6 +55,10 @@ func (mcm *MockCounterModule) Type() string { return mcm.config.Type } +func (mcm *MockCounterModule) Stop() { + mcm.cancel() +} + func init() { module.RegisterModule(module.ModuleRegistration{ Type: "mock.counter",