-
Notifications
You must be signed in to change notification settings - Fork 32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: File row splits #1709
base: develop
Are you sure you want to change the base?
feat: File row splits #1709
Changes from all commits
0bf8b45
330550a
7c53dca
1157e76
40d1ae1
3a2f090
db1a869
107f9e7
0872d69
231791d
f1606e7
7f62a87
7bff7ac
f31dc0e
b45e8e1
97fb493
8cf4f2d
f6888a5
f5a773d
f932627
8b1e871
d57d36f
2203f35
3ddfa71
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
use std::collections::BTreeSet; | ||
use std::sync::{Arc, RwLock}; | ||
|
||
use futures::stream; | ||
use itertools::Itertools; | ||
use vortex_dtype::DType; | ||
use vortex_error::{VortexResult, VortexUnwrap as _}; | ||
use vortex_io::{IoDispatcher, VortexReadAt}; | ||
|
||
use super::splits::SplitsAccumulator; | ||
use super::{LayoutMessageCache, LayoutReader, LazyDType, RowMask, VortexReadArrayStream}; | ||
use crate::read::buffered::{BufferedLayoutReader, ReadArray}; | ||
use crate::read::splits::ReadRowMask; | ||
|
||
#[derive(Clone)] | ||
pub struct VortexReadHandle<R> { | ||
input: R, | ||
dtype: Arc<LazyDType>, | ||
row_count: u64, | ||
splits: Vec<(usize, usize)>, | ||
layout_reader: Arc<dyn LayoutReader>, | ||
filter_reader: Option<Arc<dyn LayoutReader>>, | ||
messages_cache: Arc<RwLock<LayoutMessageCache>>, | ||
row_mask: Option<RowMask>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it necessary to provide a handle-level row-mask? This sort of feels like a parameter you pass when you build a scan from the handle |
||
io_dispatcher: Arc<IoDispatcher>, | ||
} | ||
|
||
impl<R: VortexReadAt + Unpin> VortexReadHandle<R> { | ||
#[allow(clippy::too_many_arguments)] | ||
pub(crate) fn try_new( | ||
input: R, | ||
layout_reader: Arc<dyn LayoutReader>, | ||
filter_reader: Option<Arc<dyn LayoutReader>>, | ||
messages_cache: Arc<RwLock<LayoutMessageCache>>, | ||
dtype: Arc<LazyDType>, | ||
row_count: u64, | ||
row_mask: Option<RowMask>, | ||
io_dispatcher: Arc<IoDispatcher>, | ||
) -> VortexResult<Self> { | ||
let mut reader_splits = BTreeSet::new(); | ||
layout_reader.add_splits(0, &mut reader_splits)?; | ||
if let Some(ref fr) = filter_reader { | ||
fr.add_splits(0, &mut reader_splits)?; | ||
} | ||
|
||
reader_splits.insert(row_count as usize); | ||
|
||
let splits = reader_splits.into_iter().tuple_windows().collect(); | ||
|
||
Ok(Self { | ||
input, | ||
dtype, | ||
row_count, | ||
splits, | ||
layout_reader, | ||
filter_reader, | ||
messages_cache, | ||
row_mask, | ||
io_dispatcher, | ||
}) | ||
} | ||
|
||
/// Returns the type of the file's top-level array. | ||
pub fn dtype(&self) -> &DType { | ||
// FIXME(ngates): why is this allowed to unwrap? | ||
self.dtype.value().vortex_unwrap() | ||
} | ||
|
||
/// Returns the total row count of the Vortex file, before any filtering. | ||
pub fn row_count(&self) -> u64 { | ||
self.row_count | ||
} | ||
|
||
/// Returns a set of row splits in the file, that can be used to inform on how to split it horizontally. | ||
pub fn splits(&self) -> &[(usize, usize)] { | ||
&self.splits | ||
} | ||
|
||
/// Create a stream over all data from the handle | ||
pub fn into_stream(self) -> VortexReadArrayStream<R> { | ||
let splits_vec = Vec::from_iter(self.splits().iter().copied()); | ||
let split_accumulator = SplitsAccumulator::new(splits_vec.into_iter(), self.row_mask); | ||
|
||
let splits_stream = stream::iter(split_accumulator); | ||
|
||
// Set up a stream of RowMask that result from applying a filter expression over the file. | ||
let mask_iterator = if let Some(fr) = &self.filter_reader { | ||
Box::new(BufferedLayoutReader::new( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally we pull the splits_stream out of this thing and just Or is that not possible? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Honestly worried about using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Buffered is a bit too naive. You want more control over coalescing |
||
self.input.clone(), | ||
self.io_dispatcher.clone(), | ||
splits_stream, | ||
ReadRowMask::new(fr.clone()), | ||
self.messages_cache.clone(), | ||
)) as _ | ||
} else { | ||
Box::new(splits_stream) as _ | ||
}; | ||
|
||
// Set up a stream of result ArrayData that result from applying the filter and projection | ||
// expressions over the file. | ||
let array_reader = BufferedLayoutReader::new( | ||
self.input, | ||
self.io_dispatcher, | ||
mask_iterator, | ||
ReadArray::new(self.layout_reader), | ||
self.messages_cache, | ||
); | ||
|
||
VortexReadArrayStream::new(self.dtype, self.row_count, array_reader) | ||
} | ||
|
||
/// Create a stream over a specific row range from the handle | ||
pub fn stream_range( | ||
mut self, | ||
begin: usize, | ||
end: usize, | ||
) -> VortexResult<VortexReadArrayStream<R>> { | ||
self.row_mask = match self.row_mask { | ||
Some(mask) => Some(mask.and_rowmask(RowMask::new_valid_between(begin, end))?), | ||
None => Some(RowMask::new_valid_between(begin, end)), | ||
}; | ||
|
||
Ok(self.into_stream()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So LayoutReaders are no longer stateful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like I overlooked some layers of of state/locks in the readers, that will take some thinking and probably additional refactoring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok after a more careful reading and a small test, as far as I can tell while the
LayoutReader
implementations are stateful (with interior mutability), they can read independently from each other as theLayoutBufferedReader
is the part that actually controls the overall flow and each stream initializes a new one.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There can be multiple concurrent reads being tracked in the layout reader but are they reusable?
I thought something encoded assumptions that splits were accessed monotonically but maybe that was in the BufferedReader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought so too, but that does seem to be in the
BufferedLayoutReader
. TheLayoutReader
implementation read throughpoll_read
which takes aRowMask
to read try and read a specific range on each call, and theBufferedReader
tracks that.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should be able to access the layoutreaders out of order so we can read multiple at the same time irrespective of their children.