Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace execution_mode with emission_type and boundedness #13823

Merged
merged 29 commits into from
Dec 20, 2024

Conversation

jayzhan-synnada
Copy link
Contributor

@jayzhan-synnada jayzhan-synnada commented Dec 18, 2024

Which issue does this PR close?

Closes #.

Rationale for this change

Existing execution_mode is not enough the represent different combination of execution properties. There are different emission type like incrementally or emit at the end. And, we might have different decision based on the combination of boundedness of the source and emission type, for example, whether it is pipeline breaking (not able to generate result if it has unbounded input and requires to emit at the end)

In this PR we introduce emission_type and boundedness to replace the existing execution mode to represent more execution properties.

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

jayzhan-synnada and others added 27 commits December 10, 2024 08:44
- Introduced `Incremental` execution mode alongside existing modes in the DataFusion execution plan.
- Updated various execution plans to utilize the new `Incremental` mode where applicable, enhancing streaming capabilities.
- Added `bitflags` dependency to `Cargo.toml` for better management of execution modes.
- Adjusted execution mode handling in multiple files to ensure compatibility with the new structure.
Signed-off-by: Jay Zhan <[email protected]>
Signed-off-by: Jay Zhan <[email protected]>
- Removed the `ExecutionMode` parameter from `PlanProperties` across multiple physical plan implementations.
- Updated related functions to utilize the new structure, ensuring compatibility with the changes.
- Adjusted comments and cleaned up imports to reflect the removal of execution mode handling.

This refactor simplifies the execution plan properties and enhances maintainability.
…sionType`

- Removed the `ExecutionMode` parameter from `PlanProperties` and related implementations across multiple files.
- Introduced `EmissionType` to better represent the output characteristics of execution plans.
- Updated functions and tests to reflect the new structure, ensuring compatibility and enhancing maintainability.
- Cleaned up imports and adjusted comments accordingly.

This refactor simplifies the execution plan properties and improves the clarity of memory handling in execution plans.
Signed-off-by: Jay Zhan <[email protected]>
- Updated test cases in `sanity_checker.rs` to reflect changes in expected outcomes for bounded and unbounded joins, ensuring accurate test coverage.
- Simplified the `is_pipeline_breaking` method in `execution_plan.rs` to clarify the conditions under which a plan is considered pipeline-breaking.
- Enhanced the emission type determination logic in `execution_plan.rs` to prioritize `Final` over `Both` and `Incremental`, improving clarity in execution plan behavior.
- Adjusted join type handling in `hash_join.rs` to classify `Right` joins as `Incremental`, allowing for immediate row emission.

These changes improve the accuracy of tests and the clarity of execution plan properties.
- Updated multiple execution plan implementations to replace `unimplemented!()` with `EmissionType::Incremental`, ensuring that the emission type is correctly defined for various plans.
- This change enhances the clarity and functionality of the execution plans by explicitly specifying their emission behavior.

These updates contribute to a more robust execution plan framework within the DataFusion project.
- Updated the `JoinType` enum in `join_type.rs` to include detailed descriptions for each join type, improving clarity on their behavior and expected results.
- Modified the emission type logic in `hash_join.rs` to ensure that `Right` and `RightAnti` joins are classified as `Incremental`, allowing for immediate row emission when applicable.

These changes improve the documentation and functionality of join operations within the DataFusion project.
- Updated the emission type determination in `SortMergeJoinExec` and `SymmetricHashJoinExec` to utilize the `emission_type_from_children` function, enhancing the accuracy of emission behavior based on input characteristics.
- Clarified comments in `sort.rs` regarding the conditions under which results are emitted, emphasizing the relationship between input sorting and emission type.
- These changes improve the clarity and functionality of the execution plans within the DataFusion project, ensuring more robust handling of emission types.
- Updated the `emission_type_from_children` function to accept an iterator instead of a slice, enhancing flexibility in how child execution plans are passed.
- Modified the `SymmetricHashJoinExec` implementation to utilize the new function signature, improving code clarity and maintainability.

These changes streamline the emission type determination process within the DataFusion project, contributing to a more robust execution plan framework.
- Introduced `boundedness` and `pipeline_behavior` methods to the `ExecutionPlanProperties` trait, improving the handling of execution plan characteristics.
- Updated the `CsvExec`, `SortExec`, and related implementations to utilize the new methods for determining boundedness and emission behavior.
- Refactored the `ensure_distribution` function to use the new boundedness logic, enhancing clarity in distribution decisions.
- These changes contribute to a more robust and maintainable execution plan framework within the DataFusion project.
…dling

- Updated multiple execution plan implementations to incorporate `Boundedness` and `EmissionType`, improving the clarity and functionality of execution plans.
- Replaced instances of `unimplemented!()` with appropriate emission types, ensuring that plans correctly define their output behavior.
- Refactored the `PlanProperties` structure to utilize the new boundedness logic, enhancing decision-making in execution plans.
- These changes contribute to a more robust and maintainable execution plan framework within the DataFusion project.
- Updated the condition for checking memory requirements in execution plans from `has_finite_memory()` to `boundedness().requires_finite_memory()`, improving clarity in memory management.
- This change enhances the robustness of execution plans within the DataFusion project by ensuring more accurate assessments of memory constraints.
- Updated conditions for checking boundedness in various execution plans to use `is_unbounded()` instead of `requires_finite_memory()`, enhancing clarity in memory management.
- Adjusted the `PlanProperties` structure to reflect these changes, ensuring more accurate assessments of memory constraints across the DataFusion project.
- These modifications contribute to a more robust and maintainable execution plan framework, improving the handling of boundedness in execution strategies.
…Exec` implementation

- Eliminated the outdated comment suggesting a switch to unbounded execution with finite memory, streamlining the code and improving clarity.
- This change contributes to a cleaner and more maintainable codebase within the DataFusion project.
- Updated the `is_pipeline_breaking` method to use `requires_finite_memory()` for improved clarity in determining pipeline behavior.
- Enhanced the `Boundedness` enum to include detailed documentation on memory requirements for unbounded streams.
- Refactored `compute_properties` methods in `GlobalLimitExec` and `LocalLimitExec` to directly use the input's boundedness, simplifying the logic.
- Adjusted emission type determination in `NestedLoopJoinExec` to utilize the `emission_type_from_children` function, ensuring accurate output behavior based on input characteristics.

These changes contribute to a more robust and maintainable execution plan framework within the DataFusion project, improving clarity and functionality in handling boundedness and emission types.
- Removed the `OptionalEmissionType` struct from `plan_properties.rs`, simplifying the codebase.
- Updated the `is_pipeline_breaking` function in `execution_plan.rs` for improved readability by formatting the condition across multiple lines.
- Adjusted the `GlobalLimitExec` implementation in `limit.rs` to directly use the input's boundedness, enhancing clarity in memory management.

These changes contribute to a more streamlined and maintainable execution plan framework within the DataFusion project, improving the handling of emission types and boundedness.
…ndling

- Updated the `compute_properties` methods in both `GlobalLimitExec` and `LocalLimitExec` to replace `EmissionType::Final` with `Boundedness::Bounded`, reflecting that limit operations always produce a finite number of rows.
- Changed the input's boundedness reference to `pipeline_behavior()` for improved clarity in execution plan properties.

These changes contribute to a more streamlined and maintainable execution plan framework within the DataFusion project, enhancing the handling of boundedness in limit operations.
@github-actions github-actions bot added physical-expr Physical Expressions optimizer Optimizer rules core Core DataFusion crate labels Dec 18, 2024
@github-actions github-actions bot added the common Related to common crate label Dec 18, 2024
@berkaysynnada berkaysynnada added the api change Changes the API exposed to users of the crate label Dec 18, 2024
Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both myself and @berkaysynnada collaborated with @jayzhan-synnada extensively on this and reviewed this API change carefully. I am quite happy with this improvement as DataFusion now exposes much more information on the nature of output streams of operators, leaving more room for downstream projects for optimizations and innovative uses.

Given that it is an API change, I think it would be great if @alamb and other interested community members can take a look and review if possible.

With this PR, we will have the following future work:

  • There are some minor TODOs which will enable DataFusion to expose emission and boundedness information less conservatively for certain operators when suitable conditions exist.
  • The API implicitly assumes that all output streams of an operator have the same emission and boundedness characteristics (which is the vast majority of the practical use cases). In the future, if we discover that we need to expose this information in a per-partition manner, we can extend the design to accommodate that.

Bounded,
Unbounded,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was the purpose of FFI to provide a stable ABI? cc @timsaucer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so too -- Maybe we can leave the FFI_ExecutionMode present and mark it deprecated or something 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the idea is to be stable. I don't think the real problem here is the change from FFI_ExecutionMode to FFI_Boundness but rather the way it breaks libraries working across the boundary that will expect one and receive the other. I think this current impact is minimal since the FFI interfaces have just been released and I don't think many people have had an opportunity to use them. However I will open an issue about adding a feature to validate the versioning across the boundary. I think that will be necessary going forward.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -28,21 +28,26 @@ use crate::{DataFusionError, Result};
/// Join type
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
pub enum JoinType {
/// Inner Join
/// Inner Join - Returns rows where there is a match in both tables.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we want to get precise, this should also say that all matches are returned (n-to-m)

Comment on lines 39 to 40
/// Full Join - Returns all rows when there is a match in either table.
/// Rows without a match in one table will have NULL values for columns from that table.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a match in either table

"a match" implies a matching row in left and right tables.

also see above, this should also imply cardinality

maybe

/// Full Join - returns all matches between left and right table, 
/// plus left table rows without a match (with NULL values for right table columns)
/// plus right table rows without a match (with NULL values for left table columns)

@findepi
Copy link
Member

findepi commented Dec 18, 2024

Thanks for expanding docstrings in JoinType.
Can you please move them to separate PR? This would help with review and merging.

@ozankabak
Copy link
Contributor

ozankabak commented Dec 18, 2024

Thanks for expanding docstrings in JoinType. Can you please move them to separate PR? This would help with review and merging.

Makes sense, we can merge the docstrings quickly ahead of the main PR while it is under review and reduce the diff

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @jayzhan-synnada and @findepi for the reivew.

I think this PR makes a lot of sense and is well documented, reasoned, and a nice improvement.

I have two concerns about this PR:

  1. The potentially breaking changes to FFI (I am sure we can find some way around this, but I am still new to the area)
  2. The accumulation of API changes we already have on main before Release DataFusion 44.0.0 #13334

So maybe we can figure out the FFI stuff, and then wait to merge this PR until 44 is relased. I will try hard to focus on the 44 release later this week

Bounded,
Unbounded,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so too -- Maybe we can leave the FFI_ExecutionMode present and mark it deprecated or something 🤔

/// In this case the stream will eventually return `None` to indicate that
/// there are no more records to process.
/// For unbounded streams, it also tracks whether the operator requires finite memory
/// to process the stream or if memory usage could grow unbounded.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we clarify in these comments how Boundedness is related to the input stream?

For example, if the the input stream is unbounded, does that imply the output is also unbounded?

Maybe some examples would help to clarify this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Limit and TopK can be given as examples

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we can expand the comments. Basically, boundedness of the output stream is a function of the boundedness of the input stream and the nature of the operator. For example, a simple SISO operator that transforms it data preserves the boundedness of its input, while an operator with fetch can have an unbounded input stream but a bounded output stream.

/// The data stream is unbounded (infinite) and could run forever
Unbounded {
/// Whether this operator requires infinite memory to process the unbounded stream.
/// If false, the operator can process an infinite stream with bounded memory.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can give an example of each type?

Like is it the case that

Unbounded {
  requires_infinite_memory: true,
}

Would describe a min/max aggregate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

min/max over unordered keys are requires_infinite_memory: true by default, but if the key cardinality is finite, then it is false. That could be added as an example.

Copy link
Contributor

@ozankabak ozankabak Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, we will add some examples. In your case, MEDIAN would be an example (but we can do MIN/MAX with bounded memory when sorted).

}
/// Represents how an operator emits its output records.
///
/// This is used to determine whether an operator emits records incrementally as they arrive,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest documenting when someone would expect incremental output -- I think it is basically when the operator can emit a RecordBatches of batch_size rows

Specifically I think it would help to say something like "EmissionType::Incremental generates output incrementally but the operator may still buffer data internally until batch_size records arrive

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you have the right idea. We will add some comments to clarify.

@ozankabak
Copy link
Contributor

So maybe we can figure out the FFI stuff, and then wait to merge this PR until 44 is relased. I will try hard to focus on the 44 release later this week

Sure, it makes sense to wait and only proceed with merging after all the PRs that are slated for 44 get merged before this.

One alternative to consider would be making 44 a major release with all the API changes (including this) so that we can proceed with a few "stable" releases after 44. It is also not a great dev-ex to deal with some API change in every single release, albeit small.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @jayzhan-synnada and @ozankabak for this PR

From my perspective this PR is fine to merge, and maybe since the FFI feature is so new as @ozankabak pointed out it would make sense to not delay this PR until after the release to get it in.

I don't have a strong preference, though hope to have dug out of my backlog later today and help to get a release candidate in shape over the next few days

- Improved descriptions for the Inner and Full join types in join_type.rs to clarify their behavior and examples.
- Added explanations regarding the boundedness of output streams and memory requirements in execution_plan.rs, including specific examples for operators like Median and Min/Max.
@jayzhan-synnada
Copy link
Contributor Author

If we can merge this soon, I think we don't need to separate Join doc, just get in together. Thanks all for the review!

@ozankabak
Copy link
Contributor

Great, then let's wait for a few hours in case any other feedback arrives and then merge this. This way, we will avoid API churn on FFI between DataFusion 44 and 45.

@ozankabak ozankabak merged commit f3b1141 into apache:main Dec 20, 2024
24 checks passed
@alamb
Copy link
Contributor

alamb commented Dec 20, 2024

🚀

@alamb
Copy link
Contributor

alamb commented Dec 20, 2024

One alternative to consider would be making 44 a major release with all the API changes (including this) so that we can proceed with a few "stable" releases after 44. It is also not a great dev-ex to deal with some API change in every single release, albeit small.

Indeed -- it may be time to consider making the API more stable. Here is a related discussion:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate common Related to common crate core Core DataFusion crate optimizer Optimizer rules physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants