From 910b3186df7d757e6b6d7c596e16e3f29aa0d395 Mon Sep 17 00:00:00 2001 From: Matthew Cramerus <8771538+suremarc@users.noreply.github.com> Date: Wed, 10 Jul 2024 08:14:28 -0500 Subject: [PATCH] fix: Fix eq properties regression from #10434 (#11363) * discover new orderings when constants are added * more comments * reduce nesting + describe argument * lint? --- .../src/equivalence/properties.rs | 182 +++++++++++------- 1 file changed, 112 insertions(+), 70 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index e3a2d1c753ca4..ae0af48913926 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -223,56 +223,11 @@ impl EquivalenceProperties { } } - // Discover new valid orderings in light of the new equality. For a discussion, see: - // https://github.com/apache/datafusion/issues/9812 - let mut new_orderings = vec![]; - for ordering in self.normalized_oeq_class().iter() { - let expressions = if left.eq(&ordering[0].expr) { - // Left expression is leading ordering - Some((ordering[0].options, right)) - } else if right.eq(&ordering[0].expr) { - // Right expression is leading ordering - Some((ordering[0].options, left)) - } else { - None - }; - if let Some((leading_ordering, other_expr)) = expressions { - // Currently, we only handle expressions with a single child. - // TODO: It should be possible to handle expressions orderings like - // f(a, b, c), a, b, c if f is monotonic in all arguments. - // First expression after leading ordering - if let Some(next_expr) = ordering.get(1) { - let children = other_expr.children(); - if children.len() == 1 - && children[0].eq(&next_expr.expr) - && SortProperties::Ordered(leading_ordering) - == other_expr - .get_properties(&[ExprProperties { - sort_properties: SortProperties::Ordered( - leading_ordering, - ), - range: Interval::make_unbounded( - &other_expr.data_type(&self.schema)?, - )?, - }])? - .sort_properties - { - // Assume existing ordering is [a ASC, b ASC] - // When equality a = f(b) is given, If we know that given ordering `[b ASC]`, ordering `[f(b) ASC]` is valid, - // then we can deduce that ordering `[b ASC]` is also valid. - // Hence, ordering `[b ASC]` can be added to the state as valid ordering. - // (e.g. existing ordering where leading ordering is removed) - new_orderings.push(ordering[1..].to_vec()); - } - } - } - } - if !new_orderings.is_empty() { - self.oeq_class.add_new_orderings(new_orderings); - } - // Add equal expressions to the state self.eq_group.add_equal_conditions(left, right); + + // Discover any new orderings + self.discover_new_orderings(left)?; Ok(()) } @@ -304,9 +259,78 @@ impl EquivalenceProperties { self.constants.push(const_expr); } } + + for ordering in self.normalized_oeq_class().iter() { + if let Err(e) = self.discover_new_orderings(&ordering[0].expr) { + log::debug!("error discovering new orderings: {e}"); + } + } + self } + // Discover new valid orderings in light of a new equality. + // Accepts a single argument (`expr`) which is used to determine + // which orderings should be updated. + // When constants or equivalence classes are changed, there may be new orderings + // that can be discovered with the new equivalence properties. + // For a discussion, see: https://github.com/apache/datafusion/issues/9812 + fn discover_new_orderings(&mut self, expr: &Arc) -> Result<()> { + let normalized_expr = self.eq_group().normalize_expr(Arc::clone(expr)); + let eq_class = self + .eq_group + .classes + .iter() + .find_map(|class| { + class + .contains(&normalized_expr) + .then(|| class.clone().into_vec()) + }) + .unwrap_or_else(|| vec![Arc::clone(&normalized_expr)]); + + let mut new_orderings: Vec = vec![]; + for (ordering, next_expr) in self + .normalized_oeq_class() + .iter() + .filter(|ordering| ordering[0].expr.eq(&normalized_expr)) + // First expression after leading ordering + .filter_map(|ordering| Some(ordering).zip(ordering.get(1))) + { + let leading_ordering = ordering[0].options; + // Currently, we only handle expressions with a single child. + // TODO: It should be possible to handle expressions orderings like + // f(a, b, c), a, b, c if f is monotonic in all arguments. + for equivalent_expr in &eq_class { + let children = equivalent_expr.children(); + if children.len() == 1 + && children[0].eq(&next_expr.expr) + && SortProperties::Ordered(leading_ordering) + == equivalent_expr + .get_properties(&[ExprProperties { + sort_properties: SortProperties::Ordered( + leading_ordering, + ), + range: Interval::make_unbounded( + &equivalent_expr.data_type(&self.schema)?, + )?, + }])? + .sort_properties + { + // Assume existing ordering is [a ASC, b ASC] + // When equality a = f(b) is given, If we know that given ordering `[b ASC]`, ordering `[f(b) ASC]` is valid, + // then we can deduce that ordering `[b ASC]` is also valid. + // Hence, ordering `[b ASC]` can be added to the state as valid ordering. + // (e.g. existing ordering where leading ordering is removed) + new_orderings.push(ordering[1..].to_vec()); + break; + } + } + } + + self.oeq_class.add_new_orderings(new_orderings); + Ok(()) + } + /// Updates the ordering equivalence group within assuming that the table /// is re-sorted according to the argument `sort_exprs`. Note that constants /// and equivalence classes are unchanged as they are unaffected by a re-sort. @@ -2441,30 +2465,48 @@ mod tests { ]; for case in cases { - let mut properties = base_properties - .clone() - .add_constants(case.constants.into_iter().map(ConstExpr::new)); - for [left, right] in &case.equal_conditions { - properties.add_equal_conditions(left, right)? - } - - let sort = case - .sort_columns - .iter() - .map(|&name| { - col(name, &schema).map(|col| PhysicalSortExpr { - expr: col, - options: SortOptions::default(), + // Construct the equivalence properties in different orders + // to exercise different code paths + // (The resulting properties _should_ be the same) + for properties in [ + // Equal conditions before constants + { + let mut properties = base_properties.clone(); + for [left, right] in &case.equal_conditions { + properties.add_equal_conditions(left, right)? + } + properties + .add_constants(case.constants.iter().cloned().map(ConstExpr::new)) + }, + // Constants before equal conditions + { + let mut properties = base_properties.clone().add_constants( + case.constants.iter().cloned().map(ConstExpr::new), + ); + for [left, right] in &case.equal_conditions { + properties.add_equal_conditions(left, right)? + } + properties + }, + ] { + let sort = case + .sort_columns + .iter() + .map(|&name| { + col(name, &schema).map(|col| PhysicalSortExpr { + expr: col, + options: SortOptions::default(), + }) }) - }) - .collect::>>()?; + .collect::>>()?; - assert_eq!( - properties.ordering_satisfy(&sort), - case.should_satisfy_ordering, - "failed test '{}'", - case.name - ); + assert_eq!( + properties.ordering_satisfy(&sort), + case.should_satisfy_ordering, + "failed test '{}'", + case.name + ); + } } Ok(())