From 3f4271b5efee37ddef5bdafe7d857cc00891db11 Mon Sep 17 00:00:00 2001 From: Joel Wetzell Date: Wed, 18 Mar 2026 14:21:36 -0500 Subject: [PATCH] add module and processor for interacting with SQLite DB --- go.mod | 4 +- go.sum | 30 +++++---- internal/common/module.go | 5 ++ internal/module/db-sqlite.go | 81 ++++++++++++++++++++++ internal/processor/db-query.go | 119 +++++++++++++++++++++++++++++++++ 5 files changed, 223 insertions(+), 16 deletions(-) create mode 100644 internal/module/db-sqlite.go create mode 100644 internal/processor/db-query.go diff --git a/go.mod b/go.mod index 93dd3ae..66ab049 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( go.opentelemetry.io/otel/sdk v1.42.0 go.opentelemetry.io/otel/trace v1.42.0 modernc.org/quickjs v0.17.1 + modernc.org/sqlite v1.47.0 sigs.k8s.io/yaml v1.6.0 ) @@ -73,7 +74,6 @@ require ( go.uber.org/atomic v1.11.0 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect golang.org/x/crypto v0.48.0 // indirect - golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect golang.org/x/net v0.51.0 // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.42.0 // indirect @@ -84,7 +84,7 @@ require ( google.golang.org/grpc v1.79.2 // indirect google.golang.org/protobuf v1.36.11 // indirect gopkg.in/hraban/opus.v2 v2.0.0-20230925203106-0188a62cb302 // indirect - modernc.org/libc v1.67.1 // indirect + modernc.org/libc v1.70.0 // indirect modernc.org/libquickjs v0.12.3 // indirect modernc.org/mathutil v1.7.1 // indirect modernc.org/memory v1.11.0 // indirect diff --git a/go.sum b/go.sum index ecfc311..362334e 100644 --- a/go.sum +++ b/go.sum @@ -51,6 +51,8 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/go-tpm v0.9.8 h1:slArAR9Ft+1ybZu0lBwpSmpwhRXaa85hWtMinMyRAWo= github.com/google/go-tpm v0.9.8/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= @@ -161,10 +163,8 @@ go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts= golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= -golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 h1:mgKeJMpvi0yx/sU5GsxQ7p6s2wtOnGAHZWCHUM4KGzY= -golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70= -golang.org/x/mod v0.32.0 h1:9F4d3PHLljb6x//jOyokMv3eX+YDeepZSEo3mFJy93c= -golang.org/x/mod v0.32.0/go.mod h1:SgipZ/3h2Ci89DlEtEXWUk/HteuRin+HHhN+WbNhguU= +golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8= +golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w= golang.org/x/net v0.51.0 h1:94R/GTO7mt3/4wIKpcR5gkGmRLOuE/2hNGeWq/GBIFo= golang.org/x/net v0.51.0/go.mod h1:aamm+2QF5ogm02fjy5Bb7CQ0WMt1/WVM7FtyaTLlA9Y= golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= @@ -177,8 +177,8 @@ golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk= golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA= golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U= golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno= -golang.org/x/tools v0.41.0 h1:a9b8iMweWG+S0OBnlU36rzLp20z1Rp10w+IY2czHTQc= -golang.org/x/tools v0.41.0/go.mod h1:XSY6eDqxVNiYgezAVqqCeihT4j1U2CCsqvH3WhQpnlg= +golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k= +golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 h1:JLQynH/LBHfCTSbDWl+py8C+Rg/k1OVH3xfcaiANuF0= @@ -200,18 +200,18 @@ gotest.tools/v3 v3.5.2 h1:7koQfIKdy+I8UTetycgUqXWSDwpgv193Ka+qRsmBY8Q= gotest.tools/v3 v3.5.2/go.mod h1:LtdLGcnqToBH83WByAAi/wiwSFCArdFIUV/xxN4pcjA= modernc.org/cc/v4 v4.27.1 h1:9W30zRlYrefrDV2JE2O8VDtJ1yPGownxciz5rrbQZis= modernc.org/cc/v4 v4.27.1/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0= -modernc.org/ccgo/v4 v4.30.1 h1:4r4U1J6Fhj98NKfSjnPUN7Ze2c6MnAdL0hWw6+LrJpc= -modernc.org/ccgo/v4 v4.30.1/go.mod h1:bIOeI1JL54Utlxn+LwrFyjCx2n2RDiYEaJVSrgdrRfM= -modernc.org/fileutil v1.3.40 h1:ZGMswMNc9JOCrcrakF1HrvmergNLAmxOPjizirpfqBA= -modernc.org/fileutil v1.3.40/go.mod h1:HxmghZSZVAz/LXcMNwZPA/DRrQZEVP9VX0V4LQGQFOc= +modernc.org/ccgo/v4 v4.32.0 h1:hjG66bI/kqIPX1b2yT6fr/jt+QedtP2fqojG2VrFuVw= +modernc.org/ccgo/v4 v4.32.0/go.mod h1:6F08EBCx5uQc38kMGl+0Nm0oWczoo1c7cgpzEry7Uc0= +modernc.org/fileutil v1.4.0 h1:j6ZzNTftVS054gi281TyLjHPp6CPHr2KCxEXjEbD6SM= +modernc.org/fileutil v1.4.0/go.mod h1:EqdKFDxiByqxLk8ozOxObDSfcVOv/54xDs/DUHdvCUU= modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI= modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito= -modernc.org/gc/v3 v3.1.1 h1:k8T3gkXWY9sEiytKhcgyiZ2L0DTyCQ/nvX+LoCljoRE= -modernc.org/gc/v3 v3.1.1/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY= +modernc.org/gc/v3 v3.1.2 h1:ZtDCnhonXSZexk/AYsegNRV1lJGgaNZJuKjJSWKyEqo= +modernc.org/gc/v3 v3.1.2/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY= modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks= modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI= -modernc.org/libc v1.67.1 h1:bFaqOaa5/zbWYJo8aW0tXPX21hXsngG2M7mckCnFSVk= -modernc.org/libc v1.67.1/go.mod h1:QvvnnJ5P7aitu0ReNpVIEyesuhmDLQ8kaEoyMjIFZJA= +modernc.org/libc v1.70.0 h1:U58NawXqXbgpZ/dcdS9kMshu08aiA6b7gusEusqzNkw= +modernc.org/libc v1.70.0/go.mod h1:OVmxFGP1CI/Z4L3E0Q3Mf1PDE0BucwMkcXjjLntvHJo= modernc.org/libquickjs v0.12.3 h1:2IU9B6njBmce2PuYttJDkXeoLRV9WnvgP+eU5HAC8YI= modernc.org/libquickjs v0.12.3/go.mod h1:iCsgVxnHTX3i0YPxxHBmJk0GLA5sVUHXWI/090UXgeE= modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= @@ -224,6 +224,8 @@ modernc.org/quickjs v0.17.1 h1:CbYnbTf7ksZk9YZ1rRM2Ab1Zfi+X6s50kXiOhpd2NIg= modernc.org/quickjs v0.17.1/go.mod h1:hATT7DIJc33I5Q/Fjffhm0tpUHNSqdKHma/ossibTA0= modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w= modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE= +modernc.org/sqlite v1.47.0 h1:R1XyaNpoW4Et9yly+I2EeX7pBza/w+pmYee/0HJDyKk= +modernc.org/sqlite v1.47.0/go.mod h1:hWjRO6Tj/5Ik8ieqxQybiEOUXy0NJFNp2tpvVpKlvig= modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0= modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A= modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= diff --git a/internal/common/module.go b/internal/common/module.go index bc65811..0d9477b 100644 --- a/internal/common/module.go +++ b/internal/common/module.go @@ -2,6 +2,7 @@ package common import ( "context" + "database/sql" ) type Module interface { @@ -16,3 +17,7 @@ type KeyValueModule interface { Get(key string) (any, error) Set(key string, value any) error } + +type DatabaseModule interface { + Database() *sql.DB +} diff --git a/internal/module/db-sqlite.go b/internal/module/db-sqlite.go new file mode 100644 index 0000000..924789d --- /dev/null +++ b/internal/module/db-sqlite.go @@ -0,0 +1,81 @@ +package module + +import ( + "context" + "database/sql" + "errors" + "fmt" + "log/slog" + + "github.com/jwetzell/showbridge-go/internal/common" + "github.com/jwetzell/showbridge-go/internal/config" + + _ "modernc.org/sqlite" +) + +type DbSqlite struct { + config config.ModuleConfig + Dsn string + ctx context.Context + router common.RouteIO + db *sql.DB + logger *slog.Logger +} + +func init() { + RegisterModule(ModuleRegistration{ + Type: "db.sqlite", + New: func(config config.ModuleConfig) (common.Module, error) { + params := config.Params + + dsnString, err := params.GetString("dsn") + if err != nil { + return nil, fmt.Errorf("db.sqlite dsn error: %w", err) + } + + return &DbSqlite{Dsn: dsnString, config: config, logger: CreateLogger(config)}, nil + }, + }) +} + +func (t *DbSqlite) Id() string { + return t.config.Id +} + +func (t *DbSqlite) Type() string { + return t.config.Type +} + +func (t *DbSqlite) Start(ctx context.Context) error { + t.logger.Debug("running") + router, ok := ctx.Value(common.RouterContextKey).(common.RouteIO) + + if !ok { + return errors.New("db.sqlite unable to get router from context") + } + t.router = router + t.ctx = ctx + + db, err := sql.Open("sqlite", t.Dsn) + if err != nil { + return fmt.Errorf("db.sqlite error opening database: %w", err) + } + t.db = db + defer t.db.Close() + <-ctx.Done() + return nil +} + +func (t *DbSqlite) Output(ctx context.Context, payload any) error { + return nil +} + +func (t *DbSqlite) Stop() { + if t.db != nil { + t.db.Close() + } +} + +func (t *DbSqlite) Database() *sql.DB { + return t.db +} diff --git a/internal/processor/db-query.go b/internal/processor/db-query.go new file mode 100644 index 0000000..08a17cb --- /dev/null +++ b/internal/processor/db-query.go @@ -0,0 +1,119 @@ +package processor + +import ( + "context" + "errors" + "fmt" + "log/slog" + + "github.com/jwetzell/showbridge-go/internal/common" + "github.com/jwetzell/showbridge-go/internal/config" +) + +type DbQuery struct { + config config.ProcessorConfig + ModuleId string + Query string + logger *slog.Logger +} + +func (dq *DbQuery) Process(ctx context.Context, wrappedPayload common.WrappedPayload) (common.WrappedPayload, error) { + ctxModules := ctx.Value(common.ModulesContextKey) + if ctxModules == nil { + wrappedPayload.End = true + return wrappedPayload, errors.New("db.query unable to get modules from context") + } + + moduleMap, ok := ctxModules.(map[string]common.Module) + if !ok { + wrappedPayload.End = true + return wrappedPayload, errors.New("db.query modules from context has wrong type") + } + + module, ok := moduleMap[dq.ModuleId] + if !ok { + wrappedPayload.End = true + return wrappedPayload, fmt.Errorf("db.query unable to find module with id: %s", dq.ModuleId) + } + + dbModule, ok := module.(common.DatabaseModule) + if !ok { + wrappedPayload.End = true + return wrappedPayload, fmt.Errorf("db.query module with id %s is not a DatabaseModule", dq.ModuleId) + } + + db := dbModule.Database() + if db == nil { + wrappedPayload.End = true + return wrappedPayload, fmt.Errorf("db.query module with id %s returned nil database", dq.ModuleId) + } + + rows, err := db.QueryContext(ctx, dq.Query) + if err != nil { + wrappedPayload.End = true + return wrappedPayload, fmt.Errorf("db.query error executing query: %w", err) + } + defer rows.Close() + + columns, err := rows.Columns() + if err != nil { + wrappedPayload.End = true + return wrappedPayload, fmt.Errorf("db.query error getting columns: %w", err) + } + + results := make([]map[string]any, 0) + + for rows.Next() { + columnValues := make([]interface{}, len(columns)) + + for i := range columnValues { + columnValues[i] = new(interface{}) + } + + if err := rows.Scan(columnValues...); err != nil { + wrappedPayload.End = true + return wrappedPayload, fmt.Errorf("db.query error scanning row: %w", err) + } + + rowMap := make(map[string]any) + for i, colName := range columns { + rowMap[colName] = columnValues[i] + } + results = append(results, rowMap) + } + + if len(results) == 0 { + wrappedPayload.Payload = nil + return wrappedPayload, nil + } else if len(results) == 1 { + wrappedPayload.Payload = results[0] + return wrappedPayload, nil + } + wrappedPayload.Payload = results + return wrappedPayload, nil +} + +func (dq *DbQuery) Type() string { + return dq.config.Type +} + +func init() { + RegisterProcessor(ProcessorRegistration{ + Type: "db.query", + New: func(config config.ProcessorConfig) (Processor, error) { + + params := config.Params + + moduleIdString, err := params.GetString("module") + if err != nil { + return nil, fmt.Errorf("db.query module error: %w", err) + } + + queryString, err := params.GetString("query") + if err != nil { + return nil, fmt.Errorf("db.query query error: %w", err) + } + return &DbQuery{config: config, ModuleId: moduleIdString, Query: queryString, logger: slog.Default().With("component", "processor", "type", config.Type)}, nil + }, + }) +}