From 178bc511d74d5703dd3a04bfb44da53aad74c37d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 7 Jan 2025 14:53:26 -0500 Subject: [PATCH 1/7] Move collapse_lex_ordering to Lexordering::collapse --- .../physical-expr-common/src/sort_expr.rs | 16 ++++++++++++++++ .../physical-expr/src/equivalence/ordering.rs | 18 +++--------------- .../src/equivalence/properties.rs | 3 +-- 3 files changed, 20 insertions(+), 17 deletions(-) diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index 0d7501610662..9729ff2c33e4 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -409,6 +409,22 @@ impl LexOrdering { .map(PhysicalSortExpr::from) .collect() } + + /// Collapse a `LexOrdering` into a new duplicate-free `LexOrdering` based on expression. + /// + /// This function filters duplicate entries that have same physical + /// expression inside, ignoring [`SortOptions`]. For example: + /// + /// `vec![a ASC, a DESC]` collapses to `vec![a ASC]`. + pub fn collapse(self) -> Self { + let mut output = LexOrdering::default(); + for item in self.iter() { + if !output.iter().any(|req| req.expr.eq(&item.expr)) { + output.push(item.clone()); + } + } + output + } } impl From> for LexOrdering { diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs index 24e2fc7dbaf5..69a785fb2fe8 100644 --- a/datafusion/physical-expr/src/equivalence/ordering.rs +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -146,8 +146,9 @@ impl OrderingEquivalenceClass { /// Returns the concatenation of all the orderings. This enables merge /// operations to preserve all equivalent orderings simultaneously. pub fn output_ordering(&self) -> Option { - let output_ordering = self.orderings.iter().flatten().cloned().collect(); - let output_ordering = collapse_lex_ordering(output_ordering); + let output_ordering: LexOrdering = + self.orderings.iter().flatten().cloned().collect(); + let output_ordering = output_ordering.collapse(); (!output_ordering.is_empty()).then_some(output_ordering) } @@ -207,19 +208,6 @@ impl IntoIterator for OrderingEquivalenceClass { } } -/// This function constructs a duplicate-free `LexOrdering` by filtering out -/// duplicate entries that have same physical expression inside. For example, -/// `vec![a ASC, a DESC]` collapses to `vec![a ASC]`. -pub fn collapse_lex_ordering(input: LexOrdering) -> LexOrdering { - let mut output = LexOrdering::default(); - for item in input.iter() { - if !output.iter().any(|req| req.expr.eq(&item.expr)) { - output.push(item.clone()); - } - } - output -} - /// Trims `orderings[idx]` if some suffix of it overlaps with a prefix of /// `orderings[pre_idx]`. Returns `true` if there is any overlap, `false` otherwise. fn resolve_overlap(orderings: &mut [LexOrdering], idx: usize, pre_idx: usize) -> bool { diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index c3d458103285..b80dd56ff30b 100755 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -22,7 +22,6 @@ use std::slice::Iter; use std::sync::Arc; use std::{fmt, mem}; -use super::ordering::collapse_lex_ordering; use crate::equivalence::class::{const_exprs_contains, AcrossPartitions}; use crate::equivalence::{ collapse_lex_req, EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, @@ -911,7 +910,7 @@ impl EquivalenceProperties { // Simplify each ordering by removing redundant sections: orderings .chain(projected_orderings) - .map(collapse_lex_ordering) + .map(|lex_ordering| lex_ordering.collapse()) .collect() } From 15558f5599acad11fb2825b2e71b8bdf9fa238a9 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 7 Jan 2025 15:00:18 -0500 Subject: [PATCH 2/7] reduce diff --- datafusion/physical-expr/src/equivalence/ordering.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs index 69a785fb2fe8..084a5963181d 100644 --- a/datafusion/physical-expr/src/equivalence/ordering.rs +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -146,8 +146,7 @@ impl OrderingEquivalenceClass { /// Returns the concatenation of all the orderings. This enables merge /// operations to preserve all equivalent orderings simultaneously. pub fn output_ordering(&self) -> Option { - let output_ordering: LexOrdering = - self.orderings.iter().flatten().cloned().collect(); + let output_ordering = self.orderings.iter().flatten().cloned().collect(); let output_ordering = output_ordering.collapse(); (!output_ordering.is_empty()).then_some(output_ordering) } From 1d2c33970b8ac15a531782ee1f584174066ed940 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 7 Jan 2025 15:03:26 -0500 Subject: [PATCH 3/7] avoid clone, cleanup --- datafusion/physical-expr-common/src/sort_expr.rs | 4 ++-- datafusion/physical-expr/src/equivalence/ordering.rs | 9 +++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index 9729ff2c33e4..35c9b63a536e 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -418,9 +418,9 @@ impl LexOrdering { /// `vec![a ASC, a DESC]` collapses to `vec![a ASC]`. pub fn collapse(self) -> Self { let mut output = LexOrdering::default(); - for item in self.iter() { + for item in self { if !output.iter().any(|req| req.expr.eq(&item.expr)) { - output.push(item.clone()); + output.push(item); } } output diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs index 084a5963181d..09e7d3089353 100644 --- a/datafusion/physical-expr/src/equivalence/ordering.rs +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -146,8 +146,13 @@ impl OrderingEquivalenceClass { /// Returns the concatenation of all the orderings. This enables merge /// operations to preserve all equivalent orderings simultaneously. pub fn output_ordering(&self) -> Option { - let output_ordering = self.orderings.iter().flatten().cloned().collect(); - let output_ordering = output_ordering.collapse(); + let output_ordering = self + .orderings + .iter() + .flatten() + .cloned() + .collect::() + .collapse(); (!output_ordering.is_empty()).then_some(output_ordering) } From d2fe8a4195d8386bcf73d26dbbdaa9f995ba794d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 7 Jan 2025 15:09:07 -0500 Subject: [PATCH 4/7] Introduce LexRequirement::collapse --- .../physical-expr-common/src/sort_expr.rs | 19 +++++++++++++++++++ .../physical-expr/src/equivalence/class.rs | 7 ++++--- .../physical-expr/src/equivalence/mod.rs | 15 ++++++--------- .../src/equivalence/properties.rs | 18 +++++++----------- .../physical-plan/src/aggregates/mod.rs | 9 ++++----- datafusion/physical-plan/src/windows/mod.rs | 5 ++--- 6 files changed, 42 insertions(+), 31 deletions(-) diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index 35c9b63a536e..051c3be2f0b2 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -556,6 +556,25 @@ impl LexRequirement { .collect(), ) } + + /// Constructs a duplicate-free `LexOrderingReq` by filtering out + /// duplicate entries that have same physical expression inside. + /// + /// For example, `vec![a Some(ASC), a Some(DESC)]` collapses to `vec![a + /// Some(ASC)]`. + /// + /// It will also filter out entries that are ordered if the next entry is; + /// for instance, `vec![floor(a) Some(ASC), a Some(ASC)]` will be collapsed to + /// `vec![a Some(ASC)]`. + pub fn collapse(self) -> Self { + let mut output = Vec::::new(); + for item in self { + if !output.iter().any(|req| req.expr.eq(&item.expr)) { + output.push(item); + } + } + LexRequirement::new(output) + } } impl From for LexRequirement { diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 9e535a94eb6e..495cf211efe7 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use super::{add_offset_to_expr, collapse_lex_req, ProjectionMapping}; +use super::{add_offset_to_expr, ProjectionMapping}; use crate::{ expressions::Column, LexOrdering, LexRequirement, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, PhysicalSortRequirement, @@ -527,12 +527,13 @@ impl EquivalenceGroup { &self, sort_reqs: &LexRequirement, ) -> LexRequirement { - collapse_lex_req(LexRequirement::new( + LexRequirement::new( sort_reqs .iter() .map(|sort_req| self.normalize_sort_requirement(sort_req.clone())) .collect(), - )) + ) + .collapse() } /// Projects `expr` according to the given projection mapping. diff --git a/datafusion/physical-expr/src/equivalence/mod.rs b/datafusion/physical-expr/src/equivalence/mod.rs index d4c14f7bc8ff..60e508dd937a 100644 --- a/datafusion/physical-expr/src/equivalence/mod.rs +++ b/datafusion/physical-expr/src/equivalence/mod.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use crate::expressions::Column; -use crate::{LexRequirement, PhysicalExpr, PhysicalSortRequirement}; +use crate::{LexRequirement, PhysicalExpr}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; @@ -41,14 +41,9 @@ pub use properties::{ /// It will also filter out entries that are ordered if the next entry is; /// for instance, `vec![floor(a) Some(ASC), a Some(ASC)]` will be collapsed to /// `vec![a Some(ASC)]`. +#[deprecated(since = "45.0.0", note = "Use LexRequirement::collapse")] pub fn collapse_lex_req(input: LexRequirement) -> LexRequirement { - let mut output = Vec::::new(); - for item in input { - if !output.iter().any(|req| req.expr.eq(&item.expr)) { - output.push(item); - } - } - LexRequirement::new(output) + input.collapse() } /// Adds the `offset` value to `Column` indices inside `expr`. This function is @@ -80,7 +75,9 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use arrow_schema::{SchemaRef, SortOptions}; use datafusion_common::{plan_datafusion_err, Result}; - use datafusion_physical_expr_common::sort_expr::LexOrdering; + use datafusion_physical_expr_common::sort_expr::{ + LexOrdering, PhysicalSortRequirement, + }; pub fn output_schema( mapping: &ProjectionMapping, diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index b80dd56ff30b..a256b4576e05 100755 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -24,8 +24,7 @@ use std::{fmt, mem}; use crate::equivalence::class::{const_exprs_contains, AcrossPartitions}; use crate::equivalence::{ - collapse_lex_req, EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, - ProjectionMapping, + EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping, }; use crate::expressions::{with_new_schema, CastExpr, Column, Literal}; use crate::{ @@ -500,15 +499,12 @@ impl EquivalenceProperties { ); let constants_normalized = self.eq_group.normalize_exprs(constant_exprs); // Prune redundant sections in the requirement: - collapse_lex_req( - normalized_sort_reqs - .iter() - .filter(|&order| { - !physical_exprs_contains(&constants_normalized, &order.expr) - }) - .cloned() - .collect(), - ) + normalized_sort_reqs + .iter() + .filter(|&order| !physical_exprs_contains(&constants_normalized, &order.expr)) + .cloned() + .collect::() + .collapse() } /// Checks whether the given ordering is satisfied by any of the existing diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index c04211d679ca..ef98be691c99 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -44,10 +44,9 @@ use datafusion_execution::TaskContext; use datafusion_expr::{Accumulator, Aggregate}; use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use datafusion_physical_expr::{ - equivalence::{collapse_lex_req, ProjectionMapping}, - expressions::Column, - physical_exprs_contains, EquivalenceProperties, LexOrdering, LexRequirement, - PhysicalExpr, PhysicalSortRequirement, + equivalence::ProjectionMapping, expressions::Column, physical_exprs_contains, + EquivalenceProperties, LexOrdering, LexRequirement, PhysicalExpr, + PhysicalSortRequirement, }; use itertools::Itertools; @@ -473,7 +472,7 @@ impl AggregateExec { &mode, )?; new_requirement.inner.extend(req); - new_requirement = collapse_lex_req(new_requirement); + new_requirement = new_requirement.collapse(); // If our aggregation has grouping sets then our base grouping exprs will // be expanded based on the flags in `group_by.groups` where for each diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 36c4b9f18da9..510cbc248b63 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -32,7 +32,6 @@ use datafusion_expr::{ PartitionEvaluator, ReversedUDWF, WindowFrame, WindowFunctionDefinition, WindowUDF, }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; -use datafusion_physical_expr::equivalence::collapse_lex_req; use datafusion_physical_expr::{ reverse_order_bys, window::{SlidingAggregateWindowExpr, StandardWindowFunctionExpr}, @@ -469,8 +468,8 @@ pub fn get_window_mode( { let req = LexRequirement::new( [partition_by_reqs.inner.clone(), order_by_reqs.inner].concat(), - ); - let req = collapse_lex_req(req); + ) + .collapse(); if partition_by_eqs.ordering_satisfy_requirement(&req) { // Window can be run with existing ordering let mode = if indices.len() == partitionby_exprs.len() { From 16ef66c17ca6cf173907db5f4782b7c6610eaf8e Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 7 Jan 2025 12:49:06 -0800 Subject: [PATCH 5/7] Minor changes --- datafusion/physical-expr-common/Cargo.toml | 1 + datafusion/physical-expr-common/src/sort_expr.rs | 15 ++++++++++++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-expr-common/Cargo.toml b/datafusion/physical-expr-common/Cargo.toml index 14d6ca64d15e..f397bece5c1a 100644 --- a/datafusion/physical-expr-common/Cargo.toml +++ b/datafusion/physical-expr-common/Cargo.toml @@ -42,3 +42,4 @@ datafusion-common = { workspace = true, default-features = true } datafusion-expr-common = { workspace = true } hashbrown = { workspace = true } itertools = { workspace = true } +indexmap = { workspace = true } diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index 051c3be2f0b2..0f9dadcd8f0d 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -30,7 +30,8 @@ use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; use datafusion_common::Result; use datafusion_expr_common::columnar_value::ColumnarValue; -use itertools::Itertools; +use itertools::{izip, Itertools}; +use indexmap::IndexSet; /// Represents Sort operation for a column in a RecordBatch /// @@ -568,11 +569,19 @@ impl LexRequirement { /// `vec![a Some(ASC)]`. pub fn collapse(self) -> Self { let mut output = Vec::::new(); + let mut exprs = IndexSet::new(); + let mut reqs = vec![]; for item in self { - if !output.iter().any(|req| req.expr.eq(&item.expr)) { - output.push(item); + let PhysicalSortRequirement { expr, options: req } = item; + // new insertion + if exprs.insert(expr) { + reqs.push(req); } } + debug_assert_eq!(reqs.len(), exprs.len()); + for (expr, req) in izip!(exprs, reqs) { + output.push(PhysicalSortRequirement::new(expr, req)); + } LexRequirement::new(output) } } From d059a0863d8a6ce29b42f5b4e6ba7296d0f8f6f4 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 7 Jan 2025 13:00:42 -0800 Subject: [PATCH 6/7] Minor changes --- datafusion-cli/Cargo.lock | 1 + datafusion/physical-expr-common/src/sort_expr.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index e8b6e99d2dc4..fb22752286fc 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1584,6 +1584,7 @@ dependencies = [ "datafusion-common", "datafusion-expr-common", "hashbrown 0.14.5", + "indexmap", "itertools 0.14.0", ] diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index 0f9dadcd8f0d..f5314dd533ee 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -30,8 +30,8 @@ use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; use datafusion_common::Result; use datafusion_expr_common::columnar_value::ColumnarValue; -use itertools::{izip, Itertools}; use indexmap::IndexSet; +use itertools::{izip, Itertools}; /// Represents Sort operation for a column in a RecordBatch /// From dc3785edb5b00a31ae2e3046f945bbc3289c4f6e Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 7 Jan 2025 13:01:31 -0800 Subject: [PATCH 7/7] fix formatting --- datafusion/physical-expr-common/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr-common/Cargo.toml b/datafusion/physical-expr-common/Cargo.toml index f397bece5c1a..bd5d1e223c4a 100644 --- a/datafusion/physical-expr-common/Cargo.toml +++ b/datafusion/physical-expr-common/Cargo.toml @@ -41,5 +41,5 @@ arrow = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr-common = { workspace = true } hashbrown = { workspace = true } -itertools = { workspace = true } indexmap = { workspace = true } +itertools = { workspace = true }