-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use "union" in partial aggregation output #12155
Use "union" in partial aggregation output #12155
Conversation
I ran tpch/tpcds benchmarks for unpartitioned data + again for partitioned on the same code. orc sf1000 unpartitioned orc sf1000 partitioned |
core/trino-main/src/main/java/io/trino/sql/planner/plan/AggregationNode.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/plan/AggregationNode.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/plan/AggregationNode.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/plan/AggregationNode.java
Outdated
Show resolved
Hide resolved
...n/src/main/java/io/trino/operator/aggregation/partial/PartialAggregationOutputProcessor.java
Outdated
Show resolved
Hide resolved
c5b8233
to
e9952f1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comments addressed
core/trino-main/src/main/java/io/trino/sql/planner/plan/AggregationNode.java
Outdated
Show resolved
Hide resolved
...n/src/main/java/io/trino/operator/aggregation/partial/PartialAggregationOutputProcessor.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments
core/trino-main/src/main/java/io/trino/sql/planner/plan/AggregationNodeBuilder.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/aggregation/GroupedAggregator.java
Outdated
Show resolved
Hide resolved
...n/src/main/java/io/trino/operator/aggregation/partial/PartialAggregationOutputProcessor.java
Outdated
Show resolved
Hide resolved
...n/src/main/java/io/trino/operator/aggregation/partial/PartialAggregationOutputProcessor.java
Outdated
Show resolved
Hide resolved
...n/src/main/java/io/trino/operator/aggregation/partial/PartialAggregationOutputProcessor.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
e9952f1
to
38276d9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comments addressed
core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/plan/AggregationNodeBuilder.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/aggregation/GroupedAggregator.java
Outdated
Show resolved
Hide resolved
...n/src/main/java/io/trino/operator/aggregation/partial/PartialAggregationOutputProcessor.java
Outdated
Show resolved
Hide resolved
38276d9
to
e92d670
Compare
please rebase |
core/trino-main/src/main/java/io/trino/sql/planner/plan/AggregationNode.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/aggregation/GroupedAggregator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/aggregation/GroupedAggregator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/aggregation/GroupedAggregator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/aggregation/GroupedAggregator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/aggregation/GroupedAggregator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/aggregation/GroupedAggregator.java
Outdated
Show resolved
Hide resolved
e92d670
to
27b85fd
Compare
...n/src/main/java/io/trino/operator/aggregation/partial/PartialAggregationOutputProcessor.java
Outdated
Show resolved
Hide resolved
27b85fd
to
dc431b7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comments addressed + rebased on the master + some clean up in the PartialAggregationOutputProcessor
core/trino-main/src/main/java/io/trino/operator/aggregation/GroupedAggregator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/plan/AggregationNode.java
Outdated
Show resolved
Hide resolved
I am so glad someone finally had the time to address this! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some comments
|
||
requireNonNull(groupingSets, "groupingSets is null"); | ||
groupIdSymbol.ifPresent(symbol -> checkArgument(groupingSets.getGroupingKeys().contains(symbol), "Grouping columns does not contain groupId column")); | ||
this.groupingSets = groupingSets; | ||
|
||
this.groupIdSymbol = requireNonNull(groupIdSymbol); | ||
|
||
this.rawInputMaskSymbol = requireNonNull(rawInputMaskSymbol, "rawInputMaskSymbol is null"); | ||
checkArgument(rawInputMaskSymbol.isPresent() || step == SINGLE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rawInputMaskSymbol
is also not needed when groupingSets.getGroupingKeys().isEmpty()
. Is it added anyway? If so, it shouldn't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored to not use rawInputMaskSymbol
for global aggregations.
core/trino-main/src/main/java/io/trino/sql/planner/plan/AggregationNode.java
Outdated
Show resolved
Hide resolved
@@ -104,6 +112,23 @@ public AggregationNode( | |||
outputs.addAll(groupingSets.getGroupingKeys()); | |||
hashSymbol.ifPresent(outputs::add); | |||
outputs.addAll(aggregations.keySet()); | |||
if (step.isOutputPartial() && !groupingSets.getGroupingKeys().isEmpty()) { | |||
// add mask channels |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are more symbols that might be propagated. Add check states:
- that
Aggregation#distinct
isfalse
forstep.isOutputPartial()
(seeDistinctGroupedAccumulator/DistinctAccumulator
) - that
Aggregation#orderingScheme
isempty
forstep.isOutputPartial()
(seeOrderingGroupedAccumulator/OrderedAccumulator
) - that
Aggregation#filter
isempty
whenstep.isOutputPartial()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this related to this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this related to this change?
Because you only output aggregation input
symbols and mask
as part of PA. You don't output oderingScheme
symbols or filter
symbol. Hence you need checks that these are not needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we either need to skip the aggregations with oderingScheme
and filter
(with validations on top) or support it in the final step.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Look at the classes I pointed. They don't support intermediate aggregation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So basically those aggregation modes do not support partial aggregation and never did, right?
So the adaptation here doesn't matter as it would fail on the intermediate state anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussed offline
@@ -444,6 +475,13 @@ public Optional<Symbol> getMask() | |||
return mask; | |||
} | |||
|
|||
public Stream<Symbol> getInputs() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getInputs
name is too generic. What generates value for lambda arguments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed to getRawInputs
.
Lambda are functions used in reduce_agg
core/trino-main/src/main/java/io/trino/sql/planner/plan/AggregationNode.java
Outdated
Show resolved
Hide resolved
...n/src/main/java/io/trino/operator/aggregation/partial/PartialAggregationOutputProcessor.java
Outdated
Show resolved
Hide resolved
...n/src/main/java/io/trino/operator/aggregation/partial/PartialAggregationOutputProcessor.java
Outdated
Show resolved
Hide resolved
...n/src/main/java/io/trino/operator/aggregation/partial/PartialAggregationOutputProcessor.java
Outdated
Show resolved
Hide resolved
...n/src/main/java/io/trino/operator/aggregation/partial/PartialAggregationOutputProcessor.java
Outdated
Show resolved
Hide resolved
@@ -16,25 +16,15 @@ | |||
import com.google.common.util.concurrent.ListenableFuture; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a wrong package. Other builders are in builder
package. I don't think we need partial
package
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This keeps classes related with partial aggregation in one place. It's better this way than split it based on function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InMemoryHash
is also used in partial aggregation. It also uses PartialAggregationOutputProcessor
. This package layout seems artificial.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is used there but it's not specific to the partial aggregation.
I still think that keeping these classes in a separate package, rather than in big io.trino.operator.aggregation is better for readability as there are all closely related.
This package is actually specific to adaptive PA so maybe the better name should be adaptive?
core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
channel++; | ||
} | ||
outputMappings.put(rawInputMaskSymbol.orElseThrow(), channel); | ||
channel++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't you just take AggregationNode#getOutputSymbols
and assign channel mappings to them? They should have correct layout already. No need to repeat logic from AggregationNode
constructor and map channel for each symbol section individually
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no AggregationNode
available + why it was not done line that before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see some solutions:
- extract
static List<Symbol> AggregationNode#createOutputSymbols(...)
to wrap symbol creation logic in utility function. It was simple previously, but notwit's complex enough to have dedicated method - Add
List<Symbol> outputSymbols
tocreateHashAggregationOperatorFactory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactored this to use the List<Symbol> outputSymbol
. Seems better, good hint!
...src/main/java/io/trino/sql/planner/iterative/rule/PushPartialAggregationThroughExchange.java
Outdated
Show resolved
Hide resolved
...src/main/java/io/trino/sql/planner/iterative/rule/PushPartialAggregationThroughExchange.java
Outdated
Show resolved
Hide resolved
@@ -31,4 +32,16 @@ protected QueryRunner createQueryRunner() | |||
"task.max-partial-aggregation-memory", "0B")) | |||
.build(); | |||
} | |||
|
|||
@Test | |||
public void testAggregationWithLambda() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to AbstractTestAggregations
. Plus a comment what it tests in general
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is already io.trino.testing.AbstractTestEngineOnlyQueries#testReduceAgg
that testes reduce_agg
with PA. IMO this test makes sense only here as it specifically tests reduce_agg
(lambda arg) with partial aggregation disabled by adaptation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO this test makes sense only here as it specifically tests reduce_agg (lambda arg) with partial aggregation disabled by adaptation.
These tests are engine aggregation tests with different execution modes. They are not as targeted as UTs. They mean to test different, sometimes unexpected execution flows. Please move the test to testReduceAgg
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test only makes sense with the adaptation kicking in (low adaptive-partial-aggregation.min-rows
and task.max-partial-aggregation-memory
), which is not that easy to setup in testReduceAgg
since task.max-partial-aggregation-memory
is config property only + it is unusual execution flow (lambda + PA adaptation) + it makes sesne to keep tests for adaptation in one place
core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java
Show resolved
Hide resolved
.build(); | ||
|
||
assertOperatorEquals(operatorFactory, operator2Input, operator2Expected); | ||
} | ||
|
||
@Test | ||
public void testFinalAggregation() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testFinalAggregationWithoutRawInput
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this actually tests all cases, raw, aggregated and mixed
} | ||
|
||
@Test | ||
public void testFinalAggregationWithMask() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testFinalAggregationWithRawInput
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this tests cases with raw, aggregated, and mixed input but also with mask block (from the distinct operator if I'm not mistaken)
core/trino-main/src/test/java/io/trino/operator/TestHashAggregationOperator.java
Show resolved
Hide resolved
createLongsBlock(0, 0, 2, 2), // hash channels | ||
nullRle(BIGINT, 4), // accumulator state | ||
booleanRle(true, 4), // mask channel | ||
createLongsBlock(1, 2, -1, -2), // aggregation input |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a separate block for aggregation input
and intermediate state
.
I would use values that are easy to validate (e.g. 10xx
for intermediate state and xx
for raw input)
Consider using aggregation that doesn't have same intermediate state type as raw input. That would be better test coverage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the blocks commented as accumulator state
are the intermediate state
. I changed the comment. Does this look better now?
I don't think type here matters for coverage. The sum
is also easy to follow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better. Please change values though, so intermediate state and raw values have different order of magnitude, so they don't accidentally match in aggregation operator consumes wrong channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I either answered or addressed in the code all of the comments but gonna look through them again in a few hours to double-check (they were coming in parts)
core/trino-main/src/main/java/io/trino/operator/HashAggregationOperator.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/aggregation/AggregatorFactory.java
Outdated
Show resolved
Hide resolved
this.rawInputChannels = ImmutableList.copyOf(inputChannels); | ||
} | ||
else { | ||
intermediateStateChannel = OptionalInt.of(inputChannels.get(0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is how it was working before my change ie. it was assumed that the first channel from inputChannels
was intermediateStateChannel
in case of final aggregation. I can move this logic a little bit further up, but to fix it completely AggregationNode.Aggregation
would have to change.
See also io.trino.sql.planner.iterative.rule.PushPartialAggregationThroughExchange#split
this.maskChannel = requireNonNull(maskChannel, "maskChannel is null"); | ||
this.spillable = spillable; | ||
this.lambdaProviders = ImmutableList.copyOf(requireNonNull(lambdaProviders, "lambdaProviders is null")); | ||
|
||
checkArgument(step.isInputRaw() || inputChannels.size() == 1, "expected 1 input channel for intermediate aggregation"); | ||
checkArgument(step.isInputRaw() || intermediateStateChannel.isPresent(), "expected intermediateStateChannel for intermediate aggregation but got %s ", intermediateStateChannel); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is redundant since you set intermediateStateChannel on step.isInputRaw() few lines above.
I dropped the validation.
Would it be possible to validate rawInputChannels count?
I think not
@@ -66,7 +78,8 @@ public Aggregator createAggregator() | |||
else { | |||
accumulator = accumulatorFactory.createIntermediateAccumulator(lambdaProviders); | |||
} | |||
return new Aggregator(accumulator, step, intermediateType, finalType, inputChannels, maskChannel); | |||
List<Integer> aggregatorInputChannels = intermediateStateChannel.isEmpty() ? rawInputChannels : ImmutableList.of(intermediateStateChannel.getAsInt()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which params do you want to be explicit? Here i join two explicit fields to one list to keep the current Aggregator
api intact.
Btw I renamed local var aggregatorInputChannels
to inputChannels
to show what is changing here and what is not.
core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java
Show resolved
Hide resolved
@@ -31,4 +32,16 @@ protected QueryRunner createQueryRunner() | |||
"task.max-partial-aggregation-memory", "0B")) | |||
.build(); | |||
} | |||
|
|||
@Test | |||
public void testAggregationWithLambda() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is already io.trino.testing.AbstractTestEngineOnlyQueries#testReduceAgg
that testes reduce_agg
with PA. IMO this test makes sense only here as it specifically tests reduce_agg
(lambda arg) with partial aggregation disabled by adaptation.
testing/trino-tests/src/test/java/io/trino/tests/TestAdaptivePartialAggregation.java
Show resolved
Hide resolved
...src/main/java/io/trino/sql/planner/iterative/rule/PushPartialAggregationThroughExchange.java
Outdated
Show resolved
Hide resolved
channel++; | ||
} | ||
outputMappings.put(rawInputMaskSymbol.orElseThrow(), channel); | ||
channel++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no AggregationNode
available + why it was not done line that before?
dc431b7
to
f92ea04
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just 2 comments for now
this.rawInputChannels = ImmutableList.copyOf(inputChannels); | ||
} | ||
else { | ||
intermediateStateChannel = OptionalInt.of(inputChannels.get(0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really, what we talk about here is arguments to the aggregation function. The list of arguments is translated from symbols to channel indexes but still, the first argument is the intermediate state for final aggregation. so the index 0 in this list is the "magic" number. In order to avoid that we would have to have a separate field for the intermediate state symbol in the Aggregation class (the intermediateSymbol
local var in the io.trino.sql.planner.iterative.rule.PushPartialAggregationThroughExchange#split
). That may be a good idea anyway but it's not related to this change.
@@ -104,6 +112,23 @@ public AggregationNode( | |||
outputs.addAll(groupingSets.getGroupingKeys()); | |||
hashSymbol.ifPresent(outputs::add); | |||
outputs.addAll(aggregations.keySet()); | |||
if (step.isOutputPartial() && !groupingSets.getGroupingKeys().isEmpty()) { | |||
// add mask channels |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we either need to skip the aggregations with oderingScheme
and filter
(with validations on top) or support it in the final step.
126523f
to
3c42707
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I addressed most of the comments.
A big chunk of work was to add support for global aggregations. I'm not sure this is needed given the complexity this brings. I could disable the adaptation in the TestPushPartialAggregationThroughJoin
instead.
I decided that for now, it is better not to support adaptation for CTAS (TableWriterNode
, TableFinishNode
) as it involves more changes in this already not small PR and it's not that critical but this may be a followup PR. I created #12697 for this.
core/trino-main/src/main/java/io/trino/operator/aggregation/AggregatorFactory.java
Outdated
Show resolved
Hide resolved
this.rawInputChannels = ImmutableList.copyOf(inputChannels); | ||
} | ||
else { | ||
intermediateStateChannel = OptionalInt.of(inputChannels.get(0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a need for that. Intermediate symbol is represented by AggregationNode#aggregations.key
I don't think AggregationNode#aggregations.key
is the Intermediate symbol for the final step (it is for partial)
...main/src/main/java/io/trino/operator/aggregation/builder/InMemoryHashAggregationBuilder.java
Outdated
Show resolved
Hide resolved
@@ -16,25 +16,15 @@ | |||
import com.google.common.util.concurrent.ListenableFuture; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is used there but it's not specific to the partial aggregation.
I still think that keeping these classes in a separate package, rather than in big io.trino.operator.aggregation is better for readability as there are all closely related.
This package is actually specific to adaptive PA so maybe the better name should be adaptive?
@@ -104,6 +112,23 @@ public AggregationNode( | |||
outputs.addAll(groupingSets.getGroupingKeys()); | |||
hashSymbol.ifPresent(outputs::add); | |||
outputs.addAll(aggregations.keySet()); | |||
if (step.isOutputPartial() && !groupingSets.getGroupingKeys().isEmpty()) { | |||
// add mask channels |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So basically those aggregation modes do not support partial aggregation and never did, right?
So the adaptation here doesn't matter as it would fail on the intermediate state anyway.
@@ -31,4 +32,16 @@ protected QueryRunner createQueryRunner() | |||
"task.max-partial-aggregation-memory", "0B")) | |||
.build(); | |||
} | |||
|
|||
@Test | |||
public void testAggregationWithLambda() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test only makes sense with the adaptation kicking in (low adaptive-partial-aggregation.min-rows
and task.max-partial-aggregation-memory
), which is not that easy to setup in testReduceAgg
since task.max-partial-aggregation-memory
is config property only + it is unusual execution flow (lambda + PA adaptation) + it makes sesne to keep tests for adaptation in one place
channel++; | ||
} | ||
outputMappings.put(rawInputMaskSymbol.orElseThrow(), channel); | ||
channel++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactored this to use the List<Symbol> outputSymbol
. Seems better, good hint!
3c42707
to
eed7bba
Compare
eed7bba
to
5f5bf3f
Compare
rebased on the master |
@@ -104,6 +112,23 @@ public AggregationNode( | |||
outputs.addAll(groupingSets.getGroupingKeys()); | |||
hashSymbol.ifPresent(outputs::add); | |||
outputs.addAll(aggregations.keySet()); | |||
if (step.isOutputPartial() && !groupingSets.getGroupingKeys().isEmpty()) { | |||
// add mask channels |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussed offline
core/trino-main/src/main/java/io/trino/sql/planner/plan/AggregationNode.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/plan/AggregationNode.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
latest comments answered
core/trino-main/src/main/java/io/trino/sql/planner/plan/AggregationNode.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/plan/AggregationNode.java
Show resolved
Hide resolved
.flatMap(Optional::stream) | ||
.distinct() | ||
.forEach(outputsBuilder::add); | ||
// add raw inputs to the aggregations to be used by adaptive partial aggregation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we run tests with adaptive PA on/off and intermediate step?
outputs.addAll(groupingSets.getGroupingKeys()); | ||
hashSymbol.ifPresent(outputs::add); | ||
outputs.addAll(aggregations.keySet()); | ||
ImmutableList.Builder<Symbol> outputsBuilder = ImmutableList.builder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you benchmark this and Push partial aggregation though join?
I ran tpch/tpcds benchmark on the current version (recently rebased on the master) + with |
5f5bf3f
to
0177bd4
Compare
I ran tpch/tpcds sf1k orc part with this (paa union) on top of #13573 (decimnal serde optimization) and |
What about |
see row 1 in the summary |
Since partial aggregation can be disabled at runtime, both aggregated and the input data need to be passed through to the final step. This change extends the partial aggregation output with additional columns to use for raw input and uses those columns to send raw input in case partial aggregation is disabled. An additional column contains information which set of channels should be used by the final step.
0177bd4
to
90f5848
Compare
I ran another orc part sf1k for this rebased on a master with #13573. One thing that is surprising is that the baseline does have the same improvement seen in #13573 and it should as it contains this change. |
👋 @lukasz-stec - this PR has become inactive. If you're still interested in working on it, please let us know. We're working on closing out old and inactive PRs, so if you're too busy or this has too many merge conflicts to be worth picking back up, we'll be making another pass to close it out in a few weeks. |
closing as there is no interest in merging this |
Description
Since partial aggregation can be disabled at runtime,
both aggregated and the input data need to be passed
through to the final step.
This change extends the partial aggregation output
with additional columns to use for raw input
and uses those columns to send raw input in case
partial aggregation is disabled.
An additional column contains information which
set of channels should be used by the final step.
This is rebased on top of #12101. Only last commit matters here.
This is an extension of #11011 and limited implementation of prestodb/presto#10305.
It does not work on a per-group basis but rather per operator factory instance (i.e.all operators for a given plan node id on one worker node).
It does not use
row type
because there is no support currently to operate on therow type
fields during planning.Benchmarks
tpch/tpcds benchmarks on partitioned orc sf1000, with and without #11289 (that PR adds RLE support for partitioned exchange which makes this pr more efficient)
adaptive-partial-aggregation-union-rle-hasNonNullValue-var-row.pdf
Summary
There are significant gains for TPCH, especially with partitioned exchange RLE support. Duration drops even more than CPU because of a significant drop in the data sent over the network.
For TPCDS, the improvement is only visible when combined with partitioned exchange RLE support.
improvement
core query engine
Improve performance of group by queries with that produce many distinct values
Related issues, pull requests, and links
Documentation
( ) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.
Release notes
( ) No release notes entries required.
( ) Release notes entries required with the following suggested text: