mirror of
https://github.com/jwetzell/showbridge-go.git
synced 2026-04-26 21:05:30 +00:00
add processor to filter out unique values
This commit is contained in:
39
internal/processor/filter-unique.go
Normal file
39
internal/processor/filter-unique.go
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
package processor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"reflect"
|
||||||
|
|
||||||
|
"github.com/jwetzell/showbridge-go/internal/common"
|
||||||
|
"github.com/jwetzell/showbridge-go/internal/config"
|
||||||
|
)
|
||||||
|
|
||||||
|
type FilterUnique struct {
|
||||||
|
config config.ProcessorConfig
|
||||||
|
previous any
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fr *FilterUnique) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) {
|
||||||
|
payload := wrappedPayload.Payload
|
||||||
|
|
||||||
|
if reflect.DeepEqual(payload, fr.previous) {
|
||||||
|
wrappedPayload.End = true
|
||||||
|
return wrappedPayload, nil
|
||||||
|
}
|
||||||
|
fr.previous = payload
|
||||||
|
|
||||||
|
return wrappedPayload, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fr *FilterUnique) Type() string {
|
||||||
|
return fr.config.Type
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
RegisterProcessor(ProcessorRegistration{
|
||||||
|
Type: "filter.unique",
|
||||||
|
New: func(config config.ProcessorConfig) (Processor, error) {
|
||||||
|
return &FilterUnique{config: config}, nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
124
internal/processor/test/filter-unique_test.go
Normal file
124
internal/processor/test/filter-unique_test.go
Normal file
@@ -0,0 +1,124 @@
|
|||||||
|
package processor_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/jwetzell/showbridge-go/internal/common"
|
||||||
|
"github.com/jwetzell/showbridge-go/internal/config"
|
||||||
|
"github.com/jwetzell/showbridge-go/internal/processor"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestFilterUniqueFromRegistry(t *testing.T) {
|
||||||
|
registration, ok := processor.ProcessorRegistry["filter.unique"]
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("filter.unique processor not registered")
|
||||||
|
}
|
||||||
|
|
||||||
|
processorInstance, err := registration.New(config.ProcessorConfig{
|
||||||
|
Type: "filter.unique",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create filter.unique processor: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if processorInstance.Type() != "filter.unique" {
|
||||||
|
t.Fatalf("filter.unique processor has wrong type: %s", processorInstance.Type())
|
||||||
|
}
|
||||||
|
|
||||||
|
payload := "hello"
|
||||||
|
expected := "hello"
|
||||||
|
|
||||||
|
got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), payload))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("filter.unique processing failed: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(got.Payload, expected) {
|
||||||
|
t.Fatalf("filter.unique got %+v, expected %+v", got.Payload, expected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGoodFilterUnique(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
params map[string]any
|
||||||
|
payload string
|
||||||
|
match bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "basic",
|
||||||
|
payload: "hello",
|
||||||
|
params: nil,
|
||||||
|
match: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
registration, ok := processor.ProcessorRegistry["filter.unique"]
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("filter.unique processor not registered")
|
||||||
|
}
|
||||||
|
|
||||||
|
processorInstance, err := registration.New(config.ProcessorConfig{
|
||||||
|
Type: "filter.unique",
|
||||||
|
Params: test.params,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("filter.unique failed to create processor: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("filter.unique processing failed: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got.End != !test.match {
|
||||||
|
t.Fatalf("filter.unique did not filter properly %+v, expected %+v", got, test.match)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBadFilterUnique(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
params map[string]any
|
||||||
|
payload any
|
||||||
|
errorString string
|
||||||
|
}{}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
registration, ok := processor.ProcessorRegistry["filter.unique"]
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("filter.unique processor not registered")
|
||||||
|
}
|
||||||
|
|
||||||
|
processorInstance, err := registration.New(config.ProcessorConfig{
|
||||||
|
Type: "filter.unique",
|
||||||
|
Params: test.params,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if test.errorString != err.Error() {
|
||||||
|
t.Fatalf("filter.unique got error '%s', expected '%s'", err.Error(), test.errorString)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
got, err := processorInstance.Process(t.Context(), common.GetWrappedPayload(t.Context(), test.payload))
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("filter.unique expected to fail but got payload: %+v", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err.Error() != test.errorString {
|
||||||
|
t.Fatalf("filter.unique got error '%s', expected '%s'", err.Error(), test.errorString)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -88,6 +88,18 @@
|
|||||||
"required": ["type", "params"],
|
"required": ["type", "params"],
|
||||||
"additionalProperties": false
|
"additionalProperties": false
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"type": "object",
|
||||||
|
"title": "Filter Unique Values",
|
||||||
|
"properties": {
|
||||||
|
"type": {
|
||||||
|
"type": "string",
|
||||||
|
"const": "filter.unique"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": ["type"],
|
||||||
|
"additionalProperties": false
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"title": "Parse Float",
|
"title": "Parse Float",
|
||||||
|
|||||||
Reference in New Issue
Block a user