From 625574dd4756f1fd1464ccdb79b67927d7eebd01 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Wed, 4 Sep 2024 12:57:15 +0200 Subject: [PATCH 1/5] Airbyte CDK: add global_state => per partition transformation Signed-off-by: Artem Inzhyyants --- .../incremental/per_partition_cursor.py | 20 +++++++++++------- .../incremental/test_per_partition_cursor.py | 21 ++++++++++++------- 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py index 28c2f0eb6b8b..56a63e8c0b63 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py @@ -45,6 +45,7 @@ class PerPartitionCursor(DeclarativeCursor): _NO_CURSOR_STATE: Mapping[str, Any] = {} _KEY = 0 _VALUE = 1 + _state_to_migrate_from: Mapping[str, Any] = {} def __init__(self, cursor_factory: CursorFactory, partition_router: PartitionRouter): self._cursor_factory = cursor_factory @@ -56,7 +57,10 @@ def stream_slices(self) -> Iterable[StreamSlice]: slices = self._partition_router.stream_slices() for partition in slices: cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition)) - if not cursor: + if not cursor and self._state_to_migrate_from: + cursor = self._create_cursor(self._state_to_migrate_from) + self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor + elif not cursor: cursor = self._create_cursor(self._NO_CURSOR_STATE) self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor @@ -97,12 +101,14 @@ def set_initial_state(self, stream_state: StreamState) -> None: return if "states" not in stream_state: - raise AirbyteTracedException( - internal_message=f"Could not sync parse the following state: {stream_state}", - message="The state for is format invalid. Validate that the migration steps included a reset and that it was performed " - "properly. Otherwise, please contact Airbyte support.", - failure_type=FailureType.config_error, - ) + self._state_to_migrate_from = stream_state + return + # raise AirbyteTracedException( + # internal_message=f"Could not sync parse the following state: {stream_state}", + # message="The state for is format invalid. Validate that the migration steps included a reset and that it was performed " + # "properly. Otherwise, please contact Airbyte support.", + # failure_type=FailureType.config_error, + # ) for state in stream_state["states"]: self._cursor_per_partition[self._to_partition_key(state["partition"])] = self._create_cursor(state["cursor"]) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor.py b/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor.py index b2c8d5faf46d..bd58d875fb7a 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor.py @@ -6,12 +6,10 @@ from unittest.mock import Mock import pytest -from airbyte_cdk.models import FailureType from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import PerPartitionCursor, PerPartitionKeySerializer, StreamSlice from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter from airbyte_cdk.sources.types import Record -from airbyte_cdk.utils import AirbyteTracedException PARTITION = { "partition_key string": "partition value", @@ -519,10 +517,19 @@ def test_get_stream_state_includes_parent_state(mocked_cursor_factory, mocked_pa assert stream_state == expected_state -def test_given_invalid_state_when_set_initial_state_then_raise_config_error(mocked_cursor_factory, mocked_partition_router) -> None: +def test_per_partition_state_when_set_initial_global_state(mocked_cursor_factory, mocked_partition_router) -> None: + first_partition = {"first_partition_key": "first_partition_value"} + mocked_partition_router.stream_slices.return_value = [ + StreamSlice(partition=first_partition, cursor_slice={}), + ] + mocked_cursor_factory.create.side_effect = [ + MockedCursorBuilder().with_stream_state({CURSOR_STATE_KEY: "first slice cursor value"}).build(), + MockedCursorBuilder().with_stream_state({CURSOR_STATE_KEY: "second slice cursor value"}).build(), + ] cursor = PerPartitionCursor(mocked_cursor_factory, mocked_partition_router) - with pytest.raises(AirbyteTracedException) as exception: - cursor.set_initial_state({"invalid_state": 1}) - - assert exception.value.failure_type == FailureType.config_error + cursor.set_initial_state({"global_state_format_key": "global_state_format_value"}) + list(cursor.stream_slices()) + assert cursor.get_stream_state()["states"] == [ + {"cursor": {"cursor state": "first slice cursor value"}, "partition": {"first_partition_key": "first_partition_value"}} + ] From a19686c62484734b1056cc74512254e5da55d3ef Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Wed, 4 Sep 2024 13:10:11 +0200 Subject: [PATCH 2/5] Airbyte CDK: fmt Signed-off-by: Artem Inzhyyants --- .../sources/declarative/incremental/per_partition_cursor.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py index 56a63e8c0b63..8e52f0ed091d 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py @@ -4,12 +4,10 @@ from typing import Any, Callable, Iterable, Mapping, MutableMapping, Optional, Union -from airbyte_cdk.models import FailureType from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter from airbyte_cdk.sources.streams.checkpoint.per_partition_key_serializer import PerPartitionKeySerializer from airbyte_cdk.sources.types import Record, StreamSlice, StreamState -from airbyte_cdk.utils import AirbyteTracedException class CursorFactory: From ce1d8965aac2f00de85762cff6858b9a56f85103 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Wed, 4 Sep 2024 15:38:12 +0200 Subject: [PATCH 3/5] Airbyte CDK: add comment Signed-off-by: Artem Inzhyyants --- .../incremental/per_partition_cursor.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py index 8e52f0ed091d..c4f0bd21f667 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py @@ -99,17 +99,13 @@ def set_initial_state(self, stream_state: StreamState) -> None: return if "states" not in stream_state: + # We assume that `stream_state` is in a global format that can be applied to all partitions. + # Example: {"global_state_format_key": "global_state_format_value"} self._state_to_migrate_from = stream_state - return - # raise AirbyteTracedException( - # internal_message=f"Could not sync parse the following state: {stream_state}", - # message="The state for is format invalid. Validate that the migration steps included a reset and that it was performed " - # "properly. Otherwise, please contact Airbyte support.", - # failure_type=FailureType.config_error, - # ) - - for state in stream_state["states"]: - self._cursor_per_partition[self._to_partition_key(state["partition"])] = self._create_cursor(state["cursor"]) + + else: + for state in stream_state["states"]: + self._cursor_per_partition[self._to_partition_key(state["partition"])] = self._create_cursor(state["cursor"]) # Set parent state for partition routers based on parent streams self._partition_router.set_initial_state(stream_state) From fb54832bbac15f79c6f32890ba8fc0f6aade312f Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Fri, 6 Sep 2024 14:49:20 +0200 Subject: [PATCH 4/5] Airbyte CDK: fix test Signed-off-by: Artem Inzhyyants --- .../incremental/per_partition_cursor.py | 8 +++---- .../incremental/test_per_partition_cursor.py | 24 +++++++++++++------ 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py index c4f0bd21f667..02843d97ce3e 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py @@ -55,11 +55,9 @@ def stream_slices(self) -> Iterable[StreamSlice]: slices = self._partition_router.stream_slices() for partition in slices: cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition)) - if not cursor and self._state_to_migrate_from: - cursor = self._create_cursor(self._state_to_migrate_from) - self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor - elif not cursor: - cursor = self._create_cursor(self._NO_CURSOR_STATE) + if not cursor: + partition_state = self._state_to_migrate_from if self._state_to_migrate_from else self._NO_CURSOR_STATE + cursor = self._create_cursor(partition_state) self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor for cursor_slice in cursor.stream_slices(): diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor.py b/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor.py index bd58d875fb7a..6e1900ba111e 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor.py @@ -519,17 +519,27 @@ def test_get_stream_state_includes_parent_state(mocked_cursor_factory, mocked_pa def test_per_partition_state_when_set_initial_global_state(mocked_cursor_factory, mocked_partition_router) -> None: first_partition = {"first_partition_key": "first_partition_value"} + second_partition = {"second_partition_key": "second_partition_value"} + global_state = {"global_state_format_key": "global_state_format_value"} + mocked_partition_router.stream_slices.return_value = [ StreamSlice(partition=first_partition, cursor_slice={}), + StreamSlice(partition=second_partition, cursor_slice={}), ] mocked_cursor_factory.create.side_effect = [ - MockedCursorBuilder().with_stream_state({CURSOR_STATE_KEY: "first slice cursor value"}).build(), - MockedCursorBuilder().with_stream_state({CURSOR_STATE_KEY: "second slice cursor value"}).build(), + MockedCursorBuilder().with_stream_state(global_state).build(), + MockedCursorBuilder().with_stream_state(global_state).build(), ] cursor = PerPartitionCursor(mocked_cursor_factory, mocked_partition_router) - - cursor.set_initial_state({"global_state_format_key": "global_state_format_value"}) + global_state = {"global_state_format_key": "global_state_format_value"} + cursor.set_initial_state(global_state) + assert cursor._state_to_migrate_from == global_state list(cursor.stream_slices()) - assert cursor.get_stream_state()["states"] == [ - {"cursor": {"cursor state": "first slice cursor value"}, "partition": {"first_partition_key": "first_partition_value"}} - ] + assert cursor._cursor_per_partition['{"first_partition_key":"first_partition_value"}'].set_initial_state.call_count == 1 + assert cursor._cursor_per_partition['{"first_partition_key":"first_partition_value"}'].set_initial_state.call_args[0] == ({'global_state_format_key': 'global_state_format_value'},) + assert cursor._cursor_per_partition['{"second_partition_key":"second_partition_value"}'].set_initial_state.call_count == 1 + assert cursor._cursor_per_partition['{"second_partition_key":"second_partition_value"}'].set_initial_state.call_args[0] == ({'global_state_format_key': 'global_state_format_value'},) + assert cursor.get_stream_state()["states"] == [{'cursor': {'global_state_format_key': 'global_state_format_value'}, + 'partition': {'first_partition_key': 'first_partition_value'}}, + {'cursor': {'global_state_format_key': 'global_state_format_value'}, + 'partition': {'second_partition_key': 'second_partition_value'}}] From 2b5c53109ab96f0576537ef1ec64d4c0fb8c7652 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Fri, 6 Sep 2024 15:02:27 +0200 Subject: [PATCH 5/5] Airbyte CDK: fmt Signed-off-by: Artem Inzhyyants --- .../incremental/test_per_partition_cursor.py | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor.py b/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor.py index 6e1900ba111e..823405cb5152 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor.py @@ -536,10 +536,18 @@ def test_per_partition_state_when_set_initial_global_state(mocked_cursor_factory assert cursor._state_to_migrate_from == global_state list(cursor.stream_slices()) assert cursor._cursor_per_partition['{"first_partition_key":"first_partition_value"}'].set_initial_state.call_count == 1 - assert cursor._cursor_per_partition['{"first_partition_key":"first_partition_value"}'].set_initial_state.call_args[0] == ({'global_state_format_key': 'global_state_format_value'},) + assert cursor._cursor_per_partition['{"first_partition_key":"first_partition_value"}'].set_initial_state.call_args[0] == ( + {"global_state_format_key": "global_state_format_value"}, + ) assert cursor._cursor_per_partition['{"second_partition_key":"second_partition_value"}'].set_initial_state.call_count == 1 - assert cursor._cursor_per_partition['{"second_partition_key":"second_partition_value"}'].set_initial_state.call_args[0] == ({'global_state_format_key': 'global_state_format_value'},) - assert cursor.get_stream_state()["states"] == [{'cursor': {'global_state_format_key': 'global_state_format_value'}, - 'partition': {'first_partition_key': 'first_partition_value'}}, - {'cursor': {'global_state_format_key': 'global_state_format_value'}, - 'partition': {'second_partition_key': 'second_partition_value'}}] + assert cursor._cursor_per_partition['{"second_partition_key":"second_partition_value"}'].set_initial_state.call_args[0] == ( + {"global_state_format_key": "global_state_format_value"}, + ) + expected_state = [ + {"cursor": {"global_state_format_key": "global_state_format_value"}, "partition": {"first_partition_key": "first_partition_value"}}, + { + "cursor": {"global_state_format_key": "global_state_format_value"}, + "partition": {"second_partition_key": "second_partition_value"}, + }, + ] + assert cursor.get_stream_state()["states"] == expected_state