Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push down filter plan for non-unnest column #11019

Merged
merged 7 commits into from
Jun 21, 2024

Conversation

jayzhan211
Copy link
Contributor

@jayzhan211 jayzhan211 commented Jun 20, 2024

Which issue does this PR close?

Follow on #10991

Closes #11016

Rationale for this change

In #10991, if we see unnest column, we skip the push down optimization.
In this PR, we select the non-unnest columns to push down, even there is unnest columns besides.

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
@github-actions github-actions bot added optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) labels Jun 20, 2024
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
@jayzhan211 jayzhan211 marked this pull request as ready for review June 20, 2024 03:13
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much @jayzhan211 -- looks great to me

I also updated the ticket description to reflect that this PR would close #11016 as far as I understand

cc @jonahgao

@@ -73,11 +73,29 @@ explain select uc2, column1 from (select unnest(column2) as uc2, column1 from v
----
logical_plan
01)Projection: unnest(v.column2) AS uc2, v.column1
02)--Filter: unnest(v.column2) > Int64(3) AND v.column1 = Int64(2)
02)--Filter: unnest(v.column2) > Int64(3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please also update the comment a few lines above to reflect the issue is fixed?

# Could push the filter (column1 = 2) down below unnest
# https://github.com/apache/datafusion/issues/11016

Copy link
Member

@jonahgao jonahgao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me!

I am considering whether it is beneficial to wait for the next rewrite pass to handle the unnest input, because we already have the capability to push down filters below projections. This might simplify the implementation, and we don't need to assume that the unnest input is a projection.

            LogicalPlan::Unnest(mut unnest) => {
                let predicates = split_conjunction_owned(filter.predicate.clone());
                let mut non_unnest_predicates = vec![];
                let mut unnest_predicates = vec![];
                for predicate in predicates {
                    // collect all the Expr::Column in predicate recursively
                    let mut accum: HashSet<Column> = HashSet::new();
                    expr_to_columns(&predicate, &mut accum)?;

                    if unnest.exec_columns.iter().any(|c| accum.contains(c)) {
                        unnest_predicates.push(predicate);
                    } else {
                        non_unnest_predicates.push(predicate);
                    }
                }
                // Unnest predicates should not be pushed down.
                // If no non-unnest predicates exist, early return
                if non_unnest_predicates.is_empty() {
                    filter.input = Arc::new(LogicalPlan::Unnest(unnest));
                    return Ok(Transformed::no(LogicalPlan::Filter(filter)));
                }
                let new_filter = LogicalPlan::Filter(Filter::try_new(
                    conjunction(non_unnest_predicates).unwrap(),
                    unnest.input.clone(),
                )?);
                unnest.input = Arc::new(new_filter);
                if unnest_predicates.is_empty() {
                    Ok(Transformed::yes(LogicalPlan::Unnest(unnest)))
                } else {
                    filter.predicate = conjunction(unnest_predicates).unwrap();
                    filter.input = Arc::new(LogicalPlan::Unnest(unnest));
                    Ok(Transformed::no(LogicalPlan::Filter(filter)))
                }
            }

@jayzhan211
Copy link
Contributor Author

jayzhan211 commented Jun 21, 2024

Looks good to me!

I am considering whether it is beneficial to wait for the next rewrite pass to handle the unnest input, because we already have the capability to push down filters below projections. This might simplify the implementation, and we don't need to assume that the unnest input is a projection.

            LogicalPlan::Unnest(mut unnest) => {
                let predicates = split_conjunction_owned(filter.predicate.clone());
                let mut non_unnest_predicates = vec![];
                let mut unnest_predicates = vec![];
                for predicate in predicates {
                    // collect all the Expr::Column in predicate recursively
                    let mut accum: HashSet<Column> = HashSet::new();
                    expr_to_columns(&predicate, &mut accum)?;

                    if unnest.exec_columns.iter().any(|c| accum.contains(c)) {
                        unnest_predicates.push(predicate);
                    } else {
                        non_unnest_predicates.push(predicate);
                    }
                }
                // Unnest predicates should not be pushed down.
                // If no non-unnest predicates exist, early return
                if non_unnest_predicates.is_empty() {
                    filter.input = Arc::new(LogicalPlan::Unnest(unnest));
                    return Ok(Transformed::no(LogicalPlan::Filter(filter)));
                }
                let new_filter = LogicalPlan::Filter(Filter::try_new(
                    conjunction(non_unnest_predicates).unwrap(),
                    unnest.input.clone(),
                )?);
                unnest.input = Arc::new(new_filter);
                if unnest_predicates.is_empty() {
                    Ok(Transformed::yes(LogicalPlan::Unnest(unnest)))
                } else {
                    filter.predicate = conjunction(unnest_predicates).unwrap();
                    filter.input = Arc::new(LogicalPlan::Unnest(unnest));
                    Ok(Transformed::no(LogicalPlan::Filter(filter)))
                }
            }

For projection plan, we check if predicates contains volatile expression, so we can't directly assign it as the new input. Not sure what is the case for other plan, they may have their own special case handling.
Also, I think currently we only run the PushDownFilter pass once 🤔 unlike SimplifyExpressions

I see. It might worth to try.

@github-actions github-actions bot added the logical-expr Logical plan and expressions label Jun 21, 2024
Signed-off-by: jayzhan211 <[email protected]>
)?);

// try push down recursively
let new_plan = self.rewrite(filter_with_unnest_input, _config)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the optimizer already handles recursion, so it might not be necessary to call rewrite here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree!

// Filter
// Unenst Input (Projection)

let unnest_input = std::mem::take(&mut unnest.input);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about directly cloning the input since it is an Arc? This way, we don't need to construct a Default LogicalPlan either.

Copy link
Contributor Author

@jayzhan211 jayzhan211 Jun 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a plan to convert Arc<LogicalPlan> to Box<LogicalPlan>, but before that we need to make sure there is no clone exist, so I think we should try our best to avoid clone. It is also the reason why we have rewrite in Logical optimizer rule that takes LogicalPlan instead of Arc<LogicalPlan>

Ref: #9637 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there's a purpose for doing this, make sense to me. Thanks @jayzhan211

Signed-off-by: jayzhan211 <[email protected]>
Copy link
Member

@jonahgao jonahgao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks @jayzhan211

@@ -281,6 +281,15 @@ pub enum LogicalPlan {
RecursiveQuery(RecursiveQuery),
}

impl Default for LogicalPlan {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb alamb merged commit fd5a68f into apache:main Jun 21, 2024
23 checks passed
@jayzhan211
Copy link
Contributor Author

Thanks @jonahgao and @alamb

@jayzhan211 jayzhan211 deleted the non-unnest-pushdown branch June 21, 2024 23:48
xinlifoobar pushed a commit to xinlifoobar/datafusion that referenced this pull request Jun 22, 2024
* push down non-unnest only

Signed-off-by: jayzhan211 <[email protected]>

* add doc

Signed-off-by: jayzhan211 <[email protected]>

* add doc

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

* rewrite unnest push donw filter

Signed-off-by: jayzhan211 <[email protected]>

* remove comment

Signed-off-by: jayzhan211 <[email protected]>

* avoid double recurisve

Signed-off-by: jayzhan211 <[email protected]>

---------

Signed-off-by: jayzhan211 <[email protected]>
xinlifoobar pushed a commit to xinlifoobar/datafusion that referenced this pull request Jun 22, 2024
* push down non-unnest only

Signed-off-by: jayzhan211 <[email protected]>

* add doc

Signed-off-by: jayzhan211 <[email protected]>

* add doc

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

* rewrite unnest push donw filter

Signed-off-by: jayzhan211 <[email protected]>

* remove comment

Signed-off-by: jayzhan211 <[email protected]>

* avoid double recurisve

Signed-off-by: jayzhan211 <[email protected]>

---------

Signed-off-by: jayzhan211 <[email protected]>
findepi pushed a commit to findepi/datafusion that referenced this pull request Jul 16, 2024
* push down non-unnest only

Signed-off-by: jayzhan211 <[email protected]>

* add doc

Signed-off-by: jayzhan211 <[email protected]>

* add doc

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

* rewrite unnest push donw filter

Signed-off-by: jayzhan211 <[email protected]>

* remove comment

Signed-off-by: jayzhan211 <[email protected]>

* avoid double recurisve

Signed-off-by: jayzhan211 <[email protected]>

---------

Signed-off-by: jayzhan211 <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
logical-expr Logical plan and expressions optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Pushdown filters that do not reference unested columns
3 participants