-
Notifications
You must be signed in to change notification settings - Fork 32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: coalesce multiple reads together and don't block on io if there's values available #1466
Conversation
vortex-file/src/read/coalescer.rs
Outdated
Ready(V), | ||
} | ||
|
||
pub struct Coalescer<R, S, V, RM> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Majority of interesting logic in this pr is this object. It performs multiple reads on underlying layout readers and dispatches them together. It makes sure there's ALWAYS one io request in flight. Importantly if the consumer of the stream is slow this might lead to using a lot of memory
vortex-file/src/read/coalescer.rs
Outdated
} | ||
} | ||
|
||
while read_more_count < NUM_TO_COALESCE { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are using arbitrary number of read requests to decide how much to coalesce
} | ||
} | ||
|
||
impl<R: VortexReadAt> VortexReadRanges<R> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is closely based on arrow-rs logic but we dispatch each request on our io loop regardless of underlying reader. We shold decide if we require ranges to be sorted by start or will the reader sort them (current implementation)
} | ||
|
||
impl<R> VortexReadRanges<R> { | ||
pub fn new(read: R, dispatcher: Arc<IoDispatcher>, max_gap: usize) -> VortexReadRanges<R> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should infer max_gap between ranges based on observed latency. This is just the simplest implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataFusion
Benchmark suite | Current: 5765d64 | Previous: 9a78dd4 | Ratio |
---|---|---|---|
arrow/planning |
803529.5470639965 ns (1513.8784078244935 ) |
801423.9872182958 ns (1739.9992692695814 ) |
1.00 |
arrow/exec |
1761550.6289470783 ns (4178.714768674923 ) |
1744912.1256691439 ns (5450.191860002698 ) |
1.01 |
vortex-pushdown-compressed/planning |
506521.09741919144 ns (1683.2973217130348 ) |
502998.3069606766 ns (1054.260114510631 ) |
1.01 |
vortex-pushdown-compressed/exec |
2735008.493157895 ns (8147.9907631571405 ) |
2644543.9484999995 ns (7279.016524999868 ) |
1.03 |
vortex-pushdown-uncompressed/planning |
506935.2166308298 ns (1264.8891288558953 ) |
505314.2837393992 ns (1611.46218455286 ) |
1.00 |
vortex-pushdown-uncompressed/exec |
1499414.163228365 ns (3342.161811748636 ) |
1492578.4550211476 ns (11940.616207785322 ) |
1.00 |
vortex-nopushdown-compressed/planning |
843882.488879961 ns (2808.2969598661875 ) |
834747.2996389676 ns (1601.7257579634315 ) |
1.01 |
vortex-nopushdown-compressed/exec |
3527389.4866666673 ns (15889.894524999894 ) |
3767030.5461538453 ns (42895.72665384621 ) |
0.94 |
vortex-nopushdown-uncompressed/planning |
823847.2391253163 ns (3165.497450991592 ) |
816355.3218891028 ns (1438.9727693437599 ) |
1.01 |
vortex-nopushdown-uncompressed/exec |
4923540.69 ns (25434.240587500855 ) |
6115103.597999999 ns (130569.92457500147 ) |
0.81 |
This comment was automatically generated by workflow using github-action-benchmark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Vortex bytes_at
Benchmark suite | Current: ca43ad6 | Previous: 630835b | Ratio |
---|---|---|---|
bytes_at/array_data |
989.4444216191108 ns (1.468313466370546 ) |
1013.087252857783 ns (1.2250948645466906 ) |
0.98 |
bytes_at/array_view |
807.8051330292327 ns (1.1016654686893617 ) |
833.3318471922553 ns (2.877201239355145 ) |
0.97 |
This comment was automatically generated by workflow using github-action-benchmark.
@@ -50,22 +47,40 @@ impl<R: VortexReadAt> VortexFileArrayStream<R> { | |||
row_count: u64, | |||
row_mask: Option<RowMask>, | |||
dispatcher: Arc<IoDispatcher>, | |||
) -> Self { | |||
) -> VortexResult<Self> { | |||
let mut reader_splits = BTreeSet::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we can construct all the readers eagerly now it's simplest to register all splits on creation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Random Access
Benchmark suite | Current: 5765d64 | Previous: 9a78dd4 | Ratio |
---|---|---|---|
random-access/vortex-tokio-local-disk |
2085155.6512000004 ns (16989.021400000085 ) |
2195755.5808333326 ns (15662.755822917214 ) |
0.95 |
random-access/vortex-local-fs |
2340551.6308695655 ns (29031.732521740254 ) |
2565400.8185 ns (16462.406056250213 ) |
0.91 |
random-access/parquet-tokio-local-disk |
208613158.93333334 ns (6262785.266666681 ) |
222710229.3666667 ns (4273431.919999972 ) |
0.94 |
This comment was automatically generated by workflow using github-action-benchmark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Vortex Compression
Benchmark suite | Current: 5765d64 | Previous: 9a78dd4 | Ratio |
---|---|---|---|
compress time/taxi |
1271072421.3 ns (2982704.5500000715 ) |
1269445267.4 ns (4733656.099999905 ) |
1.00 |
compress time/taxi throughput |
470808924 bytes |
470808924 bytes |
1 |
parquet_rs-zstd compress time/taxi |
1694401904.7 ns (4125773.8537498713 ) |
1697338041.6 ns (3550074.4000000954 ) |
1.00 |
parquet_rs-zstd compress time/taxi throughput |
470808924 bytes |
470808924 bytes |
1 |
decompress time/taxi |
325375094.8 ns (1103821.800000012 ) |
348153889.8 ns (1155950.150000006 ) |
0.93 |
decompress time/taxi throughput |
470808924 bytes |
470808924 bytes |
1 |
parquet_rs-zstd decompress time/taxi |
307016764 ns (846525.275000006 ) |
305111131.8 ns (1247659.150000006 ) |
1.01 |
parquet_rs-zstd decompress time/taxi throughput |
470808924 bytes |
470808924 bytes |
1 |
vortex:parquet-zstd size/taxi |
1.0302258312558379 ratio |
1.0302258312558379 ratio |
1 |
vortex:raw size/taxi |
0.12245365170690775 ratio |
0.12245365170690775 ratio |
1 |
vortex size/taxi |
57652272 bytes |
57652272 bytes |
1 |
compress time/AirlineSentiment |
1010777.8740811289 ns (3132.673215674644 ) |
1004485.4158568813 ns (1872.3175927525153 ) |
1.01 |
compress time/AirlineSentiment throughput |
2020 bytes |
2020 bytes |
1 |
parquet_rs-zstd compress time/AirlineSentiment |
55454.639310368126 ns (123.79506030901393 ) |
55147.57865030914 ns (200.5675341977294 ) |
1.01 |
parquet_rs-zstd compress time/AirlineSentiment throughput |
2020 bytes |
2020 bytes |
1 |
decompress time/AirlineSentiment |
123383.59035848528 ns (803.2464865044458 ) |
122183.20885000534 ns (655.6976499893499 ) |
1.01 |
decompress time/AirlineSentiment throughput |
2020 bytes |
2020 bytes |
1 |
parquet_rs-zstd decompress time/AirlineSentiment |
32348.285899991002 ns (37.44539933946544 ) |
32108.762590615213 ns (54.21731287058719 ) |
1.01 |
parquet_rs-zstd decompress time/AirlineSentiment throughput |
2020 bytes |
2020 bytes |
1 |
vortex:parquet-zstd size/AirlineSentiment |
11.544984488107549 ratio |
11.544984488107549 ratio |
1 |
vortex:raw size/AirlineSentiment |
5.526732673267326 ratio |
5.526732673267326 ratio |
1 |
vortex size/AirlineSentiment |
11164 bytes |
11164 bytes |
1 |
compress time/Arade |
2263282205.1 ns (3880277.338749647 ) |
2262984461.5 ns (3054974.283749819 ) |
1.00 |
compress time/Arade throughput |
787023760 bytes |
787023760 bytes |
1 |
parquet_rs-zstd compress time/Arade |
2877936378.9 ns (6535717.082499981 ) |
2883886061.2 ns (6605046.006249905 ) |
1.00 |
parquet_rs-zstd compress time/Arade throughput |
787023760 bytes |
787023760 bytes |
1 |
decompress time/Arade |
530487175.5 ns (3200717.2400000095 ) |
612946839.4 ns (2073186.7237499952 ) |
0.87 |
decompress time/Arade throughput |
787023760 bytes |
787023760 bytes |
1 |
parquet_rs-zstd decompress time/Arade |
662426596.7 ns (2746238.413749993 ) |
666112346.3 ns (1662576.1987499595 ) |
0.99 |
parquet_rs-zstd decompress time/Arade throughput |
787023760 bytes |
787023760 bytes |
1 |
vortex:parquet-zstd size/Arade |
0.4938228181453686 ratio |
0.4938228181453686 ratio |
1 |
vortex:raw size/Arade |
0.1916207764807507 ratio |
0.1916207764807507 ratio |
1 |
vortex size/Arade |
150810104 bytes |
150810104 bytes |
1 |
compress time/Bimbo |
10552974824 ns (9359263.322500229 ) |
10625715309.1 ns (11647554.183750153 ) |
0.99 |
compress time/Bimbo throughput |
7121333608 bytes |
7121333608 bytes |
1 |
parquet_rs-zstd compress time/Bimbo |
19213943926 ns (25489515.799999237 ) |
19282797379.3 ns (27073806.89999962 ) |
1.00 |
parquet_rs-zstd compress time/Bimbo throughput |
7121333608 bytes |
7121333608 bytes |
1 |
decompress time/Bimbo |
3548922820.2 ns (13816854.642499924 ) |
3804088021.4 ns (5468660 ) |
0.93 |
decompress time/Bimbo throughput |
7121333608 bytes |
7121333608 bytes |
1 |
parquet_rs-zstd decompress time/Bimbo |
2669191958.6 ns (6180695.410000086 ) |
2644857581.5 ns (7736382.481249809 ) |
1.01 |
parquet_rs-zstd decompress time/Bimbo throughput |
7121333608 bytes |
7121333608 bytes |
1 |
vortex:parquet-zstd size/Bimbo |
1.8549111341909053 ratio |
1.8277250112243697 ratio |
1.01 |
vortex:raw size/Bimbo |
0.1011026298769628 ratio |
0.0996208400071348 ratio |
1.01 |
vortex size/Bimbo |
719985556 bytes |
709433236 bytes |
1.01 |
compress time/CMSprovider |
12290578359.1 ns (8256593.667500496 ) |
12443627418.6 ns (12329184.007499695 ) |
0.99 |
compress time/CMSprovider throughput |
5149123964 bytes |
5149123964 bytes |
1 |
parquet_rs-zstd compress time/CMSprovider |
18317449282.3 ns (23794919.650001526 ) |
18398049817.7 ns (27697597.472499847 ) |
1.00 |
parquet_rs-zstd compress time/CMSprovider throughput |
5149123964 bytes |
5149123964 bytes |
1 |
decompress time/CMSprovider |
3698701527.3 ns (42097315.192500114 ) |
4233342991.3 ns (190265949.14750004 ) |
0.87 |
decompress time/CMSprovider throughput |
5149123964 bytes |
5149123964 bytes |
1 |
parquet_rs-zstd decompress time/CMSprovider |
5271578095.7 ns (9654621.849999905 ) |
5208739741.3 ns (5184162.31125021 ) |
1.01 |
parquet_rs-zstd decompress time/CMSprovider throughput |
5149123964 bytes |
5149123964 bytes |
1 |
vortex:parquet-zstd size/CMSprovider |
1.3078211079973756 ratio |
1.3284211533594883 ratio |
0.98 |
vortex:raw size/CMSprovider |
0.19544823761015204 ratio |
0.19852682575656863 ratio |
0.98 |
vortex size/CMSprovider |
1006387204 bytes |
1022239236 bytes |
0.98 |
compress time/Euro2016 |
2651535571.4 ns (3239248.8175001144 ) |
2638095584.5 ns (5461625.349999905 ) |
1.01 |
compress time/Euro2016 throughput |
393253221 bytes |
393253221 bytes |
1 |
parquet_rs-zstd compress time/Euro2016 |
1525128961.2 ns (2459583.2000000477 ) |
1531851684.5 ns (3158763.2000000477 ) |
1.00 |
parquet_rs-zstd compress time/Euro2016 throughput |
393253221 bytes |
393253221 bytes |
1 |
decompress time/Euro2016 |
213629788.2 ns (1310728.0775000006 ) |
296110309.4 ns (977301.650000006 ) |
0.72 |
decompress time/Euro2016 throughput |
393253221 bytes |
393253221 bytes |
1 |
parquet_rs-zstd decompress time/Euro2016 |
481265423.1 ns (1800948.088749975 ) |
479987155.1 ns (1589234.0637499988 ) |
1.00 |
parquet_rs-zstd decompress time/Euro2016 throughput |
393253221 bytes |
393253221 bytes |
1 |
vortex:parquet-zstd size/Euro2016 |
1.4824675342043792 ratio |
1.4824675342043792 ratio |
1 |
vortex:raw size/Euro2016 |
0.4481855420072961 ratio |
0.4481855420072961 ratio |
1 |
vortex size/Euro2016 |
176250408 bytes |
176250408 bytes |
1 |
compress time/Food |
956430932.6 ns (2239010.9587500095 ) |
960252187.7 ns (2207226.3499999642 ) |
1.00 |
compress time/Food throughput |
332718229 bytes |
332718229 bytes |
1 |
parquet_rs-zstd compress time/Food |
1023058282.1 ns (1660065.5999999642 ) |
1022722144.2 ns (1401163.6500000358 ) |
1.00 |
parquet_rs-zstd compress time/Food throughput |
332718229 bytes |
332718229 bytes |
1 |
decompress time/Food |
95340914.89555557 ns (330476.2674027756 ) |
122841846.06150794 ns (740251.1264841333 ) |
0.78 |
decompress time/Food throughput |
332718229 bytes |
332718229 bytes |
1 |
parquet_rs-zstd decompress time/Food |
217951499.5 ns (421339.003124997 ) |
220311141.1 ns (347985.97499999404 ) |
0.99 |
parquet_rs-zstd decompress time/Food throughput |
332718229 bytes |
332718229 bytes |
1 |
vortex:parquet-zstd size/Food |
1.4170195301889623 ratio |
1.4170195301889623 ratio |
1 |
vortex:raw size/Food |
0.15430064097870633 ratio |
0.15430064097870633 ratio |
1 |
vortex size/Food |
51338636 bytes |
51338636 bytes |
1 |
compress time/HashTags |
2466194796 ns (3659862.223749876 ) |
2462343618.7 ns (3412817.041250229 ) |
1.00 |
compress time/HashTags throughput |
804495592 bytes |
804495592 bytes |
1 |
parquet_rs-zstd compress time/HashTags |
2414483300.7 ns (4141901.5500001907 ) |
2421168053.6 ns (4580471.918750048 ) |
1.00 |
parquet_rs-zstd compress time/HashTags throughput |
804495592 bytes |
804495592 bytes |
1 |
decompress time/HashTags |
457473107.8 ns (2428760.887500018 ) |
456047317.3 ns (1328739.349999994 ) |
1.00 |
decompress time/HashTags throughput |
804495592 bytes |
804495592 bytes |
1 |
parquet_rs-zstd decompress time/HashTags |
770853645.5 ns (2948753.460000038 ) |
770295605.2 ns (4081381.616250038 ) |
1.00 |
parquet_rs-zstd decompress time/HashTags throughput |
804495592 bytes |
804495592 bytes |
1 |
vortex:parquet-zstd size/HashTags |
1.696770313204076 ratio |
1.6964612186937864 ratio |
1.00 |
vortex:raw size/HashTags |
0.28254807392406445 ratio |
0.2824966031634888 ratio |
1.00 |
vortex size/HashTags |
227308680 bytes |
227267272 bytes |
1.00 |
compress time/TPC-H l_comment chunked without fsst |
3259820257.4 ns (10362013.359999895 ) |
3278240147 ns (7835423.432499886 ) |
0.99 |
compress time/TPC-H l_comment chunked without fsst throughput |
249197098 bytes |
249197098 bytes |
1 |
parquet_rs-zstd compress time/TPC-H l_comment chunked without fsst |
904800758.3 ns (2014573.9737499356 ) |
907767925.5 ns (2196599.9487499595 ) |
1.00 |
parquet_rs-zstd compress time/TPC-H l_comment chunked without fsst throughput |
249197098 bytes |
249197098 bytes |
1 |
decompress time/TPC-H l_comment chunked without fsst |
113586399.8 ns (744321.2224999964 ) |
134174052.3 ns (376966.7012500018 ) |
0.85 |
decompress time/TPC-H l_comment chunked without fsst throughput |
249197098 bytes |
249197098 bytes |
1 |
parquet_rs-zstd decompress time/TPC-H l_comment chunked without fsst |
249970883.5 ns (380097.00499999523 ) |
252262861.4 ns (506957.57062499225 ) |
0.99 |
parquet_rs-zstd decompress time/TPC-H l_comment chunked without fsst throughput |
249197098 bytes |
249197098 bytes |
1 |
vortex:parquet-zstd size/TPC-H l_comment chunked without fsst |
4.6094725032883765 ratio |
4.60935871253353 ratio |
1.00 |
vortex:raw size/TPC-H l_comment chunked without fsst |
1.0531928104556016 ratio |
1.0531189412165627 ratio |
1.00 |
vortex size/TPC-H l_comment chunked without fsst |
262452592 bytes |
262434184 bytes |
1.00 |
compress time/TPC-H l_comment chunked |
985837911.9 ns (5737870.012499988 ) |
975606460.4 ns (1664221.8487499952 ) |
1.01 |
compress time/TPC-H l_comment chunked throughput |
249197098 bytes |
249197098 bytes |
1 |
parquet_rs-zstd compress time/TPC-H l_comment chunked |
906636148.2 ns (2225076.668749988 ) |
907923897.6 ns (1443434.800000012 ) |
1.00 |
parquet_rs-zstd compress time/TPC-H l_comment chunked throughput |
249197098 bytes |
249197098 bytes |
1 |
decompress time/TPC-H l_comment chunked |
94422093.54380952 ns (420113.7501190528 ) |
136989908.63333336 ns (426294.9795833528 ) |
0.69 |
decompress time/TPC-H l_comment chunked throughput |
249197098 bytes |
249197098 bytes |
1 |
parquet_rs-zstd decompress time/TPC-H l_comment chunked |
251154696.1 ns (859517.5481250137 ) |
252624906.4 ns (395806.4537499994 ) |
0.99 |
parquet_rs-zstd decompress time/TPC-H l_comment chunked throughput |
249197098 bytes |
249197098 bytes |
1 |
vortex:parquet-zstd size/TPC-H l_comment chunked |
1.3522625963766726 ratio |
1.352326733564503 ratio |
1.00 |
vortex:raw size/TPC-H l_comment chunked |
0.30897098167652015 ratio |
0.3089715916354692 ratio |
1.00 |
vortex size/TPC-H l_comment chunked |
76994672 bytes |
76994824 bytes |
1.00 |
compress time/TPC-H l_comment canonical |
983229741.55 ns (2012924.1012499928 ) |
981620716.05 ns (1932302.9150000215 ) |
1.00 |
compress time/TPC-H l_comment canonical throughput |
249197114 bytes |
249197114 bytes |
1 |
parquet_rs-zstd compress time/TPC-H l_comment canonical |
913855027.55 ns (2188628.6256249547 ) |
912662377.4 ns (1387070.9662499428 ) |
1.00 |
parquet_rs-zstd compress time/TPC-H l_comment canonical throughput |
249197114 bytes |
249197114 bytes |
1 |
decompress time/TPC-H l_comment canonical |
93713107.86022487 ns (232994.53101024032 ) |
138811240.23117062 ns (449824.3771031648 ) |
0.68 |
decompress time/TPC-H l_comment canonical throughput |
249197114 bytes |
249197114 bytes |
1 |
parquet_rs-zstd decompress time/TPC-H l_comment canonical |
249488661.99146825 ns (392238.420548141 ) |
254271176.72571427 ns (2553442.0268303454 ) |
0.98 |
parquet_rs-zstd decompress time/TPC-H l_comment canonical throughput |
249197114 bytes |
249197114 bytes |
1 |
vortex:parquet-zstd size/TPC-H l_comment canonical |
1.3523605242182377 ratio |
1.352364167886467 ratio |
1.00 |
vortex:raw size/TPC-H l_comment canonical |
0.3089709618386672 ratio |
0.30897157179757706 ratio |
1.00 |
vortex size/TPC-H l_comment canonical |
76994672 bytes |
76994824 bytes |
1.00 |
This comment was automatically generated by workflow using github-action-benchmark.
Seems like ci hits a deadlock |
I must have introduced deadlock in 4ffd42b since everything was running before that commit. I will figure it out tonight |
vortex-file/src/read/coalescer.rs
Outdated
self.queued.push_front(RowMaskState::Pending(m)); | ||
Poll::Pending |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this pending needs some way to know how to resume, I think reads should follow and this would resume as ready but I don't think it does
vortex-file/src/read/coalescer.rs
Outdated
cache: Arc<RwLock<LayoutMessageCache>>, | ||
} | ||
|
||
impl<R, S, V, RM> Coalescer<R, S, V, RM> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name is a bit off since the coalescing is actually happening in the VortexReadRanges
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're right, this is the thing that buffers values so they can be coalesced
vortex-file/src/read/coalescer.rs
Outdated
values_iter, | ||
in_flight: None, | ||
queued: VecDeque::new(), | ||
io_read: VortexReadRanges::new(read, dispatcher.clone(), 1 << 20), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol @ magic 1<<20
(presumably 1MiB?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we should get rid of it and instead messure the value in the reader
d58d3e8
to
655f575
Compare
vortex-file/src/read/coalescer.rs
Outdated
self.store_messages( | ||
msgs.map_err(|e| vortex_err!("Cancelled in flight read {e}"))?, | ||
); | ||
let messages = ready!(self.gather_read_messages(cx))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this returns pending we get into an error state where we poll a done future which in rust is a bug. Need this to not propagate pending
Implement a simple size-limited queue. Similar to `FuturesUnordered`, except it caps the total amount bytes that outstanding tasks will materialize into memory. The goal is to use this in #1466 to give us a backpressure mechanism that relies on memory rather than an arbitrary # of requests. There are explanatory doc comments on the `SizeLimitedStream` and `SizedFut` types.
Implement a simple size-limited queue. Similar to `FuturesUnordered`, except it caps the total amount bytes that outstanding tasks will materialize into memory. The goal is to use this in #1466 to give us a backpressure mechanism that relies on memory rather than an arbitrary # of requests. There are explanatory doc comments on the `SizeLimitedStream` and `SizedFut` types.
vortex-array/src/compute/filter.rs
Outdated
@@ -202,6 +204,10 @@ impl FilterMask { | |||
self.array.len() - self.true_count | |||
} | |||
|
|||
pub fn true_range(&self) -> (usize, usize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we got rid of true_range in #1468, prob want to remove here as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I’m working on fixing the last issue today and have it rebased locally
e5a2f38
to
30a795e
Compare
vortex-file/src/read/metadata.rs
Outdated
let metadata_reader = Coalescer::new( | ||
input, | ||
dispatcher, | ||
root_layout, | ||
Box::new(stream::iter(iter::once(Ok(RowMask::new_valid_between( | ||
0, 1, | ||
))))) as _, | ||
MetadataMaskReader::new(root_layout), | ||
layout_cache, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This suggests broken abstraction. We require some initial value always and we produce this empty range
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, it's just simpler to delegate to the existing logic and not have another type. We should factor out common logic from Coalesce for single element operation
/// Layout has no metadata | ||
None, | ||
/// Additional IO is required | ||
pub enum MessageRead<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add None
as a variant and change and remove nested value from Option in all return types
@@ -71,7 +71,7 @@ pub struct VortexReadBuilder<R> { | |||
io_dispatcher: Option<Arc<IoDispatcher>>, | |||
} | |||
|
|||
impl<R: VortexReadAt> VortexReadBuilder<R> { | |||
impl<R: VortexReadAt + Unpin> VortexReadBuilder<R> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't be so Unpin
and instead use pin_project
more
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TPC-H
Benchmark suite | Current: 5765d64 | Previous: 9a78dd4 | Ratio |
---|---|---|---|
tpch_q1/arrow |
567900919 ns |
599969079 ns |
0.95 |
tpch_q1/parquet |
788947137 ns |
812101003 ns |
0.97 |
tpch_q1/vortex-file-compressed |
490728192 ns |
471210219 ns |
1.04 |
tpch_q2/arrow |
144073286 ns |
150580712 ns |
0.96 |
tpch_q2/parquet |
181732717 ns |
184801972 ns |
0.98 |
tpch_q2/vortex-file-compressed |
151686964 ns |
153853516 ns |
0.99 |
tpch_q3/arrow |
172658279 ns |
180714000 ns |
0.96 |
tpch_q3/parquet |
382958256 ns |
387446970 ns |
0.99 |
tpch_q3/vortex-file-compressed |
223565003 ns |
226890559 ns |
0.99 |
tpch_q4/arrow |
182105286 ns |
183098082 ns |
0.99 |
tpch_q4/parquet |
217059840 ns |
224240346 ns |
0.97 |
tpch_q4/vortex-file-compressed |
139390861 ns |
144548472 ns |
0.96 |
tpch_q5/arrow |
325963155 ns |
337835232 ns |
0.96 |
tpch_q5/parquet |
506360030 ns |
532208637 ns |
0.95 |
tpch_q5/vortex-file-compressed |
350196821 ns |
347788477 ns |
1.01 |
tpch_q6/arrow |
26576819 ns |
27156954 ns |
0.98 |
tpch_q6/parquet |
154693421 ns |
155810987 ns |
0.99 |
tpch_q6/vortex-file-compressed |
10969201 ns |
14305312 ns |
0.77 |
tpch_q7/arrow |
638895377 ns |
653344791 ns |
0.98 |
tpch_q7/parquet |
785190082 ns |
806188050 ns |
0.97 |
tpch_q7/vortex-file-compressed |
644748081 ns |
652369907 ns |
0.99 |
tpch_q8/arrow |
262407977 ns |
274365324 ns |
0.96 |
tpch_q8/parquet |
547053746 ns |
570258950 ns |
0.96 |
tpch_q8/vortex-file-compressed |
324701937 ns |
293108271 ns |
1.11 |
tpch_q9/arrow |
492288201 ns |
502223545 ns |
0.98 |
tpch_q9/parquet |
802484776 ns |
819845414 ns |
0.98 |
tpch_q9/vortex-file-compressed |
566444846 ns |
538455717 ns |
1.05 |
tpch_q10/arrow |
277959931 ns |
283893865 ns |
0.98 |
tpch_q10/parquet |
528601482 ns |
534291677 ns |
0.99 |
tpch_q10/vortex-file-compressed |
270284023 ns |
287659842 ns |
0.94 |
tpch_q11/arrow |
148273751 ns |
150706425 ns |
0.98 |
tpch_q11/parquet |
154260144 ns |
159484822 ns |
0.97 |
tpch_q11/vortex-file-compressed |
133671513 ns |
132127232 ns |
1.01 |
tpch_q12/arrow |
180988214 ns |
187491269 ns |
0.97 |
tpch_q12/parquet |
330434811 ns |
337769772 ns |
0.98 |
tpch_q12/vortex-file-compressed |
189196287 ns |
205815548 ns |
0.92 |
tpch_q13/arrow |
181890879 ns |
188037303 ns |
0.97 |
tpch_q13/parquet |
323492461 ns |
327774170 ns |
0.99 |
tpch_q13/vortex-file-compressed |
195336737 ns |
195406257 ns |
1.00 |
tpch_q14/arrow |
39968356 ns |
41804498 ns |
0.96 |
tpch_q14/parquet |
238462170 ns |
241705800 ns |
0.99 |
tpch_q14/vortex-file-compressed |
64744548 ns |
76975757 ns |
0.84 |
tpch_q15/arrow |
70001669 ns |
74166362 ns |
0.94 |
tpch_q15/parquet |
333868421 ns |
337719987 ns |
0.99 |
tpch_q15/vortex-file-compressed |
107229172 ns |
138216055 ns |
0.78 |
tpch_q16/arrow |
106315719 ns |
110497638 ns |
0.96 |
tpch_q16/parquet |
120182066 ns |
126202843 ns |
0.95 |
tpch_q16/vortex-file-compressed |
107984332 ns |
111599228 ns |
0.97 |
tpch_q17/arrow |
635815002 ns |
697587401 ns |
0.91 |
tpch_q17/parquet |
687858189 ns |
713217542 ns |
0.96 |
tpch_q17/vortex-file-compressed |
580874975 ns |
577980797 ns |
1.01 |
tpch_q18/arrow |
1195468252 ns |
1267565877 ns |
0.94 |
tpch_q18/parquet |
1401974025 ns |
1490731842 ns |
0.94 |
tpch_q18/vortex-file-compressed |
1207508186 ns |
1219487250 ns |
0.99 |
tpch_q19/arrow |
151756371 ns |
156862084 ns |
0.97 |
tpch_q19/parquet |
425602502 ns |
436763094 ns |
0.97 |
tpch_q19/vortex-file-compressed |
125108383 ns |
141979143 ns |
0.88 |
tpch_q20/arrow |
230114219 ns |
246229079 ns |
0.93 |
tpch_q20/parquet |
360694360 ns |
376654516 ns |
0.96 |
tpch_q20/vortex-file-compressed |
250440244 ns |
292541894 ns |
0.86 |
tpch_q21/arrow |
1004524639 ns |
1065216442 ns |
0.94 |
tpch_q21/parquet |
1127491941 ns |
1196096174 ns |
0.94 |
tpch_q21/vortex-file-compressed |
904297497 ns |
951183752 ns |
0.95 |
tpch_q22/arrow |
79354962 ns |
84392398 ns |
0.94 |
tpch_q22/parquet |
111432582 ns |
119525981 ns |
0.93 |
tpch_q22/vortex-file-compressed |
89530089 ns |
92752525 ns |
0.97 |
This comment was automatically generated by workflow using github-action-benchmark.
37dadcc
to
5765d64
Compare
1231e09
to
7a77716
Compare
waker: Waker, | ||
) -> BoxFuture<'static, io::Result<Vec<Message>>> { | ||
let reader = self.io_read.clone(); | ||
self.dispatcher |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need this outer dispatcher.dispatch
, since read_byte_ranges
is also going through the Dispatcher.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a deadlock without it. The await is never resumed as we propagate the pending to the stream but if stream returns pending it needs to wake itself up so I need something to poll the read and then call wake
vortex-io/src/read_ranges.rs
Outdated
pub fn read_byte_ranges( | ||
&self, | ||
ranges: Vec<Range<usize>>, | ||
) -> impl Future<Output = io::Result<Vec<Bytes>>> + 'static { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably update the signature to reflect that the returned future is Send
, or perhaps just return a BoxFuture
directly
input, | ||
dispatcher, | ||
root_layout, | ||
stream::iter(iter::once(Ok(RowMask::new_valid_between(0, 1)))), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why [0,1]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is bad abstraction. This rowmask is never used but I need to produce a stream of row masks to call the metadata read… need to think this one over
vortex-file/src/read/buffered.rs
Outdated
pub struct MaskLayoutReader { | ||
layout: Box<dyn LayoutReader>, | ||
} | ||
|
||
impl MaskLayoutReader { | ||
pub fn new(layout: Box<dyn LayoutReader>) -> Self { | ||
Self { layout } | ||
} | ||
} | ||
|
||
impl RowMaskReader<ArrayData> for MaskLayoutReader { | ||
/// Read given mask out of the reader | ||
fn read_mask(&self, mask: &RowMask) -> VortexResult<Option<MessageRead<ArrayData>>> { | ||
self.layout.read_selection(mask) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can kill this and just replace it with a blanket impl
impl<T: LayoutReader> RowMaskReader<ArrayData> for T {
fn read_mask(&self, mask: &RowMask) -> VortexResult<Option<MessageRead<ArrayData>>> {
self.read_selection(mask)
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm
ReadMore(Vec<MessageLocator>), | ||
Batches(Vec<Option<ArrayData>>), | ||
} | ||
pub type BatchRead = MessageRead<ArrayData>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally find these type aliases more confusing than using the underlying type, though I think that is controversial
vortex-file/src/read/buffered.rs
Outdated
|
||
const NUM_TO_COALESCE: usize = 8; | ||
|
||
pub trait RowMaskReader<V> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all of the types in this file should probably either be non-pub
or at most pub(crate)
to avoid leaking impl things into public API, with some more imminent usage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a PR to clean up some naming and add some extra comments here: #1520
I think the structure of the reader has gotten really complex, with a lot of mutable state, lots of .expect
I'd like to remove, some awkward names. I think we can work on that after this merges though so we capture the perf improvements
I merged the renames. I think there's still something funky with |
No description provided.