-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🐛 Source Shopify: Fix Inventory Level Substream - Incremental Sync Flawed Filtering #32883
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Ignored Deployment
|
Before Merging a Connector Pull RequestWow! What a great pull request you have here! 🎉 To merge this PR, ensure the following has been done/considered for each connector added or updated:
If the checklist is complete, but the CI check is failing,
|
…bstream---Incremental-Sync-Flawed-Filtering
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments thanks for the contribution @cobobrien
airbyte-integrations/connectors/source-shopify/source_shopify/source.py
Outdated
Show resolved
Hide resolved
@@ -408,7 +408,7 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite | |||
# avoid checking `deleted` records for substreams, a.k.a `Metafields` streams, | |||
# since `deleted` records are not available, thus we avoid HTTP-400 errors. | |||
if self.deleted_cursor_field not in record: | |||
yield {self.slice_key: record[self.nested_record]} | |||
yield {self.slice_key: record[self.nested_record]} | (stream_state or {}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain the reason of returning stream_state here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I required access to the original updated_at
date (from the state file passed in as an argument to the sync execution) in the request_params
method. I couldn't find a better approach:
- The
stream_state
in therequest_params
seems to be always an empty dict self.get_updated_state
does no good here as we're looking for the value to use for theupdated_at_min
param- This connector does not have a
self.state
property
I'm eager to know of a better way of achieving this as it felt a bit hacky to me too. Thanks @marcosmarxm 🙌
…source.py Co-authored-by: Marcos Marx <[email protected]>
@marcosmarxm I have updated this PR. Could I please get a review? |
@cobobrien I'll take a look. |
@cobobrien Thank you for your time spent on the problem, currently, I don't feel this PR would be needed in the light of this one: #32345 which will migrate the @marcosmarxm Should we still merge this in with what is being said? Overall the PR looks good. |
The fix has been merged @marcosmarxm FYI. |
@cobobrien can you try latest version of the connector? |
Closed as #32345 solves the issue. |
What
The incremental sync for Shopify Inventory Level substreams pulls a fraction of the data that it should. This is due to the combination of an unsortable Parent Stream (Location) and the functionality fo the
filter_records_newer_than_state
method. Detailed explanation with examples in the issue: #32817How
Adds Incremental Inventory Levels by using the
updated_at_min
filter. Is far more performant than what existed previously due to no longer requiring thefilter_records_newer_than_state
functionality.Previously incremental substreams pulls had to query all existing data for that resource without any filtering, and then filter out the older-than-state data after the fact in the connector runtime (via the
filter_records_newer_than_state
method). This way, only the newer-than-state data is pulled via the REST interface.It runs in a fraction of the time of the old implementation and will pull all qualifying records
Recommended reading order
source.py
🚨 User Impact 🚨
All qualifying
Inventory Level
records will be pulled. This incremental stream sync will also be much more performant than it currently is.Pre-merge Actions
Updating a connector
Community member or Airbyter
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.