-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
✨ low-code: Add Incremental Parent State Handling to SubstreamPartitionRouter #38211
✨ low-code: Add Incremental Parent State Handling to SubstreamPartitionRouter #38211
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Ignored Deployment
|
611acee
to
11851ee
Compare
Please split CDK and connectors changes into different PRs |
@artem1205 I will create separate PRs for connectors; I just added these changes here for now for easier testing. This way we can execute regression tests from this branch. |
@@ -28,3 +28,21 @@ def stream_slices(self) -> Iterable[StreamSlice]: | |||
|
|||
:return: List of stream slices | |||
""" | |||
|
|||
def set_parent_state(self, stream_state: StreamState) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method is only used on partition routers so I think it would be worth creating a new PartitionRouter
interface that would be more specific.
We should also leave the two methods as abstract to ensure developers explicitly define them instead of relying on a default behavior that may or may not make sense for their use case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any stream slicer can handle parent states. For example CartesianProductStreamSlicer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DeclarativeCursor
doesn't, right?
I think it makes sense for CartesianProductStreamSlicer
for also being a PartitionRouter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed CartesianProductStreamSlicer to PartitionRouter
|
||
def set_parent_state(self, stream_state: StreamState) -> None: | ||
""" | ||
Set the state of the parent streams. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the docstring should be more specific about the parameter expectations. stream_state must have parent_state
which is not obvious from the signature. It might even be worth creating a new class to codify the expectation (kind of like how StreamSlice codifies the presence of a partition and a cursor_slice)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated docs for all classes
|
||
yield from stream_slices_for_parent | ||
|
||
def set_parent_state(self, stream_state: Optional[StreamState]) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is stream_state
an Optional
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@@ -86,6 +86,8 @@ def set_initial_state(self, stream_state: StreamState) -> None: | |||
for state in stream_state["states"]: | |||
self._cursor_per_partition[self._to_partition_key(state["partition"])] = self._create_cursor(state["cursor"]) | |||
|
|||
self._partition_router.set_parent_state(stream_state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this deserves a comment as it's not obvious why we need to update the router's parent state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reading through the code again. We're passing the full state, not just the parent_state. Is there a reason this can't just call the router's set_initial_state
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stream slicers and partition routers don't keep track of the cursor and don't have a set_initial_state
method. set_parent_state
sets the state to the parent streams that partition routers operate on, if such parent streams exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems to me this means we want to add a set_initial_state
method to the routers - from the caller's perspective, it doesn't really matter that it's setting a parent state. It's an implementation detail.
Furthermore, not all stream slicers know or care about parents (eg list partition router)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
edit: I do think the state being set is somewhat different, but the interface feels very strange to me for two reasons
- Not every stream slicers should know or care about parent streams
- even from the caller, the fact that it's updating a parent stream state is irrelevant. It's just passing the whole state object
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same goes with set_initial_state
for stream slicers, as not every stream slicer cares about the state. Introducing a set_initial_state
method could create confusion, implying that partition routers are somehow dependent on the state. set_parent_state
clearly articulates that it is only connected to the parent streams, as the stream interacts with the parent stream through the partition router.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that's exactly the point - we're now saying that SubstreamPartitionRouter
and CartesianProductStreamSlicer
are incremental and have a concept of state they need to keep track of.
I'm not sure if it'll make the code simpler or more complicated, but conceptually, this makes me think the SubstreamPartitionRouter
should have a Cursor
when incremental_dependency
is True.
This would essentially mean that instead of keeping the parent state in a dict, it could pass the parent records to its cursor's observe method while iterating over them.
I feel 4/10 strongly on this issue so not worth blocking on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the methods' names. Regarding using a Cursor to keep track of the state, it is already managed by the cursor of parent streams. Partition routers just retrieve the state from the internal parent streams.
@@ -28,3 +28,21 @@ def stream_slices(self) -> Iterable[StreamSlice]: | |||
|
|||
:return: List of stream slices | |||
""" | |||
|
|||
def set_parent_state(self, stream_state: StreamState) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DeclarativeCursor
doesn't, right?
I think it makes sense for CartesianProductStreamSlicer
for also being a PartitionRouter
@@ -28,3 +28,45 @@ def stream_slices(self) -> Iterable[StreamSlice]: | |||
|
|||
:return: List of stream slices | |||
""" | |||
|
|||
def set_parent_state(self, stream_state: StreamState) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's leave the method as abstract to force the concrete class to implement it (or intentionally implement a no-op)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
""" | ||
pass | ||
|
||
def get_parent_state(self) -> Optional[Mapping[str, StreamState]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's leave the method as abstract to force the concrete class to implement it (or intentionally implement a no-op)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@@ -86,6 +86,8 @@ def set_initial_state(self, stream_state: StreamState) -> None: | |||
for state in stream_state["states"]: | |||
self._cursor_per_partition[self._to_partition_key(state["partition"])] = self._create_cursor(state["cursor"]) | |||
|
|||
self._partition_router.set_parent_state(stream_state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that's exactly the point - we're now saying that SubstreamPartitionRouter
and CartesianProductStreamSlicer
are incremental and have a concept of state they need to keep track of.
I'm not sure if it'll make the code simpler or more complicated, but conceptually, this makes me think the SubstreamPartitionRouter
should have a Cursor
when incremental_dependency
is True.
This would essentially mean that instead of keeping the parent state in a dict, it could pass the parent records to its cursor's observe method while iterating over them.
I feel 4/10 strongly on this issue so not worth blocking on.
parent_partition = parent_stream_slice.partition if parent_stream_slice else {} | ||
|
||
# we need to read all records for slice to update the parent stream cursor | ||
stream_slices_for_parent = [] | ||
for parent_record in parent_stream.read_records( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add a comment here explaining that the sync mode and the stream state passed are irrelevant since we set the initial state from set_parent_state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! could you please add more tests to ensure that changes and stream state is handled correctly.
e.g. test_incremental_parent_state
- create test_source
- create stream_A (parent)
- create stream_B (child)
- run test_source.read() with/without stream_state
- assert:
- requests
- records
- stream_state (in output)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some questions around how files were programmatically generated from the changes to the schema.
Also, is the plan to also update the CDK for the connectors being update? Should they be split out into a follow up PR?
"last_updated": "2023-05-27T00:00:00Z" | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The indenting here is very hard to read, can you remove a lot of the extra spacing so it lines up more with the above explanation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this one's indentation is still a little off
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I missed that. Fixed
@@ -1,4 +1,4 @@ | |||
version: 0.77.2 | |||
version: 0.51.42 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this intentional? This seems like a very drastic version downgrade and we'll lose potentially quite a few changes. Why the downgrade to this version?
I've noticed this on another manifest as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I accidentally pushed this line. It is a fix for the local issue.
@@ -1305,6 +1305,11 @@ class ParentStreamConfig(BaseModel): | |||
description='A request option describing where the parent key value should be injected into and under what field name if applicable.', | |||
title='Request Option', | |||
) | |||
incremental_dependency: bool = Field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How was this field generated? Did you write it yourself or did you use the tool to generate the models based on the declarative_component_schema.yaml
?
These don't seem to match up with the one defined there. Specifically the description. And if we ran the auto-model generation, I believe incremental_dependency
would be Optional[bool]
. I think that will be strictly required otherwise existing connectors will break since they don't have that field
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
"parent_stream_name2": { | ||
"last_updated": "2023-05-27T00:00:00Z" | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment about indentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@brianjlai, I have linked the connectors' PRs in the description. I added these changes to this PR so it can be easily tested using the regression tool from this branch. I will delete these changes before merging. However, regarding the connectors PR, it would be better to use a global parent state for them as it will help to avoid breaking changes and concerns about the new state size. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, and makes sense for convenience that these are in the PR for testing and validation purposes. Do you have any live test runs against the connectors you are implementing this for and using --use-local-cdk
?
Once that is validated I think this is ready to be merged, did you address @artem1205 's changes. It looked like it, but I noticed he still had it marked as changes requested
"last_updated": "2023-05-27T00:00:00Z" | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this one's indentation is still a little off
@@ -2004,6 +2004,11 @@ definitions: | |||
title: Request Option | |||
description: A request option describing where the parent key value should be injected into and under what field name if applicable. | |||
"$ref": "#/definitions/RequestOption" | |||
incremental_dependency: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to think if there's a more descriptive word for this field. incremental_dependency
doesn't provide a clear meaning to what this is doing.
Maybe something like incremental_sync_based_on_child
? I'm not attached to it and the naming is not a blocker, but trying to consider how we can make it more clear what this boolean does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think incremental_sync_based_on_child
is a better name. Since this is for a SubstreamRouter, child
doesn't add any additional information. In contrast, incremental_dependency
clearly indicates that streams depend on each other for incremental updates.
@tolik0, let me know when you separate out connector changes and rebase — I'm happy to get this merged this week if @artem1205 is still away, I have the gawd-tier merge button privileges. |
@natikgadzhi @brianjlai I've attached an example of the regression test results. The target version shows more records due to the addition of lookback windows, yet it reproduces all records from the control version. Given that state compatibility affects the regression tests for the connectors in this PR, the best way to validate these changes is through manual verification. Specifically, ensure the parent stream is using the correct cursor, as demonstrated in the new test I've added. Regarding Artem's request, he only asked for the test that I have already added. |
76377bd
to
1e95b7f
Compare
What
This PR resolves the issue with incremental substream reads in the low-code. It ensures that the state is passed to the parent stream when generating partitions to avoid reading the parent stream in full refresh mode. Resolves: https://github.com/airbytehq/airbyte-internal-issues/issues/7369
Connector's changes:
How
The code changes introduce a new field
incremental_dependency
in theParentStreamConfig
class. TheSubstreamPartitionRouter
was updated to read parent state incrementally and save the parent state when the stream slice is processed. Additionally, new default methods were added to StreamSlicer for cases when a stream slicer does not have parent streams.Review guide
User Impact
Can this PR be safely reverted and rolled back?