diff --git a/docs/modules/components/pages/inputs/gcp_spanner_change_stream.adoc b/docs/modules/components/pages/inputs/gcp_spanner_change_stream.adoc new file mode 100644 index 0000000000..43a69aa036 --- /dev/null +++ b/docs/modules/components/pages/inputs/gcp_spanner_change_stream.adoc @@ -0,0 +1,148 @@ += gcp_spanner_change_stream +:type: input +:status: beta +:categories: ["Services","GCP"] + + + +//// + THIS FILE IS AUTOGENERATED! + + To make changes, edit the corresponding source file under: + + https://github.com/redpanda-data/connect/tree/main/internal/impl/. + + And: + + https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl +//// + +// © 2024 Redpanda Data Inc. + + +component_type_dropdown::[] + + +Creates an input that consumes from a spanner change stream. + +Introduced in version 3.43.0. + + +[tabs] +====== +Common:: ++ +-- + +```yml +# Common config fields, showing default values +input: + label: "" + gcp_spanner_change_stream: + stream_dsn: projects//instances//databases/ # No default (required) + stream_id: "" + partition_dsn: projects//instances//databases/ # No default (optional) + partition_table: "" # No default (optional) + use_in_mememory_partition: false +``` + +-- +Advanced:: ++ +-- + +```yml +# All config fields, showing default values +input: + label: "" + gcp_spanner_change_stream: + stream_dsn: projects//instances//databases/ # No default (required) + stream_id: "" + start_time_epoch: 0 + partition_dsn: projects//instances//databases/ # No default (optional) + partition_table: "" # No default (optional) + use_in_mememory_partition: false + allowed_mod_types: + - INSERT + - UPDATE + - DELETE +``` + +-- +====== + +== Fields + +=== `stream_dsn` + +Required field to use to connect to spanner for the change stream. + + +*Type*: `string` + + +```yml +# Examples + +stream_dsn: projects//instances//databases/ +``` + +=== `stream_id` + +Required name of the change stream to track. + + +*Type*: `string` + +*Default*: `""` + +=== `start_time_epoch` + +Optional microsecond accurate epoch timestamp to start reading from. If empty time.Now() will be used. + + +*Type*: `int` + +*Default*: `0` + +=== `partition_dsn` + +Field used to set the DSN for the metadata partition table, can be the same as stream_dsn. + + +*Type*: `string` + + +```yml +# Examples + +partition_dsn: projects//instances//databases/ +``` + +=== `partition_table` + +Name of the table to create/use in spanner to track change stream partition metadata. + + +*Type*: `string` + + +=== `use_in_mememory_partition` + +use an in memory partition table for tracking the partitions. + + +*Type*: `bool` + +*Default*: `false` + +=== `allowed_mod_types` + +Mod types to allow through when reading the change stream, default all. + + +*Type*: `array` + +*Default*: `["INSERT","UPDATE","DELETE"]` + + diff --git a/go.mod b/go.mod index 9b6858ee41..10fbbf2880 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( cloud.google.com/go/aiplatform v1.68.0 cloud.google.com/go/bigquery v1.64.0 cloud.google.com/go/pubsub v1.45.1 + cloud.google.com/go/spanner v1.73.0 cloud.google.com/go/storage v1.43.0 cloud.google.com/go/vertexai v0.12.0 github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 @@ -25,6 +26,7 @@ require ( github.com/Masterminds/squirrel v1.5.4 github.com/PaesslerAG/gval v1.2.2 github.com/PaesslerAG/jsonpath v0.1.1 + github.com/anicoll/screamer v0.0.0-20241206035431-9ba919b54dfd github.com/apache/pulsar-client-go v0.13.1 github.com/authzed/authzed-go v1.0.0 github.com/authzed/grpcutil v0.0.0-20240123194739-2ea1e3d2d98b @@ -61,7 +63,7 @@ require ( github.com/eclipse/paho.mqtt.golang v1.5.0 github.com/generikvault/gvalstrings v0.0.0-20180926130504-471f38f0112a github.com/getsentry/sentry-go v0.28.1 - github.com/go-faker/faker/v4 v4.4.2 + github.com/go-faker/faker/v4 v4.5.0 github.com/go-jose/go-jose/v3 v3.0.3 github.com/go-resty/resty/v2 v2.15.3 github.com/go-sql-driver/mysql v1.8.1 @@ -144,7 +146,7 @@ require ( golang.org/x/crypto v0.28.0 golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c golang.org/x/net v0.30.0 - golang.org/x/sync v0.8.0 + golang.org/x/sync v0.9.0 golang.org/x/text v0.19.0 google.golang.org/api v0.205.0 google.golang.org/protobuf v1.35.1 @@ -156,7 +158,6 @@ require ( cloud.google.com/go/longrunning v0.6.2 // indirect cloud.google.com/go/monitoring v1.21.2 // indirect cloud.google.com/go/secretmanager v1.14.2 // indirect - cloud.google.com/go/spanner v1.73.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.12.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/keyvault/internal v0.7.1 // indirect github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.0 // indirect @@ -252,8 +253,8 @@ require ( github.com/couchbase/gocbcoreps v0.1.3 // indirect github.com/couchbase/goprotostellar v1.0.2 // indirect github.com/couchbaselabs/gocbconnstr/v2 v2.0.0-20240607131231-fb385523de28 // indirect - github.com/cpuguy83/dockercfg v0.3.1 // indirect - github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect + github.com/cpuguy83/dockercfg v0.3.2 // indirect + github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect github.com/danieljoos/wincred v1.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect @@ -385,11 +386,11 @@ require ( github.com/sirupsen/logrus v1.9.3 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/stretchr/objx v0.5.2 // indirect - github.com/testcontainers/testcontainers-go v0.33.0 + github.com/testcontainers/testcontainers-go v0.34.0 github.com/tilinna/z85 v1.0.0 // indirect github.com/tklauser/go-sysconf v0.3.13 // indirect github.com/tklauser/numcpus v0.7.0 // indirect - github.com/urfave/cli/v2 v2.27.4 + github.com/urfave/cli/v2 v2.27.5 github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect @@ -432,6 +433,4 @@ require ( modernc.org/token v1.1.0 // indirect ) -go 1.22.7 - -toolchain go1.23.0 +go 1.23.2 diff --git a/go.sum b/go.sum index f59df136f6..0e95499902 100644 --- a/go.sum +++ b/go.sum @@ -762,6 +762,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= +github.com/anicoll/screamer v0.0.0-20241206035431-9ba919b54dfd h1:Bz8Rksav0QRwRNzwdVeZAkMsm0PwNkJ7H4amsYt1xfw= +github.com/anicoll/screamer v0.0.0-20241206035431-9ba919b54dfd/go.mod h1:ZP3Cph589sR4ohmkdCBRVNcaR0YNQXw7E0d3a0YfQjc= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516/go.mod h1:QNYViu/X0HXDHw7m3KXzWSVXIbfUvJqBFe6Gj8/pYA0= github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 h1:q4dksr6ICHXqG5hm0ZW5IHyeEJXoIJSOZeBLmWPNeIQ= @@ -985,10 +987,10 @@ github.com/couchbaselabs/gocaves/client v0.0.0-20230404095311-05e3ba4f0259 h1:2T github.com/couchbaselabs/gocaves/client v0.0.0-20230404095311-05e3ba4f0259/go.mod h1:AVekAZwIY2stsJOMWLAS/0uA/+qdp7pjO8EHnl61QkY= github.com/couchbaselabs/gocbconnstr/v2 v2.0.0-20240607131231-fb385523de28 h1:lhGOw8rNG6RAadmmaJAF3PJ7MNt7rFuWG7BHCYMgnGE= github.com/couchbaselabs/gocbconnstr/v2 v2.0.0-20240607131231-fb385523de28/go.mod h1:o7T431UOfFVHDNvMBUmUxpHnhivwv7BziUao/nMl81E= -github.com/cpuguy83/dockercfg v0.3.1 h1:/FpZ+JaygUR/lZP2NlFI2DVfrOEMAIKP5wWEJdoYe9E= -github.com/cpuguy83/dockercfg v0.3.1/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc= -github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4= -github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/cpuguy83/dockercfg v0.3.2 h1:DlJTyZGBDlXqUZ2Dk2Q3xHs/FtnooJJVaad2S9GKorA= +github.com/cpuguy83/dockercfg v0.3.2/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc= +github.com/cpuguy83/go-md2man/v2 v2.0.5 h1:ZtcqGrnekaHpVLArFSe4HK5DoKx1T0rq2DwVB0alcyc= +github.com/cpuguy83/go-md2man/v2 v2.0.5/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.21 h1:1/QdRyBaHHJP61QkWMXlOIBfsgdDeeKfK8SYVUWJKf0= @@ -1085,8 +1087,8 @@ github.com/getsentry/sentry-go v0.28.1/go.mod h1:1fQZ+7l7eeJ3wYi82q5Hg8GqAPgefRq github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= -github.com/go-faker/faker/v4 v4.4.2 h1:96WeU9QKEqRUVYdjHquY2/5bAqmVM0IfGKHV5mbfqmQ= -github.com/go-faker/faker/v4 v4.4.2/go.mod h1:4K3v4AbKXYNHMQNaREMc9/kRB9j5JJzpFo6KHRvrcIw= +github.com/go-faker/faker/v4 v4.5.0 h1:ARzAY2XoOL9tOUK+KSecUQzyXQsUaZHefjyF8x6YFHc= +github.com/go-faker/faker/v4 v4.5.0/go.mod h1:p3oq1GRjG2PZ7yqeFFfQI20Xm61DoBDlCA8RiSyZ48M= github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw= github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw= github.com/go-faster/errors v0.7.1 h1:MkJTnDoEdi9pDabt1dpWf7AA8/BaSYZqibYyhZ20AYg= @@ -1832,8 +1834,8 @@ github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/testcontainers/testcontainers-go v0.33.0 h1:zJS9PfXYT5O0ZFXM2xxXfk4J5UMw/kRiISng037Gxdw= -github.com/testcontainers/testcontainers-go v0.33.0/go.mod h1:W80YpTa8D5C3Yy16icheD01UTDu+LmXIA2Keo+jWtT8= +github.com/testcontainers/testcontainers-go v0.34.0 h1:5fbgF0vIN5u+nD3IWabQwRybuB4GY8G2HHgCkbMzMHo= +github.com/testcontainers/testcontainers-go v0.34.0/go.mod h1:6P/kMkQe8yqPHfPWNulFGdFHTD8HB2vLq/231xY2iPQ= github.com/testcontainers/testcontainers-go/modules/ollama v0.32.0 h1:nuYlIE4zOGd8m+TzjY0v41kyfYre3inp/iw1p4qn2eU= github.com/testcontainers/testcontainers-go/modules/ollama v0.32.0/go.mod h1:PeIvLbruDuwReyIbJT6Km+ZRxSaqLsS8VcTqeDgxs1A= github.com/testcontainers/testcontainers-go/modules/qdrant v0.32.0 h1:rsk1UKBcu3s5V/msn6Hsizx7f70SPDyLZKlgz0xDEJ8= @@ -1881,8 +1883,8 @@ github.com/uptrace/bun/dialect/pgdialect v1.1.12 h1:m/CM1UfOkoBTglGO5CUTKnIKKOAp github.com/uptrace/bun/dialect/pgdialect v1.1.12/go.mod h1:Ij6WIxQILxLlL2frUBxUBOZJtLElD2QQNDcu/PWDHTc= github.com/uptrace/bun/driver/pgdriver v1.1.12 h1:3rRWB1GK0psTJrHwxzNfEij2MLibggiLdTqjTtfHc1w= github.com/uptrace/bun/driver/pgdriver v1.1.12/go.mod h1:ssYUP+qwSEgeDDS1xm2XBip9el1y9Mi5mTAvLoiADLM= -github.com/urfave/cli/v2 v2.27.4 h1:o1owoI+02Eb+K107p27wEX9Bb8eqIoZCfLXloLUSWJ8= -github.com/urfave/cli/v2 v2.27.4/go.mod h1:m4QzxcD2qpra4z7WhzEGn74WZLViBnMpb1ToCAKdGRQ= +github.com/urfave/cli/v2 v2.27.5 h1:WoHEJLdsXr6dDWoJgMq/CboDmyY/8HMMH1fTECbih+w= +github.com/urfave/cli/v2 v2.27.5/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ= github.com/vmihailenco/bufpool v0.1.11 h1:gOq2WmBrq0i2yW5QJ16ykccQ4wH9UyEsgLm6czKAd94= github.com/vmihailenco/bufpool v0.1.11/go.mod h1:AFf/MOy3l2CFTKbxwt0mp2MwnqjNEs5H/UxrkA5jxTQ= github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= @@ -2216,8 +2218,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= -golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ= +golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/internal/impl/gcp/input_spanner_change_stream.go b/internal/impl/gcp/input_spanner_change_stream.go new file mode 100644 index 0000000000..acaa47c617 --- /dev/null +++ b/internal/impl/gcp/input_spanner_change_stream.go @@ -0,0 +1,277 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcp + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "slices" + "time" + + "cloud.google.com/go/spanner" + "github.com/Jeffail/shutdown" + "github.com/anicoll/screamer/pkg/model" + "github.com/anicoll/screamer/pkg/partitionstorage" + "github.com/anicoll/screamer/pkg/screamer" + "github.com/redpanda-data/benthos/v4/public/service" +) + +type streamReader interface { + Stream(ctx context.Context, channel chan<- *model.DataChangeRecord) error + Close() error +} + +func newSpannerChangeStreamInputConfig() *service.ConfigSpec { + return service.NewConfigSpec(). + Beta(). + Version("3.43.0"). + Categories("Services", "GCP"). + Summary("Creates an input that consumes from a spanner change stream."). + Field(service.NewStringField("stream_dsn").Description("Required field to use to connect to spanner for the change stream.").Example("projects//instances//databases/")). + Field(service.NewStringField("stream_id").Description("Required name of the change stream to track.").Default("")). + Field(service.NewIntField("start_time_epoch").Advanced().Optional().Default(0).Description("Optional microsecond accurate epoch timestamp to start reading from. If empty time.Now() will be used.")). + Field(service.NewStringField("partition_dsn").Optional().Description("Field used to set the DSN for the metadata partition table, can be the same as stream_dsn.").Example("projects//instances//databases/")). + Field(service.NewStringField("partition_table").Optional().Description("Name of the table to create/use in spanner to track change stream partition metadata.")). + Field(service.NewBoolField("use_in_mememory_partition").Description("use an in memory partition table for tracking the partitions.").Default(false)). + Field(service.NewStringListField("allowed_mod_types").Advanced().Description("Mod types to allow through when reading the change stream, default all.").Default([]string{"INSERT", "UPDATE", "DELETE"})) +} + +func newSpannerStreamInput(conf *service.ParsedConfig, log *service.Logger) (out *spannerStreamInput, err error) { + out = &spannerStreamInput{ + // not buffered to prevent the cursor from getting too far ahead. + // there is still the chance that we could lose changes though. + changeChannel: make(chan *model.DataChangeRecord, 1), + log: log, + shutdownSig: shutdown.NewSignaller(), + } + if out.partitionDSN, err = conf.FieldString("partition_dsn"); err != nil { + return + } + + if out.partitionTable, err = conf.FieldString("partition_table"); err != nil { + return + } + + if out.streamDSN, err = conf.FieldString("stream_dsn"); err != nil { + return + } + + if out.streamID, err = conf.FieldString("stream_id"); err != nil { + return + } + + if out.allowedModTypes, err = conf.FieldStringList("allowed_mod_types"); err != nil { + return + } + + useInMemPartition, err := conf.FieldBool("use_in_mememory_partition") + if err != nil { + return + } + + startTimeEpoch, err := conf.FieldInt("start_time_epoch") + if err != nil { + return + } + + if startTimeEpoch > 0 { + out.startTime = func(seconds int) *time.Time { + t := time.UnixMicro(int64(startTimeEpoch)) + return &t + }(startTimeEpoch) + } + + if !useInMemPartition && slices.Contains([]string{out.partitionDSN, out.partitionTable, out.streamDSN, out.streamID}, "") { + return nil, errors.New("partition_dsn, partition_table, stream_dsn, and stream_id must be set") + } else if slices.Contains([]string{out.streamDSN, out.streamID}, "") { + return nil, errors.New("stream_dsn, and stream_id must be set") + } + out.usePartitionTable = !useInMemPartition + return +} + +func init() { + err := service.RegisterInput( + "gcp_spanner_change_stream", newSpannerChangeStreamInputConfig(), + func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) { + return newSpannerStreamInput(conf, mgr.Logger()) + }) + if err != nil { + panic(err) + } +} + +//------------------------------------------------------------------------------ + +type spannerStreamInput struct { + streamDSN string + streamClient *db + streamID string + partitionDSN string + partitionTable string + usePartitionTable bool + startTime *time.Time + allowedModTypes []string + reader streamReader + // create a channel to pass from connection to read. + changeChannel chan *model.DataChangeRecord + + log *service.Logger + shutdownSig *shutdown.Signaller +} + +func (i *spannerStreamInput) Connect(ctx context.Context) (err error) { + jobctx, _ := i.shutdownSig.SoftStopCtx(context.Background()) + + if i.streamClient == nil { + i.streamClient, err = newDatabase(jobctx, i.streamDSN) + if err != nil { + return err + } + } + if i.reader == nil { + i.reader, err = newStreamer(jobctx, i.streamClient, i.streamID, i.partitionDSN, i.partitionTable, i.usePartitionTable, i.allowedModTypes, i.startTime) + if err != nil { + return err + } + } + go func() { + if rerr := i.reader.Stream(jobctx, i.changeChannel); rerr != nil { + i.log.Errorf("Subscription error: %v\n", rerr) + close(i.changeChannel) + panic(rerr) + } + }() + return nil +} + +func (i *spannerStreamInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error) { + msg := <-i.changeChannel + data, err := json.Marshal(msg) + if err != nil { + return nil, nil, err + } + return service.NewMessage(data), func(ctx context.Context, err error) error { + // Nacks are retried automatically when we use service.AutoRetryNacks + return nil + }, nil +} + +func (i *spannerStreamInput) Close(_ context.Context) error { + close(i.changeChannel) + if i.reader != nil { + return i.reader.Close() + } + return nil +} + +// ------------------------- + +type streamerDB struct { + streamID, partitionTable string + changeStreamClient, partitionClient *db + subscriber *screamer.Subscriber + allowedModTypes []string +} + +func newStreamer(ctx context.Context, + changestreamClient *db, + streamID, partitionDSN, partitionTable string, + usePartitionTable bool, + modTypes []string, + startTime *time.Time, +) (streamReader, error) { + streamer := &streamerDB{ + streamID: streamID, + partitionTable: partitionTable, + allowedModTypes: modTypes, + changeStreamClient: changestreamClient, + } + + var pStorage screamer.PartitionStorage = partitionstorage.NewInmemory() + // only use DB meta partition table if explicitly enabled. + if usePartitionTable { + partitionClient, err := newDatabase(ctx, partitionDSN) + if err != nil { + return nil, err + } + streamer.partitionClient = partitionClient + + spannerPartitionStorage := partitionstorage.NewSpanner(partitionClient.client, partitionTable) + if err := spannerPartitionStorage.CreateTableIfNotExists(ctx); err != nil { + return nil, err + } + // assign here as we need to use the partition storage in the subscriber. + pStorage = spannerPartitionStorage + } + + options := []screamer.Option{} + // if provided with a specific startime. use that. + if startTime != nil { + options = append(options, screamer.WithStartTimestamp(*startTime)) + } + subscriber := screamer.NewSubscriber(streamer.changeStreamClient.client, streamID, pStorage, options...) + + streamer.subscriber = subscriber + + return streamer, nil +} + +// Stream provides a stream of change records from a Spanner database configured stream to your provided channel. +// Stream is blocking unless the provided context is cancelled or an error occurs. +func (s *streamerDB) Stream(ctx context.Context, channel chan<- *model.DataChangeRecord) error { + return s.subscriber.SubscribeFunc(ctx, func(dcr *model.DataChangeRecord) error { + if slices.Contains(s.allowedModTypes, string(dcr.ModType)) { + channel <- dcr + } + return nil + }) +} + +func (s *streamerDB) Close() error { + if s.changeStreamClient != nil { + s.changeStreamClient.Close() + } + if s.partitionClient != nil { + s.partitionClient.Close() + } + return nil +} + +// ----------------- + +type db struct { + client *spanner.Client +} + +func newDatabase(ctx context.Context, dsn string) (*db, error) { + spannerClient, err := spanner.NewClient(ctx, dsn) + if err != nil { + return nil, fmt.Errorf("spanner.NewClient: %w", err) + } + + return &db{ + client: spannerClient, + }, nil +} + +func (s *db) Close() error { + if s.client != nil { + s.client.Close() + } + return nil +} diff --git a/internal/impl/gcp/input_spanner_change_stream_mock_test.go b/internal/impl/gcp/input_spanner_change_stream_mock_test.go new file mode 100644 index 0000000000..6e7beea0a5 --- /dev/null +++ b/internal/impl/gcp/input_spanner_change_stream_mock_test.go @@ -0,0 +1,39 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcp + +import ( + "context" + + "github.com/anicoll/screamer/pkg/model" + "github.com/stretchr/testify/mock" +) + +type mockStreamReader struct { + mock.Mock +} + +var _ streamReader = &mockStreamReader{} + +func (mt *mockStreamReader) Stream(ctx context.Context, channel chan<- *model.DataChangeRecord) error { + args := mt.Called(ctx, channel) + + return args.Error(0) +} + +func (mt *mockStreamReader) Close() error { + args := mt.Called() + return args.Error(0) +} diff --git a/internal/impl/gcp/input_spanner_change_stream_test.go b/internal/impl/gcp/input_spanner_change_stream_test.go new file mode 100644 index 0000000000..bbb6d4a759 --- /dev/null +++ b/internal/impl/gcp/input_spanner_change_stream_test.go @@ -0,0 +1,153 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcp + +import ( + "context" + "encoding/json" + "testing" + "time" + + "cloud.google.com/go/spanner/spannertest" + "github.com/anicoll/screamer/pkg/model" + "github.com/google/uuid" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +var testSpannerStreamInputYAML = ` +stream_dsn: "projects/test-project/instances/test-instance/databases/test-db" +stream_id: "OutboxStream" +use_in_mememory_partition: true +partition_dsn: "projects/test/instances/test/databases/test-events-md" # optional default "" +partition_table: "meta_partitions_table" # optional default "" +allowed_mod_types: + - "INSERT" + - "UPDATE" + - "DELETE" +` + +func TestGCPSpannerChangeStreamInput_Read(t *testing.T) { + spec := newSpannerChangeStreamInputConfig() + + parsed, err := spec.ParseYAML(testSpannerStreamInputYAML, nil) + require.NoError(t, err) + + proc, err := newSpannerStreamInput(parsed, nil) + require.NoError(t, err) + + mockStreamReader := &mockStreamReader{} + proc.reader = mockStreamReader + + dataChangeRecord := &model.DataChangeRecord{ + CommitTimestamp: time.Now(), + RecordSequence: "0000001", + ServerTransactionID: uuid.NewString(), + IsLastRecordInTransactionInPartition: true, + TableName: "test_table", + ColumnTypes: []*model.ColumnType{ + {Name: "ID", Type: model.Type{Code: model.TypeCode_INT64}}, + {Name: "Value", Type: model.Type{Code: model.TypeCode_STRING}}, + }, + Mods: []*model.Mod{ + { + Keys: map[string]interface{}{}, + NewValues: map[string]interface{}{}, + OldValues: map[string]interface{}{}, + }, + }, + ModType: model.ModType_INSERT, + NumberOfRecordsInTransaction: 1, + NumberOfPartitionsInTransaction: 2, + } + proc.changeChannel <- dataChangeRecord + ctx := context.Background() + + msg, _, err := proc.Read(ctx) + require.NoError(t, err) + require.NotNil(t, msg) + + expectedMsg, err := json.Marshal(dataChangeRecord) + require.NoError(t, err) + + gotMsg, err := msg.AsBytes() + require.NoError(t, err) + require.Equal(t, expectedMsg, gotMsg) +} + +func TestGCPSpannerChangeStreamInput_Connect(t *testing.T) { + spec := newSpannerChangeStreamInputConfig() + ctx, cancel := context.WithCancel(context.Background()) + + parsed, err := spec.ParseYAML(testSpannerStreamInputYAML, nil) + require.NoError(t, err) + + proc, err := newSpannerStreamInput(parsed, nil) + require.NoError(t, err) + + srv, err := spannertest.NewServer("localhost:0") + require.NoError(t, err) + t.Setenv("SPANNER_EMULATOR_HOST", srv.Addr) + + t.Cleanup(func() { + srv.Close() + }) + + mockStreamReader := &mockStreamReader{} + proc.reader = mockStreamReader + + mockStreamReader.On("Stream", mock.AnythingOfType("*context.cancelCtx"), mock.Anything).Once().Return(nil) + defer mockStreamReader.AssertExpectations(t) + + err = proc.Connect(ctx) + require.NoError(t, err) + cancel() + time.Sleep(time.Millisecond * 100) +} + +func TestGCPSpannerChangeStreamInput_Close(t *testing.T) { + spec := newSpannerChangeStreamInputConfig() + ctx, cancel := context.WithCancel(context.Background()) + + parsed, err := spec.ParseYAML(testSpannerStreamInputYAML, nil) + require.NoError(t, err) + + proc, err := newSpannerStreamInput(parsed, nil) + require.NoError(t, err) + + srv, err := spannertest.NewServer("localhost:0") + require.NoError(t, err) + t.Setenv("SPANNER_EMULATOR_HOST", srv.Addr) + + t.Cleanup(func() { + srv.Close() + }) + + mockStreamReader := &mockStreamReader{} + defer mockStreamReader.AssertExpectations(t) + proc.reader = mockStreamReader + + mockStreamReader.On("Stream", mock.AnythingOfType("*context.cancelCtx"), mock.Anything).Once().Return(nil) + + mockStreamReader.On("Close", mock.Anything).Once().Return(nil) + + err = proc.Connect(ctx) + require.NoError(t, err) + cancel() + time.Sleep(time.Millisecond * 100) + + err = proc.Close(context.Background()) + require.NoError(t, err) +} diff --git a/internal/plugins/info.csv b/internal/plugins/info.csv index 397eb732c5..c57c787274 100644 --- a/internal/plugins/info.csv +++ b/internal/plugins/info.csv @@ -84,6 +84,7 @@ gcp_cloud_storage ,output ,GCP Cloud Storage ,3.43.0 ,certif gcp_cloudtrace ,tracer ,GCP Cloud Trace ,4.2.0 ,certified ,n ,y ,y gcp_pubsub ,input ,GCP PubSub ,0.0.0 ,certified ,n ,y ,y gcp_pubsub ,output ,GCP PubSub ,0.0.0 ,certified ,n ,y ,y +gcp_spanner_change_stream ,input ,gcp_spanner_change_stream ,0.0.0 ,community ,n ,n ,n gcp_vertex_ai_chat ,processor ,GCP Vertex AI ,4.34.0 ,enterprise ,n ,y ,y gcp_vertex_ai_embeddings ,processor ,gcp_vertex_ai_embeddings ,4.37.0 ,enterprise ,n ,y ,y generate ,input ,generate ,3.40.0 ,certified ,n ,y ,y