Skip to content

Commit

Permalink
Minor: Simplify + document EliminateCrossJoin better (#10427)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored May 9, 2024
1 parent 2a15614 commit 7219744
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 30 deletions.
4 changes: 2 additions & 2 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1085,8 +1085,8 @@ impl LogicalPlanBuilder {
find_valid_equijoin_key_pair(
&normalized_left_key,
&normalized_right_key,
self.plan.schema().clone(),
right.schema().clone(),
self.plan.schema(),
right.schema(),
)?.ok_or_else(||
plan_datafusion_err!(
"can't create join plan, join key should belong to one input, error key: ({normalized_left_key},{normalized_right_key})"
Expand Down
10 changes: 5 additions & 5 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ pub fn can_hash(data_type: &DataType) -> bool {
/// Check whether all columns are from the schema.
pub fn check_all_columns_from_schema(
columns: &HashSet<Column>,
schema: DFSchemaRef,
schema: &DFSchema,
) -> Result<bool> {
for col in columns.iter() {
let exist = schema.is_column_from_schema(col);
Expand All @@ -909,8 +909,8 @@ pub fn check_all_columns_from_schema(
pub fn find_valid_equijoin_key_pair(
left_key: &Expr,
right_key: &Expr,
left_schema: DFSchemaRef,
right_schema: DFSchemaRef,
left_schema: &DFSchema,
right_schema: &DFSchema,
) -> Result<Option<(Expr, Expr)>> {
let left_using_columns = left_key.to_columns()?;
let right_using_columns = right_key.to_columns()?;
Expand All @@ -920,8 +920,8 @@ pub fn find_valid_equijoin_key_pair(
return Ok(None);
}

if check_all_columns_from_schema(&left_using_columns, left_schema.clone())?
&& check_all_columns_from_schema(&right_using_columns, right_schema.clone())?
if check_all_columns_from_schema(&left_using_columns, left_schema)?
&& check_all_columns_from_schema(&right_using_columns, right_schema)?
{
return Ok(Some((left_key.clone(), right_key.clone())));
} else if check_all_columns_from_schema(&right_using_columns, left_schema)?
Expand Down
43 changes: 29 additions & 14 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl OptimizerRule for EliminateCrossJoin {
left = find_inner_join(
&left,
&mut all_inputs,
&mut possible_join_keys,
&possible_join_keys,
&mut all_join_keys,
)?;
}
Expand Down Expand Up @@ -144,7 +144,9 @@ impl OptimizerRule for EliminateCrossJoin {
}
}

/// Recursively accumulate possible_join_keys and inputs from inner joins (including cross joins).
/// Recursively accumulate possible_join_keys and inputs from inner joins
/// (including cross joins).
///
/// Returns a boolean indicating whether the flattening was successful.
fn try_flatten_join_inputs(
plan: &LogicalPlan,
Expand All @@ -159,22 +161,19 @@ fn try_flatten_join_inputs(
return Ok(false);
}
possible_join_keys.extend(join.on.clone());
let left = &*(join.left);
let right = &*(join.right);
vec![left, right]
vec![&join.left, &join.right]
}
LogicalPlan::CrossJoin(join) => {
let left = &*(join.left);
let right = &*(join.right);
vec![left, right]
vec![&join.left, &join.right]
}
_ => {
return plan_err!("flatten_join_inputs just can call join/cross_join");
}
};

for child in children.iter() {
match *child {
let child = child.as_ref();
match child {
LogicalPlan::Join(Join {
join_type: JoinType::Inner,
..
Expand All @@ -184,27 +183,39 @@ fn try_flatten_join_inputs(
return Ok(false);
}
}
_ => all_inputs.push((*child).clone()),
_ => all_inputs.push(child.clone()),
}
}
Ok(true)
}

/// Finds the next to join with the left input plan,
///
/// Finds the next `right` from `rights` that can be joined with `left_input`
/// plan based on the join keys in `possible_join_keys`.
///
/// If such a matching `right` is found:
/// 1. Adds the matching join keys to `all_join_keys`.
/// 2. Returns `left_input JOIN right ON (all join keys)`.
///
/// If no matching `right` is found:
/// 1. Removes the first plan from `rights`
/// 2. Returns `left_input CROSS JOIN right`.
fn find_inner_join(
left_input: &LogicalPlan,
rights: &mut Vec<LogicalPlan>,
possible_join_keys: &mut Vec<(Expr, Expr)>,
possible_join_keys: &[(Expr, Expr)],
all_join_keys: &mut HashSet<(Expr, Expr)>,
) -> Result<LogicalPlan> {
for (i, right_input) in rights.iter().enumerate() {
let mut join_keys = vec![];

for (l, r) in &mut *possible_join_keys {
for (l, r) in possible_join_keys.iter() {
let key_pair = find_valid_equijoin_key_pair(
l,
r,
left_input.schema().clone(),
right_input.schema().clone(),
left_input.schema(),
right_input.schema(),
)?;

// Save join keys
Expand All @@ -215,6 +226,7 @@ fn find_inner_join(
}
}

// Found one or more matching join keys
if !join_keys.is_empty() {
all_join_keys.extend(join_keys.clone());
let right_input = rights.remove(i);
Expand All @@ -236,6 +248,9 @@ fn find_inner_join(
}));
}
}

// no matching right plan had any join keys, cross join with the first right
// plan
let right = rights.remove(0);
let join_schema = Arc::new(build_join_schema(
left_input.schema(),
Expand Down
14 changes: 5 additions & 9 deletions datafusion/optimizer/src/extract_equijoin_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use datafusion_common::{internal_err, DFSchema};
use datafusion_expr::utils::split_conjunction_owned;
use datafusion_expr::utils::{can_hash, find_valid_equijoin_key_pair};
use datafusion_expr::{BinaryExpr, Expr, ExprSchemable, Join, LogicalPlan, Operator};
use std::sync::Arc;
// equijoin predicate
type EquijoinPredicate = (Expr, Expr);

Expand Down Expand Up @@ -122,8 +121,8 @@ impl OptimizerRule for ExtractEquijoinPredicate {

fn split_eq_and_noneq_join_predicate(
filter: Expr,
left_schema: &Arc<DFSchema>,
right_schema: &Arc<DFSchema>,
left_schema: &DFSchema,
right_schema: &DFSchema,
) -> Result<(Vec<EquijoinPredicate>, Option<Expr>)> {
let exprs = split_conjunction_owned(filter);

Expand All @@ -136,12 +135,8 @@ fn split_eq_and_noneq_join_predicate(
op: Operator::Eq,
ref right,
}) => {
let join_key_pair = find_valid_equijoin_key_pair(
left,
right,
left_schema.clone(),
right_schema.clone(),
)?;
let join_key_pair =
find_valid_equijoin_key_pair(left, right, left_schema, right_schema)?;

if let Some((left_expr, right_expr)) = join_key_pair {
let left_expr_type = left_expr.get_type(left_schema)?;
Expand Down Expand Up @@ -172,6 +167,7 @@ mod tests {
use datafusion_expr::{
col, lit, logical_plan::builder::LogicalPlanBuilder, JoinType,
};
use std::sync::Arc;

fn assert_plan_eq(plan: LogicalPlan, expected: &str) -> Result<()> {
assert_optimized_plan_eq_display_indent(
Expand Down

0 comments on commit 7219744

Please sign in to comment.