Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Arrow Row Format in SortExec to improve performance #5230

Closed
tustvold opened this issue Feb 9, 2023 · 54 comments · Fixed by #6163
Closed

Use Arrow Row Format in SortExec to improve performance #5230

tustvold opened this issue Feb 9, 2023 · 54 comments · Fixed by #6163
Assignees
Labels
enhancement New feature or request help wanted Extra attention is needed

Comments

@tustvold
Copy link
Contributor

tustvold commented Feb 9, 2023

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

SortPreservingMerge now makes use of the arrow row format and this has yielded significant performance improvements over the prior DynComparator based approach. We can likely signifcantly improve the performance of SortExec by modifying sort_batch to also make use of the row format when performing multi-column sorts, instead of lexsort_to_indices which internally uses DynComparator.

For single-column sorts lexsort_to_indices will call through to sort_to_indices which will be faster than the row format, we should make sure to keep this special case.

Describe the solution you'd like

A first iteration could simply modify sort_batch to use the row format for multi-column sorts, as demonstrated here, falling back to sort_to_indices if only a single column.

A second iteration could then look to find a way to convert to the row format once, and preserve this encoding when feeding sorted batches into SortPreservingMerge.

Describe alternatives you've considered
We could not do this

Additional context

FYI @alamb @mustafasrepo @ozankabak

@tustvold tustvold added enhancement New feature or request good first issue Good for newcomers help wanted Extra attention is needed labels Feb 9, 2023
@jaylmiller
Copy link
Contributor

Hi. I've never contributed before but I've been working with datafusion the past month or so and am loving the project.

I'd like to try my hand at this issue. But no worries if a more experienced contributor wants to take over this issue instead 😀

@ozankabak
Copy link
Contributor

@jaylmiller, we will be happy to help with reviews + exchanging ideas about the details. Thank you, looking forward to collaborating!

@jaylmiller
Copy link
Contributor

Hi there. I've created a draft PR which implements the suggested first iteration. Would appreciate any comments/suggestions and in the meantime going to start implementing the 2nd iteration,.

Thanks!

@ozankabak
Copy link
Contributor

Looks good so far with a cursory look 🚀 Our team can review in detail early next week. If you think you can finish both steps in the next few days, we can do an end to end review too.

@jaylmiller
Copy link
Contributor

That would be great. thank you!

@jaylmiller
Copy link
Contributor

jaylmiller commented Feb 14, 2023

Hi there. I've got a rough draft (#5242) that works for the most part. I still have to get the metrics tracking working but I wanted to get all the sort logic working first (which it seems to be, the sql integration test suite is passing...) and potentially get some recommendations from experienced contributors on if this approach seems correct?

Additionally I still need to spill the row encoding data to disk. Right now, when sorted batches are spilled, it works like it did previously (i.e. the row encoding is just (re)created in the SortPreservingMerge)...

Any suggestions on the overall approach or recommendations on spilling the rows format to disk would be appreciated!

@alamb
Copy link
Contributor

alamb commented Feb 14, 2023

Any suggestions on the overall approach or recommendations on spilling the rows format to disk would be appreciated!

I think @tustvold has some thoughts about spilling the row format to disk. Perhaps he can share here

@jaylmiller
Copy link
Contributor

jaylmiller commented Feb 15, 2023

I've got the spill logic working now, just not sure what format to serialize it on disk to. I've got a temporary solution using arrow IPC--so I could test all the logic--but I'd imagine this is sub-optimal and would need to be changed.

Any suggestions on serialization format would be appreciated! Thanks

Other than the serialization format, everything else should now be ready to go. 🚀

@tustvold
Copy link
Contributor Author

tustvold commented Feb 15, 2023

Any suggestions on serialization format would be appreciated! Thanks

You should be able to serialize the raw row bytes directly. For example a basic idea might be, write a 4 byte magic header, e.g. b"AROW" as a sanity check, then write a u32 as little endian containing the number of rows, then for each row write a u32 length, followed by the row bytes.

Parsing it should be relatively straightforward case of reversing the framing above, and then feeding the parsed bytes into RowParser which can be obtained from the RowConverter by calling RowConverter::parser

@jaylmiller
Copy link
Contributor

Any suggestions on serialization format would be appreciated! Thanks

You should be able to serialize the raw row bytes directly. For example a basic idea might be, write a 4 byte magic header, e.g. b"AROW" as a sanity check, then write a u32 as little endian containing the number of rows, then for each row write a u32 length, followed by the row bytes.

Parsing it should be relatively straightforward case of reversing the framing above, and then feeding the parsed bytes into RowParser which can be obtained from the RowConverter by calling RowConverter::parser

Ok perfect--I figured something it'd end up being something along those lines, but wanted to make sure I wasn't missing anything... Thanks!

@jaylmiller
Copy link
Contributor

jaylmiller commented Feb 16, 2023

Having trouble getting actionable benchmark results on my laptop. Running the benchmark twice in a row on the exact same build will often have differences of 50% between the same case on each run.... Making it hard to tell if changes are actually improving anything or not 🙃. Any tips for that? I'm still kindof new to rust, never used the benchmarking stuff before...

Other than everything is done (atleast I think so! 😅)

@alamb
Copy link
Contributor

alamb commented Feb 17, 2023

Running the benchmark twice in a row on the exact same build will often have differences of 50% between the same case on each run

My laptop also has some wide variety -- I typically use a cloud server in this case. I can help run these benchmarks on such a machine if needed.

the other thing that might help could be to use a slightly larger input data set in the benchmarks 🤔

@Dandandan
Copy link
Contributor

Having trouble getting actionable benchmark results on my laptop. Running the benchmark twice in a row on the exact same build will often have differences of 50% between the same case on each run.... Making it hard to tell if changes are actually improving anything or not 🙃. Any tips for that? I'm still kindof new to rust, never used the benchmarking stuff before...

Other than everything is done (atleast I think so! 😅)

Yeah larger inputs/longer runs help as @alamb suggests, as well as keeping your laptop connected to the charger.
And make sure you run as little as possible in the background (close your IDE, browser, etc.)

@alamb
Copy link
Contributor

alamb commented Feb 17, 2023

I am going to take a shot at trying to get some benchmark and will post them here

@alamb
Copy link
Contributor

alamb commented Feb 17, 2023

Here are my performance results (I did not dig into this yet).

Methodology:

git checkout sort-preserve-row-encoding
git cherry-pick 322e92bea6e28ada9f8d57d9429748fb58b2a2a5
cargo bench -p datafusion --bench sort -- --save-baseline sort-preserve-row-encoding

Results

critcmp main sort-preserve-row-encoding
group                                                     main                                    sort-preserve-row-encoding
-----                                                     ----                                    --------------------------
sort f64                                                  1.02  655.8±233.27µs        ? ?/sec     1.00   640.9±24.78µs        ? ?/sec
sort f64 preserve partitioning                            1.00      5.1±0.09ms        ? ?/sec     1.02      5.2±0.10ms        ? ?/sec
sort i64                                                  1.05  599.4±420.69µs        ? ?/sec     1.00   571.7±11.11µs        ? ?/sec
sort i64 preserve partitioning                            1.00      4.5±0.08ms        ? ?/sec     1.02      4.6±0.09ms        ? ?/sec
sort mixed tuple                                          1.00   597.4±27.16µs        ? ?/sec     2.30  1376.1±69.26µs        ? ?/sec
sort mixed tuple preserve partitioning                    1.00      4.9±0.13ms        ? ?/sec     2.30     11.3±0.39ms        ? ?/sec
sort mixed utf8 dictionary tuple                          1.00  640.5±356.39µs        ? ?/sec     1.39   892.1±43.52µs        ? ?/sec
sort mixed utf8 dictionary tuple preserve partitioning    1.00      5.1±0.12ms        ? ?/sec     1.42      7.3±0.15ms        ? ?/sec
sort utf8 dictionary                                      1.00    200.7±4.89µs        ? ?/sec     1.00    200.4±4.35µs        ? ?/sec
sort utf8 dictionary preserve partitioning                1.00  1767.3±218.98µs        ? ?/sec    1.00  1758.6±96.26µs        ? ?/sec
sort utf8 dictionary tuple                                1.00  683.6±1094.18µs        ? ?/sec    1.24  846.7±107.74µs        ? ?/sec
sort utf8 dictionary tuple preserve partitioning          1.00      4.8±0.24ms        ? ?/sec     1.33      6.4±0.19ms        ? ?/sec
sort utf8 high cardinality                                1.00      2.1±0.04ms        ? ?/sec     1.01      2.1±0.06ms        ? ?/sec
sort utf8 high cardinality preserve partitioning          1.01     17.0±0.34ms        ? ?/sec     1.00     16.9±0.39ms        ? ?/sec
sort utf8 low cardinality                                 1.00  1258.3±84.71µs        ? ?/sec     1.02  1280.3±30.47µs        ? ?/sec
sort utf8 low cardinality preserve partitioning           1.00  1264.9±119.28µs        ? ?/sec    1.01  1278.6±52.69µs        ? ?/sec
sort utf8 tuple                                           1.00  1127.2±41.81µs        ? ?/sec     1.53  1723.5±59.90µs        ? ?/sec
sort utf8 tuple preserve partitioning                     1.00      9.4±0.26ms        ? ?/sec     1.53     14.4±0.44ms        ? ?/sec
alamb@aal-dev:~/arrow-datafusion$

I plan to next run a prof run to see where time is being spent

@alamb
Copy link
Contributor

alamb commented Feb 17, 2023

Not sure if this is helpful but here is the perf output for running

perf record target/release/deps/sort-d963b058c1acc9f5 --bench "sort mixed tuple"
Samples: 106K of event 'cpu-clock:pppH', Event count (approx.): 26716250000
Overhead  Command          Shared Object          Symbol
  14.97%  sort-d963b058c1  sort-d963b058c1acc9f5  [.] arrow_row::RowConverter::convert_columns
  14.32%  sort-d963b058c1  sort-d963b058c1acc9f5  [.] arrow_select::take::take_bytes
  11.31%  sort-d963b058c1  sort-d963b058c1acc9f5  [.] rayon::slice::quicksort::recurse
  10.09%  sort-d963b058c1  sort-d963b058c1acc9f5  [.] arrow_row::variable::encode_one
   7.86%  sort-d963b058c1  sort-d963b058c1acc9f5  [.] datafusion::physical_plan::sorts::sort::do_sort::{{closure}}
   6.48%  sort-d963b058c1  sort-d963b058c1acc9f5  [.] arrow_row::variable::encode
   4.89%  sort-d963b058c1  libc.so.6              [.] 0x00000000001a89ca
   2.52%  sort-d963b058c1  sort-d963b058c1acc9f5  [.] arrow_select::take::take_no_nulls
   2.19%  sort-d963b058c1  sort-d963b058c1acc9f5  [.] arrow_select::take::take_impl
   2.09%  sort-d963b058c1  libc.so.6              [.] 0x00000000001a80ca
   1.87%  sort-d963b058c1  sort-d963b058c1acc9f5  [.] criterion::stats::univariate::resamples::Resamples<A>::next
   1.42%  sort-d963b058c1  sort-d963b058c1acc9f5  [.] core::slice::sort::partial_insertion_sort
   1.26%  sort-d963b058c1  libc.so.6              [.] 0x00000000001a7e25
   1.24%  sort-d963b058c1  libc.so.6              [.] 0x00000000001a7e2f
   1.17%  sort-d963b058c1  libc.so.6              [.] 0x00000000001a7dc0
   1.15%  sort-d963b058c1  libc.so.6              [.] 0x00000000001a7e00
   1.14%  sort-d963b058c1  libc.so.6              [.] 0x00000000001a7e31

@jaylmiller
Copy link
Contributor

jaylmiller commented Feb 17, 2023

Thanks for this! Any suggestions on how I can improve this code would be great--still kinda new to Rust. 😁

I have some of my own ideas i might try and implement as well. Might it be good to have a separate PR for only v1 of this issue (much less code is changed there)?

@ozankabak
Copy link
Contributor

Interesting. I would expect to see much better results. There are some cases where the difference is quite significant.

I still think this will give us good results once we identify what is going on and fix the issues.

@jaylmiller
Copy link
Contributor

Interesting. I would expect to see much better results. There are some cases where the difference is quite significant.

I still think this will give us good results once we identify what is going on and fix the issues.

Cool. I'm happy to keep working on and look for the issues if you're confident that it will yield good results.

I'm going to setup a cloud machine where so can run the benchmarks myself and iterate faster because I'm not having a good time with benchmarking on my laptop 😅

@jaylmiller
Copy link
Contributor

jaylmiller commented Feb 18, 2023

Also, I just realized that the sort benchmark is kindof bad (my mistake 😬) because when preserve_partitioning is false, it expects input with a single partition, which it was not giving--it was getting input with 8 output partitions, so effectively it was only running on a fraction (1/8) of the input data. Additionally, this was only representing a specific case where the sort only runs for a single batch, which is only one branch in the sorting logic (this case is still represented in the 'preserve partitioning' cases btw).

I've added this fix in PR #5308.

Here are my benchmark results after that fix.

group                                                     main                                     sort-preserve-row-encoding
-----                                                     ----                                     --------------------------
sort f64                                                  1.02     11.4±1.52ms        ? ?/sec      1.00     11.2±0.49ms        ? ?/sec
sort f64 preserve partitioning                            1.02      4.0±0.19ms        ? ?/sec      1.00      4.0±0.08ms        ? ?/sec
sort i64                                                  1.04      9.8±0.70ms        ? ?/sec      1.00      9.5±0.45ms        ? ?/sec
sort i64 preserve partitioning                            1.09      3.6±0.32ms        ? ?/sec      1.00      3.3±0.09ms        ? ?/sec
sort mixed tuple                                          1.00    33.8±10.94ms        ? ?/sec      1.01     34.0±7.64ms        ? ?/sec
sort mixed tuple preserve partitioning                    1.00      3.8±0.21ms        ? ?/sec      1.92      7.3±0.60ms        ? ?/sec
sort mixed utf8 dictionary tuple                          2.34     55.6±3.61ms        ? ?/sec      1.00     23.8±1.19ms        ? ?/sec
sort mixed utf8 dictionary tuple preserve partitioning    1.00      3.8±0.32ms        ? ?/sec      1.36      5.2±0.36ms        ? ?/sec
sort utf8 dictionary                                      1.00      3.7±0.13ms        ? ?/sec      1.02      3.8±0.17ms        ? ?/sec
sort utf8 dictionary preserve partitioning                1.15  1551.0±1627.64µs        ? ?/sec    1.00  1351.2±61.24µs        ? ?/sec
sort utf8 dictionary tuple                                2.21     52.6±3.25ms        ? ?/sec      1.00     23.7±3.10ms        ? ?/sec
sort utf8 dictionary tuple preserve partitioning          1.00      3.9±0.72ms        ? ?/sec      1.26      4.9±0.62ms        ? ?/sec
sort utf8 high cardinality                                1.05     27.3±5.33ms        ? ?/sec      1.00     26.1±1.36ms        ? ?/sec
sort utf8 high cardinality preserve partitioning          1.00     11.0±1.09ms        ? ?/sec      1.02     11.3±2.03ms        ? ?/sec
sort utf8 low cardinality                                 1.09     14.5±3.83ms        ? ?/sec      1.00     13.3±1.27ms        ? ?/sec
sort utf8 low cardinality preserve partitioning           1.94     14.5±2.22ms        ? ?/sec      1.00      7.4±0.89ms        ? ?/sec
sort utf8 tuple                                           1.69    60.2±13.20ms        ? ?/sec      1.00     35.6±1.59ms        ? ?/sec
sort utf8 tuple preserve partitioning                     1.00      7.7±1.99ms        ? ?/sec      1.16      8.9±0.69ms        ? ?/sec

@ozankabak
Copy link
Contributor

ozankabak commented Feb 20, 2023

So we have sort mixed tuple preserve partitioning, sort mixed utf8 dictionary tuple preserve partitioning, sort utf8 dictionary tuple preserve partitioning and sort utf8 tuple preserve partitioning remaining as cases with regression.

Seems like these are the cases having tuple and preserve partitioning at the same time. Can you think of any reason why, @jaylmiller?

@jaylmiller
Copy link
Contributor

jaylmiller commented Feb 20, 2023

@ozankabak I think this could be what's going on:

In the preserve partitioning cases, it's only sorting a single batch of data (each partition receives a batch--sorts are done per partition). In the non preserve partitioning case, it is sorting every single batch of data. The time cost of the row encoding should scale linearly with rows (O(n)) , while the time cost of sorting should be O(n*log(n)).

So I think for smaller amounts of data the upfront time cost of encoding the rows is greater than the time saved by having a more efficient comparison for sorting. But as the number of rows increases, the time cost of sorting grows faster than the encoding, making the faster comparisons more beneficial.

And the reason it's only the tuple cases is b/c row encoding is only used for multi column sort.

That being said, I'm not totally sure about how to approach this issue code-wise. Suggestions would be appreciated 😅

@ozankabak
Copy link
Contributor

@tustvold, can you advise us on how to use the row conversion facility most efficiently? It seems both @jaylmiller and us are seeing the same behavior and I'd like to make sure we are using the tools at our disposal the right way. In summary, as batch sizes get smaller, row conversion seems to result in lower overall performance (probably due to gains not justifying the conversion cost for small batch sizes). If you can take a look at how @jaylmiller is using the facility and let us know whether it is used appropriately it'd be great.

If everything is done right yet this behavior still persists, maybe we can then think about how to identify a reasonable default crossover point (which could be overridden via config) and use different approaches for different batch sizes. I don't like this kind of "impure" approaches in general, but sometimes they yield great performance. The history of sort algorithms are full of such "hacks", so maybe this is one of the places where it makes sense to do it 🙂

@tustvold
Copy link
Contributor Author

tustvold commented Mar 3, 2023

All the kernels in arrow rely on large batch sizes to amortise dispatch overheads, if partitioning is producing small batches, imo that is a bug / limitation that should be fixed. Perhaps we could convert to the row format before partitioning or something?

In general I don't think regressing small inputs is necessarily an issue, arrow is optimised for batches of 1000s of rows, small datasets are not really what it is optimised for...

TLDR is the behaviour you describe is inline with my expectations for most arrow functionality, small batches severely hurt throughput. This is especially true for dictionary arrays, where dictionaries are per-array

@ozankabak
Copy link
Contributor

Great. @jaylmiller, I think if we tweak the gating logic to utilize row-converted path only for tuples + large batch sizes (like > 512 or something), the practical performance will be very good in a wide variety of use cases. IIRC you are already have a gating logic so this should be a micro change, right?

You can determine the crossover size experimentally. I expect a sane choice on an "average" computer will be good for many scenarios. We can also add an override mechanism through the config in a follow-on PR in the future.

@tustvold
Copy link
Contributor Author

tustvold commented Mar 3, 2023

like > 512

How common are such batches in practice? I guess I'm wondering if the added complexity is justified for what is effectively a degenerate case that will cause issues far beyond just for sort?

The main reason I ask is DynComparator, which underpins non-single-column lexsort, has known issues w.r.t sorting nulls, and I had hoped to eventually deprecate and remove it - apache/arrow-rs#2687

@jaylmiller
Copy link
Contributor

jaylmiller commented Mar 3, 2023

Sounds good @ozankabak

IIRC you are already have a gating logic so this should be a micro change, right?

Yes we are already only using row when doing multicolumn (tuple) sorting, so adding additional gating should not be an issue at all.

@jaylmiller
Copy link
Contributor

like > 512

How common are such batches in practice? I guess I'm wondering if the added complexity is justified for what is effectively a degenerate case that will cause issues far beyond just for sort?

Btw DynComparator has known issues w.r.t sorting nulls, and I had hoped to eventually deprecate and remove it - apache/arrow-rs#2687

No 512 is way too small @tustvold . So for the sort bench, we are seeing regression when the execute call is sorting a single batch of size 12500 (total benchmark input size is 100000, broken up into 8 partitions), this occurs when partitioning is preserved since each partition is sorted separately. When partitioning is not preserved, and all batches are sorted together, we see significant perf improvements. Additionally when partitioning is preserved, but the input data is all skewed to a single partition, we see the same perf improvement (as expected). Here are the bench results for each of those scenarios:

group                                                                          main-sort                                rows-sort
-----                                                                          ---------                                ---------
sort mixed tuple                                                               1.00     29.5±2.83ms        ? ?/sec      1.04     30.5±3.23ms        ? ?/sec
sort mixed tuple preserve partitioning                                         1.00      4.7±0.94ms        ? ?/sec      1.52      7.1±0.64ms        ? ?/sec
sort mixed tuple preserve partitioning data skewed to first                    1.00     30.6±4.78ms        ? ?/sec      1.00     30.6±6.66ms        ? ?/sec
sort mixed utf8 dictionary tuple                                               2.60    60.8±13.04ms        ? ?/sec      1.00     23.4±0.93ms        ? ?/sec
sort mixed utf8 dictionary tuple preserve partitioning                         1.00      4.5±1.27ms        ? ?/sec      1.11      5.1±0.40ms        ? ?/sec
sort mixed utf8 dictionary tuple preserve partitioning data skewed to first    2.24     54.0±4.22ms        ? ?/sec      1.00     24.1±2.17ms        ? ?/sec
sort utf8 dictionary tuple                                                     2.32     54.7±7.35ms        ? ?/sec      1.00     23.6±3.48ms        ? ?/sec
sort utf8 dictionary tuple preserve partitioning                               1.00      3.7±0.37ms        ? ?/sec      1.24      4.6±0.38ms        ? ?/sec
sort utf8 dictionary tuple preserve partitioning data skewed to first          2.50     54.1±5.52ms        ? ?/sec      1.00     21.6±0.65ms        ? ?/sec
sort utf8 tuple                                                                1.79    62.5±13.08ms        ? ?/sec      1.00     35.0±1.62ms        ? ?/sec
sort utf8 tuple preserve partitioning                                          1.00      7.3±0.79ms        ? ?/sec      1.17      8.6±0.74ms        ? ?/sec
sort utf8 tuple preserve partitioning data skewed to first                     1.54     54.5±5.11ms        ? ?/sec      1.00     35.4±2.18ms        ? ?/sec

@ozankabak
Copy link
Contributor

ozankabak commented Mar 3, 2023

How common are such batches in practice? I guess I'm wondering if the added complexity is justified for what is effectively a degenerate case that will cause issues far beyond just for sort?

Can't speak for the usages at large, but I've personally seen multiple use cases with relatively small (high hundred, low thousand zone) before in my data pipelines at various jobs. At Synnada, we use this parameter to trade-off throughout vs. latency; in some cases one is more important than the other depending on volumes etc. For this use case, this check adds no new complexity, so we are all good in that regard.

The main reason I ask is DynComparator, which underpins non-single-column lexsort, has known issues w.r.t sorting nulls, and I had hoped to eventually deprecate and remove it - apache/arrow-rs#2687

Good to know. I will think about this and discuss with my team, this will on our radar for future work.

@tustvold
Copy link
Contributor Author

tustvold commented Mar 3, 2023

we are seeing regression when the execute call is sorting a single batch of size 12500 (total benchmark input size is 100000, broken up into 8 partitions)

Do you see a similar regression in the single partition case, but if you instead reduce the size of the total benchmark down by a factor of 8? I could understand it if there were dictionaries involved, but the worst regression appears to be "sort mixed tuple preserve partitioning" which is just strings and primitives...

@jaylmiller
Copy link
Contributor

jaylmiller commented Mar 3, 2023

group                                                                          main-sort                               rows-sort
-----                                                                          ---------                               ---------
sort mixed tuple                                                               1.02      3.7±0.72ms        ? ?/sec     1.00      3.6±0.76ms        ? ?/sec
sort mixed tuple preserve partitioning                                         1.00   608.0±82.67µs        ? ?/sec     1.53  931.3±108.10µs        ? ?/sec
sort mixed utf8 dictionary tuple                                               1.38      5.2±0.43ms        ? ?/sec     1.00      3.8±1.04ms        ? ?/sec
sort mixed utf8 dictionary tuple preserve partitioning                         1.00  528.9±101.53µs        ? ?/sec     1.92  1016.8±191.17µs        ? ?/sec
sort utf8 dictionary tuple                                                     1.58      5.2±0.53ms        ? ?/sec     1.00      3.3±0.52ms        ? ?/sec
sort utf8 dictionary tuple preserve partitioning                               1.00   503.9±80.41µs        ? ?/sec     2.06  1040.1±240.80µs        ? ?/sec
sort utf8 tuple                                                                1.16      5.7±1.16ms        ? ?/sec     1.00      4.9±0.79ms        ? ?/sec
sort utf8 tuple preserve partitioning                                          1.00  895.1±177.11µs        ? ?/sec     1.29  1151.0±138.46µs        ? ?/sec

@tustvold here's the results after scaling input row size down by 8. The "sort mixed tuple preserve partitioning" regression is approximately the same.

The performance gains from the row encoding become decreased with less input rows, as we would expect @ozankabak

@tustvold
Copy link
Contributor Author

tustvold commented Mar 3, 2023

I wonder if there is something else going on here then, I can take a look next week. Unless I'm missing something, this is not consistent with the issue being the batch size, as we would expect to now see a regression for even the non-preserving case.

@jaylmiller
Copy link
Contributor

I agree its not just the batch size. It seems to be some combination of the individual batch size as well as the total number of batches being sorted at once

@ozankabak
Copy link
Contributor

I wonder if there is something else going on here then, I can take a look next week. Unless I'm missing something, this is not consistent with the issue being the batch size, as we would expect to now see a regression for even the non-preserving case.

Thanks 👍 I agree that there may be something else going on. Bettering our understanding will be helpful to isolate the effect of batch size and make any decisions based on that.

@jaylmiller
Copy link
Contributor

jaylmiller commented Mar 4, 2023

I ran some experiments investigating how batch size impacts performance when doing multi column sorts on a single record batch.

So the batch size theory seems wrong, but these results do demonstrate why the "preserve partitioning" cases are regressing. What's interesting is that while single batch sorting performance for the row format is actually worse, we're still getting significant performance increase when more than one batch is being sorted 🤔. For example, the benchmark comps for utf8-tuple

group                                                                          main-sort                                rows-sort
-----                                                                          ---------                                ---------
sort utf8 tuple                                                                1.79    62.5±13.08ms        ? ?/sec      1.00     35.0±1.62ms        ? ?/sec
sort utf8 tuple preserve partitioning                                          1.00      7.3±0.79ms        ? ?/sec      1.17      8.6±0.74ms        ? ?/sec

methodology: https://github.com/jaylmiller/inspect-arrow-sort. If someone could checkout the actual sorting code for the experiments its just a few lines and pretty much entirely lifted from the PR. Perhaps I'm just using the row format incorrectly here?
https://github.com/jaylmiller/inspect-arrow-sort/blob/a7ed948eee9f67b983f9b981276a6a05de93c19f/src/lib.rs#L23-L75

My current idea is to buffer up a few record batches before sorting them (at the moment each batch is sorted immediately when it streams in) and use that to inform the decision of whether to gate the row encoding (in addition to the existing gating which checks that theres no limit and that it's multi-column).

This should be a fairly minimal change to the current PR's code, but determining when (that is at what number of batches and/or size of those batches) to apply the row encoding might be more difficult. We know for sure that this change can give nice performance bumps (for example, when sorting 8 batches of size 12500 we see more than 2x increases for some cases) so I think this change is still worth making as long as we can minimize the perf regressions seen in some scenarios

@ozankabak
Copy link
Contributor

ozankabak commented Mar 4, 2023

@jaylmiller, I haven't studied the sort code yet, so I'd like to ask a few quick questions to further my understanding first. Let's say we have P partitions, each having N rows in total (across all batches). When we have preserve_partitioning, is it accurate to say we do the following?

  1. Coalesce batches (for every partition independently) since sort needs to operate on the whole data. If so, we would end up with P datasets of size N.
  2. Perform P row conversions and P sorts on these N-long datasets.

Is there a third output-related step I'm missing?

@jaylmiller
Copy link
Contributor

jaylmiller commented Mar 4, 2023

@ozankabak Not quite, the batches are not coalesced until the very end of the process.

This is the process for a single partition, with M batches (i.e. each batch is sized N/M):

  1. For each batch that streams in, sort that batch individually and place it into a buffer. If row encoding was used, the row encoding is also buffered (along side the RecordBatch).
  2. Once the input stream has completed, merge all the buffered batches (the row encodings from step 1 can be used here if they exist).

So we're actually performing M row conversions (and M sorts), and then doing a final merge/sort which does not perform any row encoding (reused). So overall we're doing P*M row conversions

The docs for the sort implementation on main also describes the algorithm:
https://github.com/apache/arrow-datafusion/blob/c37ddf72ec539bd39cce0dd4ff38db2e36ddb55f/datafusion/core/src/physical_plan/sorts/sort.rs#L64-L73

The preserve_partitioning flag only tells the DataFusion physical plan optimizer/execution how it is supposed to use the SortExec plan (changes the node's output_partitioning and required_input_distribution)--this is used a fair amount in the physical optimizer but I am not very familiar with that part of the codebase yet 😅. But in terms of the SortExec node, the flag doesn't alter its internal sorting logic.

@ozankabak
Copy link
Contributor

During my cursory look at the comments, the wording "sort all buffered batches" made me think we were sorting some sort of a coalesced dataset if there is no memory issue. So the comment is somewhat misleading (at least one person got it wrong!).

Looking at the code more attentively, I see that it is doing what you are describing; i.e. buffering partially sorted batches. Given this, I am currently out of theories as to why we see the regression in the preserve_partitioning cases; i.e.

So we have sort mixed tuple preserve partitioning, sort mixed utf8 dictionary tuple preserve partitioning, sort utf8 dictionary tuple preserve partitioning and sort utf8 tuple preserve partitioning remaining as cases with regression.

Maybe we will get more ideas when @tustvold takes a look at whether row conversion is done properly. I will keep thinking about this in parallel as well. I will share here if I can think of anything.

@jaylmiller
Copy link
Contributor

jaylmiller commented Mar 7, 2023

My current thinking is that since the single batch scenario is a special case with its own code path:

https://github.com/apache/arrow-datafusion/blob/928662bb12d915aef83abba1312392d25770f68f/datafusion/core/src/physical_plan/sorts/sort.rs#L286-L294

and based on findings that row format sorting can often perform worse on single batches, it seems that the performance benefits of the row encoding are gained during the step of the algorithm that merges the mem sorted batches. One possible reason that row encoding perf benefits are seen when a merge is performed, is that we can use a sorting algorithm that benefits from the fact that the data is concat'd sorted sequences (according to rust docs, Vec::sort is a mergesort and Vec::unstable_sort is a quicksort). Whereas without the row encoding, we use lexsort_to_indices which doesn't let us benefit from the data being sorted sequences.

So I'm thinking if we gate row encoding usage based on whether or not that merge will happen, we can keep the perf advantages and remove these regressions. Here's my current bench results:

group                                                     main-sort                                rows-sort
-----                                                     ---------                                ---------
sort f64                                                  1.00     10.8±0.23ms        ? ?/sec      1.04     11.2±0.93ms        ? ?/sec
sort f64 preserve partitioning                            1.00      4.0±0.27ms        ? ?/sec      1.04      4.1±0.28ms        ? ?/sec
sort i64                                                  1.00      9.5±0.55ms        ? ?/sec      1.09     10.3±0.74ms        ? ?/sec
sort i64 preserve partitioning                            1.00      3.3±0.10ms        ? ?/sec      1.06      3.5±0.13ms        ? ?/sec
sort mixed tuple                                          1.28     28.3±3.35ms        ? ?/sec      1.00     22.2±1.60ms        ? ?/sec
sort mixed tuple preserve partitioning                    1.00      3.6±0.17ms        ? ?/sec      1.15      4.1±1.09ms        ? ?/sec
sort mixed utf8 dictionary tuple                          2.84     52.7±8.27ms        ? ?/sec      1.00     18.6±1.29ms        ? ?/sec
sort mixed utf8 dictionary tuple preserve partitioning    1.02      4.2±0.92ms        ? ?/sec      1.00      4.1±0.55ms        ? ?/sec
sort utf8 dictionary                                      1.00      3.7±0.21ms        ? ?/sec      1.04      3.9±0.33ms        ? ?/sec
sort utf8 dictionary preserve partitioning                1.00  1487.2±1444.67µs        ? ?/sec    1.01  1502.8±315.79µs        ? ?/sec
sort utf8 dictionary tuple                                3.26    57.0±11.35ms        ? ?/sec      1.00     17.5±2.08ms        ? ?/sec
sort utf8 dictionary tuple preserve partitioning          1.13      4.1±1.08ms        ? ?/sec      1.00      3.6±0.52ms        ? ?/sec
sort utf8 high cardinality                                1.01     28.0±3.70ms        ? ?/sec      1.00     27.6±3.81ms        ? ?/sec
sort utf8 high cardinality preserve partitioning          1.00     11.1±1.48ms        ? ?/sec      1.21     13.5±3.38ms        ? ?/sec
sort utf8 low cardinality                                 1.00     15.3±5.08ms        ? ?/sec      1.10     16.9±6.20ms        ? ?/sec
sort utf8 low cardinality preserve partitioning           1.03      8.1±2.21ms        ? ?/sec      1.00      7.8±1.75ms        ? ?/sec
sort utf8 tuple                                           1.96     56.8±8.36ms        ? ?/sec      1.00     29.0±4.82ms        ? ?/sec
sort utf8 tuple preserve partitioning                     1.02      6.7±0.95ms        ? ?/sec      1.00      6.5±0.46ms        ? ?/sec

@alamb
Copy link
Contributor

alamb commented Mar 8, 2023

Thanks for all this work @jaylmiller -- we plan to assist / comment on this ticket in the next day or so. All your work so far has been very great.

@ozankabak
Copy link
Contributor

Great work 💯 Your charts above support your theory.

So I'm thinking if we gate row encoding usage based on whether or not that merge will happen, we can keep the perf advantages and remove these regressions.

I agree that this should solve the regressions if the merge theory is correct.

@tustvold
Copy link
Contributor Author

tustvold commented Mar 8, 2023

Apologies I have been busy with the arrow-rs <-> arrow2 unification effort, will try to get time to take a look this week, sorry for the delay

@alamb alamb changed the title Use Arrow Row Format in SortExec Use Arrow Row Format in SortExec to improve performance Mar 10, 2023
@alamb alamb removed the good first issue Good for newcomers label Apr 7, 2023
@alamb
Copy link
Contributor

alamb commented Apr 7, 2023

This turns out not to have been a good first issue 😢

@tustvold
Copy link
Contributor Author

tustvold commented Apr 7, 2023

Apologies, I underestimated how complicated this one was, but thank you once again @jaylmiller for your efforts here, they've definitely helped and informed the ongoing work in this space 💪

@jaylmiller
Copy link
Contributor

No worries! This was great to work on nonetheless: learned alot about datafusion and made some other contributions in the process 😀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request help wanted Extra attention is needed
Projects
None yet
5 participants