-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add optimizer rule for type coercion (binary operations only) #3222
Conversation
Note that this is related to #3185 from @liukun4515 |
datafusion/core/tests/sql/mod.rs
Outdated
@@ -752,29 +752,31 @@ async fn try_execute_to_batches( | |||
/// Execute query and return results as a Vec of RecordBatches | |||
async fn execute_to_batches(ctx: &SessionContext, sql: &str) -> Vec<RecordBatch> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test method was optimizing the plan twice, so I fixed that.
Codecov Report
@@ Coverage Diff @@
## master #3222 +/- ##
=======================================
Coverage 85.57% 85.57%
=======================================
Files 295 296 +1
Lines 54111 54173 +62
=======================================
+ Hits 46304 46361 +57
- Misses 7807 7812 +5
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
@liukun4515 This PR is ready for review now |
Never mind, I need to have full recursion for the expression rewriting and we seem to be missing some infrastructure for that, or I am just not finding it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of having the type coercion in an optimizer since that allows it to be either enable or disabled on a per dialect basis. Learned a few things from some interesting snippets in there as well. Seems good to me
@@ -1377,6 +1378,8 @@ impl SessionState { | |||
} | |||
rules.push(Arc::new(ReduceOuterJoin::new())); | |||
rules.push(Arc::new(FilterPushDown::new())); | |||
// we do type coercion after filter push down so that we don't push CAST filters to Parquet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
smart move, that would have been a hard bug to find!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am confused about this comment and explain why do the type coercion after the filter push down optimizer rule.
I think the type coercion rule should be done in preview stage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, Filter expr: FLOAT32(C1) < FLOAT64(16)
. We should do type coercion first and convert the filter expr to CAST(INT32(C1) AS FLOAT64 < FLOAT64(16)
and try to push the new filter expr to the table scan operation.
If you don't do the type coercion first, you will push the expr: FLOAT32(C1) < FLOAT64(16)
to table scan, Does this can be applied to the parquet filter or pruning filter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liukun4515 This PR is ready for review now
Yes, this is ready for review now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I filed #3289 applying TypeCoercion
before FilterPushDown
. I think the PR would get too large to review if I make those changes here.
datafusion/expr/src/expr_fn.rs
Outdated
@@ -259,6 +260,23 @@ pub fn cast(expr: Expr, data_type: DataType) -> Expr { | |||
} | |||
} | |||
|
|||
/// Create a cast expression | |||
pub fn cast_if_needed( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense
let new_expr = plan | ||
.expressions() | ||
.into_iter() | ||
.map(|expr| expr.rewrite(&mut expr_rewrite)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Learned something new here today about the expr rewriter
@andygrove is it ready to review? |
benchmarks/src/bin/tpch.rs
Outdated
} | ||
let physical_plan = ctx.create_physical_plan(&plan).await?; | ||
// note that `create_physical_plan` will optimize the plan so we pass the unoptimized plan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there error or different result for your plan, If it is optimized by some rules more times?
I also this in my preview pr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example:
A+B, we will get the coercion type C. After the first optimization, will get cast(A AS C) + CAST(B AS C)
.
After the second optimization, we may get the coercion type D from cast(A AS C) + CAST(B AS C)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original code was running the optimizer twice, which was not necessary. With the new rule there was a problem optimizing twice. I will look at this again today and write up an issue or make it safe to optimize twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reverted this change
let left_type = left.get_type(&self.schema)?; | ||
let right_type = right.get_type(&self.schema)?; | ||
match right_type { | ||
DataType::Interval(_) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain why we skip this datatype like left op Interval
? I can't get the point.
If we leave a lot of special code, it's difficult to maintain them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing this causes one test failue:
---- sql::timestamp::timestamp_array_add_interval stdout ----
thread 'sql::timestamp::timestamp_array_add_interval' panicked at 'called `Result::unwrap()` on an `Err` value: "Internal(\"Unsupported CAST from Interval(DayTime) to Timestamp(Nanosecond, None)\") at Creating physical plan for 'SELECT ts, ts - INTERVAL '8' MILLISECONDS FROM table_a': Projection: #table_a.ts, #table_a.ts - CAST(IntervalDayTime(\"8\") AS Timestamp(Nanosecond, None))\n TableScan: table_a projection=[ts]"', datafusion/core/tests/sql/mod.rs:773:10
Arrow does not support CAST from Interval(DayTime) to Timestamp(Nanosecond, None)
. I think this could be added so I filed apache/arrow-rs#2606. Once this is implemented, we can remove this code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it is strange -- maybe it is worth a ticket to investigate further (or maybe @waitingkuo is already tracking it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I think this special case is no longer necessary -- I tried removing it in #3379 and all the tests still pass.
Good spot @liukun4515
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once this PR is merged, I'll get #3379 ready for review
ec5d32a
to
f5fbe25
Compare
@liukun4515 @alamb @jdye64 This PR is finally ready for review. |
I plan to review this tomorrow |
@@ -694,7 +694,7 @@ async fn test_physical_plan_display_indent() { | |||
" RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 9000)", | |||
" AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]", | |||
" CoalesceBatchesExec: target_batch_size=4096", | |||
" FilterExec: c12@1 < CAST(10 AS Float64)", | |||
" FilterExec: c12@1 < 10", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the physical plan, which no longer contains a cast here because the logical plan optimized out the cast of a literal value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉 -- which I think is a good example of the value of this pass
I will review it today, but i missed the pr of issue #3330 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me -- I think the only thing that need to be fixed prior to merge is related to the test for timestamp being ignored. Otherwise I think this looks great. Thank you @andygrove
@@ -1401,6 +1402,9 @@ impl SessionState { | |||
} | |||
rules.push(Arc::new(ReduceOuterJoin::new())); | |||
rules.push(Arc::new(FilterPushDown::new())); | |||
// we do type coercion after filter push down so that we don't push CAST filters to Parquet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a partially written ticket (I will post later this week) related to supporting CAST
in pruning logic (which is part of what is pushed to parquet). Perhaps this is also related
@@ -694,7 +694,7 @@ async fn test_physical_plan_display_indent() { | |||
" RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 9000)", | |||
" AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]", | |||
" CoalesceBatchesExec: target_batch_size=4096", | |||
" FilterExec: c12@1 < CAST(10 AS Float64)", | |||
" FilterExec: c12@1 < 10", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉 -- which I think is a good example of the value of this pass
@@ -1438,9 +1438,9 @@ async fn reduce_left_join_1() -> Result<()> { | |||
"Explain [plan_type:Utf8, plan:Utf8]", | |||
" Projection: #t1.t1_id, #t1.t1_name, #t1.t1_int, #t2.t2_id, #t2.t2_name, #t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", | |||
" Inner Join: #t1.t1_id = #t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", | |||
" Filter: #t1.t1_id < Int64(100) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", | |||
" Filter: CAST(#t1.t1_id AS Int64) < Int64(100) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is much clearer that the casts are now visible in the explain plan (as it is clear what is going on)
@@ -1398,6 +1398,7 @@ async fn timestamp_sub_interval_days() -> Result<()> { | |||
} | |||
|
|||
#[tokio::test] | |||
#[ignore] // https://github.com/apache/arrow-datafusion/issues/3327 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this test be ignored? Maybe this is a merge conflict -- I think @HaoYang670 has fixed this test in #3337 so it no longer needs to be ignored
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching that. Yes, this must have been some kind of merge conflict. I have unignored this.
@@ -1101,6 +1102,20 @@ mod test { | |||
Ok(()) | |||
} | |||
|
|||
#[test] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -47,7 +47,13 @@ pub fn create_physical_expr( | |||
input_schema: &Schema, | |||
execution_props: &ExecutionProps, | |||
) -> Result<Arc<dyn PhysicalExpr>> { | |||
assert_eq!(input_schema.fields.len(), input_dfschema.fields().len()); | |||
if input_schema.fields.len() != input_dfschema.fields().len() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
return Err(DataFusionError::Internal( | ||
"create_physical_expr passed Arrow schema and DataFusion \ | ||
schema with different number of fields" | ||
.to_string(), | ||
)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return Err(DataFusionError::Internal( | |
"create_physical_expr passed Arrow schema and DataFusion \ | |
schema with different number of fields" | |
.to_string(), | |
)); | |
return Err(DataFusionError::Internal( | |
format!("create_physical_expr passed Arrow schema and DataFusion \ | |
schema with different number of fields, {} vs {}", | |
input_schema.fields.len(), input_dfschema.fields().len() | |
), | |
)); |
let left_type = left.get_type(&self.schema)?; | ||
let right_type = right.get_type(&self.schema)?; | ||
match right_type { | ||
DataType::Interval(_) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it is strange -- maybe it is worth a ticket to investigate further (or maybe @waitingkuo is already tracking it)
} | ||
_ => { | ||
let coerced_type = coerce_types(&left_type, op, &right_type)?; | ||
let left = left.clone().cast_to(&coerced_type, &self.schema)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is a minor point and could be done in a follow on PR, but since this function gets an owned expr
it might be possible to match expr
rather than match &expr
and save these clones
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I filed #3377 to track pruning expressions with casts that can't be removed at plan time
Once this PR passes CI, I plan to merge it in (and I will make a follow on PR with my suggested improvements) |
@@ -247,8 +247,8 @@ async fn query_not() -> Result<()> { | |||
async fn csv_query_sum_cast() { | |||
let ctx = SessionContext::new(); | |||
register_aggregate_csv_by_sql(&ctx).await; | |||
// c8 = i32; c9 = i64 | |||
let sql = "SELECT c8 + c9 FROM aggregate_test_100"; | |||
// c8 = i32; c6 = i64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made this change due to the fact that #3359 changed the type of c9
so it was no longer i64 but u64
"| 100 |", | ||
"+-------------------------+", | ||
"+--------+", | ||
"| test.b |", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't the original header better?
@alamb @andygrove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally don't think seeing the cast in the column name adds much value. Also no cast in the subject is consistent with postgres:
alamb=# select cast(1 as int);
int4
------
1
(1 row)
alamb=# select cast(i as int) from foo;
i
---
1
2
0
(3 rows)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM,
I also have comments about the header, but we can do in the follow up pr or issue
Benchmark runs are scheduled for baseline = 9b546e7 and contender = 191d8b7. 191d8b7 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Thank you for the review @alamb and @liukun4515 |
Which issue does this PR close?
Part of #3221
Closes #3326
There is a follow on PR #3353 to fix a test that is ignored here due to an existing bug that was exposed by changes in this PR.
Rationale for this change
I would like type coercion to happen in the logical plan. I would also like to match the behavior of Postgres and Spark where
CAST
does not appear in the field names in the schema (and this happens a lot more because of the new type coercion rule).DataFusion currently does a lot of type coercion in the physical plan (which is unaffected by this change, although some of the code may now be redundant).
See #3031 (comment) for a discussion about adding casts to the logical plan.
What changes are included in this PR?
Are there any user-facing changes?
Yes, optimized logical plans may now include CAST expressions that were not previously there (they were added in the physical plan)