Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop copying LogicalPlan and Exprs in TypeCoercion (10% faster planning) #10356

Merged
merged 6 commits into from
May 15, 2024

Conversation

alamb
Copy link
Contributor

@alamb alamb commented May 2, 2024

Note it has code from #10410 so that might good to review first

Which issue does this PR close?

Closes #10210

Part of #9637 -- let's make DataFusion planning faster by not copying so much

Rationale for this change

Now that we have the nice TreeNode API thanks to #8913 and @peter-toth let's use it to both simplify the code and avoid copies

What changes are included in this PR?

  1. rewrite TypeCoercion via TreeNode API
  2. Introduce LogicalPlan::recompute_schema to recompute the schema after expressions in a plan are changed

Are these changes tested?

Existing CI

Are there any user-facing changes?

Faster planning:

  1. 12% faster TPCH planning
  2. 10% faster TPCDS planning
Details

group                                         main                                   type_coercion
-----                                         ----                                   -------------
logical_aggregate_with_join                   1.00  1221.8±12.51µs        ? ?/sec    1.00  1220.4±11.13µs        ? ?/sec
logical_plan_tpcds_all                        1.00    160.3±1.91ms        ? ?/sec    1.00    159.5±1.81ms        ? ?/sec
logical_plan_tpch_all                         1.02     17.4±0.22ms        ? ?/sec    1.00     17.0±0.20ms        ? ?/sec
logical_select_all_from_1000                  1.00     18.7±0.10ms        ? ?/sec    1.01     18.9±0.17ms        ? ?/sec
logical_select_one_from_700                   1.00   808.2±23.16µs        ? ?/sec    1.01   817.7±10.21µs        ? ?/sec
logical_trivial_join_high_numbered_columns    1.00    763.6±7.80µs        ? ?/sec    1.00   763.1±13.86µs        ? ?/sec
logical_trivial_join_low_numbered_columns     1.00   748.3±10.69µs        ? ?/sec    1.00   747.1±15.11µs        ? ?/sec
physical_plan_tpcds_all                       1.12  1509.7±10.59ms        ? ?/sec    1.00   1350.0±8.12ms        ? ?/sec
physical_plan_tpch_all                        1.10    102.2±1.80ms        ? ?/sec    1.00     93.1±1.49ms        ? ?/sec
physical_plan_tpch_q1                         1.09      5.7±0.07ms        ? ?/sec    1.00      5.2±0.06ms        ? ?/sec
physical_plan_tpch_q10                        1.06      4.8±0.10ms        ? ?/sec    1.00      4.5±0.05ms        ? ?/sec
physical_plan_tpch_q11                        1.05      4.2±0.10ms        ? ?/sec    1.00      4.0±0.08ms        ? ?/sec
physical_plan_tpch_q12                        1.07      3.4±0.06ms        ? ?/sec    1.00      3.2±0.06ms        ? ?/sec
physical_plan_tpch_q13                        1.04      2.3±0.06ms        ? ?/sec    1.00      2.2±0.05ms        ? ?/sec
physical_plan_tpch_q14                        1.03      3.0±0.05ms        ? ?/sec    1.00      2.9±0.06ms        ? ?/sec
physical_plan_tpch_q16                        1.06      4.1±0.06ms        ? ?/sec    1.00      3.9±0.07ms        ? ?/sec
physical_plan_tpch_q17                        1.04      3.8±0.05ms        ? ?/sec    1.00      3.7±0.05ms        ? ?/sec
physical_plan_tpch_q18                        1.07      4.3±0.06ms        ? ?/sec    1.00      4.0±0.06ms        ? ?/sec
physical_plan_tpch_q19                        1.28      8.1±0.11ms        ? ?/sec    1.00      6.3±0.08ms        ? ?/sec
physical_plan_tpch_q2                         1.12      8.8±0.08ms        ? ?/sec    1.00      7.9±0.08ms        ? ?/sec
physical_plan_tpch_q20                        1.10      5.1±0.09ms        ? ?/sec    1.00      4.6±0.09ms        ? ?/sec
physical_plan_tpch_q21                        1.10      6.9±0.10ms        ? ?/sec    1.00      6.3±0.07ms        ? ?/sec
physical_plan_tpch_q22                        1.08      3.8±0.09ms        ? ?/sec    1.00      3.5±0.09ms        ? ?/sec
physical_plan_tpch_q3                         1.04      3.4±0.06ms        ? ?/sec    1.00      3.3±0.06ms        ? ?/sec
physical_plan_tpch_q4                         1.06      2.5±0.05ms        ? ?/sec    1.00      2.3±0.04ms        ? ?/sec
physical_plan_tpch_q5                         1.12      5.1±0.09ms        ? ?/sec    1.00      4.6±0.09ms        ? ?/sec
physical_plan_tpch_q6                         1.13  1863.3±56.21µs        ? ?/sec    1.00  1643.3±80.88µs        ? ?/sec
physical_plan_tpch_q7                         1.13      6.5±0.11ms        ? ?/sec    1.00      5.8±0.11ms        ? ?/sec
physical_plan_tpch_q8                         1.11      8.4±0.10ms        ? ?/sec    1.00      7.6±0.06ms        ? ?/sec
physical_plan_tpch_q9                         1.09      6.3±0.09ms        ? ?/sec    1.00      5.8±0.06ms        ? ?/sec
physical_select_all_from_1000                 1.45     88.7±0.40ms        ? ?/sec    1.00     61.3±0.45ms        ? ?/sec
physical_select_one_from_700                  1.05      3.9±0.05ms        ? ?/sec    1.00      3.7±0.05ms        ? ?/sec

@github-actions github-actions bot added logical-expr Logical plan and expressions optimizer Optimizer rules labels May 2, 2024
@alamb alamb force-pushed the alamb/type_coercion branch from 66ff8d5 to 5c1f2c4 Compare May 2, 2024 19:26
@@ -467,6 +468,200 @@ impl LogicalPlan {
self.with_new_exprs(self.expressions(), inputs.to_vec())
}

/// Recomputes schema and type information for this LogicalPlan if needed.
Copy link
Contributor Author

@alamb alamb May 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is a new API for using TreeNode to rewrite plans in ways that change the schema.

This effectively factors out the recalculation part of LogicalPlan::new_with_exprs

I tried to find a way to use reuse this logic in LogicalPlan::new_with_exprs but was not able to without forcing (another) clone

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @peter-toth I suspect you may need something like this for common subexpression elimination / #9873

@alamb alamb marked this pull request as ready for review May 2, 2024 20:26
@alamb alamb changed the title Stop copying LogicalPlan and Exprs in TypeCoercion Stop copying LogicalPlan and Exprs in TypeCoercion (10% faster planning) May 2, 2024
.map_data(|expr| original_name.restore(expr))
})?
// coerce join expressions specially
.map_data(|plan| expr_rewrite.coerce_joins(plan))?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since expr_rewrite.coerce_joins(plan) can change the plan, shouldn't its result be Result<Transformed<LogicalPlan>>? And then here we should probably use map_transformed() instead of the current map_data().

Copy link
Contributor Author

@alamb alamb May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for anyone following along, the response is https://github.com/apache/datafusion/pull/10356/files#r1588998665 (tldr should do as a follow on PR)

// coerce join expressions specially
.map_data(|plan| expr_rewrite.coerce_joins(plan))?
// recompute the schema after the expressions have been rewritten as the types may have changed
.map_data(|plan| plan.recompute_schema())
Copy link
Contributor

@peter-toth peter-toth May 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we always need to run plan.recompute_schema()? If the Transformed<LogicalPlan>'s .transformed is false then probably we don't need to.

Copy link
Contributor Author

@alamb alamb May 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an excellent point. At the moment, I think we do need to always run recompute_schema because the TypeCoercionRewriter doesn't return Transformed (and thus we don't know if any actual expression coercion was done, so we have to assume it was).

I filed #10365 to track improving this

Copy link
Contributor

@peter-toth peter-toth May 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think you use TypeCoercionRewriter in expr.rewrite(&mut expr_rewrite)? and that rewrite() returns Transformed<Expr> and then that Transformed<Expr> is propagated up into plan.map_expressions(), that returns Transformed<LogicalPlan>. So you have the necessary Transformed to decide if recompute_schema() is needed. Or not? 🙂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct (of course!) thank you for pointing it out. Now that analyze_internal returns Transformed would work. However, there is still code like this:

                let new_plan =
                    analyze_internal(self.schema, unwrap_arc(subquery.subquery))?.data;
                Ok(Transformed::yes(Expr::Exists(Exists {
                    subquery: Subquery {
                        subquery: Arc::new(new_plan),
                        outer_ref_columns: subquery.outer_ref_columns,
                    },
                    negated,
                })))
            }

Which discards the transformed information (and in this case always returns Transformed::true).

In order to keep the PRs small and easier to review I would like to not change this PR (it is no worse than main in regards to recomputing schema) and I will make a follow on PR to avoid recomputing schema when unecessary

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok, it seems there are many unnecessary Transformed::yess in the current code. But false positive transformeds doesn't cause any issue...

Sure, a follow-up PR sounds good, I agree that this PR already looks really nice!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is my draft followup: #10369

It is quite large (it requires updating the entire expression rewriter) so I am glad we left it in a separate PR

@alamb alamb force-pushed the alamb/type_coercion branch from 3105658 to 5ed976b Compare May 7, 2024 15:15
@github-actions github-actions bot removed the logical-expr Logical plan and expressions label May 10, 2024
// get schema representing all available input fields. This is used for data type
// resolution only, so order does not matter here
let mut schema = merge_schema(new_inputs.iter().collect());
let mut schema = merge_schema(plan.inputs());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 q

.map(|(lhs, rhs)| {
// coerce the arguments as though they were a single binary equality
// expression
let (lhs, rhs) = self.coerce_binary_op(lhs, Operator::Eq, rhs)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this method needed, as it looks like we just cast lhs, rhs? it feels it can be simplified?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think coerce_binary_op is different than just casting lhs and rhs as it first calls get_input_types:

        let (left_type, right_type) = get_input_types(
            &left.get_type(self.schema)?,
            &op,
            &right.get_type(self.schema)?,
        )?;

And get_input_types usese the comparison coercion rules to figure out a common set if types to coerce lhs and rhs to.

@alamb
Copy link
Contributor Author

alamb commented May 15, 2024

@comphead I think this PR is ready to go. Would you be willing to approve it? Or are there other comments you would like to see addressed?

/// For example, on_exprs like `t1.a = t2.b AND t1.x = t2.y` will be stored
/// as a list of `(t1.a, t2.b), (t1.x, t2.y)`
fn coerce_joins(&mut self, plan: LogicalPlan) -> Result<LogicalPlan> {
let LogicalPlan::Join(mut join) = plan else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thats an interesting syntax

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it checks the plan can be deconstructed into LogicalPlan::Join(...) and if its not the else branch is triggered?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is exactly right. It is one of my favorite Rust syntax's as it often can avoid a level of indenting

https://doc.rust-lang.org/rust-by-example/flow_control/let_else.html

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm thanks @alamb!

Copy link
Contributor Author

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @comphead 🙏

/// For example, on_exprs like `t1.a = t2.b AND t1.x = t2.y` will be stored
/// as a list of `(t1.a, t2.b), (t1.x, t2.y)`
fn coerce_joins(&mut self, plan: LogicalPlan) -> Result<LogicalPlan> {
let LogicalPlan::Join(mut join) = plan else {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is exactly right. It is one of my favorite Rust syntax's as it often can avoid a level of indenting

https://doc.rust-lang.org/rust-by-example/flow_control/let_else.html

@alamb alamb merged commit c312ffe into apache:main May 15, 2024
23 checks passed
@alamb alamb deleted the alamb/type_coercion branch May 15, 2024 19:58
findepi pushed a commit to findepi/datafusion that referenced this pull request Jul 16, 2024
…ning) (apache#10356)

* Add `LogicalPlan::recompute_schema` for handling rewrite passes

* Stop copying LogicalPlan and Exprs in  `TypeCoercion`

* Apply suggestions from code review

Co-authored-by: Oleks V <[email protected]>

---------

Co-authored-by: Oleks V <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
optimizer Optimizer rules
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Stop copying LogicalPlan and Exprs in TypeCoercion
3 participants