-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(airbyte-cdk): add global_state => per_partition transformation #45122
Conversation
Signed-off-by: Artem Inzhyyants <[email protected]>
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
Signed-off-by: Artem Inzhyyants <[email protected]>
Tested on source-klaviyo: "stream": {
"stream_state": {
"updated": "2120-10-10T00:00:00+00:00"
},
"stream_descriptor": { "name": "lists_detailed" }
} got transformed output per partition: {
"states": [
{
"partition": { "id": "R2p3ry", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "R4ZhCr", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "RPfQMj", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "RgS4w6", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "RnsiHB", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "RwKPyg", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "S7aBY2", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "S8nmQ9", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "SBYgiK", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "SYEFFb", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "Seq8wh", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "SmDD4y", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "TDGJsj", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "TWcKFn", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "TaSce6", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "TjbH4K", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "TpNXq9", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "UeGLUr", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "UzdNhZ", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "VDZnQt", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "VJCDbR", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "VmvmBq", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "WBxsQE", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "WJLXnV", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "X7UeXn", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "XGj3p8", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "XUbNgM", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "XpP2a5", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
},
{
"partition": { "id": "Ya5ziX", "parent_slice": {} },
"cursor": { "updated": "2120-10-10T00:00:00+00:00" }
}
]
} |
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like the test to be improved. The rest is nit. I'll approve even then because it is a pretty solid PR
@@ -56,7 +55,10 @@ def stream_slices(self) -> Iterable[StreamSlice]: | |||
slices = self._partition_router.stream_slices() | |||
for partition in slices: | |||
cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition)) | |||
if not cursor: | |||
if not cursor and self._state_to_migrate_from: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: would it avoid duplication to do something like:
partition_state = self._state_to_migrate_from if self._state_to_migrate_from else self._NO_CURSOR_STATE
cursor = self._create_cursor(partition_state)
self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor
The reasoning is that the if part is only to determine the state to use, not to build the cursor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed, ref
airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py
Outdated
Show resolved
Hide resolved
cursor.set_initial_state({"invalid_state": 1}) | ||
|
||
assert exception.value.failure_type == FailureType.config_error | ||
cursor.set_initial_state({"global_state_format_key": "global_state_format_value"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand this test. It feels like if we were to change cursor.set_initial_state({"global_state_format_key": "global_state_format_value"})
to cursor.set_initial_state({"states": []})
, the test would still pass which makes me think it isn't testing global state. Should we assert mocked_cursor_factory.create.calls == [calls(<global state>)] * <number of times we expect it to be called>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored, added test for call_args
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has this change been pushed? I don't see it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just pushed
fb54832
…dk-state-mgiration
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for tackling the comments and being diligent about this change
Signed-off-by: Artem Inzhyyants <[email protected]>
…45122 (`artem1205/airbyte-cdk-state-mgiration`) Here is a more optimized version of the given Python program to improve runtime efficiency by minimizing redundant computations and reducing the complexity of operations. ### Key Optimizations. 1. **Re-assigned state_dict to avoid multiple dictionary access:** `stream_state.get("states")` 2. **Used set comprehension** enhances readability and avoids multiple calls. 3. **Combined dictionary updates directly** to use a single pass to populate `_cursor_per_partition`.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is great thanks @artem1205 !
@lazebnyi can you use this to avoid the breaking change in the linkedin ads migration?
I tested it locally with #44370, and everything looks good. |
/approve-regression-tests
|
What
Resolve https://github.com/airbytehq/airbyte-internal-issues/issues/9576
How
add global_state => per partition transformation
Review guide
airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py
airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor.py
User Impact
Can this PR be safely reverted and rolled back?