Skip to content

Commit

Permalink
Move TopKAggregation rule into physical-optimizer crate (#12334)
Browse files Browse the repository at this point in the history
* Move TopKAggregation rule into physical-optimizer crate

* cargo update for cli

* Update cli msrv

* Fix clippy lints
  • Loading branch information
lewiszlw authored Sep 6, 2024
1 parent a444528 commit dd04929
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 29 deletions.
50 changes: 32 additions & 18 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ pub mod replace_with_order_preserving_variants;
pub mod sanity_checker;
#[cfg(test)]
pub mod test_utils;
pub mod topk_aggregation;
pub mod update_aggr_exprs;

mod sort_pushdown;
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ rust-version = { workspace = true }
workspace = true

[dependencies]
arrow-schema = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-execution = { workspace = true }
datafusion-physical-expr = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ pub mod limit_pushdown;
pub mod limited_distinct_aggregation;
mod optimizer;
pub mod output_requirements;
pub mod topk_aggregation;

pub use optimizer::PhysicalOptimizerRule;
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
use std::sync::Arc;

use crate::physical_plan::aggregates::AggregateExec;
use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::ExecutionPlan;
use datafusion_physical_plan::aggregates::AggregateExec;
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::ExecutionPlan;

use arrow_schema::DataType;
use datafusion_common::config::ConfigOptions;
Expand All @@ -33,7 +33,7 @@ use datafusion_common::Result;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalSortExpr;

use datafusion_physical_optimizer::PhysicalOptimizerRule;
use crate::PhysicalOptimizerRule;
use itertools::Itertools;

/// An optimizer rule that passes a `limit` hint to aggregations if the whole result is not needed
Expand Down Expand Up @@ -76,7 +76,7 @@ impl TopKAggregation {
aggr.group_expr().clone(),
aggr.aggr_expr().to_vec(),
aggr.filter_expr().to_vec(),
aggr.input().clone(),
Arc::clone(aggr.input()),
aggr.input_schema(),
)
.expect("Unable to copy Aggregate!")
Expand Down Expand Up @@ -114,13 +114,13 @@ impl TopKAggregation {
}
} else {
// or we continue down whitelisted nodes of other types
if !is_cardinality_preserving(plan.clone()) {
if !is_cardinality_preserving(Arc::clone(&plan)) {
cardinality_preserved = false;
}
}
Ok(Transformed::no(plan))
};
let child = child.clone().transform_down(closure).data().ok()?;
let child = Arc::clone(child).transform_down(closure).data().ok()?;
let sort = SortExec::new(sort.expr().to_vec(), child)
.with_fetch(sort.fetch())
.with_preserve_partitioning(sort.preserve_partitioning());
Expand Down

0 comments on commit dd04929

Please sign in to comment.