diff --git a/CHANGELOG.md b/CHANGELOG.md index 8dc835bef8..1f124ee6ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ All notable changes to this project will be documented in this file. - `snowpipe_streaming` now supports exactly once delivery using `offset_token`. (@rockwotj) - `ollama_chat` now supports tool calling. (@rockwotj) - New `ollama_moderation` which allows using LlamaGuard or ShieldGemma to check if LLM responses are safe. (@rockwotj) +- New `mysql_cdc` input supporting change data capture (CDC) from MySQL. (@rockwotj, @le-vlad) ### Fixed diff --git a/docs/modules/components/pages/inputs/mysql_cdc.adoc b/docs/modules/components/pages/inputs/mysql_cdc.adoc new file mode 100644 index 0000000000..16bb09675e --- /dev/null +++ b/docs/modules/components/pages/inputs/mysql_cdc.adoc @@ -0,0 +1,281 @@ += mysql_cdc +:type: input +:status: beta +:categories: ["Services"] + + + +//// + THIS FILE IS AUTOGENERATED! + + To make changes, edit the corresponding source file under: + + https://github.com/redpanda-data/connect/tree/main/internal/impl/. + + And: + + https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl +//// + +// © 2024 Redpanda Data Inc. + + +component_type_dropdown::[] + + +Enables MySQL streaming for RedPanda Connect. + +Introduced in version 4.45.0. + + +[tabs] +====== +Common:: ++ +-- + +```yml +# Common config fields, showing default values +input: + label: "" + mysql_cdc: + dsn: user:password@tcp(localhost:3306)/database # No default (required) + tables: [] # No default (required) + checkpoint_cache: "" # No default (required) + checkpoint_key: mysql_binlog_position + snapshot_max_batch_size: 1000 + stream_snapshot: false # No default (required) + auto_replay_nacks: true + checkpoint_limit: 1024 + batching: + count: 0 + byte_size: 0 + period: "" + check: "" +``` + +-- +Advanced:: ++ +-- + +```yml +# All config fields, showing default values +input: + label: "" + mysql_cdc: + dsn: user:password@tcp(localhost:3306)/database # No default (required) + tables: [] # No default (required) + checkpoint_cache: "" # No default (required) + checkpoint_key: mysql_binlog_position + snapshot_max_batch_size: 1000 + stream_snapshot: false # No default (required) + auto_replay_nacks: true + checkpoint_limit: 1024 + batching: + count: 0 + byte_size: 0 + period: "" + check: "" + processors: [] # No default (optional) +``` + +-- +====== + +== Metadata + +This input adds the following metadata fields to each message: + +- operation +- table +- binlog_position + + +== Fields + +=== `dsn` + +The DSN of the MySQL database to connect to. + + +*Type*: `string` + + +```yml +# Examples + +dsn: user:password@tcp(localhost:3306)/database +``` + +=== `tables` + +A list of tables to stream from the database. + + +*Type*: `array` + + +```yml +# Examples + +tables: + - table1 + - table2 +``` + +=== `checkpoint_cache` + +A https://www.docs.redpanda.com/redpanda-connect/components/caches/about[cache resource^] to use for storing the current latest BinLog Position that has been successfully delivered, this allows Redpanda Connect to continue from that BinLog Position upon restart, rather than consume the entire state of the table. + + +*Type*: `string` + + +=== `checkpoint_key` + +The key to use to store the snapshot position in `checkpoint_cache`. An alternative key can be provided if multiple CDC inputs share the same cache. + + +*Type*: `string` + +*Default*: `"mysql_binlog_position"` + +=== `snapshot_max_batch_size` + +The maximum number of rows to be streamed in a single batch when taking a snapshot. + + +*Type*: `int` + +*Default*: `1000` + +=== `stream_snapshot` + +If set to true, the connector will query all the existing data as a part of snapshot process. Otherwise, it will start from the current binlog position. + + +*Type*: `bool` + + +=== `auto_replay_nacks` + +Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation. + + +*Type*: `bool` + +*Default*: `true` + +=== `checkpoint_limit` + +The maximum number of messages that can be processed at a given time. Increasing this limit enables parallel processing and batching at the output level. Any given BinLog Position will not be acknowledged unless all messages under that offset are delivered in order to preserve at least once delivery guarantees. + + +*Type*: `int` + +*Default*: `1024` + +=== `batching` + +Allows you to configure a xref:configuration:batching.adoc[batching policy]. + + +*Type*: `object` + + +```yml +# Examples + +batching: + byte_size: 5000 + count: 0 + period: 1s + +batching: + count: 10 + period: 1s + +batching: + check: this.contains("END BATCH") + count: 0 + period: 1m +``` + +=== `batching.count` + +A number of messages at which the batch should be flushed. If `0` disables count based batching. + + +*Type*: `int` + +*Default*: `0` + +=== `batching.byte_size` + +An amount of bytes at which the batch should be flushed. If `0` disables size based batching. + + +*Type*: `int` + +*Default*: `0` + +=== `batching.period` + +A period in which an incomplete batch should be flushed regardless of its size. + + +*Type*: `string` + +*Default*: `""` + +```yml +# Examples + +period: 1s + +period: 1m + +period: 500ms +``` + +=== `batching.check` + +A xref:guides:bloblang/about.adoc[Bloblang query] that should return a boolean value indicating whether a message should end a batch. + + +*Type*: `string` + +*Default*: `""` + +```yml +# Examples + +check: this.type == "end_of_transaction" +``` + +=== `batching.processors` + +A list of xref:components:processors/about.adoc[processors] to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. + + +*Type*: `array` + + +```yml +# Examples + +processors: + - archive: + format: concatenate + +processors: + - archive: + format: lines + +processors: + - archive: + format: json_array +``` + + diff --git a/go.mod b/go.mod index 3e704d1ae1..d77833db76 100644 --- a/go.mod +++ b/go.mod @@ -65,6 +65,7 @@ require ( github.com/getsentry/sentry-go v0.28.1 github.com/go-faker/faker/v4 v4.4.2 github.com/go-jose/go-jose/v3 v3.0.3 + github.com/go-mysql-org/go-mysql v1.10.0 github.com/go-resty/resty/v2 v2.15.3 github.com/go-sql-driver/mysql v1.8.1 github.com/gocql/gocql v1.6.0 @@ -161,8 +162,10 @@ require ( cloud.google.com/go/spanner v1.73.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.12.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/keyvault/internal v0.7.1 // indirect + github.com/BurntSushi/toml v1.4.1-0.20240526193622-a339e1f7089c // indirect github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.24.1 // indirect + github.com/Masterminds/semver v1.5.0 // indirect github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.34.3 // indirect github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d // indirect @@ -177,8 +180,14 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/onsi/gomega v1.34.2 // indirect + github.com/pingcap/errors v0.11.5-0.20240311024730-e056997136bb // indirect + github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect + github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 // indirect + github.com/pingcap/tidb/pkg/parser v0.0.0-20241118164214-4f047be191be // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/samber/lo v1.47.0 // indirect + github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 // indirect + github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 // indirect github.com/tidwall/gjson v1.18.0 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.1 // indirect diff --git a/go.sum b/go.sum index 68035027bc..4ec9efadf9 100644 --- a/go.sum +++ b/go.sum @@ -695,6 +695,8 @@ github.com/AzureAD/microsoft-authentication-extensions-for-go/cache v0.1.1/go.mo github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU= github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/BurntSushi/toml v1.4.1-0.20240526193622-a339e1f7089c h1:pxW6RcqyfI9/kWtOwnv/G+AzdKuy2ZrqINhenH4HyNs= +github.com/BurntSushi/toml v1.4.1-0.20240526193622-a339e1f7089c/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/ClickHouse/ch-go v0.61.5 h1:zwR8QbYI0tsMiEcze/uIMK+Tz1D3XZXLdNrlaOpeEI4= github.com/ClickHouse/ch-go v0.61.5/go.mod h1:s1LJW/F/LcFs5HJnuogFMta50kKDO0lf9zzfrbl0RQg= @@ -728,6 +730,8 @@ github.com/Jeffail/shutdown v1.0.0 h1:afYjnY4pksqP/012m3NGJVccDI+WATdSzIMVHZKU8/ github.com/Jeffail/shutdown v1.0.0/go.mod h1:5dT4Y1oe60SJELCkmAB1pr9uQyHBhh6cwDLQTfmuO5U= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= +github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww= +github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y= github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= github.com/Masterminds/semver/v3 v3.3.0 h1:B8LGeaivUe71a5qox1ICM/JLl0NqZSW5CHyL+hmvYS0= github.com/Masterminds/semver/v3 v3.3.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= @@ -1114,6 +1118,8 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-mysql-org/go-mysql v1.10.0 h1:9iEPrZdHKq6EepUuPONrBA+wc3aL1WLhbUm5w8ryDFg= +github.com/go-mysql-org/go-mysql v1.10.0/go.mod h1:GzFQAI+FqbYAPtsannL0hmZH6zcLzCQbwqopT9bgTt0= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= @@ -1666,8 +1672,15 @@ github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pinecone-io/go-pinecone v1.0.0 h1:90euw+0EKSgdeE9q7iGSTVmdx9r9+x3mxWkrCCLab+o= github.com/pinecone-io/go-pinecone v1.0.0/go.mod h1:KfJhn4yThX293+fbtrZLnxe2PJYo8557Py062W4FYKk= -github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= -github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= +github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= +github.com/pingcap/errors v0.11.5-0.20240311024730-e056997136bb h1:3pSi4EDG6hg0orE1ndHkXvX6Qdq2cZn8gAPir8ymKZk= +github.com/pingcap/errors v0.11.5-0.20240311024730-e056997136bb/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= +github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE= +github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4= +github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 h1:2SOzvGvE8beiC1Y4g9Onkvu6UmuBBOeWRGQEjJaT/JY= +github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= +github.com/pingcap/tidb/pkg/parser v0.0.0-20241118164214-4f047be191be h1:t5EkCmZpxLCig5GQA0AZG47aqsuL5GTsJeeUD+Qfies= +github.com/pingcap/tidb/pkg/parser v0.0.0-20241118164214-4f047be191be/go.mod h1:Hju1TEWZvrctQKbztTRwXH7rd41Yq0Pgmq4PrEKcq7o= github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= @@ -1786,6 +1799,10 @@ github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFR github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= +github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 h1:xT+JlYxNGqyT+XcU8iUrN18JYed2TvG9yN5ULG2jATM= +github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw= +github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 h1:oI+RNwuC9jF2g2lP0u0cVEEZrc/AYBCuFdvwrLWM/6Q= +github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07/go.mod h1:yFdBgwXP24JziuRl2NMUahT7nGLNOKi1SIiFxMttVD4= github.com/sijms/go-ora/v2 v2.8.19 h1:7LoKZatDYGi18mkpQTR/gQvG9yOdtc7hPAex96Bqisc= github.com/sijms/go-ora/v2 v2.8.19/go.mod h1:EHxlY6x7y9HAsdfumurRfTd+v8NrEOTR3Xl4FWlH6xk= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= @@ -1988,6 +2005,7 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= @@ -1997,6 +2015,7 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/ go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= @@ -2004,6 +2023,7 @@ go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= +go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20180723164146-c126467f60eb/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= diff --git a/internal/impl/mysql/event.go b/internal/impl/mysql/event.go new file mode 100644 index 0000000000..a13470f649 --- /dev/null +++ b/internal/impl/mysql/event.go @@ -0,0 +1,62 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed as a Redpanda Enterprise file under the Redpanda Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/redpanda-data/connect/v4/blob/main/licenses/rcl.md + +package mysql + +import ( + "fmt" + "strconv" + "strings" + + "github.com/go-mysql-org/go-mysql/mysql" +) + +type position = mysql.Position + +// MessageOperation is a string type specifying message operation +type MessageOperation string + +const ( + // MessageOperationRead represents read from snapshot + MessageOperationRead MessageOperation = "read" + // MessageOperationInsert represents insert statement in mysql binlog + MessageOperationInsert MessageOperation = "insert" + // MessageOperationUpdate represents update statement in mysql binlog + MessageOperationUpdate MessageOperation = "update" + // MessageOperationDelete represents delete statement in mysql binlog + MessageOperationDelete MessageOperation = "delete" +) + +// MessageEvent represents a message from mysql cdc plugin +type MessageEvent struct { + Row map[string]any `json:"row"` + Table string `json:"table"` + Operation MessageOperation `json:"operation"` + Position *position `json:"position"` +} + +func binlogPositionToString(pos position) string { + // Pad the position so this string is lexicographically ordered. + return fmt.Sprintf("%s@%08X", pos.Name, pos.Pos) +} + +func parseBinlogPosition(str string) (pos position, err error) { + idx := strings.LastIndexByte(str, '@') + if idx == -1 { + err = fmt.Errorf("invalid binlog string: %s", str) + return + } + pos.Name = str[:idx] + var offset uint64 + offset, err = strconv.ParseUint(str[idx+1:], 16, 32) + pos.Pos = uint32(offset) + if err != nil { + err = fmt.Errorf("invalid binlog string offset: %w", err) + } + return +} diff --git a/internal/impl/mysql/event_test.go b/internal/impl/mysql/event_test.go new file mode 100644 index 0000000000..f6b3a5bfd3 --- /dev/null +++ b/internal/impl/mysql/event_test.go @@ -0,0 +1,41 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed as a Redpanda Enterprise file under the Redpanda Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/redpanda-data/connect/v4/blob/main/licenses/rcl.md + +package mysql + +import ( + "math" + "strconv" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestBinlogString(t *testing.T) { + good := []position{ + {Name: "log.0000", Pos: 32}, + {Name: "log@0000", Pos: 32}, + {Name: "log.09999999", Pos: 0}, + {Name: "custom-binlog.9999999", Pos: math.MaxUint32}, + } + for _, expected := range good { + str := binlogPositionToString(expected) + actual, err := parseBinlogPosition(str) + require.NoError(t, err) + require.Equal(t, expected, actual) + } + bad := []string{ + "log.000", + "log.000@" + strconv.FormatUint(math.MaxUint64, 16), + "log.000.FF", + } + for _, str := range bad { + _, err := parseBinlogPosition(str) + require.Error(t, err) + } +} diff --git a/internal/impl/mysql/input_mysql_stream.go b/internal/impl/mysql/input_mysql_stream.go new file mode 100644 index 0000000000..b2b1173dcf --- /dev/null +++ b/internal/impl/mysql/input_mysql_stream.go @@ -0,0 +1,775 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed as a Redpanda Enterprise file under the Redpanda Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/redpanda-data/connect/v4/blob/main/licenses/rcl.md + +package mysql + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "regexp" + "strings" + "sync" + "time" + + "github.com/Jeffail/checkpoint" + "github.com/Jeffail/shutdown" + "github.com/go-mysql-org/go-mysql/canal" + "github.com/go-mysql-org/go-mysql/replication" + "github.com/go-mysql-org/go-mysql/schema" + "github.com/go-sql-driver/mysql" + "github.com/redpanda-data/benthos/v4/public/service" + "golang.org/x/sync/errgroup" +) + +const ( + fieldMySQLDSN = "dsn" + fieldMySQLTables = "tables" + fieldStreamSnapshot = "stream_snapshot" + fieldSnapshotMaxBatchSize = "snapshot_max_batch_size" + fieldBatching = "batching" + fieldCheckpointKey = "checkpoint_key" + fieldCheckpointCache = "checkpoint_cache" + fieldCheckpointLimit = "checkpoint_limit" + + shutdownTimeout = 5 * time.Second +) + +var mysqlStreamConfigSpec = service.NewConfigSpec(). + Beta(). + Categories("Services"). + Version("4.45.0"). + Summary("Enables MySQL streaming for RedPanda Connect."). + Description(` +== Metadata + +This input adds the following metadata fields to each message: + +- operation +- table +- binlog_position +`). + Fields( + service.NewStringField(fieldMySQLDSN). + Description("The DSN of the MySQL database to connect to."). + Example("user:password@tcp(localhost:3306)/database"), + service.NewStringListField(fieldMySQLTables). + Description("A list of tables to stream from the database."). + Example([]string{"table1", "table2"}), + service.NewStringField(fieldCheckpointCache). + Description("A https://www.docs.redpanda.com/redpanda-connect/components/caches/about[cache resource^] to use for storing the current latest BinLog Position that has been successfully delivered, this allows Redpanda Connect to continue from that BinLog Position upon restart, rather than consume the entire state of the table."), + service.NewStringField(fieldCheckpointKey). + Description("The key to use to store the snapshot position in `"+fieldCheckpointCache+"`. An alternative key can be provided if multiple CDC inputs share the same cache."). + Default("mysql_binlog_position"), + service.NewIntField(fieldSnapshotMaxBatchSize). + Description("The maximum number of rows to be streamed in a single batch when taking a snapshot."). + Default(1000), + service.NewBoolField(fieldStreamSnapshot). + Description("If set to true, the connector will query all the existing data as a part of snapshot process. Otherwise, it will start from the current binlog position."), + service.NewAutoRetryNacksToggleField(), + service.NewIntField(fieldCheckpointLimit). + Description("The maximum number of messages that can be processed at a given time. Increasing this limit enables parallel processing and batching at the output level. Any given BinLog Position will not be acknowledged unless all messages under that offset are delivered in order to preserve at least once delivery guarantees."). + Default(1024), + service.NewBatchPolicyField(fieldBatching), + ) + +type asyncMessage struct { + msg service.MessageBatch + ackFn service.AckFunc +} + +type mysqlStreamInput struct { + canal.DummyEventHandler + + mutex sync.Mutex + // canal stands for mysql binlog listener connection + canal *canal.Canal + mysqlConfig *mysql.Config + binLogCache string + binLogCacheKey string + currentBinlogName string + + dsn string + tables []string + streamSnapshot bool + + batching service.BatchPolicy + batchPolicy *service.Batcher + tablesFilterMap map[string]bool + checkPointLimit int + fieldSnapshotMaxBatchSize int + + logger *service.Logger + res *service.Resources + + rawMessageEvents chan MessageEvent + msgChan chan asyncMessage + cp *checkpoint.Capped[*position] + + shutSig *shutdown.Signaller +} + +func newMySQLStreamInput(conf *service.ParsedConfig, res *service.Resources) (s service.BatchInput, err error) { + i := mysqlStreamInput{ + logger: res.Logger(), + rawMessageEvents: make(chan MessageEvent), + msgChan: make(chan asyncMessage), + res: res, + } + + var batching service.BatchPolicy + + if i.dsn, err = conf.FieldString(fieldMySQLDSN); err != nil { + return nil, err + } + + i.mysqlConfig, err = mysql.ParseDSN(i.dsn) + if err != nil { + return nil, fmt.Errorf("error parsing mysql DSN: %v", err) + } + + if i.tables, err = conf.FieldStringList(fieldMySQLTables); err != nil { + return nil, err + } + + if i.streamSnapshot, err = conf.FieldBool(fieldStreamSnapshot); err != nil { + return nil, err + } + + if i.fieldSnapshotMaxBatchSize, err = conf.FieldInt(fieldSnapshotMaxBatchSize); err != nil { + return nil, err + } + + if i.checkPointLimit, err = conf.FieldInt(fieldCheckpointLimit); err != nil { + return nil, err + } + + if i.binLogCache, err = conf.FieldString(fieldCheckpointCache); err != nil { + return nil, err + } + if !conf.Resources().HasCache(i.binLogCache) { + return nil, fmt.Errorf("unknown cache resource: %s", i.binLogCache) + } + if i.binLogCacheKey, err = conf.FieldString(fieldCheckpointKey); err != nil { + return nil, err + } + + i.cp = checkpoint.NewCapped[*position](int64(i.checkPointLimit)) + + i.tablesFilterMap = map[string]bool{} + for _, table := range i.tables { + if err = validateTableName(table); err != nil { + return nil, err + } + i.tablesFilterMap[table] = true + } + + if batching, err = conf.FieldBatchPolicy(fieldBatching); err != nil { + return nil, err + } else if batching.IsNoop() { + batching.Count = 1 + } + + i.batching = batching + if i.batchPolicy, err = i.batching.NewBatcher(res); err != nil { + return nil, err + } else if batching.IsNoop() { + batching.Count = 1 + } + + r, err := service.AutoRetryNacksBatchedToggled(conf, &i) + if err != nil { + return nil, err + } + + return conf.WrapBatchInputExtractTracingSpanMapping("mysql_cdc", r) +} + +func init() { + err := service.RegisterBatchInput("mysql_cdc", mysqlStreamConfigSpec, newMySQLStreamInput) + if err != nil { + panic(err) + } +} + +// ---- Redpanda Connect specific methods---- + +func (i *mysqlStreamInput) Connect(ctx context.Context) error { + canalConfig := canal.NewDefaultConfig() + canalConfig.Addr = i.mysqlConfig.Addr + canalConfig.User = i.mysqlConfig.User + canalConfig.Password = i.mysqlConfig.Passwd + // resetting dump path since we are doing snapshot manually + // this is required since canal will try to prepare dumper on init stage + canalConfig.Dump.ExecutionPath = "" + + // Parse and set additional parameters + canalConfig.Charset = i.mysqlConfig.Collation + if i.mysqlConfig.TLS != nil { + canalConfig.TLSConfig = i.mysqlConfig.TLS + } + // Parse time values as time.Time values not strings + canalConfig.ParseTime = true + // canalConfig.Logger + + for _, table := range i.tables { + canalConfig.IncludeTableRegex = append(canalConfig.IncludeTableRegex, regexp.QuoteMeta(table)) + } + + c, err := canal.NewCanal(canalConfig) + if err != nil { + return err + } + c.AddDumpTables(i.mysqlConfig.DBName, i.tables...) + + i.canal = c + + pos, err := i.getCachedBinlogPosition(ctx) + if err != nil { + return fmt.Errorf("unable to get cached binlog position: %s", err) + } + // create snapshot instance if we were requested and haven't finished it before. + var snapshot *Snapshot + if i.streamSnapshot && pos == nil { + db, err := sql.Open("mysql", i.dsn) + if err != nil { + return fmt.Errorf("failed to connect to MySQL server: %s", err) + } + snapshot = NewSnapshot(i.logger, db) + } + + // Reset the shutSig + sig := shutdown.NewSignaller() + i.shutSig = sig + go func() { + ctx, _ := sig.SoftStopCtx(context.Background()) + wg, ctx := errgroup.WithContext(ctx) + wg.Go(func() error { + <-ctx.Done() + i.canal.Close() + return nil + }) + wg.Go(func() error { return i.readMessages(ctx) }) + wg.Go(func() error { return i.startMySQLSync(ctx, pos, snapshot) }) + if err := wg.Wait(); err != nil && !errors.Is(err, context.Canceled) { + i.logger.Errorf("error during MySQL CDC: %s", err) + } else { + i.logger.Info("successfully shutdown MySQL CDC stream") + } + sig.TriggerHasStopped() + }() + + return nil +} + +func (i *mysqlStreamInput) startMySQLSync(ctx context.Context, pos *position, snapshot *Snapshot) error { + // If we are given a snapshot, then we need to read it. + if snapshot != nil { + startPos, err := snapshot.prepareSnapshot(ctx) + if err != nil { + _ = snapshot.close() + return fmt.Errorf("unable to prepare snapshot: %w", err) + } + if err = i.readSnapshot(ctx, snapshot); err != nil { + _ = snapshot.close() + return fmt.Errorf("failed reading snapshot: %w", err) + } + if err = snapshot.releaseSnapshot(ctx); err != nil { + _ = snapshot.close() + return fmt.Errorf("unable to release snapshot: %w", err) + } + if err = snapshot.close(); err != nil { + return fmt.Errorf("unable to close snapshot: %w", err) + } + pos = startPos + } else if pos == nil { + coords, err := i.canal.GetMasterPos() + if err != nil { + return fmt.Errorf("unable to get start binlog position: %w", err) + } + pos = &coords + } + i.logger.Infof("starting MySQL CDC stream from binlog %s at offset %d", pos.Name, pos.Pos) + i.currentBinlogName = pos.Name + i.canal.SetEventHandler(i) + if err := i.canal.RunFrom(*pos); err != nil { + return fmt.Errorf("failed to start streaming: %w", err) + } + return nil +} + +func (i *mysqlStreamInput) readSnapshot(ctx context.Context, snapshot *Snapshot) error { + // TODO(cdc): Process tables in parallel + for _, table := range i.tables { + tablePks, err := snapshot.getTablePrimaryKeys(ctx, table) + if err != nil { + return err + } + i.logger.Tracef("primary keys for table %s: %v", table, tablePks) + lastSeenPksValues := map[string]any{} + for _, pk := range tablePks { + lastSeenPksValues[pk] = nil + } + + var numRowsProcessed int + for { + var batchRows *sql.Rows + if numRowsProcessed == 0 { + batchRows, err = snapshot.querySnapshotTable(ctx, table, tablePks, nil, i.fieldSnapshotMaxBatchSize) + } else { + batchRows, err = snapshot.querySnapshotTable(ctx, table, tablePks, &lastSeenPksValues, i.fieldSnapshotMaxBatchSize) + } + if err != nil { + return fmt.Errorf("failed to execute snapshot table query: %s", err) + } + + types, err := batchRows.ColumnTypes() + if err != nil { + return fmt.Errorf("failed to fetch column types: %s", err) + } + + values, mappers := prepSnapshotScannerAndMappers(types) + + columns, err := batchRows.Columns() + if err != nil { + return fmt.Errorf("failed to fetch columns: %s", err) + } + + var batchRowsCount int + for batchRows.Next() { + numRowsProcessed++ + batchRowsCount++ + + if err := batchRows.Scan(values...); err != nil { + return err + } + + row := map[string]any{} + for idx, value := range values { + v, err := mappers[idx](value) + if err != nil { + return err + } + row[columns[idx]] = v + if _, ok := lastSeenPksValues[columns[idx]]; ok { + lastSeenPksValues[columns[idx]] = value + } + } + + select { + case i.rawMessageEvents <- MessageEvent{ + Row: row, + Operation: MessageOperationRead, + Table: table, + Position: nil, + }: + case <-ctx.Done(): + return ctx.Err() + } + } + + if err := batchRows.Err(); err != nil { + return fmt.Errorf("failed to iterate snapshot table: %s", err) + } + + if batchRowsCount < i.fieldSnapshotMaxBatchSize { + break + } + } + } + return nil +} + +func snapshotValueMapper[T any](v any) (any, error) { + s, ok := v.(*sql.Null[T]) + if !ok { + var e T + return nil, fmt.Errorf("expected %T got %T", e, v) + } + if !s.Valid { + return nil, nil + } + return s.V, nil +} + +func prepSnapshotScannerAndMappers(cols []*sql.ColumnType) (values []any, mappers []func(any) (any, error)) { + stringMapping := func(mapper func(s string) (any, error)) func(any) (any, error) { + return func(v any) (any, error) { + s, ok := v.(*sql.NullString) + if !ok { + return nil, fmt.Errorf("expected %T got %T", "", v) + } + if !s.Valid { + return nil, nil + } + return mapper(s.String) + } + } + for _, col := range cols { + var val any + var mapper func(any) (any, error) + switch col.DatabaseTypeName() { + case "BINARY", "VARBINARY", "TINYBLOB", "BLOB", "MEDIUMBLOB", "LONGBLOB": + val = new(sql.Null[[]byte]) + mapper = snapshotValueMapper[[]byte] + case "DATETIME", "TIMESTAMP": + val = new(sql.NullTime) + mapper = func(v any) (any, error) { + s, ok := v.(*sql.NullTime) + if !ok { + return nil, fmt.Errorf("expected %T got %T", time.Time{}, v) + } + if !s.Valid { + return nil, nil + } + return s.Time, nil + } + case "TINYINT", "SMALLINT", "MEDIUMINT", "INT", "BIGINT", "YEAR": + val = new(sql.NullInt64) + mapper = func(v any) (any, error) { + s, ok := v.(*sql.NullInt64) + if !ok { + return nil, fmt.Errorf("expected %T got %T", int64(0), v) + } + if !s.Valid { + return nil, nil + } + return int(s.Int64), nil + } + case "DECIMAL", "NUMERIC": + val = new(sql.NullString) + mapper = stringMapping(func(s string) (any, error) { + return json.Number(s), nil + }) + case "FLOAT", "DOUBLE": + val = new(sql.Null[float64]) + mapper = snapshotValueMapper[float64] + case "SET": + val = new(sql.NullString) + mapper = stringMapping(func(s string) (any, error) { + // This might be a little simplistic, we may need to handle escaped values + // here... + out := []any{} + for _, elem := range strings.Split(s, ",") { + out = append(out, elem) + } + return out, nil + }) + case "JSON": + val = new(sql.NullString) + mapper = stringMapping(func(s string) (v any, err error) { + err = json.Unmarshal([]byte(s), &v) + return + }) + default: + val = new(sql.Null[string]) + mapper = snapshotValueMapper[string] + } + values = append(values, val) + mappers = append(mappers, mapper) + } + return +} +func (i *mysqlStreamInput) readMessages(ctx context.Context) error { + var nextTimedBatchChan <-chan time.Time + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-nextTimedBatchChan: + nextTimedBatchChan = nil + flushedBatch, err := i.batchPolicy.Flush(ctx) + if err != nil { + return fmt.Errorf("timed flush batch error: %w", err) + } + + if err := i.flushBatch(ctx, i.cp, flushedBatch); err != nil { + return fmt.Errorf("failed to flush periodic batch: %w", err) + } + case me := <-i.rawMessageEvents: + row, err := json.Marshal(me.Row) + if err != nil { + return fmt.Errorf("failed to serialize row: %w", err) + } + + mb := service.NewMessage(row) + mb.MetaSet("operation", string(me.Operation)) + mb.MetaSet("table", me.Table) + if me.Position != nil { + mb.MetaSet("binlog_position", binlogPositionToString(*me.Position)) + } + + if i.batchPolicy.Add(mb) { + nextTimedBatchChan = nil + flushedBatch, err := i.batchPolicy.Flush(ctx) + if err != nil { + return fmt.Errorf("flush batch error: %w", err) + } + if err := i.flushBatch(ctx, i.cp, flushedBatch); err != nil { + return fmt.Errorf("failed to flush batch: %w", err) + } + } else { + d, ok := i.batchPolicy.UntilNext() + if ok { + nextTimedBatchChan = time.After(d) + } + } + } + } +} + +func (i *mysqlStreamInput) flushBatch( + ctx context.Context, + checkpointer *checkpoint.Capped[*position], + batch service.MessageBatch, +) error { + if len(batch) == 0 { + return nil + } + + lastMsg := batch[len(batch)-1] + strPosition, ok := lastMsg.MetaGet("binlog_position") + var binLogPos *position + if ok { + pos, err := parseBinlogPosition(strPosition) + if err != nil { + return err + } + binLogPos = &pos + } + + resolveFn, err := checkpointer.Track(ctx, binLogPos, int64(len(batch))) + if err != nil { + return fmt.Errorf("failed to track checkpoint for batch: %w", err) + } + msg := asyncMessage{ + msg: batch, + ackFn: func(ctx context.Context, res error) error { + i.mutex.Lock() + defer i.mutex.Unlock() + maxOffset := resolveFn() + // Nothing to commit, this wasn't the latest message + if maxOffset == nil { + return nil + } + offset := *maxOffset + // This has no offset - it's a snapshot message + if offset == nil { + return nil + } + return i.setCachedBinlogPosition(ctx, *offset) + }, + } + select { + case i.msgChan <- msg: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (i *mysqlStreamInput) ReadBatch(ctx context.Context) (service.MessageBatch, service.AckFunc, error) { + select { + case m := <-i.msgChan: + return m.msg, m.ackFn, nil + case <-i.shutSig.HasStoppedChan(): + return nil, nil, service.ErrNotConnected + case <-ctx.Done(): + } + return nil, nil, ctx.Err() +} + +func (i *mysqlStreamInput) Close(ctx context.Context) error { + if i.shutSig == nil { + return nil // Never connected + } + i.shutSig.TriggerSoftStop() + select { + case <-ctx.Done(): + case <-time.After(shutdownTimeout): + case <-i.shutSig.HasStoppedChan(): + } + i.shutSig.TriggerHardStop() + select { + case <-ctx.Done(): + case <-time.After(shutdownTimeout): + i.logger.Error("failed to shutdown mysql_cdc within the timeout") + case <-i.shutSig.HasStoppedChan(): + } + return nil +} + +// ---- input methods end ---- + +// ---- cache methods start ---- + +func (i *mysqlStreamInput) getCachedBinlogPosition(ctx context.Context) (*position, error) { + var ( + cacheVal []byte + cErr error + ) + if err := i.res.AccessCache(ctx, i.binLogCache, func(c service.Cache) { + cacheVal, cErr = c.Get(ctx, i.binLogCacheKey) + }); err != nil { + return nil, fmt.Errorf("unable to access cache for reading: %w", err) + } + if errors.Is(cErr, service.ErrKeyNotFound) { + return nil, nil + } else if cErr != nil { + return nil, fmt.Errorf("unable read checkpoint from cache: %w", cErr) + } else if cacheVal == nil { + return nil, nil + } + pos, err := parseBinlogPosition(string(cacheVal)) + return &pos, err +} + +func (i *mysqlStreamInput) setCachedBinlogPosition(ctx context.Context, binLogPos position) error { + var cErr error + if err := i.res.AccessCache(ctx, i.binLogCache, func(c service.Cache) { + cErr = c.Set( + ctx, + i.binLogCacheKey, + []byte(binlogPositionToString(binLogPos)), + nil, + ) + }); err != nil { + return fmt.Errorf("unable to access cache for writing: %w", err) + } + if cErr != nil { + return fmt.Errorf("unable persist checkpoint to cache: %w", cErr) + } + return nil +} + +// ---- cache methods end ---- + +// --- MySQL Canal handler methods ---- + +func (i *mysqlStreamInput) OnRotate(eh *replication.EventHeader, re *replication.RotateEvent) error { + i.currentBinlogName = string(re.NextLogName) + return nil +} + +func (i *mysqlStreamInput) OnRow(e *canal.RowsEvent) error { + if _, ok := i.tablesFilterMap[e.Table.Name]; !ok { + return nil + } + switch e.Action { + case canal.InsertAction: + return i.onMessage(e, 0, 1) + case canal.DeleteAction: + return i.onMessage(e, 0, 1) + case canal.UpdateAction: + // Updates send both the new and old data - we only emit the new data. + return i.onMessage(e, 1, 2) + default: + return errors.New("invalid rows action") + } +} + +func (i *mysqlStreamInput) onMessage(e *canal.RowsEvent, initValue, incrementValue int) error { + for pi := initValue; pi < len(e.Rows); pi += incrementValue { + message := map[string]any{} + for i, v := range e.Rows[pi] { + col := e.Table.Columns[i] + v, err := mapMessageColumn(v, col) + if err != nil { + return err + } + message[col.Name] = v + } + i.rawMessageEvents <- MessageEvent{ + Row: message, + Operation: MessageOperation(e.Action), + Table: e.Table.Name, + Position: &position{Name: i.currentBinlogName, Pos: e.Header.LogPos}, + } + } + return nil +} + +func mapMessageColumn(v any, col schema.TableColumn) (any, error) { + if v == nil { + return v, nil + } + switch col.Type { + case schema.TYPE_DECIMAL: + s, ok := v.(string) + if !ok { + return nil, fmt.Errorf("expected string value for decimal column got: %T", v) + } + return json.Number(s), nil + case schema.TYPE_SET: + bitset, ok := v.(int64) + if !ok { + return nil, fmt.Errorf("expected int value for set column got: %T", v) + } + out := []any{} + for i, element := range col.SetValues { + if (bitset>>i)&1 == 1 { + out = append(out, element) + } + } + return out, nil + case schema.TYPE_DATE: + date, ok := v.(string) + if !ok { + return nil, fmt.Errorf("expected string value for date column got: %T", v) + } + return time.Parse("2006-01-02", date) + case schema.TYPE_ENUM: + ordinal, ok := v.(int64) + if !ok { + return nil, fmt.Errorf("expected int value for enum column got: %T", v) + } + if ordinal < 1 || int(ordinal) > len(col.EnumValues) { + return nil, fmt.Errorf("enum ordinal out of range: %d when there are %d variants", ordinal, len(col.EnumValues)) + } + return col.EnumValues[ordinal-1], nil + case schema.TYPE_JSON: + s, ok := v.(string) + if !ok { + return nil, fmt.Errorf("expected string value for json column got: %T", v) + } + var decoded any + if err := json.Unmarshal([]byte(s), &decoded); err != nil { + return nil, err + } + return decoded, nil + case schema.TYPE_STRING: + // Blob types should come through as binary, but are marked type 5, + // instead skip them here and have those fallthrough to the binary case. + if !strings.Contains(col.RawType, "blob") { + if s, ok := v.(string); ok { + return s, nil + } + s, ok := v.([]byte) + if !ok { + return nil, fmt.Errorf("unexpected type for STRING column: %T", v) + } + return string(s), nil + } + fallthrough + case schema.TYPE_BINARY: + if s, ok := v.([]byte); ok { + return s, nil + } + s, ok := v.(string) + if !ok { + return nil, fmt.Errorf("unexpected type for BINARY column: %T", v) + } + return []byte(s), nil + default: + return v, nil + } +} + +// --- MySQL Canal handler methods end ---- diff --git a/internal/impl/mysql/integration_test.go b/internal/impl/mysql/integration_test.go new file mode 100644 index 0000000000..242ea60dca --- /dev/null +++ b/internal/impl/mysql/integration_test.go @@ -0,0 +1,647 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed as a Redpanda Enterprise file under the Redpanda Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/redpanda-data/connect/v4/blob/main/licenses/rcl.md + +package mysql + +import ( + "context" + "database/sql" + "fmt" + "sync" + "testing" + "time" + + "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" + + _ "github.com/go-sql-driver/mysql" + _ "github.com/redpanda-data/benthos/v4/public/components/io" + _ "github.com/redpanda-data/benthos/v4/public/components/pure" + "github.com/redpanda-data/benthos/v4/public/service" + "github.com/redpanda-data/benthos/v4/public/service/integration" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type testDB struct { + *sql.DB + + t *testing.T +} + +func (db *testDB) Exec(query string, args ...any) { + _, err := db.DB.Exec(query, args...) + require.NoError(db.t, err) +} + +func setupTestWithMySQLVersion(t *testing.T, version string) (string, *testDB) { + t.Parallel() + integration.CheckSkip(t) + pool, err := dockertest.NewPool("") + require.NoError(t, err) + + pool.MaxWait = time.Minute + + // MySQL specific environment variables + resource, err := pool.RunWithOptions(&dockertest.RunOptions{ + Repository: "mysql", + Tag: version, + Env: []string{ + "MYSQL_ROOT_PASSWORD=password", + "MYSQL_DATABASE=testdb", + }, + Cmd: []string{ + "--server-id=1", + "--log-bin=mysql-bin", + "--binlog-format=ROW", + "--binlog-row-image=FULL", + "--log-slave-updates=ON", + }, + ExposedPorts: []string{"3306/tcp"}, + }, func(config *docker.HostConfig) { + // set AutoRemove to true so that stopped container goes away by itself + config.AutoRemove = true + config.RestartPolicy = docker.RestartPolicy{ + Name: "no", + } + }) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, pool.Purge(resource)) + }) + + port := resource.GetPort("3306/tcp") + dsn := fmt.Sprintf( + "root:password@tcp(localhost:%s)/testdb?parseTime=true&timeout=30s&readTimeout=30s&writeTimeout=30s&multiStatements=true", + port, + ) + + var db *sql.DB + err = pool.Retry(func() error { + var err error + db, err = sql.Open("mysql", dsn) + if err != nil { + return err + } + + db.SetMaxOpenConns(10) + db.SetMaxIdleConns(5) + db.SetConnMaxLifetime(time.Minute * 5) + + return db.Ping() + }) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, db.Close()) + }) + return dsn, &testDB{db, t} +} + +func TestIntegrationMySQLCDC(t *testing.T) { + integration.CheckSkip(t) + var mysqlTestVersions = []string{"8.0", "9.0", "9.1"} + for _, version := range mysqlTestVersions { + t.Run(version, func(t *testing.T) { + dsn, db := setupTestWithMySQLVersion(t, version) + // Create table + db.Exec(` + CREATE TABLE IF NOT EXISTS foo ( + a INT PRIMARY KEY + ) +`) + template := fmt.Sprintf(` +mysql_cdc: + dsn: %s + stream_snapshot: false + checkpoint_cache: foocache + tables: + - foo +`, dsn) + + cacheConf := fmt.Sprintf(` +label: foocache +file: + directory: %s`, t.TempDir()) + + streamOutBuilder := service.NewStreamBuilder() + require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: INFO`)) + require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf)) + require.NoError(t, streamOutBuilder.AddInputYAML(template)) + + var outBatches []string + var outBatchMut sync.Mutex + require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(c context.Context, mb service.MessageBatch) error { + msgBytes, err := mb[0].AsBytes() + require.NoError(t, err) + outBatchMut.Lock() + outBatches = append(outBatches, string(msgBytes)) + outBatchMut.Unlock() + return nil + })) + + streamOut, err := streamOutBuilder.Build() + require.NoError(t, err) + + go func() { + err = streamOut.Run(context.Background()) + require.NoError(t, err) + }() + + time.Sleep(time.Second * 5) + for i := 0; i < 1000; i++ { + // Insert 10000 rows + db.Exec("INSERT INTO foo VALUES (?)", i) + } + + assert.Eventually(t, func() bool { + outBatchMut.Lock() + defer outBatchMut.Unlock() + return len(outBatches) == 1000 + }, time.Minute*5, time.Millisecond*100) + + require.NoError(t, streamOut.StopWithin(time.Second*10)) + + streamOutBuilder = service.NewStreamBuilder() + require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: INFO`)) + require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf)) + require.NoError(t, streamOutBuilder.AddInputYAML(template)) + + outBatches = nil + require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(c context.Context, mb service.MessageBatch) error { + msgBytes, err := mb[0].AsBytes() + require.NoError(t, err) + outBatchMut.Lock() + outBatches = append(outBatches, string(msgBytes)) + outBatchMut.Unlock() + return nil + })) + + streamOut, err = streamOutBuilder.Build() + require.NoError(t, err) + + time.Sleep(time.Second) + for i := 1001; i < 2001; i++ { + db.Exec("INSERT INTO foo VALUES (?)", i) + } + + go func() { + err = streamOut.Run(context.Background()) + require.NoError(t, err) + }() + + assert.Eventually(t, func() bool { + outBatchMut.Lock() + defer outBatchMut.Unlock() + return len(outBatches) == 1000 + }, time.Minute*5, time.Millisecond*100) + + require.NoError(t, streamOut.StopWithin(time.Second*10)) + }) + } +} + +func TestIntegrationMySQLSnapshotAndCDC(t *testing.T) { + dsn, db := setupTestWithMySQLVersion(t, "8.0") + // Create table + db.Exec(` + CREATE TABLE IF NOT EXISTS foo ( + a INT PRIMARY KEY + ) +`) + // Insert 1000 rows for initial snapshot streaming + for i := 0; i < 1000; i++ { + db.Exec("INSERT INTO foo VALUES (?)", i) + } + + template := fmt.Sprintf(` +mysql_cdc: + dsn: %s + stream_snapshot: true + snapshot_max_batch_size: 500 + checkpoint_cache: foocache + tables: + - foo +`, dsn) + + cacheConf := fmt.Sprintf(` +label: foocache +file: + directory: %s`, t.TempDir()) + + streamOutBuilder := service.NewStreamBuilder() + require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: DEBUG`)) + require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf)) + require.NoError(t, streamOutBuilder.AddInputYAML(template)) + + var outBatches []string + var outBatchMut sync.Mutex + require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(c context.Context, mb service.MessageBatch) error { + msgBytes, err := mb[0].AsBytes() + require.NoError(t, err) + outBatchMut.Lock() + outBatches = append(outBatches, string(msgBytes)) + outBatchMut.Unlock() + return nil + })) + + streamOut, err := streamOutBuilder.Build() + require.NoError(t, err) + + go func() { + err = streamOut.Run(context.Background()) + require.NoError(t, err) + }() + + time.Sleep(time.Second * 5) + for i := 1000; i < 2000; i++ { + // Insert 10000 rows + db.Exec("INSERT INTO foo VALUES (?)", i) + } + + assert.Eventually(t, func() bool { + outBatchMut.Lock() + defer outBatchMut.Unlock() + return len(outBatches) == 2000 + }, time.Minute*5, time.Millisecond*100) + + require.NoError(t, streamOut.StopWithin(time.Second*10)) +} + +func TestIntegrationMySQLCDCWithCompositePrimaryKeys(t *testing.T) { + dsn, db := setupTestWithMySQLVersion(t, "8.0") + // Create table + db.Exec(` + CREATE TABLE IF NOT EXISTS ` + "`Foo`" + ` ( + ` + "`A`" + ` INT, + ` + "`B`" + ` INT, + PRIMARY KEY ( + ` + "`A`" + `, + ` + "`B`" + ` + ) + ) +`) + // Create control table to ensure we don't stream it + db.Exec(` + CREATE TABLE IF NOT EXISTS foo_non_streamed ( + a INT, + b INT, + PRIMARY KEY (a, b) + ) +`) + + // Insert 1000 rows for initial snapshot streaming + for i := 0; i < 1000; i++ { + db.Exec("INSERT INTO `Foo` VALUES (?, ?)", i, i) + db.Exec("INSERT INTO foo_non_streamed VALUES (?, ?)", i, i) + } + + template := fmt.Sprintf(` +mysql_cdc: + dsn: %s + stream_snapshot: true + snapshot_max_batch_size: 500 + checkpoint_cache: foocache + tables: + - Foo +`, dsn) + + cacheConf := fmt.Sprintf(` +label: foocache +file: + directory: %s`, t.TempDir()) + + streamOutBuilder := service.NewStreamBuilder() + require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: DEBUG`)) + require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf)) + require.NoError(t, streamOutBuilder.AddInputYAML(template)) + + var outBatches []string + var outBatchMut sync.Mutex + require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(c context.Context, mb service.MessageBatch) error { + msgBytes, err := mb[0].AsBytes() + require.NoError(t, err) + outBatchMut.Lock() + outBatches = append(outBatches, string(msgBytes)) + outBatchMut.Unlock() + return nil + })) + + streamOut, err := streamOutBuilder.Build() + require.NoError(t, err) + + go func() { + err = streamOut.Run(context.Background()) + require.NoError(t, err) + }() + + time.Sleep(time.Second * 5) + for i := 1000; i < 2000; i++ { + // Insert 10000 rows + db.Exec("INSERT INTO `Foo` VALUES (?, ?)", i, i) + db.Exec("INSERT INTO foo_non_streamed VALUES (?, ?)", i, i) + } + + assert.Eventually(t, func() bool { + outBatchMut.Lock() + defer outBatchMut.Unlock() + return len(outBatches) == 2000 + }, time.Minute*5, time.Millisecond*100) + require.NoError(t, streamOut.StopWithin(time.Second*10)) +} + +func TestIntegrationMySQLCDCAllTypes(t *testing.T) { + dsn, db := setupTestWithMySQLVersion(t, "8.0") + // Create table + db.Exec(` + CREATE TABLE all_data_types ( + -- Numeric Data Types + tinyint_col TINYINT PRIMARY KEY, + smallint_col SMALLINT, + mediumint_col MEDIUMINT, + int_col INT, + bigint_col BIGINT, + decimal_col DECIMAL(38, 2), + numeric_col NUMERIC(10, 2), + float_col FLOAT, + double_col DOUBLE, + + -- Date and Time Data Types + date_col DATE, + datetime_col DATETIME, + timestamp_col TIMESTAMP, + time_col TIME, + year_col YEAR, + + -- String Data Types + char_col CHAR(10), + varchar_col VARCHAR(255), + binary_col BINARY(10), + varbinary_col VARBINARY(255), + tinyblob_col TINYBLOB, + blob_col BLOB, + mediumblob_col MEDIUMBLOB, + longblob_col LONGBLOB, + tinytext_col TINYTEXT, + text_col TEXT, + mediumtext_col MEDIUMTEXT, + longtext_col LONGTEXT, + enum_col ENUM('option1', 'option2', 'option3'), + set_col SET('a', 'b', 'c', 'd'), + json_col JSON + + -- TODO(cdc): Spatial Data Types + -- geometry_col GEOMETRY, + -- point_col POINT, + -- linestring_col LINESTRING, + -- polygon_col POLYGON, + -- multipoint_col MULTIPOINT, + -- multilinestring_col MULTILINESTRING, + -- multipolygon_col MULTIPOLYGON, + -- geometrycollection_col GEOMETRYCOLLECTION +); +`) + + db.Exec(` +INSERT INTO all_data_types ( + tinyint_col, + smallint_col, + mediumint_col, + int_col, + bigint_col, + decimal_col, + numeric_col, + float_col, + double_col, + date_col, + datetime_col, + timestamp_col, + time_col, + year_col, + char_col, + varchar_col, + binary_col, + varbinary_col, + tinyblob_col, + blob_col, + mediumblob_col, + longblob_col, + tinytext_col, + text_col, + mediumtext_col, + longtext_col, + enum_col, + set_col, + json_col +) VALUES ( + 127, -- tinyint_col + 32767, -- smallint_col + 8388607, -- mediumint_col + 2147483647, -- int_col + 9223372036854775807, -- bigint_col + 999999999999999999999999999999999999.99, -- decimal_col + 98765.43, -- numeric_col + 3.14, -- float_col + 2.718281828, -- double_col + '2024-12-10', -- date_col + '2024-12-10 15:30:45', -- datetime_col + '2024-12-10 15:30:46', -- timestamp_col + '15:30:45', -- time_col + 2024, -- year_col + 'char_data', -- char_col + 'varchar_data', -- varchar_col + BINARY('binary'), -- binary_col + BINARY('varbinary'), -- varbinary_col + 'small blob', -- tinyblob_col + 'regular blob', -- blob_col + 'medium blob', -- mediumblob_col + 'large blob', -- longblob_col + 'tiny text', -- tinytext_col + 'regular text', -- text_col + 'medium text', -- mediumtext_col + 'large text', -- longtext_col + 'option1', -- enum_col + 'a,b', -- set_col + '{"foo":5,"bar":[1,2,3]}' -- json_col +); + + `) + + template := fmt.Sprintf(` +mysql_cdc: + dsn: %s + stream_snapshot: true + snapshot_max_batch_size: 500 + checkpoint_cache: memcache + tables: + - all_data_types +`, dsn) + + cacheConf := ` +label: memcache +memory: {} +` + + streamOutBuilder := service.NewStreamBuilder() + require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: DEBUG`)) + require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf)) + require.NoError(t, streamOutBuilder.AddInputYAML(template)) + + var outBatches []string + var outBatchMut sync.Mutex + require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(c context.Context, mb service.MessageBatch) error { + msgBytes, err := mb[0].AsBytes() + require.NoError(t, err) + outBatchMut.Lock() + outBatches = append(outBatches, string(msgBytes)) + outBatchMut.Unlock() + return nil + })) + + streamOut, err := streamOutBuilder.Build() + require.NoError(t, err) + + go func() { + err = streamOut.Run(context.Background()) + require.NoError(t, err) + }() + + time.Sleep(time.Second * 5) + + db.Exec(` + INSERT INTO all_data_types ( + tinyint_col, + smallint_col, + mediumint_col, + int_col, + bigint_col, + decimal_col, + numeric_col, + float_col, + double_col, + date_col, + datetime_col, + timestamp_col, + time_col, + year_col, + char_col, + varchar_col, + binary_col, + varbinary_col, + tinyblob_col, + blob_col, + mediumblob_col, + longblob_col, + tinytext_col, + text_col, + mediumtext_col, + longtext_col, + enum_col, + set_col, + json_col +) VALUES ( + -128, -- tinyint_col + -32768, -- smallint_col + -8388608, -- mediumint_col + -2147483648, -- int_col + -9223372036854775808, -- bigint_col + 888888888888888888888888888888888888.88, -- decimal_col + 87654.21, -- numeric_col + 1.618, -- float_col + 3.141592653, -- double_col + '2023-01-01', -- date_col + '2023-01-01 12:00:00', -- datetime_col + '2023-01-01 12:00:00', -- timestamp_col + '23:59:59', -- time_col + 2023, -- year_col + 'example', -- char_col + 'another_example', -- varchar_col + BINARY('fixed'), -- binary_col + BINARY('dynamic'), -- varbinary_col + 'tiny_blob_value', -- tinyblob_col + 'blob_value', -- blob_col + 'medium_blob_value', -- mediumblob_col + 'long_blob_value', -- longblob_col + 'tiny_text_value', -- tinytext_col + 'text_value', -- text_col + 'medium_text_value', -- mediumtext_col + 'long_text_value', -- longtext_col + 'option2', -- enum_col + 'b,c', -- set_col + '{"foo":-1,"bar":[3,2,1]}' -- json_col +);`) + + assert.Eventually(t, func() bool { + outBatchMut.Lock() + defer outBatchMut.Unlock() + return len(outBatches) == 2 + }, time.Second*30, time.Millisecond*100) + require.NoError(t, streamOut.StopWithin(time.Second*10)) + + require.JSONEq(t, `{ + "tinyint_col": 127, + "smallint_col": 32767, + "mediumint_col": 8388607, + "int_col": 2147483647, + "bigint_col": 9223372036854775807, + "decimal_col": 999999999999999999999999999999999999.99, + "numeric_col": 98765.43, + "float_col": 3.14, + "double_col": 2.718281828, + "date_col": "2024-12-10T00:00:00Z", + "datetime_col": "2024-12-10T15:30:45Z", + "timestamp_col": "2024-12-10T15:30:46Z", + "time_col": "15:30:45", + "year_col": 2024, + "char_col": "char_data", + "varchar_col": "varchar_data", + "binary_col": "YmluYXJ5AAAAAA==", + "varbinary_col": "dmFyYmluYXJ5", + "tinyblob_col": "c21hbGwgYmxvYg==", + "blob_col": "cmVndWxhciBibG9i", + "mediumblob_col": "bWVkaXVtIGJsb2I=", + "longblob_col": "bGFyZ2UgYmxvYg==", + "tinytext_col": "tiny text", + "text_col": "regular text", + "mediumtext_col": "medium text", + "longtext_col": "large text", + "enum_col": "option1", + "set_col": ["a", "b"], + "json_col": {"foo":5, "bar":[1, 2, 3]} +}`, outBatches[0]) + require.JSONEq(t, `{ + "tinyint_col": -128, + "smallint_col": -32768, + "mediumint_col": -8388608, + "int_col": -2147483648, + "bigint_col": -9223372036854775808, + "decimal_col": 888888888888888888888888888888888888.88, + "numeric_col": 87654.21, + "float_col": 1.618, + "double_col": 3.141592653, + "date_col": "2023-01-01T00:00:00Z", + "datetime_col": "2023-01-01T12:00:00Z", + "timestamp_col": "2023-01-01T12:00:00Z", + "time_col": "23:59:59", + "year_col": 2023, + "char_col": "example", + "varchar_col": "another_example", + "binary_col": "Zml4ZWQ=", + "varbinary_col": "ZHluYW1pYw==", + "tinyblob_col": "dGlueV9ibG9iX3ZhbHVl", + "blob_col": "YmxvYl92YWx1ZQ==", + "mediumblob_col": "bWVkaXVtX2Jsb2JfdmFsdWU=", + "longblob_col": "bG9uZ19ibG9iX3ZhbHVl", + "tinytext_col": "tiny_text_value", + "text_col": "text_value", + "mediumtext_col": "medium_text_value", + "longtext_col": "long_text_value", + "enum_col": "option2", + "set_col": ["b", "c"], + "json_col": {"foo":-1,"bar":[3,2,1]} +}`, outBatches[1]) +} diff --git a/internal/impl/mysql/snapshot.go b/internal/impl/mysql/snapshot.go new file mode 100644 index 0000000000..755ea4723f --- /dev/null +++ b/internal/impl/mysql/snapshot.go @@ -0,0 +1,248 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed as a Redpanda Enterprise file under the Redpanda Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/redpanda-data/connect/v4/blob/main/licenses/rcl.md + +package mysql + +import ( + "context" + "database/sql" + "fmt" + "strings" + + "github.com/redpanda-data/benthos/v4/public/service" +) + +// Snapshot represents a structure that prepares a transaction +// and creates mysql consistent snapshot inside the transaction +type Snapshot struct { + db *sql.DB + tx *sql.Tx + + lockConn *sql.Conn + snapshotConn *sql.Conn + + logger *service.Logger +} + +// NewSnapshot creates new snapshot instance +func NewSnapshot(logger *service.Logger, db *sql.DB) *Snapshot { + return &Snapshot{ + db: db, + logger: logger, + } +} + +func (s *Snapshot) prepareSnapshot(ctx context.Context) (*position, error) { + var err error + // Create a separate connection for FTWRL + s.lockConn, err = s.db.Conn(ctx) + if err != nil { + return nil, fmt.Errorf("failed to create lock connection: %v", err) + } + + // Create another connection for the snapshot + s.snapshotConn, err = s.db.Conn(ctx) + if err != nil { + return nil, fmt.Errorf("failed to create snapshot connection: %v", err) + } + + // Start a consistent snapshot transaction + s.tx, err = s.snapshotConn.BeginTx(ctx, &sql.TxOptions{ + ReadOnly: true, + Isolation: sql.LevelRepeatableRead, + }) + + if err != nil { + return nil, fmt.Errorf("failed to start transaction: %v", err) + } + + /* + START TRANSACTION WITH CONSISTENT SNAPSHOT ensures a consistent view of database state + when reading historical data during CDC initialization. Without it, concurrent writes + could create inconsistencies between binlog position and table snapshots, potentially + missing or duplicating events. The snapshot prevents other transactions from modifying + the data being read, maintaining referential integrity across tables while capturing + the initial state. + */ + + // NOTE: this is a little sneaky because we're actually implicitly closing the transaction + // started with `BeginTx` above and replacing it with this one. We have to do this because + // the `database/sql` driver we're using does not support this WITH CONSISTENT SNAPSHOT. + if _, err := s.tx.ExecContext(ctx, "START TRANSACTION WITH CONSISTENT SNAPSHOT"); err != nil { + if rErr := s.tx.Rollback(); rErr != nil { + return nil, rErr + } + + return nil, fmt.Errorf("failed to start consistent snapshot: %v", err) + } + + /* + FLUSH TABLES WITH READ LOCK is executed after CONSISTENT SNAPSHOT to: + 1. Force MySQL to flush all data from memory to disk + 2. Prevent any writes to tables while we read the binlog position + + This lock MUST be released quickly to avoid blocking other connections. Only use it + to capture the binlog coordinates, then release immediately with UNLOCK TABLES. + */ + if _, err := s.lockConn.ExecContext(ctx, "FLUSH TABLES WITH READ LOCK"); err != nil { + if rErr := s.tx.Rollback(); rErr != nil { + return nil, rErr + } + return nil, fmt.Errorf("failed to acquire global read lock: %v", err) + } + + // Get binary log position (while locked) + pos, err := s.getCurrentBinlogPosition(ctx) + if err != nil { + // Make sure to release the lock if we fail + if _, eErr := s.lockConn.ExecContext(ctx, "UNLOCK TABLES"); eErr != nil { + return nil, eErr + } + + if rErr := s.tx.Rollback(); rErr != nil { + return nil, rErr + } + return nil, fmt.Errorf("failed to get binlog position: %v", err) + } + + // Release the global read lock immediately after getting the binlog position + if _, err := s.lockConn.ExecContext(ctx, "UNLOCK TABLES"); err != nil { + if rErr := s.tx.Rollback(); rErr != nil { + return nil, rErr + } + return nil, fmt.Errorf("failed to release global read lock: %v", err) + } + + return &pos, nil +} + +func (s *Snapshot) getTablePrimaryKeys(ctx context.Context, table string) ([]string, error) { + // Get primary key columns for the table + rows, err := s.tx.QueryContext(ctx, fmt.Sprintf(` +SELECT COLUMN_NAME +FROM information_schema.KEY_COLUMN_USAGE +WHERE TABLE_NAME = '%s' AND CONSTRAINT_NAME = 'PRIMARY' +ORDER BY ORDINAL_POSITION; + `, table)) + if err != nil { + return nil, fmt.Errorf("failed to get primary key: %v", err) + } + + defer rows.Close() + + var pks []string + for rows.Next() { + var pk string + if err := rows.Scan(&pk); err != nil { + return nil, err + } + pks = append(pks, pk) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("failed to iterate table: %s", err) + } + + if len(pks) == 0 { + return nil, fmt.Errorf("unable to find primary key for table %s - does the table exist and does it have a primary key set?", table) + } + + return pks, nil +} + +func (s *Snapshot) querySnapshotTable(ctx context.Context, table string, pk []string, lastSeenPkVal *map[string]any, limit int) (*sql.Rows, error) { + snapshotQueryParts := []string{ + "SELECT * FROM " + table, + } + + if lastSeenPkVal == nil { + snapshotQueryParts = append(snapshotQueryParts, s.buildOrderByClause(pk)) + + snapshotQueryParts = append(snapshotQueryParts, "LIMIT ?") + q := strings.Join(snapshotQueryParts, " ") + s.logger.Infof("Querying snapshot: %s", q) + return s.tx.QueryContext(ctx, strings.Join(snapshotQueryParts, " "), limit) + } + + var lastSeenPkVals []any + var placeholders []string + for _, pkCol := range *lastSeenPkVal { + lastSeenPkVals = append(lastSeenPkVals, pkCol) + placeholders = append(placeholders, "?") + } + + snapshotQueryParts = append(snapshotQueryParts, fmt.Sprintf("WHERE (%s) > (%s)", strings.Join(pk, ", "), strings.Join(placeholders, ", "))) + snapshotQueryParts = append(snapshotQueryParts, s.buildOrderByClause(pk)) + snapshotQueryParts = append(snapshotQueryParts, fmt.Sprintf("LIMIT %d", limit)) + q := strings.Join(snapshotQueryParts, " ") + s.logger.Infof("Querying snapshot: %s", q) + return s.tx.QueryContext(ctx, q, lastSeenPkVals...) +} + +func (s *Snapshot) buildOrderByClause(pk []string) string { + if len(pk) == 1 { + return "ORDER BY " + pk[0] + } + + return "ORDER BY " + strings.Join(pk, ", ") +} + +func (s *Snapshot) getCurrentBinlogPosition(ctx context.Context) (position, error) { + var ( + offset uint32 + file string + // binlogDoDB, binlogIgnoreDB intentionally non-used + // required to scan response + binlogDoDB any + binlogIgnoreDB any + executedGtidSet any + ) + + row := s.snapshotConn.QueryRowContext(ctx, "SHOW MASTER STATUS") + if err := row.Scan(&file, &offset, &binlogDoDB, &binlogIgnoreDB, &executedGtidSet); err != nil { + return position{}, err + } + + return position{ + Name: file, + Pos: offset, + }, nil +} + +func (s *Snapshot) releaseSnapshot(_ context.Context) error { + if s.tx != nil { + if err := s.tx.Commit(); err != nil { + return fmt.Errorf("failed to commit transaction: %v", err) + } + } + + // reset transaction + s.tx = nil + return nil +} + +func (s *Snapshot) close() error { + if s.tx != nil { + if err := s.tx.Rollback(); err != nil { + return fmt.Errorf("unable to rollback transaction: %w", err) + } + s.tx = nil + } + for _, conn := range []*sql.Conn{s.lockConn, s.snapshotConn} { + if conn == nil { + continue + } + if err := conn.Close(); err != nil { + return fmt.Errorf("unable to close connection: %w", err) + } + } + if err := s.db.Close(); err != nil { + return fmt.Errorf("unable to close db: %w", err) + } + return nil +} diff --git a/internal/impl/mysql/validate.go b/internal/impl/mysql/validate.go new file mode 100644 index 0000000000..6c4ae184e8 --- /dev/null +++ b/internal/impl/mysql/validate.go @@ -0,0 +1,46 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed as a Redpanda Enterprise file under the Redpanda Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/redpanda-data/connect/v4/blob/main/licenses/rcl.md + +package mysql + +import ( + "errors" + "regexp" + "unicode/utf8" +) + +var ( + errEmptyTableName = errors.New("empty table name") + errInvalidTableLength = errors.New("invalid table length") + errInvalidTableStartChar = errors.New("invalid start char in mysql table name") + errInvalidTableName = errors.New("invalid table name") +) + +func validateTableName(tableName string) error { + // Check if empty + if tableName == "" { + return errEmptyTableName + } + + // Check length + if utf8.RuneCountInString(tableName) > 64 { + return errInvalidTableLength + } + + // Check if starts with a valid character + if matched, _ := regexp.MatchString(`^[a-zA-Z_]`, tableName); !matched { + return errInvalidTableStartChar + } + + // Check if contains only valid characters + if matched, _ := regexp.MatchString(`^[a-zA-Z0-9_$]+$`, tableName); !matched { + return errInvalidTableName + } + + return nil +} diff --git a/internal/impl/mysql/validate_test.go b/internal/impl/mysql/validate_test.go new file mode 100644 index 0000000000..0dd304ce18 --- /dev/null +++ b/internal/impl/mysql/validate_test.go @@ -0,0 +1,99 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed as a Redpanda Enterprise file under the Redpanda Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/redpanda-data/connect/v4/blob/main/licenses/rcl.md + +package mysql + +import ( + "strings" + "testing" +) + +func TestValidateTableName(t *testing.T) { + tests := []struct { + name string + tableName string + expectedErr error + }{ + // Valid cases + { + name: "Valid simple table name", + tableName: "users", + expectedErr: nil, + }, + { + name: "Valid table name with numbers", + tableName: "orders_2024", + expectedErr: nil, + }, + { + name: "Valid table name with underscore prefix", + tableName: "_temp_table", + expectedErr: nil, + }, + { + name: "Valid table name with dollar sign", + tableName: "user$data", + expectedErr: nil, + }, + { + name: "Valid table name with mixed case", + tableName: "UserProfiles", + expectedErr: nil, + }, + + // Invalid cases + { + name: "Empty table name", + tableName: "", + expectedErr: errEmptyTableName, + }, + { + name: "Table name starting with number", + tableName: "2users", + expectedErr: errInvalidTableStartChar, + }, + { + name: "Table name with special characters", + tableName: "users@table", + expectedErr: errInvalidTableName, + }, + { + name: "Table name with spaces", + tableName: "user table", + expectedErr: errInvalidTableName, + }, + { + name: "Table name with hyphens", + tableName: "user-table", + expectedErr: errInvalidTableName, + }, + { + name: "Too long table name", + tableName: strings.Repeat("a", 65), + expectedErr: errInvalidTableLength, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := validateTableName(tc.tableName) + + if tc.expectedErr == nil && err != nil { + t.Errorf("expected no error, got %v", err) + } + + if tc.expectedErr != nil && err == nil { + t.Errorf("expected error %v, got nil", tc.expectedErr) + } + + if tc.expectedErr != nil && err != nil && tc.expectedErr.Error() != err.Error() { + t.Errorf("expected error %v, got %v", tc.expectedErr, err) + } + }) + } +} diff --git a/internal/impl/timeplus/driver/driver.go b/internal/impl/timeplus/driver/driver.go index 30d5e68a68..2d2684e694 100644 --- a/internal/impl/timeplus/driver/driver.go +++ b/internal/impl/timeplus/driver/driver.go @@ -51,12 +51,15 @@ func (d *driver) Run(sql string) error { d.ctx, d.cancel = context.WithCancel(context.Background()) ckCtx := protonDriver.Context(d.ctx) - //nolint rows, err := d.conn.QueryContext(ckCtx, sql) if err != nil { return err } + if err := rows.Err(); err != nil { + return err + } + columnTypes, err := rows.ColumnTypes() if err != nil { return err diff --git a/internal/plugins/info.csv b/internal/plugins/info.csv index 397eb732c5..782c60d915 100644 --- a/internal/plugins/info.csv +++ b/internal/plugins/info.csv @@ -131,6 +131,7 @@ mqtt ,output ,mqtt ,4.37.0 ,certif msgpack ,processor ,msgpack ,3.59.0 ,community ,n ,n ,n multilevel ,cache ,Multilevel ,0.0.0 ,certified ,n ,y ,y mutation ,processor ,mutation ,4.5.0 ,certified ,n ,y ,y +mysql_cdc ,input ,mysql_cdc ,4.45.0 ,enterprise ,n ,y ,y nanomsg ,input ,nanomsg ,0.0.0 ,community ,n ,n ,n nanomsg ,output ,nanomsg ,0.0.0 ,community ,n ,n ,n nats ,input ,NATS ,0.0.0 ,certified ,n ,y ,y @@ -170,7 +171,7 @@ parquet ,processor ,parquet ,3.62.0 ,commun parquet_decode ,processor ,parquet_decode ,4.4.0 ,certified ,n ,y ,y parquet_encode ,processor ,parquet_encode ,4.4.0 ,certified ,n ,y ,y parse_log ,processor ,parse_log ,0.0.0 ,community ,n ,y ,y -pg_stream ,input ,pg_stream ,0.0.0 ,enterprise ,y ,y ,y +pg_stream ,input ,pg_stream ,4.43.0 ,enterprise ,y ,y ,y pinecone ,output ,pinecone ,4.31.0 ,certified ,n ,y ,y postgres_cdc ,input ,postgres_cdc ,4.43.0 ,enterprise ,n ,y ,y processors ,processor ,processors ,0.0.0 ,certified ,n ,y ,y diff --git a/public/components/all/package.go b/public/components/all/package.go index 0b7b3a6c3e..04ebb22edb 100644 --- a/public/components/all/package.go +++ b/public/components/all/package.go @@ -21,6 +21,7 @@ import ( _ "github.com/redpanda-data/connect/v4/public/components/cohere" _ "github.com/redpanda-data/connect/v4/public/components/gcp/enterprise" _ "github.com/redpanda-data/connect/v4/public/components/kafka/enterprise" + _ "github.com/redpanda-data/connect/v4/public/components/mysql" _ "github.com/redpanda-data/connect/v4/public/components/ollama" _ "github.com/redpanda-data/connect/v4/public/components/openai" _ "github.com/redpanda-data/connect/v4/public/components/postgresql" diff --git a/public/components/cloud/package.go b/public/components/cloud/package.go index 259a140ba4..1823ae3052 100644 --- a/public/components/cloud/package.go +++ b/public/components/cloud/package.go @@ -31,6 +31,7 @@ import ( _ "github.com/redpanda-data/connect/v4/public/components/memcached" _ "github.com/redpanda-data/connect/v4/public/components/mqtt" _ "github.com/redpanda-data/connect/v4/public/components/msgpack" + _ "github.com/redpanda-data/connect/v4/public/components/mysql" _ "github.com/redpanda-data/connect/v4/public/components/nats" _ "github.com/redpanda-data/connect/v4/public/components/openai" _ "github.com/redpanda-data/connect/v4/public/components/opensearch" diff --git a/public/components/mysql/package.go b/public/components/mysql/package.go new file mode 100644 index 0000000000..3fe1517f69 --- /dev/null +++ b/public/components/mysql/package.go @@ -0,0 +1,16 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +package mysql + +import ( + // Bring in the internal plugin definitions. + _ "github.com/redpanda-data/connect/v4/internal/impl/mysql" +)