-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(cdk): add async job components #45178
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
""" | ||
|
||
for url in self.urls_extractor.extract_records(self._polling_job_response_by_id[job.api_job_id()]): | ||
stream_slice: StreamSlice = StreamSlice(partition={"url": url}, cursor_slice={}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we don't have interpolation on random fields, we have this hack which consist on using the stream_slice to allow for interpolation. This is what it looks like in the manifest: https://github.com/airbytehq/airbyte/compare/async-job/feature-branch#diff-b041c21db61fd5fe7fb67481e6167b6a343236571089c1ed61498b597202a750R921
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: would it make sense to create a new type of slice or partition since this isn't exactly a StreamSlice in the typical sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The answer for me would be the allow for interpolation for more parameters than the stream_slice, stream_state and next_page_token as we limit the interpolation to these. This is can be done but will require a change to those three layers: HttpRequester, RequestOptionProvider, RequestInputProvider
""" | ||
for job in jobs: | ||
stream_slice = StreamSlice( | ||
partition={"create_job_response": self._create_job_response_by_id[job.api_job_id()]}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we don't have interpolation on random fields, we have this hack which consist on using the stream_slice to allow for interpolation. This is what it looks like in the manifest: https://github.com/airbytehq/airbyte/compare/async-job/feature-branch#diff-b041c21db61fd5fe7fb67481e6167b6a343236571089c1ed61498b597202a750R913
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe worth leaving this comment in the code?
…mapping` by 8% in PR #45178 (`async-job/cdk-release`) Certainly! Here is the rewritten Python program optimized for better performance. ### Optimization Changes. 1. **Removed Redundant Imports:** Only essential imports are retained to improve the program's load time. 2. **Consolidated Enum Definitions:** Avoided redundancy and moved the `AsyncJobStatus` Enum definition to the top. 3. **Initialization Improvements:** - Mapped status directly in a dictionary for faster lookup instead of using the `match` statement. - Combined the `if status in api_status_to_cdk_status` validation to the main loop to avoid additional checks. 4. **Eliminated Redundant String Checks:** For checking CDK status, utilized the loop over predefined list `["running", "completed", "failed", "timeout"]`. 5. **Refactored `_get_async_job_status` for Direct Dictionary Access:** This avoids the overhead of match/case or if-else checks, speeding up decision-making. These optimizations reduce the overall complexity and execution time, particularly for the method generating mappings and querying job statuses.
⚡️ Codeflash found optimizations for this PR📄
|
airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py
Show resolved
Hide resolved
airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py
Outdated
Show resolved
Hide resolved
airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py
Outdated
Show resolved
Hide resolved
⚡️ Codeflash found optimizations for this PR📄
|
…mapping` by 19% in PR #45178 (`async-job/cdk-release`) Here's a faster version of your Python program. The changes mainly focus on optimizing the iteration logic and using built-in functionalities more efficiently. Also, I've made sure to reduce overhead where possible. ### Changes and Optimizations. 1. **Removed Redundant Match/Case**: Replaced the `match` statement in `_get_async_job_status` with simple conditional checks which are much faster in execution. 2. **Efficient Iteration**: Instead of iterating over the whole dictionary which includes the `type` key, we iterate only over the relevant status keys (`running`, `completed`, `failed`, `timeout`). 3. **Inline Mapping**: Instead of using intermediate variables wherever possible, values are fetched and used directly, thus saving on additional lookups and assignment operations. The refactored code is more streamlined, reducing the potential execution time without altering the underlying logic or architecture, ensuring the end result remains the same.
⚡️ Codeflash found optimizations for this PR📄
|
status_extractor: | ||
description: Responsible for fetching the actual status of the async job. | ||
anyOf: | ||
- "$ref": "#/definitions/CustomRecordExtractor" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a bit annoying that if we add an extractor, we need to update all the places where we reference extractors. Same thing with the requesters, decoders, etc... Do we have a solution for that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unfortunately not to my knowledge :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you create a reference to the anyOf?
airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py
Outdated
Show resolved
Hide resolved
I know there is still mypy issues but I don't think it is blocking a review |
…mapping` by 20% in PR #45178 (`async-job/cdk-release`) To optimize the given Python program, we can make a few changes without altering the overall logic or function signatures. The optimizations will mainly include improvements like removing unnecessary imports, avoiding unnecessary initializations, and more efficient status conversion. Here's the optimized version. ### Optimizations Made. 1. **Import Cleanup**: Removed unnecessary imports like `Any` from `pydantic.v1`. 2. **Status Mapping Using Dictionary**: Simplified `_get_async_job_status` by using a dictionary. 3. **Loop Improvement**: Avoid recalculating statuses in `_create_async_job_status_mapping` by retrieving async job status once per CDK status. These changes optimize readability and potentially the performance of the code without altering its functionality.
⚡️ Codeflash found optimizations for this PR📄
|
api_status = next(iter(self.status_extractor.extract_records(response)), None) | ||
job_status = self.status_mapping.get(str(api_status), None) | ||
if job_status is None: | ||
raise ValueError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if this is dangerous: what happens if the API adds a status? I guess we should know in some way but Salesforce today only operate on a COMPLETE and FAILED statuses and probably assume the rest is running (see this)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this will fail eventually, but I think the desired behavior is indeed to fail loudly if an unexpected status shows up. Happy to revisit the decision if this becomes too noisy and isn't actionable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid of this creating P0s as we don't control the API but I also get your point. Let's try and learn from that
|
||
for completed_partition in self._job_orchestrator.create_and_get_completed_partitions(): | ||
yield StreamSlice( | ||
partition=dict(completed_partition.stream_slice.partition) | {"partition": completed_partition}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brianjlai I thought this might be interesting for you. This doesn't work very well for concurrent because:
- The slice is not json serializable as it is of object
StreamSlice
and it'll fail here. This is a low-code problem, not a async job one. We could cast this as a dict and we fine with it - Even if we were to cast it as a dict, this would fail because
completed_partition
is of typeAsyncJob
which is also not serializable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a real blocker to making AsyncJob
serializable?
maybe hot take: partitions should be serializable else there's no way to pass the context from a thread to another
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding of how the json
library works is that we would need to provide a default
doc says: If specified, default should be a function that gets called for objects that can’t otherwise be serialized. It should return a JSON encodable version of the object or raise a [TypeError](https://docs.python.org/3/library/exceptions.html#TypeError). If not specified, [TypeError](https://docs.python.org/3/library/exceptions.html#TypeError) is raised.
In our case, I don't know if we could:
- assume that if we can't serialize it with the standard encoder, we use the string version
- ask users that puts an object in the slice to implement a
__stream_slice_serialize__
method
I think I would prefer the second option but haven't put too much thoughts into it yet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is great! Left a few questions, but nothing blocking on my end
airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py
Outdated
Show resolved
Hide resolved
airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py
Show resolved
Hide resolved
airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py
Outdated
Show resolved
Hide resolved
api_status = next(iter(self.status_extractor.extract_records(response)), None) | ||
job_status = self.status_mapping.get(str(api_status), None) | ||
if job_status is None: | ||
raise ValueError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this will fail eventually, but I think the desired behavior is indeed to fail loudly if an unexpected status shows up. Happy to revisit the decision if this becomes too noisy and isn't actionable.
""" | ||
for job in jobs: | ||
stream_slice = StreamSlice( | ||
partition={"create_job_response": self._create_job_response_by_id[job.api_job_id()]}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe worth leaving this comment in the code?
""" | ||
|
||
for url in self.urls_extractor.extract_records(self._polling_job_response_by_id[job.api_job_id()]): | ||
stream_slice: StreamSlice = StreamSlice(partition={"url": url}, cursor_slice={}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: would it make sense to create a new type of slice or partition since this isn't exactly a StreamSlice in the typical sense?
""" | ||
As a first iteration for sendgrid, there is no state to be managed | ||
""" | ||
return {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: is this a good time to pull the state management out of the retriever into the DeclarativeStream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the deadlines we have for async project, I would say this is out of scope but we can have the discussion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
making the consequence of this decision explicit: async retriever only supports full refresh streams. is that right?
|
||
for completed_partition in self._job_orchestrator.create_and_get_completed_partitions(): | ||
yield StreamSlice( | ||
partition=dict(completed_partition.stream_slice.partition) | {"partition": completed_partition}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a real blocker to making AsyncJob
serializable?
maybe hot take: partitions should be serializable else there's no way to pass the context from a thread to another
self._stream_slice = stream_slice | ||
|
||
def has_reached_max_attempt(self) -> bool: | ||
return any(map(lambda attempt_count: attempt_count >= self._MAX_NUMBER_OF_ATTEMPTS, self._attempts_per_job.values())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return any(map(lambda attempt_count: attempt_count >= self._MAX_NUMBER_OF_ATTEMPTS, self._attempts_per_job.values())) | |
for attempt_count in self._attempts_per_job.values(): | |
if attempt_count >= self._MAX_NUMBER_OF_ATTEMPTS: | |
return True | |
return False |
⚡️ Codeflash found optimizations for this PR📄
|
…mapping` by 19% in PR #45178 (`async-job/cdk-release`) To optimize this Python program for runtime and memory usage, we can apply several techniques such as. 1. Minimizing imports. 2. Using more efficient data structures where appropriate. 3. Reducing unnecessary computations inside loops. Here's a more optimized version of the provided code. ### Changes made. 1. **Reduced Unnecessary Imports**. - Removed the duplicate `Config` alias definition. - Removed unused imports like `Literal` from `typing_extensions` and `Enum`. 2. **Optimized `_get_async_job_status`**. - Replaced `match` statement with if-elif statements to avoid overhead. 3. **Efficient Loop in `_create_async_job_status_mapping`**. - Directly popped the "type" element from `model_dict` before looping. - Used a dictionary comprehension syntax to build the output for better readability and efficiency. 4. **Removed `_init_mappings` Call**. - It's not defined in the provided code, so it's unnecessary to include it. With these changes, the program should run faster and be more memory-efficient while maintaining the same functionality.
⚡️ Codeflash found optimizations for this PR📄
|
self._stream_slice = stream_slice | ||
|
||
def has_reached_max_attempt(self) -> bool: | ||
return any(map(lambda attempt_count: attempt_count >= self._MAX_NUMBER_OF_ATTEMPTS, self._attempts_per_job.values())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return any(map(lambda attempt_count: attempt_count >= self._MAX_NUMBER_OF_ATTEMPTS, self._attempts_per_job.values())) | |
max_attempts = self._MAX_NUMBER_OF_ATTEMPTS | |
for attempt_count in self._attempts_per_job.values(): | |
if attempt_count >= max_attempts: | |
return True | |
return False |
⚡️ Codeflash found optimizations for this PR📄
|
…mapping` by 14% in PR #45178 (`async-job/cdk-release`) The given code can be optimized in several ways to improve its performance especially by tweaking its logic and reducing function calls where necessary. Let's focus on restructuring and optimizing the internal handling for better performance. ### Changes made. 1. **Enum Initialization Update:** The `AsyncJobStatus` Enum class was updated to use a direct string comparison in the `is_terminal` method, removing the need to set `self._value` and `self._is_terminal` initially. 2. **Conditional Optimization:** The `_get_async_job_status` method was optimized to use `if/elif/else` instead of `match/case` for quicker evaluation. 3. **Intermediate `.dict()` handling:** Expanded the use of the `dict()` method to only call once outside the loop to avoid redundant calls. 4. **Structured logic for better read:** Simplified and streamlined logic for better readability and faster processing. These changes will improve the runtime performance by minimizing the number of operations required, handling enums more efficiently, and ensuring optimal logic flow.
⚡️ Codeflash found optimizations for this PR📄
|
/approve-regression-tests "Could not test using regression because of #45178 (comment) so this was tested manually and locally"
|
|
||
content_type = headers.get("content-type") | ||
|
||
if not content_type: | ||
return DEFAULT_ENCODING | ||
|
||
content_type, params = requests.utils.parse_header_links(content_type) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
content_type = headers.get("content-type") | |
if not content_type: | |
return DEFAULT_ENCODING | |
content_type, params = requests.utils.parse_header_links(content_type) | |
_, params = requests.utils.parse_header_links(content_type) | |
return params.get("charset", DEFAULT_ENCODING).strip("'\"") |
⚡️ Codeflash found optimizations for this PR📄
|
/approve-regression-tests "Could not test using regression because of #45178 (comment) so this was tested manually and locally"
|
What
Creating an async job retriever component and adding it to the declarative manifest.
How
The design has been described here.
We have three main parts:
The usage can be seen here
Also part of this PR is adding the addition of a transformation for sendgrid and fixing the interface. This has already been reviewed here
Review guide
User Impact
This will allow us to update source-sendgrid to be manifest only source.
Can this PR be safely reverted and rolled back?