Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the performance of Aggregator, grouping, aggregation #4973

Closed
4 tasks done
Tracked by #1570
alamb opened this issue Jan 18, 2023 · 57 comments · Fixed by #6904
Closed
4 tasks done
Tracked by #1570

Improve the performance of Aggregator, grouping, aggregation #4973

alamb opened this issue Jan 18, 2023 · 57 comments · Fixed by #6904
Assignees
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Jan 18, 2023

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
The performance of DataFusion is a very important feature, and aggregation is one of the core algorithms.

For example it features prominently in the ClickBench queries #5404

This ticket covers making it even faster.

Background

Prior Art

After #4924 we have a single GroupByHash implementation that uses the Arrow row format for group keys (good!) -- thank you @ozankabak and @mustafasrepo

However, the aggregator (e.g. the thing storing partial sums, counts, etc) is non ideal. Depending on the type of aggregate, it is either:

  1. Box<dyn RowAccumulator> per
  2. Box<dyn Accumulator>

The GroupByHash operator manages the mapping from group key to state and passes a RowAccessor to the RowAccumulator if needed.

Groups are calculated once per batch, then a take kernel reshuffles (aka copies) everything and slices are passed to the RowAccumulator / Accumulator.

Proposal

Ditch the word-aligned row format for the state management and change the aggregator to:

trait Aggregator: MemoryConsumer {
    /// Update aggregator state for given groups.
    fn update_batch(&mut self, keys: Rows, batch: &RecordBatch)) -> Result<()>;
    ...
}

Hence the aggregator will be dyn-dispatched ONCE per record batch and will keep its own internal state. This moves the key->state map from the [row_]hash.rs to the aggregators. We will provide tooling (macros or generics) to simplify the implementation and to avoid boilerplate code as much as possible.

Note that this also removes the take kernel since we think that it doesn't provide any performance improvements over iterating over the hashable rows and perform the aggregation row-by-row. We may bring the take handling back (as a "perform take if at least one aggregator wants that) if we can proof (via benchmarks) that this is desirable for certain aggregators, but we leave this out for now.

This also moves the memory consumer handling into the aggregator since the aggregator knows best how to split states and spill data.

Implementation Plan

TBD

Addtional Context

Note this was broken out of #2723 and is based on the wonderful writeup from @crepererum and @tustvold on #2723 (comment)

@alamb alamb added the enhancement New feature or request label Jan 18, 2023
@alamb alamb changed the title Consolidate and improve the performance of Aggregators Consolidate and improve the performance of Aggregator Jan 18, 2023
@alamb alamb changed the title Consolidate and improve the performance of Aggregator Consolidate and improve the performance of Aggregator Jan 18, 2023
@alamb alamb changed the title Consolidate and improve the performance of Aggregator Improve the performance of Aggregator, grouping, aggregaton Mar 3, 2023
@yjshen
Copy link
Member

yjshen commented Mar 4, 2023

I'm curious about the proposed new Aggregator API. Do you know if it needs a hash table in each aggregator? I'm wondering because I'm a bit concerned about memory usage, especially for high cardinality aggregations.

Suppose keys are duplicated n times during execution (where n is the number of aggregators in the query). In that case, this could potentially lead to a significant increase in memory consumption, which might not be acceptable. What do you think about this?

@crepererum
Copy link
Contributor

Do you know if it needs a hash table in each aggregator? I'm wondering because I'm a bit concerned about memory usage, especially for high cardinality aggregations.

Yes, see #2723 (comment)

We're trading speed for slightly more memory usage here. We could probably avoid storing the actual key in the per-aggregator hash tables by either interning the key using another hash table (key -> ID) or by hashing the keys to a "wide enough" ID (like 128bit, also assuming that the hash function is actually proper).

@yjshen
Copy link
Member

yjshen commented Mar 6, 2023

So there will also be n times hash probe, and rehash, besides n memory overhead. Won't this hinder performance? How is the prototype benchmarked?

@crepererum
Copy link
Contributor

So there will also be n times hash probe, and rehash, besides n memory overhead. Won't this hinder performance?

These are good questions.

Hashing an u128 ID should be very cheap since we don't need all this secure stuff anymore. So it's probably close to upper_word(id) ^ lower_word(id). The random access pattern could be a problem, but I think this will be hard to avoid in general, at least if you wanna have a maintainable, safe solution (in contrast to the untyped aligned row format that is currently used).

How is the prototype benchmarked?

There is no prototype yet, but a PR must run the groupby benchmarks.


That said, I am open to other suggestions. The issue is that the current system needs a redesign:

  • It should be easy to add types and aggregations
  • It should ideally use a single aggregator type
  • It should work w/ memory accounting
  • Data accesses should be properly typed (in contrast to the aligned row format)
  • The solution should consider variable-length types. These cannot be added as an afterthought.

@yjshen
Copy link
Member

yjshen commented Mar 6, 2023

While I agree that we may need to revisit the current Box<dyn Accumulator> approach when creating a fallback or default strategy, I do not believe we should combine all hash aggregate methods into one universal approach. Given that we are constructing an essential operator for a database execution engine, it is important to consider specialized implementations that offer performance benefits for handling different use cases.

I view row hash as a fast lane for certain queries rather than a dead end. By efficiently executing queries through row hash, we can avoid slower or more general approaches. Furthermore, I am open to exploring other specialized aggregate implementations that offer performance benefits to handle more use cases quickly.

For example, Spark provides multiple hash aggregate implementations, including vectorized hash, hash, and object hash. Similarly, ClickHouse offers different implementation options for its hash table, based on key types. Utilizing these specialized implementations can optimize query performance without relying on a one-size-fits-all approach.

It should be easy to add types and aggregations

Regarding the proposed criteria for a fallback strategy, I believe that while it should be easy to add types and aggregations, not all internal implementations should be treated equally, or else we risk the entire engine running at the speed of UDAFs.

It should work w/ memory accounting

While it would be beneficial for the fallback strategy to work with memory accounting, it is not strictly necessary. Users can take risks when defining UDAFs, or we could even risk the less-used built-in functions.

Data accesses should be properly typed (in contrast to the aligned row format)

I'm not convinced that data accesses need to be typed for aggregate implementations, which is entirely internal to database engines.

The solution should consider variable-length types. These cannot be added as an afterthought.

I understand that you were referring to the aligned row format, but it is possible to handle variable length fields using 'pointer swizzling.' Additionally, I don't believe that median is a performance-critical aggregate function that needs to be considered a first-class citizen for all aggregate strategies.

@tustvold
Copy link
Contributor

tustvold commented Mar 6, 2023

What do people think about implementing the strategy as described here - #2723 (comment) and then revisiting the question of whether we keep additional fallback approaches based on empirical evidence as to their performance characteristics.

If this proves to outperform all the existing implementations, there is no harm in removing the other approaches, if it does not we can keep them

I view row hash as a fast lane for certain queries rather than a dead end.

I think it is important to highlight that the proposal in #2723 (comment) is still a row hash approach, just using a different row encoding. Early benchmarks show it to be significantly faster

@alamb
Copy link
Contributor Author

alamb commented Mar 6, 2023

I agree this should be benchmark results driven and the outcome should be as maintainable as possible. Given how performance critical this code is, I think performance is of higher priority than requiring strong typing (though of course it would be great if the resulting solution is strongly typed as well as high performance)

If we can't handle memory accounting for some reason, providing the user a choice to use potentially unlimited memory or faster performance, seems acceptable. I would be surprised, however, if we can't work a memory limit check in somehow and amortize its cost into the noise

@yjshen
Copy link
Member

yjshen commented Mar 7, 2023

I think it is important to highlight that the proposal in #2723 (comment) is still a row hash approach, just using a different row encoding. Early benchmarks show it to be significantly faster.

@tustvold There appears to be a discrepancy between your proposal and my current situation. Can you clarify how I should interpret this?

Hence the aggregator will be dyn-dispatched ONCE per record batch and will keep its own internal state. This moves the key->state map from the [row_]hash.rs to the aggregators.

Do you know if it needs a hash table in each aggregator? What does the internal state mean? Can you elaborate

If yes, what do you mean by "still a row hash approach"? Keys are rows, but aggregator states are not?
If not, why @crepererum said: "We're trading speed for slightly more memory usage here....", and why "in the per-aggregator hash tables by either interning...."

If the proposal is to remove all(word-aligned/compact) row formats inside DataFusion, use row-format from arrow-rs, and achieve a significant performance improvement, I completely vote for the change.

@crepererum
Copy link
Contributor

To clarify the situation around the row format:

@tustvold and I think:

  • group key: That it should be sufficient to use the arrow row format for the group key. This is already done and proofed to be a perf win)
  • group state: That the group state should be implemented by the aggregator (hence you get an internal hash table per aggregator) instead of using the compact DF row format. This is what we're discussing here, i.e. what keeps the state for a group (there are multiple groups and potentially multiple aggregators).

The current compact aggregation looks roughly as following:

  • you have two dyn RowAccumulator and the untyped aggregation_buffer
  • first you calculate the group key based on the arrow row format, then you reshuffle the input so that groups are put together (sort by group key)
  • then you feed every group record batch and a view into the untyped bufffer into RowAccumulator::update_batch (this is a dyn dispatch per group per accumulator)

Our proposal would be:

  • calculate group key
  • feed input columns + group key cols into Accumulator. This is one dyn dispatch per accumulator

Now in our proposal you CAN still use reshuffle + flat state buffer, but in many cases it might be cheaper to just iterate over the input and do the aggregation directly (reshuffle VS hash multiple times).

So technically NOTHING would change when this proposal is implemented, just that the aggregator operator no longer needs to care about "is this a compact aggregator or a non-compact one" and that the buffers are owned by the aggregators instead of flying around. However the change provides a base to simplify aggregators or to make them even faster w/ more hacks (if we want).

@yjshen
Copy link
Member

yjshen commented Mar 7, 2023

So technically NOTHING would change when this proposal is implemented

I take it differently here; changing one single hash table to per-aggregator hash table is a fundamental change. After this change, we will theoretically:

  1. do more hash table operations (hash, matching, resize&rehash when the threshold is reached)
  2. introduce much more random memory access during state update (no matter whether the state is the current word-aligned row or changed to using the unified row in arrow-rs)
    • for a continuous row state in the current approach, we could load sequential words of it into cache lines, update all fields at once, and proceed with the next group by key.
    • With the per-aggregator state, the cache line will always be filled with adjacent but irrelevant state data.

If the current inefficiency comes from too much dyn dispatch, I would like to try JIT.

@tustvold
Copy link
Contributor

tustvold commented Mar 7, 2023

introduce much more random memory access state update

The cache locality is actually likely better as the aggregated values can be stored inline within the hash table as they are known at compile time. Similarly the row format stores consecutive rows contiguously in memory, and so you have very good locality from that perspective as well.

I would like to try JIT

I think if we can get competitive performance without having to reach for JIT I think that is a massive win to maintainability. I originally reached for a JIT when creating the row format, and came to the conclusion it is a huge additional complexity that needs to yield order of magnitude performance improvements to justify itself. In the case of the row format the JIT approach was actually slower, than a correctly vectorised version. That's not to say we couldn't use a JIT, just it should definitely not be the first thing we try.

is a fundamental change

I see no reason to be fundamentalist here, we can support multiple approaches so long as they provide benefits to certain workloads. I suggest we move ahead with a POC unless you object and get some real performance data

@yjshen
Copy link
Member

yjshen commented Mar 7, 2023

Thank you @tustvold and @crepererum for your patient and detailed explanation.

aggregated values can be stored inline within the hash table

I understand your main idea now and look forward to learning more about it.

I agree that JIT is complex, and we should definitely try low-hanging fruits first.

we can support multiple approaches so long as they provide benefits to certain workloads.

I agree to add this proposal as an alternative code path for aggregation. It would be great to have performance data in the future.

@alamb
Copy link
Contributor Author

alamb commented Mar 7, 2023

It is great to see the discussion on this ticket -- it is really nice to see and a good read

@sunchao
Copy link
Member

sunchao commented Mar 8, 2023

I'm actually working on some POC to improve the hash aggregation performance, following a very similar approach. The only difference is that I'm not using Rows in the update_batch API, but rather the row format defined in DF: it seems the Rows in arrow-rs incurs extra costs because it is designed for sort and requires order preserving, and the cost is especially high for dictionary encoded arrays.

The approach requires quite a few API changes. I was able to see a big improvement for simple cases at least - haven't done comprehensive benchmarks.

@tustvold
Copy link
Contributor

tustvold commented Mar 8, 2023

the Rows in arrow-rs incurs extra costs because it is designed for sort and requires order preserving

Have you confirmed this, in my benchmarks I've found it to be significantly faster to encode than the DF row format? The order preserving part is basically free for most types, it is just a couple of bit twiddles.

@sunchao
Copy link
Member

sunchao commented Mar 8, 2023

For dictionary array at least, it stands out in the profiling result:

Screenshot 2023-03-08 at 2 39 13 PM

I feel for aggregate buffers, it's just simpler to use the native format of the data types (with null bits separated).

@tustvold
Copy link
Contributor

tustvold commented Mar 8, 2023

For dictionary array at least

Aah, yeah dictionaries are expensive as it has to compute a combined dictionary. I'm curious what the DF format is doing here, is it just hydrating all the values?

@sunchao
Copy link
Member

sunchao commented Mar 8, 2023

I don't think DF format considers dictionary 😂 . In the POC for group-by columns we basically "unpack" the dictionary arrays into plain rows while doing the columnar to row conversion. For aggregate columns, we initialize the buffer (using DF format) based on the schema of the aggregates. If dictionary type shows up in any of the aggregates, we'll remove it and replace it with its value type.

@tustvold
Copy link
Contributor

tustvold commented Mar 8, 2023

Hmm... Perhaps I could add an option to RowConverter to hydrate dictionaries to allow trading off space in this way 🤔

The main advantage of the arrow format, aside from reducing the number of formats we need to maintain, is so that aggregators can easily spill sorted state, which can then be cheaply recombined.

Either way, very cool to see this operator getting some love, and encouraging that we're thinking along the same lines 👍

@ozankabak
Copy link
Contributor

The main advantage of the arrow format, aside from reducing the number of formats we need to maintain, is so that aggregators can easily spill sorted state, which can then be cheaply recombined.

Agreed 👍

Hmm... Perhaps I could add an option to RowConverter to hydrate dictionaries to allow trading off space in this way 🤔

This sounds reasonable (at least at first thought)

@sunchao
Copy link
Member

sunchao commented Jun 29, 2023

I did some further experiment in the bucketing to see if this is still the case today branch.
This is very close to the description in the above MonetDB paper and written about here https://www.cockroachlabs.com/blog/vectorized-hash-joiner/ but doesn't really improve join performance, even when creating buckets to avoid collisions

@Dandandan does DF use SIMD in the hot paths of hash join? From my experiences LLVM auto-vectorization is pretty fragile and often times doesn't get triggered if the logic is a bit complex. I think even the create_hashes in aggregation doesn't use SIMD (I could be wrong there).

@Dandandan
Copy link
Contributor

I did some further experiment in the bucketing to see if this is still the case today branch.
This is very close to the description in the above MonetDB paper and written about here https://www.cockroachlabs.com/blog/vectorized-hash-joiner/ but doesn't really improve join performance, even when creating buckets to avoid collisions

@Dandandan does DF use SIMD in the hot paths of hash join? From my experiences LLVM auto-vectorization is pretty fragile and often times doesn't get triggered if the logic is a bit complex. I think even the create_hashes in aggregation doesn't use SIMD (I could be wrong there).

@sunchao TBH I am not sure about whether everything generates optimal SIMD instructions, however in the hash join vectorization of create_hashes, vectorized equality checking (using plain arrow kernels) was definitely a big performance improvement compared to the earlier scalar-based approaches, this would probably be the case even without much use of SIMD. There are probably some possible improvements here and there wrt. generated code. create_hashes uses ahash internally, which is one of the faster algorithms - the hashing itself is usually not very expensive by now compared to other operations, it used to be. It just does the hashing + rehashing column-wise like in other systems.

The above experiment is only about the conversion from using a RawTable<(u64, u64)) to storing buckets in a Vec<u64> and having collision checking handled at the probing phase as discussed in the MonetDB paper. Maybe I'm a bit wrong here in my implementation (and the Photon whitepaper doesn't include enough details unfortunately) or something else could be optimized in order for it to be faster. Or it turns out that keeping a plain hashtable for values only might not be so bad at all(?) At least with the current approach I'm getting mixed results.

@sunchao
Copy link
Member

sunchao commented Jun 29, 2023

@Dandandan thanks, I agree that calculating hashes itself is probably not so critical to performances comparing to other hash table operations. We are doing some experiments to switch to vectorized hash table and see if it can deliver better performance. So far, without SIMD it's roughly the same as hashbrown used by DF, and I'm really hoping by vectorizing each of the steps it can get us to the next level. Will share the result once I have some numbers.

@Dandandan
Copy link
Contributor

Cool @sunchao very interested about what comes out of those experiments :)

@mingmwang
Copy link
Contributor

mingmwang commented Jun 30, 2023

@alamb @tustvold @Dandandan

I did another POC based on the changes in #6657. The basic idea is to reduce the memory size of GroupState and avoid using Arc, Box, Vec, dyn Trait, etc. And the result is very exciting !!!
Compared to the main branch, there is about 50% improvement, and for the high cardinality aggregation itself, the improvement is about 100%.

Test result:

Q17

Running benchmarks with the following options: DataFusionBenchmarkOpt { query: Some(17), debug: false, iterations: 10, partitions: 1, batch_size: 8192, path: "./parquet_data", file_format: "parquet", mem_table: false, output_path: None, disable_statistics: false }
null_width 2
values_width 24
Query 17 iteration 0 took 818.2 ms and returned 1 rows
null_width 2
values_width 24
Query 17 iteration 1 took 774.3 ms and returned 1 rows
null_width 2
values_width 24
Query 17 iteration 2 took 765.0 ms and returned 1 rows
null_width 2
values_width 24
Query 17 iteration 3 took 772.8 ms and returned 1 rows
null_width 2
values_width 24
Query 17 iteration 4 took 770.9 ms and returned 1 rows
null_width 2
values_width 24
Query 17 iteration 5 took 765.2 ms and returned 1 rows
null_width 2
values_width 24
Query 17 iteration 6 took 765.9 ms and returned 1 rows
null_width 2
values_width 24
Query 17 iteration 7 took 760.3 ms and returned 1 rows
null_width 2
values_width 24
Query 17 iteration 8 took 758.3 ms and returned 1 rows
null_width 2
values_width 24
Query 17 iteration 9 took 763.8 ms and returned 1 rows
Query 17 avg time: 771.46 ms

Q18

Query 18 iteration 4 took 768.1 ms and returned 57 rows
null_width 2
values_width 24
null_width 2
values_width 24
Query 18 iteration 5 took 776.4 ms and returned 57 rows
null_width 2
values_width 24
null_width 2
values_width 24
Query 18 iteration 6 took 767.0 ms and returned 57 rows
null_width 2
values_width 24
null_width 2
values_width 24
Query 18 iteration 7 took 775.9 ms and returned 57 rows
null_width 2
values_width 24
null_width 2
values_width 24
Query 18 iteration 8 took 773.7 ms and returned 57 rows
null_width 2
values_width 24
null_width 2
values_width 24
Query 18 iteration 9 took 771.2 ms and returned 57 rows
Query 18 avg time: 778.98 ms
/// for non fixed length group keys
pub(crate) struct NonFixedSizeGroupState {
    /// Group data
    pub group_data: Vec<u8>,
    /// Accumulator data
    pub acc_data: Vec<u8>,
}

impl NonFixedSizeGroupState {
    #[inline(always)]
    fn group_data(&self) -> &[u8] {
        &self.group_data
    }

    #[inline(always)]
    fn agg_data(&self) -> &[u8] {
        &self.acc_data
    }
}

/// for fixed length group keys
pub(crate) struct FixedSizeGroupState {
    /// Group data and Accumulator state data, stored sequentially
    pub group_states: Vec<u8>,
}

impl FixedSizeGroupState {
    #[inline(always)]
    fn group_data(&self, data_width: usize) -> &[u8] {
        &self.group_states[0..data_width]
    }

    #[inline(always)]
    fn agg_data(&self, data_width: usize) -> &[u8] {
        &self.group_states[data_width..]
    }
}


fn update_one_accumulator_with_native_value<T1>(
        &mut self,
        groups_addresses: &[usize],
        agg_input_array: &T1,
        acc_idx: usize,
        filter_bool_array: &[Option<&BooleanArray>],
        row_layout: Arc<RowLayout>,
    ) -> Result<()>
    where
        T1: ArrowArrayReader,
    {
        let acc = &self.row_accumulators[acc_idx];
        let filter_array = &filter_bool_array[acc_idx];
        let mut state_accessor = RowAccessor::new_from_layout(row_layout);
        if filter_array.is_none() && agg_input_array.null_count() == 0 {
            for idx in 0..groups_addresses.len() {
                unsafe {
                    let group_state_ptr = &mut *(&self.aggr_state.group_states
                        [groups_addresses[idx]]
                        as *const FixedSizeGroupState
                        as *mut FixedSizeGroupState);
                    state_accessor.point_to(
                        0,
                        group_state_ptr.group_states[self.data_part_width..].as_mut(),
                    );
                    acc.update_value::<T1::Item>(
                        Some(agg_input_array.value_at_unchecked(idx)),
                        &mut state_accessor,
                    );
                }
            }
        } else {
            for idx in 0..groups_addresses.len() {
                unsafe {
                    let group_state_ptr = &mut *(&self.aggr_state.group_states
                        [groups_addresses[idx]]
                        as *const FixedSizeGroupState
                        as *mut FixedSizeGroupState);
                    state_accessor.point_to(
                        0,
                        group_state_ptr.group_states[self.data_part_width..].as_mut(),
                    );
                    let value = col_to_value(agg_input_array, filter_array, idx);
                    acc.update_value::<T1::Item>(value, &mut state_accessor);
                }
            }
        }

        Ok(())
    }

And now, the update group stats method(hash, comparing and matching related operations) becomes the bottleneck:

Total evaluate agg input elapsed: 185
Total update group stats elapsed: 290376
Total update accumulators elapsed: 61192
Query 17 iteration 9 took 776.5 ms and returned 1 rows
Query 17 avg time: 776.81 ms

@mingmwang
Copy link
Contributor

Based on the test result, I would like do another POC and implement a new Accumulator framework. The Accumulator framework will follow your suggestions in the new Accumulator API proposal and will combine the normal Accumulator
and the RowAccumulator into a unified Accumulator framework.

  1. Unify the Accumulator framework, remove the current RowAccumulator API.
  2. A new RowLayout to combine the grouping data and accumulator state together
  3. Make the new Accumulator state generic.
  4. There will be one NewAccumulator instance per aggregate in the query (NOT one per group)
  5. NewAccumulator API has update_batch that is invoked once per input batch, per aggregate (NOT PER GROUP).

@alamb
Copy link
Contributor Author

alamb commented Jun 30, 2023

Based on the test result, I would like do another POC and implement a new Accumulator framework. The Accumulator framework will follow your suggestions in the new Accumulator API proposal and will combine the normal Accumulator

Sounds like a great idea @mingmwang ❤️ -- Note I also got super excited about this project and I am in the middle of implementing a POC for the proposed approach -- you can see the structure I have so far here #6800. I am hoping to have some numbers ready to report today

If you also do a POC that will be great and we can compare the results and pick the one that shows the greatest promise.

@alamb
Copy link
Contributor Author

alamb commented Jun 30, 2023

@mingmwang @Dandandan and @tustvold (and everyone else following along) here is a POC #6800

It follows similar pattern as you describe in #4973 (comment) and It also makes q17 go more than 2x faster (and has no unsafe code or row format)!

This branch: Query 17 avg time: 766.31 ms
main: Query 17 avg time: 1789.73 ms

I would love feedback and thoughts on the approach

@jhorstmann
Copy link
Contributor

Clickbench query Q32 is also a very good benchmark of high cardinality grouping. It's also the only query that did not finish successfully when the benchmark results were last updated.

@liurenjie1024
Copy link
Contributor

We have implemented a hash key to further reduce allocation for fix sized group key/state, which is inspired by clickhouse:
https://github.com/risingwavelabs/risingwave/blob/main/src/common/src/hash/key.rs

cc @mingmwang

@vincev
Copy link
Contributor

vincev commented Jul 3, 2023

I am adding this comment here instead of opening a new issue as it looks the above PRs may fix/improve but please let me know if you would like a new issue.

I am experimenting with using DataFusion instead of Polars as query engine for dply-rs and I have noticed that group by median is more than ~20x slower (I run these a few times the timing is consistent):

The Polars version:

$ time ./target/release/polars-median
shape: (6, 3)
┌──────────────┬──────────────┬──────────┐
│ payment_type ┆ total_amount ┆ n        │
│ ---          ┆ ---          ┆ ---      │
│ i64          ┆ f64          ┆ u32      │
╞══════════════╪══════════════╪══════════╡
│ 5            ┆ 5.275        ┆ 6        │
│ 3            ┆ 7.7          ┆ 194323   │
│ 4            ┆ -6.8         ┆ 244364   │
│ 0            ┆ 23.0         ┆ 1368303  │
│ 2            ┆ 13.3         ┆ 7763339  │
│ 1            ┆ 16.56        ┆ 30085763 │
└──────────────┴──────────────┴──────────┘

real    0m0.551s
user    0m1.654s
sys     0m0.253s

datafusion version:

$ time ./target/release/datafusion-median
+--------------+--------------+----------+
| payment_type | total_amount | n        |
+--------------+--------------+----------+
| 5            | 5.275        | 6        |
| 3            | 7.7          | 194323   |
| 4            | -6.8         | 244364   |
| 0            | 23.0         | 1368303  |
| 2            | 13.3         | 7763339  |
| 1            | 16.56        | 30085763 |
+--------------+--------------+----------+

real    0m22.681s
user    1m54.036s
sys     0m3.001s

This is using one year of data from the nyctaxi dataset.

The Datafusion code:

use anyhow::Result;
use datafusion::prelude::*;

#[tokio::main]
async fn main() -> Result<()> {
    let ctx = SessionContext::new();
    let df = ctx
        .read_parquet("./data/*.parquet", Default::default())
        .await?;
    df.aggregate(
        vec![col("payment_type")],
        vec![
            median(col("total_amount")).alias("total_amount"),
            count(lit(1)).alias("n"),
        ],
    )?
    .sort(vec![col("n").sort(true, false)])?
    .show()
    .await?;
    Ok(())
}

The Polars code:

use anyhow::Result;
use polars::prelude::*;

fn main() -> Result<()> {
    let lazy = LazyFrame::scan_parquet("./data/*.parquet", ScanArgsParquet::default())?;
    let df = lazy
        .groupby([col("payment_type")])
        .agg([col("total_amount").median(), col("total_amount").count().alias("n")])
        .sort("n", Default::default())
        .collect()?;
    println!("{}", df);

    Ok(())
}

@alamb
Copy link
Contributor Author

alamb commented Jul 3, 2023

I am adding this comment here instead of opening a new issue as it looks the above PRs may fix/improve but please let me know if you would like a new issue.

@vincev -- thank you Yes, I expect DataFusion will get significantly faster with the new grouping. However, the overall performance of the median aggregator will still have room to improve (someone will need to write a specialized vectorized implementation of median)

@vincev
Copy link
Contributor

vincev commented Jul 4, 2023

@alamb spent a bit of time looking at the median aggregator, I was able to reduce the time from ~22secs to ~2secs by reducing the number of allocations:

$ time ./target/release/datafusion-median
+--------------+--------------+----------+
| payment_type | total_amount | n        |
+--------------+--------------+----------+
| 5            | 5.275        | 6        |
| 3            | 7.7          | 194323   |
| 4            | -6.8         | 244364   |
| 0            | 23.0         | 1368303  |
| 2            | 13.3         | 7763339  |
| 1            | 16.56        | 30085763 |
+--------------+--------------+----------+

real    0m2.514s
user    0m4.725s
sys     0m1.765s

I have created #6837.

@mingmwang
Copy link
Contributor

mingmwang commented Jul 5, 2023

@mingmwang @Dandandan and @tustvold (and everyone else following along) here is a POC #6800

It follows similar pattern as you describe in #4973 (comment) and It also makes q17 go more than 2x faster (and has no unsafe code or row format)!

This branch: Query 17 avg time: 766.31 ms
main: Query 17 avg time: 1789.73 ms

I would love feedback and thoughts on the approach

@alamb
I like your approach !!. The POC result is very promising, and the performance is better than my best version.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.