From 721974456742bf44bdae291b3114bc23fe478bcd Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 9 May 2024 12:57:14 -0400 Subject: [PATCH] Minor: Simplify + document `EliminateCrossJoin` better (#10427) --- datafusion/expr/src/logical_plan/builder.rs | 4 +- datafusion/expr/src/utils.rs | 10 ++--- .../optimizer/src/eliminate_cross_join.rs | 43 +++++++++++++------ .../src/extract_equijoin_predicate.rs | 14 +++--- 4 files changed, 41 insertions(+), 30 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 7b1e4498010b..3f15b84784f1 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1085,8 +1085,8 @@ impl LogicalPlanBuilder { find_valid_equijoin_key_pair( &normalized_left_key, &normalized_right_key, - self.plan.schema().clone(), - right.schema().clone(), + self.plan.schema(), + right.schema(), )?.ok_or_else(|| plan_datafusion_err!( "can't create join plan, join key should belong to one input, error key: ({normalized_left_key},{normalized_right_key})" diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 4282952a1efc..0c1084674d8e 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -885,7 +885,7 @@ pub fn can_hash(data_type: &DataType) -> bool { /// Check whether all columns are from the schema. pub fn check_all_columns_from_schema( columns: &HashSet, - schema: DFSchemaRef, + schema: &DFSchema, ) -> Result { for col in columns.iter() { let exist = schema.is_column_from_schema(col); @@ -909,8 +909,8 @@ pub fn check_all_columns_from_schema( pub fn find_valid_equijoin_key_pair( left_key: &Expr, right_key: &Expr, - left_schema: DFSchemaRef, - right_schema: DFSchemaRef, + left_schema: &DFSchema, + right_schema: &DFSchema, ) -> Result> { let left_using_columns = left_key.to_columns()?; let right_using_columns = right_key.to_columns()?; @@ -920,8 +920,8 @@ pub fn find_valid_equijoin_key_pair( return Ok(None); } - if check_all_columns_from_schema(&left_using_columns, left_schema.clone())? - && check_all_columns_from_schema(&right_using_columns, right_schema.clone())? + if check_all_columns_from_schema(&left_using_columns, left_schema)? + && check_all_columns_from_schema(&right_using_columns, right_schema)? { return Ok(Some((left_key.clone(), right_key.clone()))); } else if check_all_columns_from_schema(&right_using_columns, left_schema)? diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs index ae6c1b339d5f..a807ee5ff2c5 100644 --- a/datafusion/optimizer/src/eliminate_cross_join.rs +++ b/datafusion/optimizer/src/eliminate_cross_join.rs @@ -107,7 +107,7 @@ impl OptimizerRule for EliminateCrossJoin { left = find_inner_join( &left, &mut all_inputs, - &mut possible_join_keys, + &possible_join_keys, &mut all_join_keys, )?; } @@ -144,7 +144,9 @@ impl OptimizerRule for EliminateCrossJoin { } } -/// Recursively accumulate possible_join_keys and inputs from inner joins (including cross joins). +/// Recursively accumulate possible_join_keys and inputs from inner joins +/// (including cross joins). +/// /// Returns a boolean indicating whether the flattening was successful. fn try_flatten_join_inputs( plan: &LogicalPlan, @@ -159,14 +161,10 @@ fn try_flatten_join_inputs( return Ok(false); } possible_join_keys.extend(join.on.clone()); - let left = &*(join.left); - let right = &*(join.right); - vec![left, right] + vec![&join.left, &join.right] } LogicalPlan::CrossJoin(join) => { - let left = &*(join.left); - let right = &*(join.right); - vec![left, right] + vec![&join.left, &join.right] } _ => { return plan_err!("flatten_join_inputs just can call join/cross_join"); @@ -174,7 +172,8 @@ fn try_flatten_join_inputs( }; for child in children.iter() { - match *child { + let child = child.as_ref(); + match child { LogicalPlan::Join(Join { join_type: JoinType::Inner, .. @@ -184,27 +183,39 @@ fn try_flatten_join_inputs( return Ok(false); } } - _ => all_inputs.push((*child).clone()), + _ => all_inputs.push(child.clone()), } } Ok(true) } +/// Finds the next to join with the left input plan, +/// +/// Finds the next `right` from `rights` that can be joined with `left_input` +/// plan based on the join keys in `possible_join_keys`. +/// +/// If such a matching `right` is found: +/// 1. Adds the matching join keys to `all_join_keys`. +/// 2. Returns `left_input JOIN right ON (all join keys)`. +/// +/// If no matching `right` is found: +/// 1. Removes the first plan from `rights` +/// 2. Returns `left_input CROSS JOIN right`. fn find_inner_join( left_input: &LogicalPlan, rights: &mut Vec, - possible_join_keys: &mut Vec<(Expr, Expr)>, + possible_join_keys: &[(Expr, Expr)], all_join_keys: &mut HashSet<(Expr, Expr)>, ) -> Result { for (i, right_input) in rights.iter().enumerate() { let mut join_keys = vec![]; - for (l, r) in &mut *possible_join_keys { + for (l, r) in possible_join_keys.iter() { let key_pair = find_valid_equijoin_key_pair( l, r, - left_input.schema().clone(), - right_input.schema().clone(), + left_input.schema(), + right_input.schema(), )?; // Save join keys @@ -215,6 +226,7 @@ fn find_inner_join( } } + // Found one or more matching join keys if !join_keys.is_empty() { all_join_keys.extend(join_keys.clone()); let right_input = rights.remove(i); @@ -236,6 +248,9 @@ fn find_inner_join( })); } } + + // no matching right plan had any join keys, cross join with the first right + // plan let right = rights.remove(0); let join_schema = Arc::new(build_join_schema( left_input.schema(), diff --git a/datafusion/optimizer/src/extract_equijoin_predicate.rs b/datafusion/optimizer/src/extract_equijoin_predicate.rs index c47a86974cd2..237c00352419 100644 --- a/datafusion/optimizer/src/extract_equijoin_predicate.rs +++ b/datafusion/optimizer/src/extract_equijoin_predicate.rs @@ -24,7 +24,6 @@ use datafusion_common::{internal_err, DFSchema}; use datafusion_expr::utils::split_conjunction_owned; use datafusion_expr::utils::{can_hash, find_valid_equijoin_key_pair}; use datafusion_expr::{BinaryExpr, Expr, ExprSchemable, Join, LogicalPlan, Operator}; -use std::sync::Arc; // equijoin predicate type EquijoinPredicate = (Expr, Expr); @@ -122,8 +121,8 @@ impl OptimizerRule for ExtractEquijoinPredicate { fn split_eq_and_noneq_join_predicate( filter: Expr, - left_schema: &Arc, - right_schema: &Arc, + left_schema: &DFSchema, + right_schema: &DFSchema, ) -> Result<(Vec, Option)> { let exprs = split_conjunction_owned(filter); @@ -136,12 +135,8 @@ fn split_eq_and_noneq_join_predicate( op: Operator::Eq, ref right, }) => { - let join_key_pair = find_valid_equijoin_key_pair( - left, - right, - left_schema.clone(), - right_schema.clone(), - )?; + let join_key_pair = + find_valid_equijoin_key_pair(left, right, left_schema, right_schema)?; if let Some((left_expr, right_expr)) = join_key_pair { let left_expr_type = left_expr.get_type(left_schema)?; @@ -172,6 +167,7 @@ mod tests { use datafusion_expr::{ col, lit, logical_plan::builder::LogicalPlanBuilder, JoinType, }; + use std::sync::Arc; fn assert_plan_eq(plan: LogicalPlan, expected: &str) -> Result<()> { assert_optimized_plan_eq_display_indent(