From 524e56d8e59932fd643e6a72c53d5089b7812db7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Fri, 27 Sep 2024 04:33:10 +0800 Subject: [PATCH] Simplify `update_skip_aggregation_probe` method (#12332) * Simplify update_skip_aggregation_probe method * Add assert & doc --------- Co-authored-by: Andrew Lamb --- .../src/aggregate/groups_accumulator.rs | 2 +- .../physical-plan/src/aggregates/row_hash.rs | 22 ++++--------------- 2 files changed, 5 insertions(+), 19 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs index 92dd91bd86bc..e60f68972074 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs @@ -258,7 +258,7 @@ impl GroupsAccumulatorAdapter { opt_filter.as_ref().map(|f| f.as_boolean()), offsets, )?; - (f)(state.accumulator.as_mut(), &values_to_accumulate)?; + f(state.accumulator.as_mut(), &values_to_accumulate)?; // clear out the state so they are empty for next // iteration diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 60efc7711216..d4dbdf0f029d 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -187,13 +187,6 @@ impl SkipAggregationProbe { self.should_skip } - /// Provides an ability to externally set `should_skip` flag - /// to `false` and prohibit further state updates - fn forbid_skipping(&mut self) { - self.should_skip = false; - self.is_locked = true; - } - /// Record the number of rows that were output directly without aggregation fn record_skipped(&mut self, batch: &RecordBatch) { self.skipped_aggregation_rows.add(batch.num_rows()); @@ -1009,19 +1002,12 @@ impl GroupedHashAggregateStream { } /// Updates skip aggregation probe state. - /// - /// In case stream has any spills, the probe is forcefully set to - /// forbid aggregation skipping, and locked, since spilling resets - /// total number of unique groups. - /// - /// Note: currently spilling is not supported for Partial aggregation fn update_skip_aggregation_probe(&mut self, input_rows: usize) { if let Some(probe) = self.skip_aggregation_probe.as_mut() { - if !self.spill_state.spills.is_empty() { - probe.forbid_skipping(); - } else { - probe.update_state(input_rows, self.group_values.len()); - } + // Skip aggregation probe is not supported if stream has any spills, + // currently spilling is not supported for Partial aggregation + assert!(self.spill_state.spills.is_empty()); + probe.update_state(input_rows, self.group_values.len()); }; }