Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relax combine partial final rule #10913

Merged
merged 3 commits into from
Jun 21, 2024

Conversation

mustafasrepo
Copy link
Contributor

@mustafasrepo mustafasrepo commented Jun 14, 2024

Which issue does this PR close?

Closes #.

Rationale for this change

Currently, CombinePartialFinalAggregate rule combine AggregateExec::Partial + AggregateExec::Final into AggregateExec::Single when following conditions are met (same conditions applies to the combining AggregateExec::Partial + AggregateExec::FinalPartitioned into AggregateExec::SinglePartitioned):

  • These operators are consecutive
  • Their group by expressions are equal
  • Their aggregate expressions are equal
  • Their filter expressions are equal.
    See can_combine function for implementation.

However, the query below

SELECT
    DATE_BIN(INTERVAL '2' MINUTE, ts, '2000-01-01') AS ts_chunk,
    ARRAY_AGG(DISTINCT keyword) AS keywords,
    COUNT(keyword) AS alert_keyword_count
FROM
    keywords_stream
WHERE
    keywords_stream.keyword IN (SELECT keyword FROM ALERT_KEYWORDS)
GROUP BY
    ts_chunk;

where keywords_stream is defined as

CREATE TABLE keywords_stream (
    ts TIMESTAMP,
    sn INTEGER PRIMARY KEY,
    keyword VARCHAR NOT NULL
);

and ALERT_KEYWORDS is defined as

CREATE TABLE ALERT_KEYWORDS(keyword VARCHAR NOT NULL);

generates following plan

01)ProjectionExec: expr=[date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 }"),keywords_stream.ts,Utf8("2000-01-01"))@0 as ts_chunk, COUNT(keywords_stream.keyword)@1 as alert_keyword_count]
02)--AggregateExec: mode=Final, gby=[date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 }"),keywords_stream.ts,Utf8("2000-01-01"))@0 as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 }"),keywords_stream.ts,Utf8("2000-01-01"))], aggr=[COUNT(keywords_stream.keyword)]
03)----AggregateExec: mode=Partial, gby=[date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 }, ts@0, 946684800000000000) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 }"),keywords_stream.ts,Utf8("2000-01-01"))], aggr=[COUNT(keywords_stream.keyword)]
04)------CoalesceBatchesExec: target_batch_size=2
05)--------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(keyword@0, keyword@1)]
06)----------MemoryExec: partitions=1, partition_sizes=[1]
07)----------MemoryExec: partitions=1, partition_sizes=[1]

where AggregateExec::Partial and AggregateExec::Final couldn't combined into AggregateExec::Single. However, we should be able to generate following plan

01)ProjectionExec: expr=[date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 }"),keywords_stream.ts,Utf8("2000-01-01"))@0 as ts_chunk, COUNT(keywords_stream.keyword)@1 as alert_keyword_count]
02)--AggregateExec: mode=Single, gby=[date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 }, ts@0, 946684800000000000) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 }"),keywords_stream.ts,Utf8("2000-01-01"))], aggr=[COUNT(keywords_stream.keyword)]
03)----CoalesceBatchesExec: target_batch_size=2
04)------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(keyword@0, keyword@1)]
05)--------MemoryExec: partitions=1, partition_sizes=[1]
06)--------MemoryExec: partitions=1, partition_sizes=[1]

with AggregateExec: mode=Single operator. The reason we cannot current do this change is that group by expressions of the AggregateExec: mode=Partial and AggregateExec: mode=Final are not same (Partial has scalar Function, Final has Column expressions which is the result of the scalar function).
However, As far as I can tell, as long as AggregateExec::Partial and AggregateExec::Final are consecutive, we can combine these operators into AggregateExec::Single (It is guaranteed for these operators to be related and their partitioning is same). Hence, can_combine should be more like invariance check. Looking into the planner. By invariance, aggregate expressions and filter expressions should have exactly same expressions. However, group by expressions can be different (partial group by might be complex expression, final group by will be its result in column form). Hence, I propose to relax this group by equality check to generate better plans in the single partition plans..

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Jun 14, 2024
Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but let's wait for additional community review in case we are missing something.

14)------------------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
15)--------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
16)----------------------MemoryExec: partitions=1, partition_sizes=[1]
05)--------AggregateExec: mode=SinglePartitioned, gby=[t1_id@0 as alias1], aggr=[]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this plan looks better and correct

@@ -144,8 +144,12 @@ fn can_combine(final_agg: GroupExprsRef, partial_agg: GroupExprsRef) -> bool {
let (input_group_by, input_aggr_expr, input_filter_expr) =
normalize_group_exprs(partial_agg);

final_group_by.eq(&input_group_by)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure that just checking the length of the group bys is sufficient -- I think logically they must be the same.

It seems like the reason these weren't combined

05)--------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[]
06)----------AggregateExec: mode=Partial, gby=[t1_id@0 as alias1], aggr=[]

Is becase of aliasing the exprs didn't match exactly -- t1_id@0 as alias1 didn't match alias1@0 as alias1 even though I think they are logically equivalent

So for example, if we ever made the following plan (with actually different grouping expressions) after this change the code would incorrectly collapse them

05)--------AggregateExec: mode=FinalPartitioned, gby=[alias1 / 2 as alias1], aggr=[]
06)----------AggregateExec: mode=Partial, gby=[t1_id@0 as alias1], aggr=[]

However, I am not sure that such a plan would be valid 🤔

Copy link
Contributor

@ozankabak ozankabak Jun 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were trying to think whether it is possible for a valid plan to have a consecutive Partial/Final duo with differing GROUP BY expressions (unless of course it is manually generated that way w/o a query).

We weren't able to find an example of this and started to think it is not possible. That's why that check was deemed to inhibit better plans in some cases without adding any real protection.

We would appreciate some brain cycles from the community on this. If our suspicion is correct, this small PR will give us better plans in many cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the explanation.

My concern is that if someone ever did create a plan that didn't have the same grouping expression that this condition could apply and thus cause a very hard to debug failure.

I think we should at least add some comments to the check explaining the assumption (that a two phase grouping must have semantically the same grouping keys) to help future readers / developers. Then I think this PR is ok to merge.

I also do wonder if we have some pre-existing code to check two expressions for equality from different schemas by normalizing them or something, but I didn't try and check for it at the moment

Copy link
Contributor Author

@mustafasrepo mustafasrepo Jun 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some thinking, I realized that since we are checking expressions equality of the subsequent operators. Output group by expressions of the first operator and input group by expressions of the second operator must be equal. I re-introduced group by equality condition with this comparison. With this comparison, we still generate better plans without relaxing the check. It can be found in the commit

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me -- thank you @mustafasrepo

let (input_group_by, input_aggr_expr, input_filter_expr) = partial_agg;

// Compare output expressions of the partial, and input expressions of the final operator.
physical_exprs_equal(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

@alamb
Copy link
Contributor

alamb commented Jun 21, 2024

I also merged this branch with main and re-ran the sqllogictests and verified they all still passed

@alamb alamb merged commit 098ba30 into apache:main Jun 21, 2024
23 checks passed
xinlifoobar pushed a commit to xinlifoobar/datafusion that referenced this pull request Jun 22, 2024
* Minor changes

* Minor changes

* Re-introduce group by expression check
xinlifoobar pushed a commit to xinlifoobar/datafusion that referenced this pull request Jun 22, 2024
* Minor changes

* Minor changes

* Re-introduce group by expression check
findepi pushed a commit to findepi/datafusion that referenced this pull request Jul 16, 2024
* Minor changes

* Minor changes

* Re-introduce group by expression check
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants