Skip to content

Commit

Permalink
Move OutputRequirements to datafusion-physical-optimizer crate (apach…
Browse files Browse the repository at this point in the history
…e#11579)

* Move OutputRequirements to datafusion-physical-optimizer crate

* Fix fmt

* Fix cargo for cli
  • Loading branch information
xinlifoobar authored and Lordworms committed Jul 23, 2024
1 parent 496b1e0 commit 03f8236
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 17 deletions.
10 changes: 6 additions & 4 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
use std::fmt::Debug;
use std::sync::Arc;

use super::output_requirements::OutputRequirementExec;
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::utils::{
Expand Down Expand Up @@ -55,6 +54,7 @@ use datafusion_physical_expr::{
use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec};
use datafusion_physical_plan::ExecutionPlanProperties;

use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use itertools::izip;

Expand Down Expand Up @@ -1290,7 +1290,6 @@ pub(crate) mod tests {
use crate::datasource::object_store::ObjectStoreUrl;
use crate::datasource::physical_plan::{CsvExec, FileScanConfig, ParquetExec};
use crate::physical_optimizer::enforce_sorting::EnforceSorting;
use crate::physical_optimizer::output_requirements::OutputRequirements;
use crate::physical_optimizer::test_utils::{
check_integrity, coalesce_partitions_exec, repartition_exec,
};
Expand All @@ -1301,6 +1300,7 @@ pub(crate) mod tests {
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::{displayable, DisplayAs, DisplayFormatType, Statistics};
use datafusion_physical_optimizer::output_requirements::OutputRequirements;

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::ScalarValue;
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ pub mod enforce_sorting;
pub mod join_selection;
pub mod limited_distinct_aggregation;
pub mod optimizer;
pub mod output_requirements;
pub mod projection_pushdown;
pub mod pruning;
pub mod replace_with_order_preserving_variants;
Expand Down
2 changes: 2 additions & 0 deletions datafusion/physical-optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,6 @@ workspace = true

[dependencies]
datafusion-common = { workspace = true, default-features = true }
datafusion-execution = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
1 change: 1 addition & 0 deletions datafusion/physical-optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@
#![deny(clippy::clone_on_ref_ptr)]

mod optimizer;
pub mod output_requirements;

pub use optimizer::PhysicalOptimizerRule;
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,21 @@
use std::sync::Arc;

use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
use datafusion_execution::TaskContext;
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream,
};

use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{Result, Statistics};
use datafusion_physical_expr::{Distribution, LexRequirement, PhysicalSortRequirement};
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::{ExecutionPlanProperties, PlanProperties};

use crate::PhysicalOptimizerRule;

/// This rule either adds or removes [`OutputRequirements`]s to/from the physical
/// plan according to its `mode` attribute, which is set by the constructors
/// `new_add_mode` and `new_remove_mode`. With this rule, we can keep track of
Expand Down Expand Up @@ -86,15 +90,15 @@ enum RuleMode {
///
/// See [`OutputRequirements`] for more details
#[derive(Debug)]
pub(crate) struct OutputRequirementExec {
pub struct OutputRequirementExec {
input: Arc<dyn ExecutionPlan>,
order_requirement: Option<LexRequirement>,
dist_requirement: Distribution,
cache: PlanProperties,
}

impl OutputRequirementExec {
pub(crate) fn new(
pub fn new(
input: Arc<dyn ExecutionPlan>,
requirements: Option<LexRequirement>,
dist_requirement: Distribution,
Expand All @@ -108,8 +112,8 @@ impl OutputRequirementExec {
}
}

pub(crate) fn input(&self) -> Arc<dyn ExecutionPlan> {
self.input.clone()
pub fn input(&self) -> Arc<dyn ExecutionPlan> {
Arc::clone(&self.input)
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
Expand Down Expand Up @@ -179,8 +183,8 @@ impl ExecutionPlan for OutputRequirementExec {
fn execute(
&self,
_partition: usize,
_context: Arc<crate::execution::context::TaskContext>,
) -> Result<crate::physical_plan::SendableRecordBatchStream> {
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unreachable!();
}

Expand Down Expand Up @@ -275,7 +279,7 @@ fn require_top_ordering_helper(
// When an operator requires an ordering, any `SortExec` below can not
// be responsible for (i.e. the originator of) the global ordering.
let (new_child, is_changed) =
require_top_ordering_helper(children.swap_remove(0).clone())?;
require_top_ordering_helper(Arc::clone(children.swap_remove(0)))?;
Ok((plan.with_new_children(vec![new_child])?, is_changed))
} else {
// Stop searching, there is no global ordering desired for the query.
Expand Down

0 comments on commit 03f8236

Please sign in to comment.