From cb5ab656faa766e2845119e5144588a9fd2cba77 Mon Sep 17 00:00:00 2001 From: Xiaohan Song Date: Wed, 4 Sep 2024 12:06:15 -0700 Subject: [PATCH 1/6] fix for non RFR state count --- .../initialsync/MySqlInitialLoadGlobalStateManager.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadGlobalStateManager.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadGlobalStateManager.java index 5f2fa13144df..944c4e81eb8b 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadGlobalStateManager.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadGlobalStateManager.java @@ -115,6 +115,13 @@ public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirb streamStates.add(getAirbyteStreamState(stream, (Jsons.jsonNode(pkStatus)))); } }); + + nonResumableFullRefreshStreams.forEach(stream -> { + streamStates.add(new AirbyteStreamState() + .withStreamDescriptor( + new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace()))); + }); + if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) { AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace()); From 1059f7bb0ba8e90b3a5336866c8d4a47fd317f71 Mon Sep 17 00:00:00 2001 From: Xiaohan Song Date: Thu, 5 Sep 2024 15:25:21 -0700 Subject: [PATCH 2/6] state manager has incorrect criteria for marking state --- .../MssqlInitialLoadGlobalStateManager.java | 3 ++- .../MySqlInitialLoadGlobalStateManager.java | 18 +++++++++++++----- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadGlobalStateManager.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadGlobalStateManager.java index da55b7b40c56..7948738de790 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadGlobalStateManager.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadGlobalStateManager.java @@ -70,7 +70,8 @@ private void initStreams(final InitialLoadStreams initialLoadStreams, this.streamsThatHaveCompletedSnapshot.add(pairInStream); } if (configuredAirbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) { - if (initialLoadStreams.streamsForInitialLoad().contains(configuredAirbyteStream)) { + if (configuredAirbyteStream.getStream().getSourceDefinedPrimaryKey() != null + && !configuredAirbyteStream.getStream().getSourceDefinedPrimaryKey().isEmpty()) { this.resumableFullRefreshStreams.add(pairInStream); } else { this.nonResumableFullRefreshStreams.add(pairInStream); diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadGlobalStateManager.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadGlobalStateManager.java index 944c4e81eb8b..9b3de8d4047e 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadGlobalStateManager.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadGlobalStateManager.java @@ -27,9 +27,12 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MySqlInitialLoadGlobalStateManager extends MySqlInitialLoadStateManager { + private static final Logger LOGGER = LoggerFactory.getLogger(MySqlInitialLoadGlobalStateManager.class); protected StateManager stateManager; // Only one global state is emitted, which is fanned out into many entries in the DB by platform. As @@ -42,6 +45,7 @@ public class MySqlInitialLoadGlobalStateManager extends MySqlInitialLoadStateMan // non ResumableFullRefreshStreams do not have any state. We only report count for them. private Set nonResumableFullRefreshStreams; + private Set completedNonResumableFullRefreshStreams; private final boolean savedOffsetStillPresentOnServer; private final ConfiguredAirbyteCatalog catalog; @@ -69,6 +73,7 @@ private void initStreams(final InitialLoadStreams initialLoadStreams, this.streamsThatHaveCompletedSnapshot = new HashSet<>(); this.resumableFullRefreshStreams = new HashSet<>(); this.nonResumableFullRefreshStreams = new HashSet<>(); + this.completedNonResumableFullRefreshStreams = new HashSet<>(); catalog.getStreams().forEach(configuredAirbyteStream -> { var pairInStream = @@ -78,7 +83,8 @@ private void initStreams(final InitialLoadStreams initialLoadStreams, this.streamsThatHaveCompletedSnapshot.add(pairInStream); } if (configuredAirbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) { - if (initialLoadStreams.streamsForInitialLoad().contains(configuredAirbyteStream)) { + if (configuredAirbyteStream.getStream().getSourceDefinedPrimaryKey() != null + && !configuredAirbyteStream.getStream().getSourceDefinedPrimaryKey().isEmpty()) { this.resumableFullRefreshStreams.add(pairInStream); } else { this.nonResumableFullRefreshStreams.add(pairInStream); @@ -116,7 +122,7 @@ public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirb } }); - nonResumableFullRefreshStreams.forEach(stream -> { + completedNonResumableFullRefreshStreams.forEach(stream -> { streamStates.add(new AirbyteStreamState() .withStreamDescriptor( new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace()))); @@ -136,10 +142,12 @@ public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirb @Override public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream airbyteStream) { + final AirbyteStreamNameNamespacePair pair = + new AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace()); if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) { - AirbyteStreamNameNamespacePair pair = - new AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace()); streamsThatHaveCompletedSnapshot.add(pair); + } else if (nonResumableFullRefreshStreams.contains(pair)) { + completedNonResumableFullRefreshStreams.add(pair); } final List streamStates = new ArrayList<>(); @@ -153,7 +161,7 @@ public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream streamStates.add(getAirbyteStreamState(stream, (Jsons.jsonNode(pkStatus)))); }); - nonResumableFullRefreshStreams.forEach(stream -> { + completedNonResumableFullRefreshStreams.forEach(stream -> { streamStates.add(new AirbyteStreamState() .withStreamDescriptor( new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace()))); From 6375bebadf30415b336911a3514bbe7c1e2320d3 Mon Sep 17 00:00:00 2001 From: Xiaohan Song Date: Thu, 5 Sep 2024 15:26:44 -0700 Subject: [PATCH 3/6] more fix on mssql --- .../MssqlInitialLoadGlobalStateManager.java | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadGlobalStateManager.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadGlobalStateManager.java index 7948738de790..40fe26825d64 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadGlobalStateManager.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadGlobalStateManager.java @@ -30,6 +30,8 @@ public class MssqlInitialLoadGlobalStateManager extends MssqlInitialLoadStateMan // No special handling for resumable full refresh streams. We will report the cursor as it is. private Set resumableFullRefreshStreams; private Set nonResumableFullRefreshStreams; + private Set completedNonResumableFullRefreshStreams; + public MssqlInitialLoadGlobalStateManager(final InitialLoadStreams initialLoadStreams, final Map pairToOrderedColInfo, @@ -95,6 +97,12 @@ public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirb } }); + completedNonResumableFullRefreshStreams.forEach(stream -> { + streamStates.add(new AirbyteStreamState() + .withStreamDescriptor( + new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace()))); + }); + if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) { AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace()); @@ -120,10 +128,13 @@ private AirbyteStreamState getAirbyteStreamState(final AirbyteStreamNameNamespac @Override public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream airbyteStream) { + + final io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair pair = new io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair( + airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace()); if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) { - io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair pair = new io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair( - airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace()); streamsThatHaveCompletedSnapshot.add(pair); + } else if (nonResumableFullRefreshStreams.contains(pair)) { + completedNonResumableFullRefreshStreams.add(pair); } final List streamStates = new ArrayList<>(); streamsThatHaveCompletedSnapshot.forEach(stream -> { @@ -136,7 +147,7 @@ public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream streamStates.add(getAirbyteStreamState(stream, Jsons.jsonNode(ocStatus))); }); - nonResumableFullRefreshStreams.forEach(stream -> { + completedNonResumableFullRefreshStreams.forEach(stream -> { streamStates.add(new AirbyteStreamState() .withStreamDescriptor( new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace()))); From c60c22f4214f5e104f7693d8f157ade7ece283a7 Mon Sep 17 00:00:00 2001 From: Xiaohan Song Date: Thu, 5 Sep 2024 15:43:05 -0700 Subject: [PATCH 4/6] format --- .../mssql/initialsync/MssqlInitialLoadGlobalStateManager.java | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadGlobalStateManager.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadGlobalStateManager.java index 40fe26825d64..dfc07c6a7bb4 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadGlobalStateManager.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadGlobalStateManager.java @@ -32,7 +32,6 @@ public class MssqlInitialLoadGlobalStateManager extends MssqlInitialLoadStateMan private Set nonResumableFullRefreshStreams; private Set completedNonResumableFullRefreshStreams; - public MssqlInitialLoadGlobalStateManager(final InitialLoadStreams initialLoadStreams, final Map pairToOrderedColInfo, final StateManager stateManager, From 74c4d278bbb06fc0e73f5f942758c4903a13d45d Mon Sep 17 00:00:00 2001 From: Xiaohan Song Date: Thu, 5 Sep 2024 15:45:04 -0700 Subject: [PATCH 5/6] bump version --- airbyte-integrations/connectors/source-mssql/metadata.yaml | 2 +- airbyte-integrations/connectors/source-mysql/metadata.yaml | 2 +- docs/integrations/sources/mssql.md | 1 + docs/integrations/sources/mysql.md | 3 ++- 4 files changed, 5 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-mssql/metadata.yaml b/airbyte-integrations/connectors/source-mssql/metadata.yaml index f522f8910824..186e91b9e44e 100644 --- a/airbyte-integrations/connectors/source-mssql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mssql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1 - dockerImageTag: 4.1.11 + dockerImageTag: 4.1.12 dockerRepository: airbyte/source-mssql documentationUrl: https://docs.airbyte.com/integrations/sources/mssql githubIssueLabel: source-mssql diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index 11f1a73bdd89..13d8d6a89561 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.7.1 + dockerImageTag: 3.7.2 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/docs/integrations/sources/mssql.md b/docs/integrations/sources/mssql.md index 03778dbb1dea..9081243b1e74 100644 --- a/docs/integrations/sources/mssql.md +++ b/docs/integrations/sources/mssql.md @@ -422,6 +422,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura | Version | Date | Pull Request | Subject | |:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| +| 4.1.12 | 2024-09-05 | [45181](https://github.com/airbytehq/airbyte/pull/45181) | Fix incorrect categorizing resumable/nonresumable full refresh streams. | | 4.1.11 | 2024-09-04 | [45142](https://github.com/airbytehq/airbyte/pull/45142) | Fix incorrect datetimeoffset format in cursor state. | | 4.1.10 | 2024-08-27 | [44759](https://github.com/airbytehq/airbyte/pull/44759) | Improve null safety in parsing debezium change events. | | 4.1.9 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. | diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index 97c1f1a22df8..5b398ae5a2f8 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -233,7 +233,8 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| -| 3.7.1 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. | +| 3.7.2 | 2024-09-05 | [45181](https://github.com/airbytehq/airbyte/pull/45181) | Fix incorrect categorizing resumable/nonresumable full refresh streams. | +| 3.7.1 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. | | 3.7.0 | 2024-08-13 | [44013](https://github.com/airbytehq/airbyte/pull/44013) | Upgrading to Debezium 2.7.1.Final | | 3.6.9 | 2024-08-08 | [43410](https://github.com/airbytehq/airbyte/pull/43410) | Adopt latest CDK. | | 3.6.8 | 2024-07-30 | [42869](https://github.com/airbytehq/airbyte/pull/42869) | Adopt latest CDK. | From b43895d83b08d264182a37121e8f1d25cd6fa880 Mon Sep 17 00:00:00 2001 From: Xiaohan Song Date: Thu, 5 Sep 2024 16:29:16 -0700 Subject: [PATCH 6/6] npe fix --- .../mssql/initialsync/MssqlInitialLoadGlobalStateManager.java | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadGlobalStateManager.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadGlobalStateManager.java index dfc07c6a7bb4..4fa63c266c6e 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadGlobalStateManager.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadGlobalStateManager.java @@ -62,6 +62,7 @@ private void initStreams(final InitialLoadStreams initialLoadStreams, this.streamsThatHaveCompletedSnapshot = new HashSet<>(); this.resumableFullRefreshStreams = new HashSet<>(); this.nonResumableFullRefreshStreams = new HashSet<>(); + this.completedNonResumableFullRefreshStreams = new HashSet<>(); catalog.getStreams().forEach(configuredAirbyteStream -> { var pairInStream =