diff --git a/bench-vortex/benches/compress.rs b/bench-vortex/benches/compress.rs index 91bd8c0f2..2fbb4fb8e 100644 --- a/bench-vortex/benches/compress.rs +++ b/bench-vortex/benches/compress.rs @@ -144,7 +144,7 @@ fn vortex_decompress_read(runtime: &Runtime, buf: Buffer) -> VortexResult VortexResult { .with_io_dispatcher(DISPATCHER.clone()) .build() .await? + .into_stream() .read_all() .await } @@ -127,6 +128,7 @@ async fn take_vortex( .with_indices(ArrayData::from(indices.to_vec())) .build() .await? + .into_stream() .read_all() .await // For equivalence.... we decompress to make sure we're not cheating too much. diff --git a/pyvortex/src/dataset.rs b/pyvortex/src/dataset.rs index aa5718636..d1a5292f8 100644 --- a/pyvortex/src/dataset.rs +++ b/pyvortex/src/dataset.rs @@ -12,7 +12,7 @@ use vortex::dtype::DType; use vortex::error::VortexResult; use vortex::file::{ read_initial_bytes, LayoutContext, LayoutDeserializer, Projection, RowFilter, - VortexFileArrayStream, VortexReadBuilder, VortexRecordBatchReader, + VortexReadArrayStream, VortexReadBuilder, VortexRecordBatchReader, }; use vortex::io::{ObjectStoreReadAt, TokioFile, VortexReadAt}; use vortex::sampling_compressor::ALL_ENCODINGS_CONTEXT; @@ -27,7 +27,7 @@ pub async fn layout_stream_from_reader( projection: Projection, row_filter: Option, indices: Option, -) -> VortexResult> { +) -> VortexResult> { let mut builder = VortexReadBuilder::new( reader, LayoutDeserializer::new( @@ -45,7 +45,7 @@ pub async fn layout_stream_from_reader( builder = builder.with_indices(indices); } - builder.build().await + Ok(builder.build().await?.into_stream()) } pub async fn read_array_from_reader( diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index cb0d43337..ac5e7054c 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -72,6 +72,7 @@ impl FileOpener for VortexFileOpener { builder .build() .await? + .into_stream() .map_ok(RecordBatch::try_from) .map(|r| r.and_then(|inner| inner)) .map_err(|e| e.into()), diff --git a/vortex-dtype/src/ptype.rs b/vortex-dtype/src/ptype.rs index d1ebf2789..301a2fdca 100644 --- a/vortex-dtype/src/ptype.rs +++ b/vortex-dtype/src/ptype.rs @@ -188,9 +188,7 @@ macro_rules! match_each_integer_ptype { PType::U16 => __with__! { u16 }, PType::U32 => __with__! { u32 }, PType::U64 => __with__! { u64 }, - PType::F16 => panic!("Unsupported ptype f16"), - PType::F32 => panic!("Unsupported ptype f32"), - PType::F64 => panic!("Unsupported ptype f64"), + other => panic!("Unsupported ptype {other}") } }) } @@ -206,7 +204,7 @@ macro_rules! match_each_unsigned_integer_ptype { PType::U16 => __with__! { u16 }, PType::U32 => __with__! { u32 }, PType::U64 => __with__! { u64 }, - _ => panic!("Unsupported ptype {}", $self), + other => panic!("Unsupported ptype {other}"), } }) } @@ -222,7 +220,7 @@ macro_rules! match_each_float_ptype { PType::F16 => __with__! { f16 }, PType::F32 => __with__! { f32 }, PType::F64 => __with__! { f64 }, - _ => panic!("Unsupported ptype {}", $self), + other => panic!("Unsupported ptype {other}"), } }) } diff --git a/vortex-file/src/lib.rs b/vortex-file/src/lib.rs index 611bfbc80..e47fa6fdc 100644 --- a/vortex-file/src/lib.rs +++ b/vortex-file/src/lib.rs @@ -47,15 +47,15 @@ //! //! # Reading //! -//! Layout reading is implemented by [`VortexFileArrayStream`]. The [`VortexFileArrayStream`] should +//! Layout reading is implemented by [`VortexReadArrayStream`]. The [`VortexReadArrayStream`] should //! be constructed by a [`VortexReadBuilder`], which first uses an [InitialRead] to read the footer //! (schema, layout, postscript, version, and magic bytes). In most cases, these entire footer can //! be read by a single read of the suffix of the file. //! -//! A [`VortexFileArrayStream`] internally contains a [`LayoutMessageCache`] which is shared by its +//! A [`VortexReadArrayStream`] internally contains a [`LayoutMessageCache`] which is shared by its //! layout reader and the layout reader's descendants. The cache permits the reading system to //! "read" the bytes of a layout multiple times without triggering reads to the underlying storage. -//! For example, the [`VortexFileArrayStream`] reads an array, evaluates the row filter, and then +//! For example, the [`VortexReadArrayStream`] reads an array, evaluates the row filter, and then //! reads the array again with the filter mask. //! //! A [`LayoutReader`] then assembles one or more Vortex arrays by reading the serialized data and @@ -64,7 +64,7 @@ //! # Apache Arrow //! //! If you ultimately seek Arrow arrays, [`VortexRecordBatchReader`] converts a -//! [`VortexFileArrayStream`] into a [`RecordBatchReader`](arrow_array::RecordBatchReader). +//! [`VortexReadArrayStream`] into a [`RecordBatchReader`](arrow_array::RecordBatchReader). mod read; mod write; diff --git a/vortex-file/src/read/buffered.rs b/vortex-file/src/read/buffered.rs index f63ec6c38..a007fc9dc 100644 --- a/vortex-file/src/read/buffered.rs +++ b/vortex-file/src/read/buffered.rs @@ -26,11 +26,11 @@ pub(crate) trait ReadMasked { /// Read an array with a [`RowMask`]. pub(crate) struct ReadArray { - layout: Box, + layout: Arc, } impl ReadArray { - pub(crate) fn new(layout: Box) -> Self { + pub(crate) fn new(layout: Arc) -> Self { Self { layout } } } @@ -51,7 +51,8 @@ enum RowMaskState { } pub struct BufferedLayoutReader { - values: S, + /// Stream of row masks to read + read_masks: S, row_mask_reader: RM, in_flight: Option>>>, queued: VecDeque>, @@ -69,12 +70,12 @@ where pub fn new( read: R, dispatcher: Arc, - values: S, + read_masks: S, row_mask_reader: RM, cache: Arc>, ) -> Self { Self { - values, + read_masks, row_mask_reader, in_flight: None, queued: VecDeque::new(), @@ -126,7 +127,7 @@ where let mut exhausted = false; while read_more_count < NUM_TO_COALESCE { - match self.values.poll_next_unpin(cx) { + match self.read_masks.poll_next_unpin(cx) { Poll::Ready(Some(Ok(next_mask))) => { if let Some(read_result) = self.row_mask_reader.read_masked(&next_mask)? { match read_result { diff --git a/vortex-file/src/read/builder/mod.rs b/vortex-file/src/read/builder/mod.rs index 055994986..f042f7987 100644 --- a/vortex-file/src/read/builder/mod.rs +++ b/vortex-file/src/read/builder/mod.rs @@ -6,12 +6,12 @@ use vortex_error::VortexResult; use vortex_expr::Select; use vortex_io::{IoDispatcher, VortexReadAt}; +use super::handle::VortexReadHandle; use super::InitialRead; use crate::read::cache::{LayoutMessageCache, RelativeLayoutCache}; use crate::read::context::LayoutDeserializer; use crate::read::filtering::RowFilter; use crate::read::projection::Projection; -use crate::read::stream::VortexFileArrayStream; use crate::read::{RowMask, Scan}; pub(crate) mod initial_read; @@ -122,12 +122,20 @@ impl VortexReadBuilder { self } - pub async fn build(self) -> VortexResult> { + pub async fn make_initial_read(&mut self) -> VortexResult { + if let Some(initial_read) = self.initial_read.as_ref() { + return Ok(initial_read.clone()); + } + + let new_read = read_initial_bytes(&self.read_at, self.file_size().await?).await?; + self.initial_read = Some(new_read.clone()); + + Ok(new_read) + } + + pub async fn build(mut self) -> VortexResult> { // we do a large enough initial read to get footer, layout, and schema - let initial_read = match self.initial_read { - Some(r) => r, - None => read_initial_bytes(&self.read_at, self.file_size().await?).await?, - }; + let initial_read = self.make_initial_read().await?; let layout = initial_read.fb_layout(); @@ -163,19 +171,13 @@ impl VortexReadBuilder { let row_mask = self .row_mask .as_ref() - .map(|row_mask| { - if row_mask.dtype().is_int() { - RowMask::from_index_array(row_mask, 0, row_count as usize) - } else { - RowMask::from_mask_array(row_mask, 0, row_count as usize) - } - }) + .map(|row_mask| RowMask::from_array(row_mask, 0, row_count as usize)) .transpose()?; // Default: fallback to single-threaded tokio dispatcher. let io_dispatcher = self.io_dispatcher.unwrap_or_default(); - VortexFileArrayStream::try_new( + VortexReadHandle::try_new( self.read_at, layout_reader, filter_reader, diff --git a/vortex-file/src/read/context.rs b/vortex-file/src/read/context.rs index 551ea8715..4d00fba3b 100644 --- a/vortex-file/src/read/context.rs +++ b/vortex-file/src/read/context.rs @@ -27,7 +27,7 @@ pub trait Layout: Debug + Send + Sync { scan: Scan, layout_serde: LayoutDeserializer, message_cache: RelativeLayoutCache, - ) -> VortexResult>; + ) -> VortexResult>; } pub type LayoutRef = &'static dyn Layout; @@ -74,7 +74,7 @@ impl LayoutDeserializer { layout: fb::Layout, scan: Scan, message_cache: RelativeLayoutCache, - ) -> VortexResult> { + ) -> VortexResult> { let layout_id = LayoutId(layout.encoding()); self.layout_ctx .lookup_layout(&layout_id) diff --git a/vortex-file/src/read/handle.rs b/vortex-file/src/read/handle.rs new file mode 100644 index 000000000..cfec92067 --- /dev/null +++ b/vortex-file/src/read/handle.rs @@ -0,0 +1,125 @@ +use std::collections::BTreeSet; +use std::sync::{Arc, RwLock}; + +use futures::stream; +use itertools::Itertools; +use vortex_dtype::DType; +use vortex_error::{VortexResult, VortexUnwrap as _}; +use vortex_io::{IoDispatcher, VortexReadAt}; + +use super::splits::SplitsAccumulator; +use super::{LayoutMessageCache, LayoutReader, LazyDType, RowMask, VortexReadArrayStream}; +use crate::read::buffered::{BufferedLayoutReader, ReadArray}; +use crate::read::splits::ReadRowMask; + +#[derive(Clone)] +pub struct VortexReadHandle { + input: R, + dtype: Arc, + row_count: u64, + splits: Vec<(usize, usize)>, + layout_reader: Arc, + filter_reader: Option>, + messages_cache: Arc>, + row_mask: Option, + io_dispatcher: Arc, +} + +impl VortexReadHandle { + #[allow(clippy::too_many_arguments)] + pub(crate) fn try_new( + input: R, + layout_reader: Arc, + filter_reader: Option>, + messages_cache: Arc>, + dtype: Arc, + row_count: u64, + row_mask: Option, + io_dispatcher: Arc, + ) -> VortexResult { + let mut reader_splits = BTreeSet::new(); + layout_reader.add_splits(0, &mut reader_splits)?; + if let Some(ref fr) = filter_reader { + fr.add_splits(0, &mut reader_splits)?; + } + + reader_splits.insert(row_count as usize); + + let splits = reader_splits.into_iter().tuple_windows().collect(); + + Ok(Self { + input, + dtype, + row_count, + splits, + layout_reader, + filter_reader, + messages_cache, + row_mask, + io_dispatcher, + }) + } + + /// Returns the type of the file's top-level array. + pub fn dtype(&self) -> &DType { + // FIXME(ngates): why is this allowed to unwrap? + self.dtype.value().vortex_unwrap() + } + + /// Returns the total row count of the Vortex file, before any filtering. + pub fn row_count(&self) -> u64 { + self.row_count + } + + /// Returns a set of row splits in the file, that can be used to inform on how to split it horizontally. + pub fn splits(&self) -> &[(usize, usize)] { + &self.splits + } + + /// Create a stream over all data from the handle + pub fn into_stream(self) -> VortexReadArrayStream { + let splits_vec = Vec::from_iter(self.splits().iter().copied()); + let split_accumulator = SplitsAccumulator::new(splits_vec.into_iter(), self.row_mask); + + let splits_stream = stream::iter(split_accumulator); + + // Set up a stream of RowMask that result from applying a filter expression over the file. + let mask_iterator = if let Some(fr) = &self.filter_reader { + Box::new(BufferedLayoutReader::new( + self.input.clone(), + self.io_dispatcher.clone(), + splits_stream, + ReadRowMask::new(fr.clone()), + self.messages_cache.clone(), + )) as _ + } else { + Box::new(splits_stream) as _ + }; + + // Set up a stream of result ArrayData that result from applying the filter and projection + // expressions over the file. + let array_reader = BufferedLayoutReader::new( + self.input, + self.io_dispatcher, + mask_iterator, + ReadArray::new(self.layout_reader), + self.messages_cache, + ); + + VortexReadArrayStream::new(self.dtype, self.row_count, array_reader) + } + + /// Create a stream over a specific row range from the handle + pub fn stream_range( + mut self, + begin: usize, + end: usize, + ) -> VortexResult> { + self.row_mask = match self.row_mask { + Some(mask) => Some(mask.and_rowmask(RowMask::new_valid_between(begin, end))?), + None => Some(RowMask::new_valid_between(begin, end)), + }; + + Ok(self.into_stream()) + } +} diff --git a/vortex-file/src/read/layouts/chunked.rs b/vortex-file/src/read/layouts/chunked.rs index 18dfcd9bf..0f3a7fed7 100644 --- a/vortex-file/src/read/layouts/chunked.rs +++ b/vortex-file/src/read/layouts/chunked.rs @@ -42,8 +42,8 @@ impl Layout for ChunkedLayout { scan: Scan, layout_builder: LayoutDeserializer, message_cache: RelativeLayoutCache, - ) -> VortexResult> { - Ok(Box::new( + ) -> VortexResult> { + Ok(Arc::new( ChunkedLayoutBuilder { layout, scan, @@ -171,7 +171,7 @@ type InProgressLayoutRanges = RwLock, Vec, - metadata_layout: Option>, + metadata_layout: Option>, scan: Scan, in_progress_ranges: InProgressLayoutRanges, cached_metadata: OnceLock, @@ -181,7 +181,7 @@ pub struct ChunkedLayoutReader { impl ChunkedLayoutReader { pub fn new( layouts: Vec, - metadata_layout: Option>, + metadata_layout: Option>, scan: Scan, ) -> Self { Self { @@ -246,8 +246,8 @@ impl ChunkedLayoutReader { self.layouts .iter() .enumerate() - .filter_map(|(i, RangedLayoutReader((child_begin, child_end), _))| { - (end > *child_begin && begin < *child_end).then_some(i) + .filter_map(|(i, &RangedLayoutReader((child_begin, child_end), _))| { + (end > child_begin && begin < child_end).then_some(i) }) .collect::>() } @@ -288,8 +288,8 @@ impl ChunkedLayoutReader { impl LayoutReader for ChunkedLayoutReader { fn add_splits(&self, row_offset: usize, splits: &mut BTreeSet) -> VortexResult<()> { - for RangedLayoutReader((begin, _), child) in &self.layouts { - child.add_splits(row_offset + begin, splits)?; + for RangedLayoutReader((begin, _), child) in self.layouts.iter() { + child.add_splits(row_offset + *begin, splits)?; } Ok(()) } @@ -493,7 +493,7 @@ mod tests { #[cfg_attr(miri, ignore)] async fn read_range() { let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); - let (mut filter_layout, mut projection_layout, buf, length) = layout_and_bytes( + let (filter_layout, projection_layout, buf, length) = layout_and_bytes( cache.clone(), Scan::new(RowFilter::new_expr(BinaryExpr::new_expr( Arc::new(Identity), @@ -509,14 +509,8 @@ mod tests { assert!(filter_layout.metadata_layout().is_none()); assert!(projection_layout.metadata_layout().is_none()); - let arr = filter_read_layout( - &mut filter_layout, - &mut projection_layout, - cache, - &buf, - length, - ) - .pop_front(); + let arr = + filter_read_layout(&filter_layout, &projection_layout, cache, &buf, length).pop_front(); assert!(arr.is_some()); let arr = arr.unwrap(); @@ -530,9 +524,9 @@ mod tests { #[cfg_attr(miri, ignore)] async fn read_range_no_filter() { let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); - let (_, mut projection_layout, buf, length) = + let (_, projection_layout, buf, length) = layout_and_bytes(cache.clone(), Scan::empty()).await; - let arr = read_layout(&mut projection_layout, cache, &buf, length).pop_front(); + let arr = read_layout(&projection_layout, cache, &buf, length).pop_front(); assert!(arr.is_some()); let arr = arr.unwrap(); @@ -546,10 +540,9 @@ mod tests { #[cfg_attr(miri, ignore)] async fn read_no_range() { let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); - let (_, mut projection_layout, buf, _) = - layout_and_bytes(cache.clone(), Scan::empty()).await; + let (_, projection_layout, buf, _) = layout_and_bytes(cache.clone(), Scan::empty()).await; let arr = read_layout_data( - &mut projection_layout, + &projection_layout, cache, &buf, &RowMask::new_valid_between(0, 500), @@ -567,8 +560,7 @@ mod tests { #[cfg_attr(miri, ignore)] async fn read_multiple_selectors() { let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); - let (_, mut projection_layout, buf, _) = - layout_and_bytes(cache.clone(), Scan::empty()).await; + let (_, projection_layout, buf, _) = layout_and_bytes(cache.clone(), Scan::empty()).await; let mut first_range = BooleanBufferBuilder::new(200); first_range.append_n(150, true); @@ -584,7 +576,7 @@ mod tests { RowMask::new_valid_between(400, 500), ] .into_iter() - .flat_map(|s| read_layout_data(&mut projection_layout, cache.clone(), &buf, &s)) + .flat_map(|s| read_layout_data(&projection_layout, cache.clone(), &buf, &s)) .collect::>(); assert_eq!(arr.len(), 3); diff --git a/vortex-file/src/read/layouts/columnar.rs b/vortex-file/src/read/layouts/columnar.rs index 44f0d7a08..eaed15857 100644 --- a/vortex-file/src/read/layouts/columnar.rs +++ b/vortex-file/src/read/layouts/columnar.rs @@ -35,8 +35,8 @@ impl Layout for ColumnarLayout { scan: Scan, layout_serde: LayoutDeserializer, message_cache: RelativeLayoutCache, - ) -> VortexResult> { - Ok(Box::new( + ) -> VortexResult> { + Ok(Arc::new( ColumnarLayoutBuilder { layout, scan, @@ -116,7 +116,7 @@ impl ColumnarLayoutBuilder<'_> { ) })?; - handled_children.push(Box::new(ColumnarLayoutReader::new( + handled_children.push(Arc::new(ColumnarLayoutReader::new( unhandled_names.into(), unhandled_children, Some(prf), @@ -187,7 +187,7 @@ type InProgressPrunes = RwLock>>>; #[derive(Debug)] pub struct ColumnarLayoutReader { names: FieldNames, - children: Vec>, + children: Vec>, expr: Option>, // TODO(robert): This is a hack/optimization that tells us if we're reducing results with AND or not shortcircuit_siblings: bool, @@ -199,7 +199,7 @@ pub struct ColumnarLayoutReader { impl ColumnarLayoutReader { pub fn new( names: FieldNames, - children: Vec>, + children: Vec>, expr: Option>, shortcircuit_siblings: bool, ) -> Self { @@ -418,7 +418,7 @@ mod tests { async fn layout_and_bytes( cache: Arc>, scan: Scan, - ) -> (Box, Box, Buffer, usize) { + ) -> (Arc, Arc, Buffer, usize) { let int_array = PrimitiveArray::from((0..100).collect::>()).into_array(); let int2_array = PrimitiveArray::from((100..200).collect::>()).into_array(); let int_dtype = int_array.dtype().clone(); @@ -483,7 +483,7 @@ mod tests { #[cfg_attr(miri, ignore)] async fn read_range() { let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); - let (mut filter_layout, mut project_layout, buf, length) = layout_and_bytes( + let (filter_layout, project_layout, buf, length) = layout_and_bytes( cache.clone(), Scan::new(RowFilter::new_expr(BinaryExpr::new_expr( Column::new_expr(Field::from("ints")), @@ -493,8 +493,8 @@ mod tests { ) .await; let arr = filter_read_layout( - filter_layout.as_mut(), - project_layout.as_mut(), + filter_layout.as_ref(), + project_layout.as_ref(), cache, &buf, length, @@ -539,9 +539,8 @@ mod tests { #[cfg_attr(miri, ignore)] async fn read_range_no_filter() { let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); - let (_, mut project_layout, buf, length) = - layout_and_bytes(cache.clone(), Scan::empty()).await; - let arr = read_layout(project_layout.as_mut(), cache, &buf, length).pop_front(); + let (_, project_layout, buf, length) = layout_and_bytes(cache.clone(), Scan::empty()).await; + let arr = read_layout(project_layout.as_ref(), cache, &buf, length).pop_front(); assert!(arr.is_some()); let prim_arr = arr @@ -581,7 +580,7 @@ mod tests { #[cfg_attr(miri, ignore)] async fn short_circuit() { let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); - let (mut filter_layout, mut project_layout, buf, length) = layout_and_bytes( + let (filter_layout, project_layout, buf, length) = layout_and_bytes( cache.clone(), Scan::new(RowFilter::new_expr(BinaryExpr::new_expr( BinaryExpr::new_expr( @@ -599,8 +598,8 @@ mod tests { ) .await; let arr = filter_read_layout( - filter_layout.as_mut(), - project_layout.as_mut(), + filter_layout.as_ref(), + project_layout.as_ref(), cache, &buf, length, diff --git a/vortex-file/src/read/layouts/flat.rs b/vortex-file/src/read/layouts/flat.rs index d01838efb..0c36059a5 100644 --- a/vortex-file/src/read/layouts/flat.rs +++ b/vortex-file/src/read/layouts/flat.rs @@ -30,14 +30,14 @@ impl Layout for FlatLayout { scan: Scan, layout_serde: LayoutDeserializer, message_cache: RelativeLayoutCache, - ) -> VortexResult> { + ) -> VortexResult> { let buffers = layout.buffers().unwrap_or_default(); if buffers.len() != 1 { vortex_bail!("Flat layout can have exactly 1 buffer") } let buf = buffers.get(0); - Ok(Box::new(FlatLayoutReader::new( + Ok(Arc::new(FlatLayoutReader::new( ByteRange::new(buf.begin(), buf.end()), scan, layout_serde.ctx(), @@ -180,7 +180,7 @@ mod tests { #[cfg_attr(miri, ignore)] async fn read_range() { let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); - let (mut filter_layout, mut projection_layout, buf, length) = layout_and_bytes( + let (filter_layout, projection_layout, buf, length) = layout_and_bytes( cache.clone(), Scan::new(RowFilter::new_expr(BinaryExpr::new_expr( Arc::new(Identity), @@ -189,14 +189,8 @@ mod tests { ))), ) .await; - let arr = filter_read_layout( - &mut filter_layout, - &mut projection_layout, - cache, - &buf, - length, - ) - .pop_front(); + let arr = + filter_read_layout(&filter_layout, &projection_layout, cache, &buf, length).pop_front(); assert!(arr.is_some()); let arr = arr.unwrap(); @@ -210,8 +204,8 @@ mod tests { #[cfg_attr(miri, ignore)] async fn read_range_no_filter() { let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); - let (mut data_layout, buf, length, ..) = read_only_layout(cache.clone()).await; - let arr = read_layout(&mut data_layout, cache, &buf, length).pop_front(); + let (data_layout, buf, length, ..) = read_only_layout(cache.clone()).await; + let arr = read_layout(&data_layout, cache, &buf, length).pop_front(); assert!(arr.is_some()); let arr = arr.unwrap(); @@ -225,7 +219,7 @@ mod tests { #[cfg_attr(miri, ignore)] async fn read_empty() { let cache = Arc::new(RwLock::new(LayoutMessageCache::default())); - let (mut filter_layout, mut projection_layout, buf, length) = layout_and_bytes( + let (filter_layout, projection_layout, buf, length) = layout_and_bytes( cache.clone(), Scan::new(RowFilter::new_expr(BinaryExpr::new_expr( Arc::new(Identity), @@ -234,14 +228,8 @@ mod tests { ))), ) .await; - let arr = filter_read_layout( - &mut filter_layout, - &mut projection_layout, - cache, - &buf, - length, - ) - .pop_front(); + let arr = + filter_read_layout(&filter_layout, &projection_layout, cache, &buf, length).pop_front(); assert!(arr.is_none()); } diff --git a/vortex-file/src/read/layouts/mod.rs b/vortex-file/src/read/layouts/mod.rs index 50d78a7a5..bab0b68a0 100644 --- a/vortex-file/src/read/layouts/mod.rs +++ b/vortex-file/src/read/layouts/mod.rs @@ -4,6 +4,8 @@ mod flat; #[cfg(test)] mod test_read; +use std::sync::Arc; + pub use chunked::ChunkedLayout; pub use columnar::ColumnarLayout; pub use flat::FlatLayout; @@ -12,4 +14,4 @@ use crate::LayoutReader; // TODO(aduffy): make this container more useful #[derive(Debug)] -pub struct RangedLayoutReader((usize, usize), Box); +pub struct RangedLayoutReader((usize, usize), Arc); diff --git a/vortex-file/src/read/layouts/test_read.rs b/vortex-file/src/read/layouts/test_read.rs index 1e28833d8..d2ae6136c 100644 --- a/vortex-file/src/read/layouts/test_read.rs +++ b/vortex-file/src/read/layouts/test_read.rs @@ -1,6 +1,7 @@ use std::collections::{BTreeSet, VecDeque}; use std::sync::{Arc, RwLock}; +use itertools::Itertools; use vortex_array::ArrayData; use vortex_buffer::Buffer; use vortex_error::VortexUnwrap; @@ -9,21 +10,20 @@ use crate::read::mask::RowMask; use crate::read::splits::SplitsAccumulator; use crate::{LayoutMessageCache, LayoutReader, MessageLocator, PollRead}; -fn layout_splits( - layouts: &[&mut dyn LayoutReader], - length: usize, -) -> impl Iterator { - let mut iter = SplitsAccumulator::new(length as u64, None); +fn layout_splits(layouts: &[&dyn LayoutReader], length: usize) -> impl Iterator { let mut splits = BTreeSet::new(); for layout in layouts { layout.add_splits(0, &mut splits).vortex_unwrap(); } - iter.append_splits(&mut splits); + splits.insert(length); + + let iter = SplitsAccumulator::new(splits.into_iter().tuple_windows::<(usize, usize)>(), None); + iter.into_iter().map(|m| m.unwrap()) } pub fn read_layout_data( - layout: &mut dyn LayoutReader, + layout: &dyn LayoutReader, cache: Arc>, buf: &Buffer, selector: &RowMask, @@ -43,7 +43,7 @@ pub fn read_layout_data( } pub fn read_filters( - layout: &mut dyn LayoutReader, + layout: &dyn LayoutReader, cache: Arc>, buf: &Buffer, selector: &RowMask, @@ -57,9 +57,7 @@ pub fn read_filters( } } PollRead::Value(a) => { - return Some( - RowMask::from_mask_array(&a, selector.begin(), selector.end()).unwrap(), - ); + return Some(RowMask::from_array(&a, selector.begin(), selector.end()).unwrap()); } } } @@ -68,8 +66,8 @@ pub fn read_filters( } pub fn filter_read_layout( - filter_layout: &mut dyn LayoutReader, - layout: &mut dyn LayoutReader, + filter_layout: &dyn LayoutReader, + layout: &dyn LayoutReader, cache: Arc>, buf: &Buffer, length: usize, @@ -81,7 +79,7 @@ pub fn filter_read_layout( } pub fn read_layout( - layout: &mut dyn LayoutReader, + layout: &dyn LayoutReader, cache: Arc>, buf: &Buffer, length: usize, diff --git a/vortex-file/src/read/mask.rs b/vortex-file/src/read/mask.rs index 411a8fc1b..3ab7bcfbe 100644 --- a/vortex-file/src/read/mask.rs +++ b/vortex-file/src/read/mask.rs @@ -3,10 +3,11 @@ use std::fmt::{Display, Formatter}; use arrow_buffer::BooleanBuffer; use itertools::Itertools; -use vortex_array::array::{PrimitiveArray, SparseArray}; +use vortex_array::aliases::hash_set::HashSet; +use vortex_array::array::{BoolArray, PrimitiveArray, SparseArray}; use vortex_array::compute::{and, filter, slice, try_cast, FilterMask}; use vortex_array::validity::{ArrayValidity, LogicalValidity, Validity}; -use vortex_array::{ArrayData, IntoArrayData, IntoArrayVariant}; +use vortex_array::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant}; use vortex_dtype::Nullability::NonNullable; use vortex_dtype::{DType, PType}; use vortex_error::{vortex_bail, VortexResult, VortexUnwrap}; @@ -69,10 +70,24 @@ impl RowMask { .vortex_unwrap() } + /// Creates a RowMask from an array, only supported boolean and integer types. + pub fn from_array(array: &ArrayData, begin: usize, end: usize) -> VortexResult { + if array.dtype().is_int() { + Self::from_index_array(array, begin, end) + } else if array.dtype().is_boolean() { + Self::from_mask_array(array, begin, end) + } else { + vortex_bail!( + "RowMask can only be created from integer or boolean arrays, got {} instead.", + array.dtype() + ); + } + } + /// Construct a RowMask from a Boolean typed array. /// /// True-valued positions are kept by the returned mask. - pub fn from_mask_array(array: &ArrayData, begin: usize, end: usize) -> VortexResult { + fn from_mask_array(array: &ArrayData, begin: usize, end: usize) -> VortexResult { match array.logical_validity() { LogicalValidity::AllValid(_) => { Self::try_new(FilterMask::try_from(array.clone())?, begin, end) @@ -88,7 +103,7 @@ impl RowMask { /// Construct a RowMask from an integral array. /// /// The array values are interpreted as indices and those indices are kept by the returned mask. - pub fn from_index_array(array: &ArrayData, begin: usize, end: usize) -> VortexResult { + fn from_index_array(array: &ArrayData, begin: usize, end: usize) -> VortexResult { let indices = try_cast(array, &DType::Primitive(PType::U64, NonNullable))?.into_primitive()?; @@ -126,6 +141,53 @@ impl RowMask { } } + pub fn and_rowmask(&self, other: RowMask) -> VortexResult { + if other.true_count() == other.len() { + return Ok(self.clone()); + } + + // If both masks align perfectly + if self.begin == other.begin && self.end == other.end { + let this_buffer = self.mask.to_boolean_buffer()?; + let other_buffer = other.mask.to_boolean_buffer()?; + + let unified = &this_buffer & (&other_buffer); + return RowMask::from_mask_array( + BoolArray::from(unified).as_ref(), + self.begin, + self.end, + ); + } + + // Disjoint row ranges + if self.end <= other.begin || self.begin >= other.end { + return Ok(RowMask::new_invalid_between( + min(self.begin, other.begin), + max(self.end, other.end), + )); + } + + let output_end = max(self.end, other.end); + + let this_buffer = self.mask.to_boolean_buffer()?; + let other_buffer = other.mask.to_boolean_buffer()?; + let self_indices = this_buffer + .set_indices() + .map(|v| v + self.begin) + .collect::>(); + let other_indices = other_buffer + .set_indices() + .map(|v| v + other.begin) + .collect::>(); + + let output_mask = FilterMask::from_indices( + output_end, + self_indices.intersection(&other_indices).copied(), + ); + + Self::try_new(output_mask, 0, output_end) + } + pub fn is_all_false(&self) -> bool { self.mask.true_count() == 0 } @@ -222,6 +284,7 @@ mod tests { use rstest::rstest; use vortex_array::array::PrimitiveArray; use vortex_array::compute::FilterMask; + use vortex_array::validity::Validity; use vortex_array::{IntoArrayData, IntoArrayVariant}; use vortex_error::VortexUnwrap; @@ -300,4 +363,77 @@ mod tests { (5..10).collect::>() ); } + + #[test] + #[should_panic] + fn test_row_mask_type_validation() { + let array = PrimitiveArray::from_vec(vec![1.0, 2.0], Validity::AllInvalid).into_array(); + RowMask::from_array(&array, 0, 2).unwrap(); + } + + #[test] + fn test_and_rowmap_disjoint() { + let a = RowMask::from_array( + PrimitiveArray::from_vec(vec![1, 2, 3], Validity::AllValid).as_ref(), + 0, + 10, + ) + .unwrap(); + let b = RowMask::from_array( + PrimitiveArray::from_vec(vec![1, 2, 3], Validity::AllValid).as_ref(), + 15, + 20, + ) + .unwrap(); + + let output = a.and_rowmask(b).unwrap(); + + assert_eq!(output.begin, 0); + assert_eq!(output.end, 20); + assert!(output.is_all_false()); + } + + #[test] + fn test_and_rowmap_aligned() { + let a = RowMask::from_array( + PrimitiveArray::from_vec(vec![1, 2, 3], Validity::AllValid).as_ref(), + 0, + 10, + ) + .unwrap(); + let b = RowMask::from_array( + PrimitiveArray::from_vec(vec![1, 2, 7], Validity::AllValid).as_ref(), + 0, + 10, + ) + .unwrap(); + + let output = a.and_rowmask(b).unwrap(); + + assert_eq!(output.begin, 0); + assert_eq!(output.end, 10); + assert_eq!(output.true_count(), 2); + } + + #[test] + fn test_and_rowmap_intersect() { + let a = RowMask::from_array( + PrimitiveArray::from_vec(vec![1, 2, 3], Validity::AllValid).as_ref(), + 0, + 10, + ) + .unwrap(); + let b = RowMask::from_array( + PrimitiveArray::from_vec(vec![1, 2, 7], Validity::AllValid).as_ref(), + 5, + 15, + ) + .unwrap(); + + let output = a.and_rowmask(b).unwrap(); + + assert_eq!(output.begin, 0); + assert_eq!(output.end, 15); + assert_eq!(output.true_count(), 0); + } } diff --git a/vortex-file/src/read/metadata.rs b/vortex-file/src/read/metadata.rs index c18dddb48..9db0815ea 100644 --- a/vortex-file/src/read/metadata.rs +++ b/vortex-file/src/read/metadata.rs @@ -11,11 +11,11 @@ use crate::read::buffered::{BufferedLayoutReader, ReadMasked}; use crate::{PollRead, RowMask}; struct MetadataMaskReader { - layout: Box, + layout: Arc, } impl MetadataMaskReader { - pub fn new(layout: Box) -> Self { + pub fn new(layout: Arc) -> Self { Self { layout } } } @@ -34,7 +34,7 @@ impl ReadMasked for MetadataMaskReader { pub async fn fetch_metadata( input: R, dispatcher: Arc, - root_layout: Box, + root_layout: Arc, layout_cache: Arc>, ) -> VortexResult>>> { let mut metadata_reader = BufferedLayoutReader::new( diff --git a/vortex-file/src/read/mod.rs b/vortex-file/src/read/mod.rs index f03519e6b..fa0ec1ceb 100644 --- a/vortex-file/src/read/mod.rs +++ b/vortex-file/src/read/mod.rs @@ -10,6 +10,7 @@ mod cache; mod context; mod expr_project; mod filtering; +pub mod handle; pub mod layouts; mod mask; pub mod metadata; @@ -26,7 +27,7 @@ pub use context::*; pub use filtering::RowFilter; pub use projection::Projection; pub use recordbatchreader::{AsyncRuntime, VortexRecordBatchReader}; -pub use stream::VortexFileArrayStream; +pub use stream::VortexReadArrayStream; use vortex_buffer::Buffer; use vortex_expr::ExprRef; @@ -105,7 +106,7 @@ pub enum Prune { /// Layout readers are **synchronous** and **stateful**. A request to read a given row range may /// trigger a request for more messages, which will be handled by the caller, placing the messages /// back into the message cache for this layout as a result. -pub trait LayoutReader: Debug + Send { +pub trait LayoutReader: Debug + Send + Sync { /// Register all horizontal row boundaries of this layout. /// /// Layout should register all indivisible absolute row boundaries of the data stored in itself and its children. diff --git a/vortex-file/src/read/reader.rs b/vortex-file/src/read/reader.rs index a2a9aa658..3d4a90e4b 100644 --- a/vortex-file/src/read/reader.rs +++ b/vortex-file/src/read/reader.rs @@ -7,7 +7,7 @@ use vortex_array::ArrayData; use vortex_error::VortexResult; use vortex_io::VortexReadAt; -use crate::{InitialRead, LayoutMessageCache, VortexFileArrayStream}; +use crate::{InitialRead, LayoutMessageCache, VortexReadArrayStream}; pub struct VortexFileArrayReader { read: R, @@ -27,7 +27,7 @@ impl VortexFileArrayReader { } /// Stream the chunks of the Vortex file. - pub fn into_stream(self) -> VortexFileArrayStream { + pub fn into_stream(self) -> VortexReadArrayStream { todo!() } } diff --git a/vortex-file/src/read/recordbatchreader.rs b/vortex-file/src/read/recordbatchreader.rs index 760c54def..620f82859 100644 --- a/vortex-file/src/read/recordbatchreader.rs +++ b/vortex-file/src/read/recordbatchreader.rs @@ -9,7 +9,7 @@ use vortex_array::ArrayData; use vortex_error::{VortexError, VortexResult}; use vortex_io::VortexReadAt; -use super::VortexFileArrayStream; +use super::VortexReadArrayStream; fn vortex_to_arrow_error(error: VortexError) -> ArrowError { ArrowError::ExternalError(Box::new(error)) @@ -32,7 +32,7 @@ impl AsyncRuntime for tokio::runtime::Runtime { } pub struct VortexRecordBatchReader<'a, R, AR> { - stream: VortexFileArrayStream, + stream: VortexReadArrayStream, arrow_schema: SchemaRef, runtime: &'a AR, } @@ -43,7 +43,7 @@ where AR: AsyncRuntime, { pub fn try_new( - stream: VortexFileArrayStream, + stream: VortexReadArrayStream, runtime: &'a AR, ) -> VortexResult> { let arrow_schema = Arc::new(infer_schema(stream.dtype())?); diff --git a/vortex-file/src/read/splits.rs b/vortex-file/src/read/splits.rs index f0ab18fbb..a050472dd 100644 --- a/vortex-file/src/read/splits.rs +++ b/vortex-file/src/read/splits.rs @@ -1,8 +1,7 @@ -use std::collections::BTreeSet; +use std::sync::Arc; -use itertools::Itertools; use vortex_array::stats::ArrayStatistics; -use vortex_error::{VortexResult, VortexUnwrap}; +use vortex_error::VortexResult; use crate::read::buffered::ReadMasked; use crate::{LayoutReader, PollRead, Prune, RowMask}; @@ -12,11 +11,11 @@ use crate::{LayoutReader, PollRead, Prune, RowMask}; /// Similar to `ReadArray`, this wraps a layout to read an array, but `ReadRowMask` will interpret /// that array as a `RowMask`, and performs some optimizations to apply pruning first. pub(crate) struct ReadRowMask { - layout: Box, + layout: Arc, } impl ReadRowMask { - pub(crate) fn new(layout: Box) -> Self { + pub(crate) fn new(layout: Arc) -> Self { Self { layout } } } @@ -58,8 +57,9 @@ impl ReadMasked for ReadRowMask { } } +/// Takes all row sub-ranges, and produces a set of row ranges that aren't filtered out by the provided mask. pub struct SplitsAccumulator { - splits: BTreeSet, + ranges: Box + Send>, row_mask: Option, } @@ -69,14 +69,12 @@ pub struct SplitsIntoIter { } impl SplitsAccumulator { - pub fn new(row_count: u64, row_mask: Option) -> Self { - let mut splits = BTreeSet::new(); - splits.insert(row_count.try_into().vortex_unwrap()); - Self { splits, row_mask } - } - - pub fn append_splits(&mut self, other: &mut BTreeSet) { - self.splits.append(other); + pub fn new( + ranges: impl Iterator + Send + 'static, + row_mask: Option, + ) -> Self { + let ranges = Box::new(ranges); + Self { ranges, row_mask } } } @@ -85,10 +83,10 @@ impl IntoIterator for SplitsAccumulator { type IntoIter = SplitsIntoIter; + /// Creates an iterator of row masks to be read from the underlying file. fn into_iter(self) -> Self::IntoIter { - let ranges = Box::new(self.splits.into_iter().tuple_windows::<(usize, usize)>()); SplitsIntoIter { - ranges, + ranges: self.ranges, row_mask: self.row_mask, } } @@ -106,6 +104,7 @@ impl Iterator for SplitsIntoIter { Err(e) => return Some(Err(e)), }; + // If the range is all false, we take the next one if sliced.is_all_false() { continue; } @@ -121,8 +120,6 @@ impl Iterator for SplitsIntoIter { #[cfg(test)] mod tests { - use std::collections::BTreeSet; - use vortex_array::compute::FilterMask; use vortex_error::VortexResult; @@ -132,8 +129,8 @@ mod tests { #[test] #[cfg_attr(miri, ignore)] fn filters_empty() { - let mut mask_iter = SplitsAccumulator::new( - 10, + let mask_iter = SplitsAccumulator::new( + [(0, 2), (2, 4), (4, 6), (6, 8), (8, 10)].into_iter(), Some( RowMask::try_new( FilterMask::from_iter([ @@ -145,7 +142,6 @@ mod tests { .unwrap(), ), ); - mask_iter.append_splits(&mut BTreeSet::from([0, 2, 4, 6, 8, 10])); let actual = mask_iter .into_iter() diff --git a/vortex-file/src/read/stream.rs b/vortex-file/src/read/stream.rs index c1ac843b8..e6378d028 100644 --- a/vortex-file/src/read/stream.rs +++ b/vortex-file/src/read/stream.rs @@ -1,21 +1,17 @@ -use std::collections::BTreeSet; use std::pin::Pin; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::task::{Context, Poll}; -use futures::{stream, Stream}; +use futures::Stream; use futures_util::{StreamExt, TryStreamExt}; use vortex_array::array::ChunkedArray; use vortex_array::{ArrayData, IntoArrayData}; use vortex_dtype::DType; use vortex_error::{vortex_panic, VortexResult, VortexUnwrap}; -use vortex_io::{IoDispatcher, VortexReadAt}; +use vortex_io::VortexReadAt; use crate::read::buffered::{BufferedLayoutReader, ReadArray}; -use crate::read::cache::LayoutMessageCache; use crate::read::mask::RowMask; -use crate::read::splits::{ReadRowMask, SplitsAccumulator}; -use crate::read::LayoutReader; use crate::LazyDType; /// An asynchronous Vortex file that returns a [`Stream`] of [`ArrayData`]s. @@ -25,7 +21,7 @@ use crate::LazyDType; /// /// Use [VortexReadBuilder][crate::read::builder::VortexReadBuilder] to build one /// from a reader. -pub struct VortexFileArrayStream { +pub struct VortexReadArrayStream { dtype: Arc, row_count: u64, array_reader: BufferedLayoutReader< @@ -36,77 +32,36 @@ pub struct VortexFileArrayStream { >, } -impl VortexFileArrayStream { - #[allow(clippy::too_many_arguments)] - pub(crate) fn try_new( - input: R, - layout_reader: Box, - filter_reader: Option>, - messages_cache: Arc>, +impl VortexReadArrayStream { + pub(crate) fn new( dtype: Arc, row_count: u64, - row_mask: Option, - dispatcher: Arc, - ) -> VortexResult { - let mut reader_splits = BTreeSet::new(); - layout_reader.add_splits(0, &mut reader_splits)?; - if let Some(ref fr) = filter_reader { - fr.add_splits(0, &mut reader_splits)?; - } - - let mut split_accumulator = SplitsAccumulator::new(row_count, row_mask); - split_accumulator.append_splits(&mut reader_splits); - let splits_stream = stream::iter(split_accumulator); - - // Set up a stream of RowMask that result from applying a filter expression over the file. - let mask_iterator = if let Some(fr) = filter_reader { - Box::new(BufferedLayoutReader::new( - input.clone(), - dispatcher.clone(), - splits_stream, - ReadRowMask::new(fr), - messages_cache.clone(), - )) as _ - } else { - Box::new(splits_stream) as _ - }; - - // Set up a stream of result ArrayData that result from applying the filter and projection - // expressions over the file. - let array_reader = BufferedLayoutReader::new( - input, - dispatcher, - mask_iterator, - ReadArray::new(layout_reader), - messages_cache, - ); - - Ok(Self { + array_reader: BufferedLayoutReader< + R, + Box> + Send + Unpin>, + ArrayData, + ReadArray, + >, + ) -> Self { + Self { dtype, row_count, array_reader, - }) + } } + /// Returns the type of the file's top-level array. pub fn dtype(&self) -> &DType { // FIXME(ngates): why is this allowed to unwrap? self.dtype.value().vortex_unwrap() } + /// Returns the total row count of the Vortex file, before any filtering. pub fn row_count(&self) -> u64 { self.row_count } -} - -impl Stream for VortexFileArrayStream { - type Item = VortexResult; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.array_reader.poll_next_unpin(cx) - } -} -impl VortexFileArrayStream { + /// Read the whole stream into a single [`ArrayData`]. pub async fn read_all(self) -> VortexResult { let dtype = self.dtype().clone(); let arrays = self.try_collect::>().await?; @@ -121,3 +76,11 @@ impl VortexFileArrayStream { } } } + +impl Stream for VortexReadArrayStream { + type Item = VortexResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.array_reader.poll_next_unpin(cx) + } +} diff --git a/vortex-file/src/tests.rs b/vortex-file/src/tests.rs index f70284f6d..d59e69c06 100644 --- a/vortex-file/src/tests.rs +++ b/vortex-file/src/tests.rs @@ -58,7 +58,8 @@ async fn test_read_simple() { let mut stream = VortexReadBuilder::new(written, LayoutDeserializer::default()) .build() .await - .unwrap(); + .unwrap() + .into_stream(); let mut batch_count = 0; let mut row_count = 0; @@ -177,6 +178,7 @@ async fn test_read_projection() { .build() .await .unwrap() + .into_stream() .read_all() .await .unwrap(); @@ -208,6 +210,7 @@ async fn test_read_projection() { .build() .await .unwrap() + .into_stream() .read_all() .await .unwrap(); @@ -239,6 +242,7 @@ async fn test_read_projection() { .build() .await .unwrap() + .into_stream() .read_all() .await .unwrap(); @@ -266,6 +270,7 @@ async fn test_read_projection() { .build() .await .unwrap() + .into_stream() .read_all() .await .unwrap(); @@ -313,7 +318,8 @@ async fn unequal_batches() { let mut stream = VortexReadBuilder::new(written, LayoutDeserializer::default()) .build() .await - .unwrap(); + .unwrap() + .into_stream(); let mut batch_count = 0; let mut item_count = 0; @@ -371,7 +377,8 @@ async fn write_chunked() { let mut reader = VortexReadBuilder::new(written, LayoutDeserializer::default()) .build() .await - .unwrap(); + .unwrap() + .into_stream(); let mut array_len: usize = 0; while let Some(array) = reader.next().await { array_len += array.unwrap().len(); @@ -402,7 +409,7 @@ async fn filter_string() { writer = writer.write_array_columns(st).await.unwrap(); let written = Buffer::from(writer.finalize().await.unwrap()); - let reader = VortexReadBuilder::new(written, LayoutDeserializer::default()) + let stream = VortexReadBuilder::new(written, LayoutDeserializer::default()) .with_row_filter(RowFilter::new(BinaryExpr::new_expr( Column::new_expr(Field::from("name")), Operator::Eq, @@ -410,9 +417,10 @@ async fn filter_string() { ))) .build() .await - .unwrap(); + .unwrap() + .into_stream(); - let result = reader.try_collect::>().await.unwrap(); + let result = stream.try_collect::>().await.unwrap(); assert_eq!(result.len(), 1); let names = result[0].as_struct_array().unwrap().field(0).unwrap(); assert_eq!( @@ -476,7 +484,8 @@ async fn filter_or() { ))) .build() .await - .unwrap(); + .unwrap() + .into_stream(); let mut result = Vec::new(); while let Some(array) = reader.next().await { @@ -540,7 +549,8 @@ async fn filter_and() { ))) .build() .await - .unwrap(); + .unwrap() + .into_stream(); let mut result = Vec::new(); while let Some(array) = reader.next().await { @@ -590,6 +600,7 @@ async fn test_with_indices_simple() { .build() .await .unwrap() + .into_stream() .read_all() .await .unwrap() @@ -607,6 +618,7 @@ async fn test_with_indices_simple() { .build() .await .unwrap() + .into_stream() .read_all() .await .unwrap() @@ -630,6 +642,7 @@ async fn test_with_indices_simple() { .build() .await .unwrap() + .into_stream() .read_all() .await .unwrap() @@ -672,6 +685,7 @@ async fn test_with_indices_on_two_columns() { .build() .await .unwrap() + .into_stream() .read_all() .await .unwrap() @@ -737,6 +751,7 @@ async fn test_with_indices_and_with_row_filter_simple() { .build() .await .unwrap() + .into_stream() .read_all() .await .unwrap() @@ -759,6 +774,7 @@ async fn test_with_indices_and_with_row_filter_simple() { .build() .await .unwrap() + .into_stream() .read_all() .await .unwrap() @@ -790,6 +806,7 @@ async fn test_with_indices_and_with_row_filter_simple() { .build() .await .unwrap() + .into_stream() .read_all() .await .unwrap() @@ -855,6 +872,7 @@ async fn filter_string_chunked() { .build() .await .unwrap() + .into_stream() .read_all() .await .unwrap(); @@ -953,6 +971,7 @@ async fn test_pruning_with_or() { .build() .await .unwrap() + .into_stream() .read_all() .await .unwrap(); @@ -1037,6 +1056,7 @@ async fn test_repeated_projection() { .with_projection(projection) .build() .await? + .into_stream() .read_all() .await } @@ -1075,3 +1095,96 @@ async fn test_repeated_projection() { .collect_vec() ); } + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn test_simple_ranged_read() { + let strings = ChunkedArray::from_iter([ + VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(), + VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(), + ]) + .into_array(); + + let numbers = ChunkedArray::from_iter([ + PrimitiveArray::from(vec![1u32, 2, 3, 4]).into_array(), + PrimitiveArray::from(vec![5u32, 6, 7, 8]).into_array(), + ]) + .into_array(); + + let st = StructArray::from_fields(&[("strings", strings), ("numbers", numbers)]).unwrap(); + let buf = Vec::new(); + let mut writer = VortexFileWriter::new(buf); + writer = writer.write_array_columns(st.into_array()).await.unwrap(); + let written = Buffer::from(writer.finalize().await.unwrap()); + + let handle = VortexReadBuilder::new(written, LayoutDeserializer::default()) + .build() + .await + .unwrap(); + + dbg!(handle.splits()); + for v in [(0, 4), (4, 8)] { + assert!(handle.splits().contains(&v)); + } + + let mut stream = handle.stream_range(0, 4).unwrap(); + + let mut batch_count = 0; + let mut row_count = 0; + + while let Some(array) = stream.next().await { + let array = array.unwrap(); + batch_count += 1; + row_count += array.len(); + } + + assert_eq!(batch_count, 1); + assert_eq!(row_count, 4); +} + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn test_simple_range_twice() { + let strings = ChunkedArray::from_iter([ + VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(), + VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(), + ]) + .into_array(); + + let numbers = ChunkedArray::from_iter([ + PrimitiveArray::from(vec![1u32, 2, 3, 4]).into_array(), + PrimitiveArray::from(vec![5u32, 6, 7, 8]).into_array(), + ]) + .into_array(); + + let st = StructArray::from_fields(&[("strings", strings), ("numbers", numbers)]).unwrap(); + let buf = Vec::new(); + let mut writer = VortexFileWriter::new(buf); + writer = writer.write_array_columns(st.into_array()).await.unwrap(); + let written = Buffer::from(writer.finalize().await.unwrap()); + + let handle = VortexReadBuilder::new(written, LayoutDeserializer::default()) + .build() + .await + .unwrap(); + + for v in [(0, 4), (4, 8)] { + assert!(handle.splits().contains(&v)); + } + + for _ in 0..2 { + let mut stream = handle.clone().stream_range(0, 7).unwrap(); + + let mut batch_count = 0; + let mut row_count = 0; + + while let Some(array) = stream.next().await { + let array = array.unwrap(); + batch_count += 1; + row_count += array.len(); + } + + assert_eq!(batch_count, 2); + assert_eq!(row_count, 7); + } +}