Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EPIC] (Even More) Grouping / Group By / Aggregation Performance #7000

Open
10 of 17 tasks
Tracked by #11679
alamb opened this issue Jul 17, 2023 · 10 comments
Open
10 of 17 tasks
Tracked by #11679

[EPIC] (Even More) Grouping / Group By / Aggregation Performance #7000

alamb opened this issue Jul 17, 2023 · 10 comments
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Jul 17, 2023

Is your feature request related to a problem or challenge?

Aggregation is a key operation of Analytic engines. DataFusion has made great progress recently (e.g. #4973 and #6889)

This Epic gathers other potential ways we can improve the performance of aggregation

Core Hash Grouping Algorithm:

Specialized Aggregators:

New features:

Improved partitioning:

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

@alamb alamb added the enhancement New feature or request label Jul 17, 2023
@alamb alamb changed the title [EPIC] (Even More) Aggregation Performance [EPIC] (Even More) Grouping / Aggregation Performance Jan 8, 2024
@alamb alamb changed the title [EPIC] (Even More) Grouping / Aggregation Performance [EPIC] (Even More) Grouping / Group By / Aggregation Performance Feb 29, 2024
@karlovnv
Copy link

karlovnv commented May 4, 2024

Hi! There is great job done here!

I faced with an issues with CoalesceBatches: it seams that there is a performance killer somewhere in CoalesceBatchesStream.
It's spending too much time in arrow_select::concat::concat (especially in arrow_data::transform::variable_size::build_extend::_{{closure}}). I think that it's an issue of expanding MutableBatch (or not vectorized copying)

In Query:

select count(*)
 from SomeTable as a
 join AnotherTable as b
on a.c1 = b.c1
and a.c2 = b.c2
and a.c3 = b.c3
and a.c4 = b.c4 

SomeTable consists of 200m rows
Anothertable -- 15m rows

17% of all execution time is CoalesceBatchesStream:

coalesce_batches::CoalesceBatchesStream::poll_next (55,950 samples, 17.03%)
|--arrow_select::concat::concat_batches (36,618 samples, 11.14%)
    |--arrow_select::concat::concat (30,947 samples, 9.42%)
image

@karlovnv
Copy link

karlovnv commented May 4, 2024

Another topic related issue is performance of RowConverter used for grouping.

More than 75% of GroupedHashAggregateStream work is converting composite aggregation key to row
Apprx 50% of GroupedHashAggregateStream work is encoding (variable::encode)

physical_plan::aggregates::row_hash::GroupedHashAggregateStream::poll_next (17,163 samples, **80.95%**)
|-- arrow_row::RowConverter::convert_columns (10,684 samples, **50.39%**)
     |--arrow_row::row_lengths (2,080 samples, 9.81%)
     |--arrow_row::variable::encode (7,267 samples, 34.28%)
image

The query is:

SELECT count(*), col1, col2, col3, col4
FROM SomeTable
GROUP BY col1, col2, col3, col4
ORDER BY col1, col2, col3, col4

The length of SomeTable is ~150m rows

I haven't figured out the root of the problem yet

@alamb
Copy link
Contributor Author

alamb commented May 5, 2024

Thanks for these profiles @karlovnv

I faced with an issues with CoalesceBatches: it seams that there is a performance killer somewhere in CoalesceBatchesStream.

Looking at the trace in #7000 (comment) while I agree that CoalesceBatchesStream is spending the time, I believe it is effectively part of the HashJoin.

Specifically the HashJoin is basically acting like a filter on the probe side input (and thus may emit many small batches where most rows were filtered). Then the CoalesceBatchesStream copies these small batches togehter back into bigger batches

Thus in my opinion the way to speed up this query is not to look at CoalesceBatchesStream itself but instead look at HashJoin -- specifically maybe we could improve the code (especially after @korowa 's work to encapsulate the output generation) so that the join itself handles creating the large output batches

@alamb
Copy link
Contributor Author

alamb commented May 5, 2024

Another topic related issue is performance of RowConverter used for grouping.

Similarly, in while most of the time is being spent in RowConverter I think it would be hard to improve the performance of the Row Converter itself as it is already quite optimized

Instead, I think the key to improveing a query such as you have in #7000 (comment) is to stop using using the RowCoverter (at least as much). Here is one idea for doing so: #9403 (I still harbor dreams of working on this sometime)

@karlovnv
Copy link

karlovnv commented May 5, 2024

Looking at the trace in

@alamb I'd like to mention, that extending of mutable batch spends a lot of time (MutableArrayData::Extend, utils::extend_offsets) and related allocator's work.

I suppose that it's much better to preallocate bigger arrow buffer instead of extending it by small portions. And I believe that it will give us an effect.

Also I noticed that ~18% was spent by asm_exc_page_fault which is probably an issue of enabled transparent huge pages (which is bad for databases workloads). I will investigate more on that and post some conclusions later

@alamb
Copy link
Contributor Author

alamb commented May 5, 2024

Looking at the trace in

@alamb I'd like to mention, that extending of mutable batch spends a lot of time (MutableArrayData::Extend, utils::extend_offsets) and related allocator's work.

I think those particular functions are the ones that actually copy data, so I am not sure how much more they can be optimized

I suppose that it's much better to preallocate bigger arrow buffer instead of extending it by small portions. And I believe that it will give us an effect.

I agree it may well

Also I noticed that ~18% was spent by asm_exc_page_fault which is probably an issue of enabled transparent huge pages (which is bad for databases workloads). I will investigate more on that and post some conclusions later

👍

@karlovnv
Copy link

karlovnv commented May 5, 2024

Here is one idea for doing so: #9403

I thought over a join issue in case when left table may be not columnar.

For instance let's consider Events and Users tables.
Events is a columnar table and consist of 10^9 rows
Users table is only of 10^6 rows

So in case of that Users table may be considered as a row-based table with persistent (or stored only in memory) hash (or b*-tree) index.

We can achieve performance boost using different approaches:

  1. Introduce Dictionary feature. Consider Users table as a dictionary (like in clickhouse)

ClickHouse supports special functions for working with dictionaries that can be used in queries. It is easier and more efficient to use dictionaries with functions than a JOIN with reference tables.

Now we are playing with UDFs like

select timestamp, 
  e.user_id,  
  get_dict_utf8("Users", "Id", "Name", e.user_id) as user_name 
from events e

But this is not a kind of general solution so that leads us to the next approach.

  1. Introduce row-based table provider with its special type of LookupRecordBatchStream
    The main idea is to add an ability of providing data to HashJoinStream by a request:
    get_items_from_table_by_ids(join_on: RecordBatch) → Result<SendableRecordBatchStream>

Also this approach may be useful for joining columnar data with another relational source like Postgres (by loading portions of joining table data on demand by list of ids) in future.

  1. Cache indices that have been built during JOIN execution or use an external user provided index

@alamb
Copy link
Contributor Author

alamb commented May 6, 2024

Introduce Dictionary feature. Consider Users table as a dictionary (like in clickhouse)

This is an excellent idea -- Arrow has something equivalent for DictionaryArray and minimize the copying required

Something else that might help is to use StringViewArray when coalescing which would avoid a copy. This might be quite effective if the next operation was GroupByHash or sort where the data would be copied again 🤔 I think the next release of arrow-rs might have sufficient functionality to try it

Introduce row-based table provider with its special type of LookupRecordBatchStream

I wonder if we could combine this with something like #7955 🤔

@karlovnv
Copy link

karlovnv commented May 6, 2024

I wonder if we could combine this with something like #7955 🤔

It's quite a good idea!

But I think it's a tricky to push ON condition down. The main reason is following: we know the list of ids (in perspective of columnindex) only at JOIN stage but not at filtering and getting data from the source.

So the second approach:

2. Introduce row-based table provider
Is about adding an ability of getting data directly from HASH stream by list of ids like so:

image

Or even better to get only offsets by ids (arrow take index for take_record_batch() kernel). This idea is very similar to indices in duck db.

@karlovnv
Copy link

karlovnv commented May 6, 2024

DictionaryArray

DictionaryArray is something different. It is the best choice for low cardinality columns (now to efficiently encode data in a single column to save space and increase performance of filtering)
ClickHouse offers special option in ddl - LowCardinality.
UPD: https://clickhouse.com/docs/en/sql-reference/data-types/lowcardinality

Changes the internal representation of other data types to be dictionary-encoded

But it will be great if we support arrow Dictionary encoded type! Also we can use shared dictionary buffer for all the batches.

Since DictionaryArray has no index based on value, we cannot use it for fast O(1) getting data.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants