Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Demonstrate new GroupHashAggregate stream approach (runs more than 2x faster!) #6800

Closed
wants to merge 98 commits into from

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Jun 29, 2023

TLDR

This branch executes Q17 (which has a high cardinality grouping on a Decimal128) in less than half (44%) of the time as main. 🥳

Which issue does this PR close?

Related to #4973

This PR contains a technical spike / proof of concept that the hash aggregate approach described in #4973 (comment) will improve performance dramatically

I do not intend to ever merge this PR, but rather if it proves promising, I will break it up and incrementally merge it into the existing code (steps TBD)

Rationale for this change

We want faster grouping behavior, especially when there are large numbers of distinct groups

What changes are included in this PR?

  • A new GroupedHashAggregateStream2 operator that implements vectorized / multi-group updates
  • A new GroupsAccumulator trait with a proposed vectorized API for managing and updating group state
  • An generic implementation of GroupsAccumulator for AVG for PrimitiveArray (including decimal)

Stuff I plan to complete in this PR

  • Complete fuzz testing of accumulate function
  • Implement opt_filter in accumulate functions
  • An adapter that implements GroupsAccumulator in terms of Accumulator (for slower, but simpler accumulators)

I am very pleased :bowtie: with how the code looks

Things not done:

  1. Filtering (though I don't expect it would change perfomance at all without filters)
  2. Null handling for counts

Performance Results:

This branch runs Q17 in less than half (44%) of the time as main. 🥳

  • This branch: Query 17 avg time: 766.31 ms
  • main: Query 17 avg time: 1789.73 ms
Details

Correctness

Both main and this branch produce the same answer

    +-------------------+
    | avg_yearly        |
    +-------------------+
    | 348406.0542857143 |
    +-------------------+

This branch

Query 17 iteration 0 took 876.5 ms and returned 1 rows
Query 17 iteration 1 took 757.5 ms and returned 1 rows
Query 17 iteration 2 took 737.6 ms and returned 1 rows
Query 17 iteration 3 took 728.6 ms and returned 1 rows
Query 17 iteration 4 took 731.3 ms and returned 1 rows
Query 17 avg time: 766.31 ms

Main

Query 17 iteration 0 took 1794.5 ms and returned 1 rows
Query 17 iteration 1 took 1825.9 ms and returned 1 rows
Query 17 iteration 2 took 1799.1 ms and returned 1 rows
Query 17 iteration 3 took 1793.4 ms and returned 1 rows
Query 17 iteration 4 took 1735.7 ms and returned 1 rows
Query 17 avg time: 1789.73 ms

Screenshot 2023-06-30 at 1 00 50 PM

Methodology

Run this command

cargo run --profile release-nonlto --bin tpch -- benchmark datafusion --iterations 5 -m --format parquet -q 17 --path /Users/alamb/Software/arrow-datafusion/benchmarks/data/

Query:

select
        sum(l_extendedprice) / 7.0 as avg_yearly
from
    lineitem,
    part
where
        p_partkey = l_partkey
  and p_brand = 'Brand#23'
  and p_container = 'MED BOX'
  and l_quantity < (
    select
            0.2 * avg(l_quantity)
    from
        lineitem
    where
            l_partkey = p_partkey

Here is the original plan:

[2023-06-29T13:26:41Z DEBUG datafusion::physical_planner] Optimized physical plan:
    ProjectionExec: expr=[CAST(SUM(lineitem.l_extendedprice)@0 AS Float64) / 7 as avg_yearly]
      AggregateExec: mode=Final, gby=[], aggr=[SUM(lineitem.l_extendedprice)]
        CoalescePartitionsExec
          AggregateExec: mode=Partial, gby=[], aggr=[SUM(lineitem.l_extendedprice)]
            ProjectionExec: expr=[l_extendedprice@1 as l_extendedprice]
              CoalesceBatchesExec: target_batch_size=8192
                HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@2, l_partkey@1)], filter=CAST(l_quantity@0 AS Decimal128(30, 15)) < Float64(0.2) * AVG(lineitem.l_quantity)@1
                  CoalesceBatchesExec: target_batch_size=8192
                    RepartitionExec: partitioning=Hash([p_partkey@2], 2), input_partitions=2
                      ProjectionExec: expr=[l_quantity@1 as l_quantity, l_extendedprice@2 as l_extendedprice, p_partkey@3 as p_partkey]
                        CoalesceBatchesExec: target_batch_size=8192
                          HashJoinExec: mode=Partitioned, join_type=Inner, on=[(l_partkey@0, p_partkey@0)]
                            CoalesceBatchesExec: target_batch_size=8192
                              RepartitionExec: partitioning=Hash([l_partkey@0], 2), input_partitions=2
                                MemoryExec: partitions=2, partition_sizes=[367, 366]
                            CoalesceBatchesExec: target_batch_size=8192
                              RepartitionExec: partitioning=Hash([p_partkey@0], 2), input_partitions=2
                                ProjectionExec: expr=[p_partkey@0 as p_partkey]
                                  CoalesceBatchesExec: target_batch_size=8192
                                    FilterExec: p_brand@1 = Brand#23 AND p_container@2 = MED BOX
                                      MemoryExec: partitions=2, partition_sizes=[13, 12]
                  ProjectionExec: expr=[CAST(0.2 * CAST(AVG(lineitem.l_quantity)@1 AS Float64) AS Decimal128(30, 15)) as Float64(0.2) * AVG(lineitem.l_quantity), l_partkey@0 as l_partkey]
                    AggregateExec: mode=FinalPartitioned, gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)]   <-- want to use the new stream for here
                      CoalesceBatchesExec: target_batch_size=8192
                        RepartitionExec: partitioning=Hash([l_partkey@0], 2), input_partitions=2
                          AggregateExec: mode=Partial, gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)]
                            MemoryExec: partitions=2, partition_sizes=[367, 366]

Next Steps

Stuff I would do after the above is done:

Here is the list of RowAccumulators (aka accumulators that have
specialized implementations). I think Avg is the trickiest to
implement (and it is already done)

  • CountRowAccumulator
  • MaxRowAccumulator
  • MinRowAccumulator
  • AvgRowAccumulator
  • SumRowAccumulator
  • BitAndRowAccumulator
  • BitOrRowAccumulator
  • BitXorRowAccumulator
  • BoolAndRowAccumulator
  • BoolOrRowAccumulator

@alamb alamb changed the title (NOT READY FOR REVIEW YET) POC: Demonstrate new GroupHashAggregate stream approach (NOT READY FOR REVIEW YET) POC: Demonstrate new GroupHashAggregate stream approach Jun 29, 2023
@github-actions github-actions bot added the core Core DataFusion crate label Jun 29, 2023

/// The actual group by values, stored in arrow Row format
/// the index of group_by_values is the index
/// https://github.com/apache/arrow-rs/issues/4466
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// The actual group by values, stored in arrow Row format
/// the index of group_by_values is the index
/// https://github.com/apache/arrow-rs/issues/4466
group_by_values: Vec<OwnedRow>,
Copy link
Contributor

@Dandandan Dandandan Jun 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be a buffer of some sort? OwnedRow has a copy of the RowConfig per value. If we want to keep using rows(?), something like the following would do:

pub struct AppendableRows {
    /// Underlying row bytes
    buffer: Vec<u8>,
    /// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]`
    offsets: Vec<usize>,
    /// The config for these rows
    config: RowConfig,
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Dandandan -- that is an excellent point. That is what I was trying to get at with apache/arrow-rs#4466

Note that the formulation in this PR is no worse than what is on master I don't think (which also has an OwnedRow per group)

https://github.com/apache/arrow-datafusion/blob/e91af991c5ae4a6b4afab2cb1b0c9307a69e4046/datafusion/core/src/physical_plan/aggregates/utils.rs#L40

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I saw you mentioned the need for it in the feature request

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And it's interesting currently already it does with an OwnedRow, didn't realize that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I am feeling very good about the ability to make the code faster 🚀 )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(BTW @tustvold is being a hero. Here is a PR to help apache/arrow-rs#4470)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this change. It is important to reduce the memory size of group rows/keys.
One optimization we can do further is when the group keys are fixed length, we can void the offsets vec also.

create_hashes(group_values, &self.random_state, &mut batch_hashes)?;

for (row, hash) in batch_hashes.into_iter().enumerate() {
let entry = self.map.get_mut(hash, |(_hash, group_idx)| {
Copy link
Contributor

@Dandandan Dandandan Jun 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could get this more in line with the hash join, with the following steps:

  1. Create candidates (possible matches) based on hash-equality
  2. Compare keys (column-wise) in a vectorized fashion (take + eq + and)
  3. Filter candidates based on filter (filter).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is interesting 🤔 This is basically what the existing grouping operator does. I'll try and check out the join code at some point and see if i can transfer any of the learnings over here)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From our experiences, convert_columns is also quite expensive. It may worth considering to directly compare column by column, and only do the row conversion when spilling is required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can special case single column grouping like we do for SortPreservingMerge, the row format is only really beneficial when dealing with multiple columns as it avoids per-field type dispatch.

FWIW row conversion should have similar performance to the take kernel, with exception to dictionaries. I would be interested if this is not the case, as that is a bug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can special case single column grouping like we do for SortPreservingMerge, the row format is only really beneficial when dealing with multiple columns as it avoids per-field type dispatch.

Yes, I think this would be an excellent idea.

Basically @sunchao I think we have seen that for single column sorting (in this case grouping) keeping the native representation is better than converting to row format. However, once there are sort(or group) columns involved the dynamic dispatch logic for comparsions quickly dominates the row conversion costs.

I am a bit concerned about "boiling the ocean" when improving grouping. Any work will take a significant amount of time, so keeping the scope down is important to make the change practical

That being said, if we with go with the formulation in this PR, we'll be in a much better place to try and special group storage -- it may not be obvious but the actual operator / stream code in this PR is quite a bit simpler than the existing row_hash even though it has all the same features. This difference is largely due to not tracking parallel sets of aggregators (row and Accumulator)s

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% agree, I think we should focus on getting a consistent accumulator representation and interface, before undertaking additional optimisation work of the hash table machinery

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I totally agree with the approach. Getting the other changes ironed out is definitely more important for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also agree we should finish the other changes first as it will get too big otherwise 👍

I might do some experiments in the future with a similar approach as I mentioned above. I think the conversion might be relatively fast, but it will make other operations (e.g. equality) slower as it is not specialized on fixed size types and not as well vectorized.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From our experiences, convert_columns is also quite expensive. It may worth considering to directly compare column by column, and only do the row conversion when spilling is required.

I think the encoder implemented by @tustvold is very efficient. In the past I did some test on this code path it almost take no time.

@github-actions github-actions bot added the physical-expr Physical Expressions label Jun 29, 2023
@alamb alamb force-pushed the alamb/hash_agg_spike branch from 31335b4 to e02c35d Compare June 30, 2023 13:46

use super::AggregateExec;

/// Grouping aggregate
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code follows the basic structure of row_hash but the aggregate state management is different


match self.mode {
AggregateMode::Partial | AggregateMode::Single => {
acc.update_batch(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is one key difference -- each accumulator is called once per input batch (not once per group)

let avg_fn =
move |sum: i128, count: u64| decimal_averager.avg(sum, count as i128);

Ok(Box::new(AvgGroupsAccumulator::<Decimal128Type, _>::new(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a specialized accumulator -- it will be instantiated once per native type or other type we need to support in the accumulator, but this will result in a specialized accumulator for each native type. 👨‍🍳 👌

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also serves the purpose of allowing us to eventually deprecate the ScalarValue binary operations - #6842

// TODO combine the null mask from values and opt_filter
let valids = values.nulls();

// This is based on (ahem, COPY/PASTA) arrow::compute::aggregate::sum
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This particular code is likely to be very common across most accumulators so I would hope to find some way to generalize it into its own function / macro

datafusion/physical-expr/src/aggregate/average.rs Outdated Show resolved Hide resolved
use arrow_array::{ArrayRef, BooleanArray};
use datafusion_common::Result;

/// An implementation of GroupAccumulator is for a single aggregate
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the new GroupsAccumulator trait that all accumulators would have to implement.

I would also plan to create a struct that implements this trait for aggregates based on Accumulator s

struct GroupsAdapter {
  groups: Vec<Box<dyn Accumulator>>
}

impl GroupsAccumulator for GroupsAdapter {
...
}

So in that way we can start with simpler (but slower) Accumulator implementations for aggregates, and provide a fast GroupsAccumulator for the aggregates / types that need the specialization

datafusion/physical-expr/src/aggregate/utils.rs Outdated Show resolved Hide resolved
@alamb alamb changed the title (NOT READY FOR REVIEW YET) POC: Demonstrate new GroupHashAggregate stream approach Demonstrate new GroupHashAggregate stream approach (runs more than 2x faster!) Jun 30, 2023
@alamb alamb changed the title Demonstrate new GroupHashAggregate stream approach (runs more than 2x faster!) RFC: Demonstrate new GroupHashAggregate stream approach (runs more than 2x faster!) Jun 30, 2023
@alamb alamb requested a review from mingmwang June 30, 2023 17:23
@Dandandan
Copy link
Contributor

Dandandan commented Jul 2, 2023

I did some profiling on the current version on query 17: seems that a portion (at least 10% but could be more) of the time is spent now around Row/OwnedRow - would be interesting to see how much it improves after using apache/arrow-rs#4470

@Dandandan
Copy link
Contributor

@alamb do you continue this PR on your own or would some form of assistance help? E.g. writing some of those accumulators?

group_indicies,
values,
opt_filter,
|group_index, _new_value| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this compiles into the same code as with only iterating over group_indicies

Copy link
Contributor Author

@alamb alamb Jul 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be super helpful if you could test that / figure out if it is worth specializing -- the original version didn't handle input nulls correctly


if values.null_count() == 0 {
accumulate_all(
group_indicies,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
group_indicies,
group_indices,

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦

git commit -a -m 'fix spelling of indices'
[alamb/hash_agg_spike d760a5f115] fix spelling of indices
 4 files changed, 24 insertions(+), 24 deletions(-)

@alamb
Copy link
Contributor Author

alamb commented Jul 7, 2023

I just have the last two accumulators

BoolAndRowAccumulator
 BoolOrRowAccumulator

To complete and I think I'll be ready to create a PR for review

@github-actions github-actions bot added the logical-expr Logical plan and expressions label Jul 8, 2023
@Dandandan
Copy link
Contributor

Found time for a small optimization (to reuse the buffer to create the hashes).

@github-actions github-actions bot removed the logical-expr Logical plan and expressions label Jul 8, 2023
@@ -111,6 +111,8 @@ pub(crate) struct GroupedHashAggregateStream {
/// first element in the array corresponds to normal accumulators
/// second element in the array corresponds to row accumulators
indices: [Vec<Range<usize>>; 2],
// buffer to be reused to store hashes
hashes_buffer: Vec<u64>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️ this is a good change -- thanks @Dandandan . Pretty soon there will be no allocations while processing each batch (aka the hot loop) 🥳 -- I think with #6888 we can get rid of the counts in the sum accumulator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this change was made to the existing row_hash (not the new one). I will port the change to the new one as part of #6904

@alamb
Copy link
Contributor Author

alamb commented Jul 8, 2023

Ok, here are some numbers (TPCH SF1). I am quite pleased

My next plan is to turn this into a PR

--------------------
Benchmark tpch.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ main_base ┃ alamb_hash_agg_spike ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │  789.36ms │             768.82ms │     no change │
│ QQuery 2     │  292.62ms │             219.58ms │ +1.33x faster │
│ QQuery 3     │  408.23ms │             388.36ms │     no change │
│ QQuery 4     │  239.14ms │             236.48ms │     no change │
│ QQuery 5     │  512.51ms │             516.96ms │     no change │
│ QQuery 6     │  208.24ms │             211.47ms │     no change │
│ QQuery 7     │  869.70ms │             896.97ms │     no change │
│ QQuery 8     │  574.60ms │             591.00ms │     no change │
│ QQuery 9     │  893.77ms │             908.34ms │     no change │
│ QQuery 10    │  650.66ms │             621.45ms │     no change │
│ QQuery 11    │  204.09ms │             178.99ms │ +1.14x faster │
│ QQuery 12    │  334.17ms │             327.36ms │     no change │
│ QQuery 13    │  744.82ms │             634.29ms │ +1.17x faster │
│ QQuery 14    │  292.05ms │             281.81ms │     no change │
│ QQuery 15    │  247.06ms │             218.11ms │ +1.13x faster │
│ QQuery 16    │  247.45ms │             209.87ms │ +1.18x faster │
│ QQuery 17    │ 2534.68ms │            1135.75ms │ +2.23x faster │
│ QQuery 18    │ 2630.03ms │            1751.31ms │ +1.50x faster │
│ QQuery 19    │  521.75ms │             528.30ms │     no change │
│ QQuery 20    │  926.76ms │             440.71ms │ +2.10x faster │
│ QQuery 21    │ 1278.07ms │            1275.54ms │     no change │
│ QQuery 22    │  150.15ms │             150.67ms │     no change │
└──────────────┴───────────┴──────────────────────┴───────────────┘
--------------------
Benchmark tpch_mem.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ main_base ┃ alamb_hash_agg_spike ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │  489.23ms │             455.08ms │ +1.08x faster │
│ QQuery 2     │  243.33ms │             134.34ms │ +1.81x faster │
│ QQuery 3     │  166.61ms │             158.30ms │     no change │
│ QQuery 4     │  112.69ms │             109.91ms │     no change │
│ QQuery 5     │  371.31ms │             367.26ms │     no change │
│ QQuery 6     │   38.85ms │              39.05ms │     no change │
│ QQuery 7     │  857.14ms │             848.70ms │     no change │
│ QQuery 8     │  228.76ms │             226.56ms │     no change │
│ QQuery 9     │  525.80ms │             507.89ms │     no change │
│ QQuery 10    │  322.86ms │             304.78ms │ +1.06x faster │
│ QQuery 11    │  185.13ms │             157.05ms │ +1.18x faster │
│ QQuery 12    │  158.53ms │             152.98ms │     no change │
│ QQuery 13    │  511.26ms │             254.26ms │ +2.01x faster │
│ QQuery 14    │   44.26ms │              43.50ms │     no change │
│ QQuery 15    │   75.39ms │              45.33ms │ +1.66x faster │
│ QQuery 16    │  196.56ms │             158.71ms │ +1.24x faster │
│ QQuery 17    │ 2260.88ms │             788.95ms │ +2.87x faster │
│ QQuery 18    │ 2375.63ms │            1416.96ms │ +1.68x faster │
│ QQuery 19    │  158.64ms │             150.11ms │ +1.06x faster │
│ QQuery 20    │  830.32ms │             305.56ms │ +2.72x faster │
│ QQuery 21    │  995.44ms │             978.06ms │     no change │
│ QQuery 22    │   84.62ms │              79.60ms │ +1.06x faster │
└──────────────┴───────────┴──────────────────────┴───────────────┘

@alamb
Copy link
Contributor Author

alamb commented Jul 8, 2023

Tracking my plan in #6889

@alamb
Copy link
Contributor Author

alamb commented Jul 8, 2023

I also tried some Clickebench queries and I got a similar speedup (3x) -- I am feeling good about this one

SELECT COUNT(DISTINCT "UserID") FROM 'hits.parquet';

Main:

1 row in set. Query took 8.231 seconds.

This branch

1 row in set. Query took 3.879 seconds.

🚀

@Dandandan
Copy link
Contributor

I also tried some Clickebench queries and I got a similar speedup (3x) -- I am feeling good about this one

SELECT COUNT(DISTINCT "UserID") FROM 'hits.parquet';

Main:

1 row in set. Query took 8.231 seconds.

This branch

1 row in set. Query took 3.879 seconds.

rocket

Amazing 🚀 I think for this query we should also consider avoiding the conversion to the row-format as this likely will be one of the more expensive things now.

@alamb
Copy link
Contributor Author

alamb commented Jul 9, 2023

Amazing 🚀 I think for this query we should also consider avoiding the conversion to the row-format as this likely will be one of the more expensive things now.

That is a good idea -- it worked well for sorting as well. I put a note on #6889 to track writing up a real ticket

counts: Vec<u64>,

/// Sums per group, stored as the native type
sums: Vec<T::Native>,
Copy link
Contributor

@yahoNanJing yahoNanJing Jul 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to combine the counts and sums into one property, like avg_states: Vec<(T::Native, u64)>? Since one sum and the related count are always used together, I think it's better to put them together for better cache locality.

Copy link
Contributor

@Dandandan Dandandan Jul 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @alamb sounds like a useful suggestion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to combine the counts and sums into one property, like avg_states: Vec<(T::Native, u64)>? Since one sum and the related count are always used together, I think it's better to put them together for better cache locality.

Thank you for the comment @yahoNanJing

The reason the sums and counts are stored separately is to minimize copying when forming the final output -- since the final output is columnar (two columns) keeping the data as two Vecs allows the final ArrayRefs to be created directly from that data.

It would be an interesting experiment to see if keeping them together and improving cache locality outweighed the extra copy.

BTW if people are looking to optimize the inner loops more, I think removing the bounds checks with unsafe might also help (but I don't plan to pursue it until I find need to optimize more)

So instead of

                let sum = &mut self.sums[group_index];
                *sum = sum.add_wrapping(new_value);
                unsafe {
                  let sum = sums.get_unchecked_mut(group_index);
                  *sum = sum.add_wrapping(new_value);
                }

Copy link
Contributor

@yahoNanJing yahoNanJing Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to make a tuple (T::Native, u64) as a primitive type at the arrow-rs side so that we can create an array of tuple? Then we don't need to return two arrays for the state()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah -- I see what you are saying -- I think we could potentially use a StructArray for the state (which would be a single "column" in arrow) but the underlying storage is still two separate contiguous arrays.

Maybe we could use FixedSizeBinaryArray 🤔 and pack/unpack the tuples to the appropriate size

It would be an interesting experiment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid both of the StructArray and FixedSizeBinaryArray may have additional overhead.

If T::Native can be a tuple, then we can provide a new array, called TupleArray. The element type is a tuple, (T::Native, T::Native). Then this tuple can be any nested tuples. And this new TupleArray can cover any nested tuple cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would definitely be a cool thing to try

@alamb
Copy link
Contributor Author

alamb commented Jul 11, 2023

For anyone following along, I have created a proposed PR with these changes that is ready for review: #6904

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.