Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve grouping performance via better vectorization in accumulate functions #6954

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 22 additions & 19 deletions datafusion/physical-expr/src/aggregate/average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,30 +565,33 @@ where
let sums = std::mem::take(&mut self.sums);
let nulls = self.null_state.build();

assert_eq!(nulls.len(), sums.len());
assert_eq!(counts.len(), sums.len());

// don't evaluate averages with null inputs to avoid errors on null values

let array: PrimitiveArray<T> = if nulls.null_count() > 0 {
let mut builder = PrimitiveBuilder::<T>::with_capacity(nulls.len());
let iter = sums.into_iter().zip(counts.into_iter()).zip(nulls.iter());

for ((sum, count), is_valid) in iter {
if is_valid {
builder.append_value((self.avg_fn)(sum, count)?)
} else {
builder.append_null();
let array: PrimitiveArray<T> = match nulls {
Some(nulls) if nulls.null_count() > 0 => {
assert_eq!(nulls.len(), sums.len());
let mut builder = PrimitiveBuilder::<T>::with_capacity(nulls.len());
let iter = sums.into_iter().zip(counts.into_iter()).zip(nulls.iter());

for ((sum, count), is_valid) in iter {
if is_valid {
builder.append_value((self.avg_fn)(sum, count)?)
} else {
builder.append_null();
}
}
builder.finish()
}
_ => {
let averages: Vec<T::Native> = sums
.into_iter()
.zip(counts.into_iter())
.map(|(sum, count)| (self.avg_fn)(sum, count))
.collect::<Result<Vec<_>>>()?;
PrimitiveArray::new(averages.into(), nulls) // no copy
}
builder.finish()
} else {
let averages: Vec<T::Native> = sums
.into_iter()
.zip(counts.into_iter())
.map(|(sum, count)| (self.avg_fn)(sum, count))
.collect::<Result<Vec<_>>>()?;
PrimitiveArray::new(averages.into(), Some(nulls)) // no copy
};

// fix up decimal precision and scale for decimals
Expand All @@ -599,7 +602,7 @@ where

// return arrays for sums and counts
fn state(&mut self) -> Result<Vec<ArrayRef>> {
let nulls = Some(self.null_state.build());
let nulls = self.null_state.build();
let counts = std::mem::take(&mut self.counts);
let counts = UInt64Array::new(counts.into(), nulls.clone()); // zero copy

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,21 @@ pub struct NullState {
///
/// If `seen_values[i]` is false, have not seen any values that
/// pass the filter yet for group `i`
seen_values: BooleanBufferBuilder,
seen_values: Option<BooleanBufferBuilder>,
}

impl NullState {
pub fn new() -> Self {
Self {
seen_values: BooleanBufferBuilder::new(0),
}
Self { seen_values: None }
}

/// return the size of all buffers allocated by this null state, not including self
pub fn size(&self) -> usize {
// capacity is in bits, so convert to bytes
self.seen_values.capacity() / 8
self.seen_values
.as_ref()
.map(|seen_values| seen_values.capacity() / 8)
.unwrap_or(0)
}

/// Invokes `value_fn(group_index, value)` for each non null, non
Expand Down Expand Up @@ -130,8 +131,27 @@ impl NullState {
let data: &[T::Native] = values.values();
assert_eq!(data.len(), group_indices.len());

// ensure the seen_values is big enough (start everything at
// "not seen" valid)
let input_has_nulls = values.null_count() > 0;

// avoid tracking null state if possible
if !input_has_nulls &&
self.seen_values.is_none()&& // we have seen values for all previous groups
opt_filter.is_none()
// we will be looking at all values
{
// since we know all groups have at least one non
// value, there is no need to track seen_values
// individually
let iter = group_indices.iter().zip(data.iter());

for (&group_index, &new_value) in iter {
value_fn(group_index, new_value);
}
return;
}

// have been tracking seen values previously, so we still need
// to track them here
let seen_values =
initialize_builder(&mut self.seen_values, total_num_groups, false);

Expand Down Expand Up @@ -321,9 +341,20 @@ impl NullState {
/// group_indices should have null values (because they never saw
/// any values)
///
/// If all groups are valid, returns None (no NullBuffer)
///
/// resets the internal state to empty
pub fn build(&mut self) -> NullBuffer {
NullBuffer::new(self.seen_values.finish())
pub fn build(&mut self) -> Option<NullBuffer> {
let Some(seen_values) = self.seen_values.as_mut() else {
return None;
};

let nulls = NullBuffer::new(seen_values.finish());
if nulls.null_count() > 0 {
Some(nulls)
} else {
None
}
}
}

Expand Down Expand Up @@ -423,10 +454,15 @@ pub fn accumulate_indices<F>(
///
/// All new entries are initialized to `default_value`
fn initialize_builder(
builder: &mut BooleanBufferBuilder,
builder: &mut Option<BooleanBufferBuilder>,
total_num_groups: usize,
default_value: bool,
) -> &mut BooleanBufferBuilder {
if builder.is_none() {
*builder = Some(BooleanBufferBuilder::new(total_num_groups));
}
let builder = builder.as_mut().unwrap();

if builder.len() < total_num_groups {
let new_groups = total_num_groups - builder.len();
builder.append_n(new_groups, default_value);
Expand Down Expand Up @@ -683,11 +719,21 @@ mod test {

assert_eq!(accumulated_values, expected_values,
"\n\naccumulated_values:{accumulated_values:#?}\n\nexpected_values:{expected_values:#?}");
let seen_values = null_state.seen_values.finish_cloned();
mock.validate_seen_values(&seen_values);

if let Some(seen_values) = null_state.seen_values.as_ref() {
let seen_values = seen_values.finish_cloned();
mock.validate_seen_values(&seen_values);
}

// Validate the final buffer (one value per group)
let expected_null_buffer = mock.expected_null_buffer(total_num_groups);
let expected_null_buffer = if values.null_count() > 0 || opt_filter.is_some() {
mock.expected_null_buffer(total_num_groups)
} else {
// the test data doesn't always pass all group indices
// unlike the real hash grouper, so only build a null
// buffer if it would have made one
None
};

let null_buffer = null_state.build();

Expand Down Expand Up @@ -800,8 +846,10 @@ mod test {
assert_eq!(accumulated_values, expected_values,
"\n\naccumulated_values:{accumulated_values:#?}\n\nexpected_values:{expected_values:#?}");

let seen_values = null_state.seen_values.finish_cloned();
mock.validate_seen_values(&seen_values);
if let Some(seen_values) = null_state.seen_values.as_ref() {
let seen_values = seen_values.finish_cloned();
mock.validate_seen_values(&seen_values);
}

// Validate the final buffer (one value per group)
let expected_null_buffer = mock.expected_null_buffer(total_num_groups);
Expand Down Expand Up @@ -845,10 +893,16 @@ mod test {
}

/// Create the expected null buffer based on if the input had nulls and a filter
fn expected_null_buffer(&self, total_num_groups: usize) -> NullBuffer {
(0..total_num_groups)
fn expected_null_buffer(&self, total_num_groups: usize) -> Option<NullBuffer> {
let nulls: NullBuffer = (0..total_num_groups)
.map(|group_index| self.expected_seen(group_index))
.collect()
.collect();

if nulls.null_count() > 0 {
Some(nulls)
} else {
None
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ where
fn evaluate(&mut self) -> Result<ArrayRef> {
let values = self.values.finish();
let nulls = self.null_state.build();
let values = BooleanArray::new(values, Some(nulls));
let values = BooleanArray::new(values, nulls);
Ok(Arc::new(values))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ where
fn evaluate(&mut self) -> Result<ArrayRef> {
let values = std::mem::take(&mut self.values);
let nulls = self.null_state.build();
let values = PrimitiveArray::<T>::new(values.into(), Some(nulls)); // no copy
let values = PrimitiveArray::<T>::new(values.into(), nulls); // no copy

adjust_output_array(&self.data_type, Arc::new(values))
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-expr/src/aggregate/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1407,7 +1407,7 @@ where
let min_max = std::mem::take(&mut self.min_max);
let nulls = self.null_state.build();

let min_max = PrimitiveArray::<T>::new(min_max.into(), Some(nulls)); // no copy
let min_max = PrimitiveArray::<T>::new(min_max.into(), nulls); // no copy
let min_max = adjust_output_array(&self.data_type, Arc::new(min_max))?;

Ok(Arc::new(min_max))
Expand All @@ -1418,7 +1418,7 @@ where
let nulls = self.null_state.build();

let min_max = std::mem::take(&mut self.min_max);
let min_max = PrimitiveArray::<T>::new(min_max.into(), Some(nulls)); // zero copy
let min_max = PrimitiveArray::<T>::new(min_max.into(), nulls); // zero copy

let min_max = adjust_output_array(&self.data_type, Arc::new(min_max))?;

Expand Down