diff --git a/internal/processing/string-decode.go b/internal/processing/string-decode.go new file mode 100644 index 0000000..22d984c --- /dev/null +++ b/internal/processing/string-decode.go @@ -0,0 +1,35 @@ +package processing + +import ( + "context" + "fmt" +) + +type StringDecode struct { + config ProcessorConfig +} + +func (sd *StringDecode) Process(ctx context.Context, payload any) (any, error) { + payloadBytes, ok := payload.([]byte) + + if !ok { + return nil, fmt.Errorf("string.decode processor only accepts a []byte") + } + + payloadMessage := string(payloadBytes) + + return payloadMessage, nil +} + +func (sd *StringDecode) Type() string { + return sd.config.Type +} + +func init() { + RegisterProcessor(ProcessorRegistration{ + Type: "string.decode", + New: func(config ProcessorConfig) (Processor, error) { + return &StringDecode{config: config}, nil + }, + }) +} diff --git a/internal/processing/string-encode.go b/internal/processing/string-encode.go new file mode 100644 index 0000000..8966d7f --- /dev/null +++ b/internal/processing/string-encode.go @@ -0,0 +1,35 @@ +package processing + +import ( + "context" + "fmt" +) + +type StringEncode struct { + config ProcessorConfig +} + +func (se *StringEncode) Process(ctx context.Context, payload any) (any, error) { + payloadString, ok := payload.(string) + + if !ok { + return nil, fmt.Errorf("string.encode processor only accepts a string") + } + + payloadBytes := []byte(payloadString) + + return payloadBytes, nil +} + +func (se *StringEncode) Type() string { + return se.config.Type +} + +func init() { + RegisterProcessor(ProcessorRegistration{ + Type: "string.encode", + New: func(config ProcessorConfig) (Processor, error) { + return &StringEncode{config: config}, nil + }, + }) +} diff --git a/internal/processing/string-filter.go b/internal/processing/string-filter.go new file mode 100644 index 0000000..35a2eeb --- /dev/null +++ b/internal/processing/string-filter.go @@ -0,0 +1,59 @@ +package processing + +import ( + "context" + "fmt" + "regexp" +) + +type StringFilter struct { + config ProcessorConfig + Pattern *regexp.Regexp +} + +func (se *StringFilter) Process(ctx context.Context, payload any) (any, error) { + payloadString, ok := payload.(string) + + if !ok { + return nil, fmt.Errorf("string.filter processor only accepts a string") + } + + if !se.Pattern.MatchString(payloadString) { + return nil, nil + } + + return payloadString, nil +} + +func (se *StringFilter) Type() string { + return se.config.Type +} + +func init() { + RegisterProcessor(ProcessorRegistration{ + Type: "string.filter", + New: func(config ProcessorConfig) (Processor, error) { + params := config.Params + + pattern, ok := params["pattern"] + + if !ok { + return nil, fmt.Errorf("http.request.filter requires an pattern parameter") + } + + patternString, ok := pattern.(string) + + if !ok { + return nil, fmt.Errorf("http.request.filter pattern must be a string") + } + + patternRegexp, err := regexp.Compile(fmt.Sprintf("^%s$", patternString)) + + if err != nil { + return nil, err + } + + return &StringFilter{config: config, Pattern: patternRegexp}, nil + }, + }) +}