Merge pull request #111 from jwetzell/feat/kv-processors

more generic key value module things
This commit is contained in:
Joel Wetzell
2026-03-18 14:20:12 -05:00
committed by GitHub
22 changed files with 187 additions and 165 deletions

View File

@@ -11,3 +11,8 @@ type Module interface {
Stop() Stop()
Output(context.Context, any) error Output(context.Context, any) error
} }
type KeyValueModule interface {
Get(key string) (any, error)
Set(key string, value any) error
}

View File

@@ -205,7 +205,3 @@ func (hs *HTTPServer) Output(ctx context.Context, payload any) error {
func (hs *HTTPServer) Stop() { func (hs *HTTPServer) Stop() {
hs.cancel() hs.cancel()
} }
func (hs *HTTPServer) Get(key string) (any, error) {
return nil, errors.New("http.server does not support Get")
}

View File

@@ -89,12 +89,3 @@ func (mi *MIDIInput) Output(ctx context.Context, payload any) error {
func (mi *MIDIInput) Stop() { func (mi *MIDIInput) Stop() {
mi.cancel() mi.cancel()
} }
func (mi *MIDIInput) Get(key string) (any, error) {
switch key {
case "port":
return mi.Port, nil
default:
return nil, errors.New("midi.input key not found")
}
}

View File

@@ -96,12 +96,3 @@ func (mo *MIDIOutput) Output(ctx context.Context, payload any) error {
func (mo *MIDIOutput) Stop() { func (mo *MIDIOutput) Stop() {
mo.cancel() mo.cancel()
} }
func (mo *MIDIOutput) Get(key string) (any, error) {
switch key {
case "port":
return mo.Port, nil
default:
return nil, errors.New("midi.output key not found")
}
}

View File

@@ -125,7 +125,3 @@ func (mc *MQTTClient) Output(ctx context.Context, payload any) error {
func (mc *MQTTClient) Stop() { func (mc *MQTTClient) Stop() {
mc.cancel() mc.cancel()
} }
func (mc *MQTTClient) Get(key string) (any, error) {
return nil, errors.New("mqtt.client does not support Get")
}

View File

@@ -116,7 +116,3 @@ func (nc *NATSClient) Output(ctx context.Context, payload any) error {
func (nc *NATSClient) Stop() { func (nc *NATSClient) Stop() {
nc.cancel() nc.cancel()
} }
func (nc *NATSClient) Get(key string) (any, error) {
return nil, errors.New("nats.client does not support Get")
}

View File

@@ -112,7 +112,3 @@ func (ns *NATSServer) Stop() {
ns.server.Shutdown() ns.server.Shutdown()
} }
} }
func (ns *NATSServer) Get(key string) (any, error) {
return nil, errors.New("nats.server does not support Get")
}

View File

@@ -112,14 +112,3 @@ func (pc *PSNClient) Output(ctx context.Context, payload any) error {
func (pc *PSNClient) Stop() { func (pc *PSNClient) Stop() {
pc.cancel() pc.cancel()
} }
func (pc *PSNClient) Get(key string) (any, error) {
switch key {
case "trackers":
return pc.decoder.Trackers, nil
case "name":
return pc.decoder.SystemName, nil
default:
return nil, errors.New("psn.client key not found")
}
}

View File

@@ -89,13 +89,6 @@ func (rc *RedisClient) Stop() {
} }
func (rc *RedisClient) Get(key string) (any, error) { func (rc *RedisClient) Get(key string) (any, error) {
switch key {
case "host":
return rc.Host, nil
case "port":
return rc.Port, nil
default:
if rc.client != nil { if rc.client != nil {
val, err := rc.client.Get(rc.ctx, key).Result() val, err := rc.client.Get(rc.ctx, key).Result()
if err != nil { if err != nil {
@@ -103,6 +96,13 @@ func (rc *RedisClient) Get(key string) (any, error) {
} }
return val, nil return val, nil
} }
return nil, errors.New("redis.client key not found") return nil, errors.New("redis.client not setup")
} }
func (rc *RedisClient) Set(key string, value any) error {
if rc.client != nil {
status := rc.client.Set(rc.ctx, key, value, 0)
return status.Err()
}
return errors.New("redis.client not setup")
} }

View File

@@ -169,12 +169,3 @@ func (sc *SerialClient) Output(ctx context.Context, payload any) error {
func (sc *SerialClient) Stop() { func (sc *SerialClient) Stop() {
sc.cancel() sc.cancel()
} }
func (sc *SerialClient) Get(key string) (any, error) {
switch key {
case "port":
return sc.Port, nil
default:
return nil, errors.New("serial.client key not found")
}
}

View File

@@ -221,7 +221,3 @@ func (scs *SIPCallServer) Output(ctx context.Context, payload any) error {
func (scs *SIPCallServer) Stop() { func (scs *SIPCallServer) Stop() {
scs.cancel() scs.cancel()
} }
func (scs *SIPCallServer) Get(key string) (any, error) {
return nil, errors.New("sip.call.server does not support Get")
}

View File

@@ -249,7 +249,3 @@ func (sds *SIPDTMFServer) Output(ctx context.Context, payload any) error {
func (sds *SIPDTMFServer) Stop() { func (sds *SIPDTMFServer) Stop() {
sds.cancel() sds.cancel()
} }
func (sds *SIPDTMFServer) Get(key string) (any, error) {
return nil, errors.New("sip.dtmf.server does not support Get")
}

View File

@@ -163,20 +163,3 @@ func (tc *TCPClient) Output(ctx context.Context, payload any) error {
func (tc *TCPClient) Stop() { func (tc *TCPClient) Stop() {
tc.cancel() tc.cancel()
} }
func (tc *TCPClient) Get(key string) (any, error) {
switch key {
case "host":
host, err := tc.config.Params.GetString("host")
if err != nil {
return nil, fmt.Errorf("net.tcp.client host error: %w", err)
}
return host, nil
case "ip":
return tc.Addr.IP.String(), nil
case "port":
return tc.Addr.Port, nil
default:
return nil, errors.New("net.tcp.client key not found")
}
}

View File

@@ -232,14 +232,3 @@ func (ts *TCPServer) Stop() {
ts.cancel() ts.cancel()
ts.wg.Wait() ts.wg.Wait()
} }
func (ts *TCPServer) Get(key string) (any, error) {
switch key {
case "ip":
return ts.Addr.IP.String(), nil
case "port":
return ts.Addr.Port, nil
default:
return nil, errors.New("net.tcp.server key not found")
}
}

View File

@@ -31,10 +31,6 @@ func (m *TestModule) Id() string {
return "test" return "test"
} }
func (m *TestModule) Get(key string) (any, error) {
return nil, nil
}
func TestModuleBadRegistrationNoType(t *testing.T) { func TestModuleBadRegistrationNoType(t *testing.T) {
defer func() { defer func() {
if r := recover(); r == nil { if r := recover(); r == nil {

View File

@@ -82,12 +82,3 @@ func (i *TimeInterval) Output(ctx context.Context, payload any) error {
func (i *TimeInterval) Stop() { func (i *TimeInterval) Stop() {
i.cancel() i.cancel()
} }
func (i *TimeInterval) Get(key string) (any, error) {
switch key {
case "duration":
return i.Duration, nil
default:
return nil, errors.New("time.interval key not found")
}
}

View File

@@ -81,12 +81,3 @@ func (t *TimeTimer) Output(ctx context.Context, payload any) error {
func (t *TimeTimer) Stop() { func (t *TimeTimer) Stop() {
t.cancel() t.cancel()
} }
func (t *TimeTimer) Get(key string) (any, error) {
switch key {
case "duration":
return t.Duration, nil
default:
return nil, errors.New("time.timer key not found")
}
}

View File

@@ -106,20 +106,3 @@ func (uc *UDPClient) Output(ctx context.Context, payload any) error {
func (uc *UDPClient) Stop() { func (uc *UDPClient) Stop() {
uc.cancel() uc.cancel()
} }
func (uc *UDPClient) Get(key string) (any, error) {
switch key {
case "host":
host, err := uc.config.Params.GetString("host")
if err != nil {
return nil, fmt.Errorf("net.udp.client host error: %w", err)
}
return host, nil
case "ip":
return uc.Addr.IP.String(), nil
case "port":
return uc.Addr.Port, nil
default:
return nil, errors.New("net.udp.client key not found")
}
}

View File

@@ -124,14 +124,3 @@ func (um *UDPMulticast) Output(ctx context.Context, payload any) error {
func (um *UDPMulticast) Stop() { func (um *UDPMulticast) Stop() {
um.cancel() um.cancel()
} }
func (um *UDPMulticast) Get(key string) (any, error) {
switch key {
case "ip":
return um.Addr.IP.String(), nil
case "port":
return um.Addr.Port, nil
default:
return nil, errors.New("net.udp.multicast key not found")
}
}

View File

@@ -123,14 +123,3 @@ func (us *UDPServer) Output(ctx context.Context, payload any) error {
func (us *UDPServer) Stop() { func (us *UDPServer) Stop() {
us.cancel() us.cancel()
} }
func (us *UDPServer) Get(key string) (any, error) {
switch key {
case "ip":
return us.Addr.IP.String(), nil
case "port":
return us.Addr.Port, nil
default:
return nil, errors.New("net.udp.server key not found")
}
}

View File

@@ -0,0 +1,78 @@
package processor
import (
"context"
"errors"
"fmt"
"log/slog"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
type KVGet struct {
config config.ProcessorConfig
ModuleId string
Key string
logger *slog.Logger
}
func (kvg *KVGet) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
ctxModules := ctx.Value(common.ModulesContextKey)
if ctxModules == nil {
wrappedPayload.End = true
return wrappedPayload, errors.New("kv.get unable to get modules from context")
}
moduleMap, ok := ctxModules.(map[string]common.Module)
if !ok {
wrappedPayload.End = true
return wrappedPayload, errors.New("kv.get modules from context has wrong type")
}
module, ok := moduleMap[kvg.ModuleId]
if !ok {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("kv.get unable to find module with id: %s", kvg.ModuleId)
}
kvModule, ok := module.(common.KeyValueModule)
if !ok {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("kv.get module with id %s is not a KeyValueModule", kvg.ModuleId)
}
value, err := kvModule.Get(kvg.Key)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("kv.get error getting key: %w", err)
}
wrappedPayload.Payload = value
return wrappedPayload, nil
}
func (kvg *KVGet) Type() string {
return kvg.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "kv.get",
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
moduleIdString, err := params.GetString("module")
if err != nil {
return nil, fmt.Errorf("kv.get module error: %w", err)
}
keyString, err := params.GetString("key")
if err != nil {
return nil, fmt.Errorf("kv.get key error: %w", err)
}
return &KVGet{config: config, ModuleId: moduleIdString, Key: keyString, logger: slog.Default().With("component", "processor", "type", config.Type)}, nil
},
})
}

View File

@@ -0,0 +1,90 @@
package processor
import (
"context"
"errors"
"fmt"
"html/template"
"log/slog"
"github.com/jwetzell/showbridge-go/internal/common"
"github.com/jwetzell/showbridge-go/internal/config"
)
type KVSet struct {
config config.ProcessorConfig
ModuleId string
Key string
Value *template.Template
logger *slog.Logger
}
func (kvs *KVSet) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
ctxModules := ctx.Value(common.ModulesContextKey)
if ctxModules == nil {
wrappedPayload.End = true
return wrappedPayload, errors.New("kv.set unable to get modules from context")
}
moduleMap, ok := ctxModules.(map[string]common.Module)
if !ok {
wrappedPayload.End = true
return wrappedPayload, errors.New("kv.set modules from context has wrong type")
}
module, ok := moduleMap[kvs.ModuleId]
if !ok {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("kv.set unable to find module with id: %s", kvs.ModuleId)
}
kvModule, ok := module.(common.KeyValueModule)
if !ok {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("kv.set module with id %s is not a KeyValueModule", kvs.ModuleId)
}
err := kvModule.Set(kvs.Key, wrappedPayload.Payload)
if err != nil {
wrappedPayload.End = true
return wrappedPayload, fmt.Errorf("kv.set error setting key: %w", err)
}
return wrappedPayload, nil
}
func (kvs *KVSet) Type() string {
return kvs.config.Type
}
func init() {
RegisterProcessor(ProcessorRegistration{
Type: "kv.set",
New: func(config config.ProcessorConfig) (Processor, error) {
params := config.Params
moduleIdString, err := params.GetString("module")
if err != nil {
return nil, fmt.Errorf("kv.set module error: %w", err)
}
keyString, err := params.GetString("key")
if err != nil {
return nil, fmt.Errorf("kv.set key error: %w", err)
}
valueString, err := params.GetString("value")
if err != nil {
return nil, fmt.Errorf("kv.set value error: %w", err)
}
valueTemplate, err := template.New("template").Parse(valueString)
if err != nil {
return nil, err
}
return &KVSet{config: config, ModuleId: moduleIdString, Key: keyString, Value: valueTemplate, logger: slog.Default().With("component", "processor", "type", config.Type)}, nil
},
})
}