diff --git a/vortex-file/Cargo.toml b/vortex-file/Cargo.toml index e5d9bf1f76..63e79aa81f 100644 --- a/vortex-file/Cargo.toml +++ b/vortex-file/Cargo.toml @@ -57,7 +57,7 @@ vortex-sampling-compressor = { path = "../vortex-sampling-compressor" } workspace = true [features] -default = [] +default = ["tokio"] futures = ["futures-util/io", "vortex-io/futures"] compio = ["dep:compio", "vortex-io/compio"] tokio = ["dep:tokio", "vortex-io/tokio"] diff --git a/vortex-file/src/chunked_reader/take_rows.rs b/vortex-file/src/chunked_reader/take_rows.rs index 74543680fc..9ddc513c80 100644 --- a/vortex-file/src/chunked_reader/take_rows.rs +++ b/vortex-file/src/chunked_reader/take_rows.rs @@ -1,6 +1,6 @@ +use std::io::Cursor; use std::ops::Range; -use bytes::BytesMut; use futures_util::{stream, StreamExt, TryStreamExt}; use itertools::Itertools; use vortex_array::aliases::hash_map::HashMap; @@ -122,7 +122,7 @@ impl ChunkedArrayReader { byte_range: Range, row_range: Range, ) -> VortexResult { - let range_byte_len = (byte_range.end - byte_range.start) as usize; + let range_byte_len = byte_range.end - byte_range.start; // Relativize the indices to these chunks let indices_start = @@ -133,15 +133,15 @@ impl ChunkedArrayReader { let row_start_scalar = Scalar::from(row_range.start).cast(relative_indices.dtype())?; let relative_indices = subtract_scalar(&relative_indices, &row_start_scalar)?; - // Set up an array reader to read this range of chunks. - let mut buffer = BytesMut::with_capacity(range_byte_len); - unsafe { buffer.set_len(range_byte_len) } // TODO(ngates): instead of reading the whole range into a buffer, we should stream // the byte range (e.g. if its coming from an HTTP endpoint) and wrap that with an // MesssageReader. - let buffer = self.read.read_at_into(byte_range.start, buffer).await?; + let buffer = self + .read + .read_byte_range(byte_range.start, range_byte_len) + .await?; - let reader = StreamArrayReader::try_new(buffer, self.context.clone()) + let reader = StreamArrayReader::try_new(Cursor::new(buffer), self.context.clone()) .await? .with_dtype(self.dtype.clone()); diff --git a/vortex-file/src/dispatcher/tokio.rs b/vortex-file/src/dispatcher/tokio.rs index aa33fad088..37cd293e1b 100644 --- a/vortex-file/src/dispatcher/tokio.rs +++ b/vortex-file/src/dispatcher/tokio.rs @@ -117,7 +117,6 @@ impl Dispatch for TokioDispatcher { mod tests { use std::io::Write; - use bytes::BytesMut; use tempfile::NamedTempFile; use vortex_io::{TokioFile, VortexReadAt}; @@ -133,8 +132,8 @@ mod tests { let rx = dispatcher .dispatch(|| async move { let file = TokioFile::open(tmpfile.path()).unwrap(); - let bytes = BytesMut::zeroed(4); - file.read_at_into(0, bytes).await.unwrap() + + file.read_byte_range(0, 4).await.unwrap() }) .unwrap(); diff --git a/vortex-file/src/read/builder/initial_read.rs b/vortex-file/src/read/builder/initial_read.rs index 722e99dd84..38898d6bcf 100644 --- a/vortex-file/src/read/builder/initial_read.rs +++ b/vortex-file/src/read/builder/initial_read.rs @@ -1,6 +1,6 @@ use core::ops::Range; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use flatbuffers::{root, root_unchecked}; use vortex_error::{vortex_bail, vortex_err, VortexResult}; use vortex_flatbuffers::{footer, message}; @@ -91,11 +91,11 @@ pub async fn read_initial_bytes( } let read_size = INITIAL_READ_SIZE.min(file_size as usize); - let mut buf = BytesMut::with_capacity(read_size); - unsafe { buf.set_len(read_size) } let initial_read_offset = file_size - read_size as u64; - buf = read.read_at_into(initial_read_offset, buf).await?; + let buf = read + .read_byte_range(initial_read_offset, read_size as u64) + .await?; let eof_loc = read_size - EOF_SIZE; let magic_bytes_loc = eof_loc + (EOF_SIZE - MAGIC_BYTES.len()); @@ -168,7 +168,7 @@ pub async fn read_initial_bytes( root::(&buf[layout_loc..ps_loc])?; Ok(InitialRead { - buf: buf.freeze(), + buf, initial_read_offset, fb_postscript_byte_range, }) diff --git a/vortex-file/src/read/stream.rs b/vortex-file/src/read/stream.rs index 1683a9798f..4b48f80787 100644 --- a/vortex-file/src/read/stream.rs +++ b/vortex-file/src/read/stream.rs @@ -4,7 +4,7 @@ use std::pin::Pin; use std::sync::{Arc, RwLock}; use std::task::{Context, Poll}; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use futures::future::BoxFuture; use futures::Stream; use futures_util::{stream, FutureExt, StreamExt, TryStreamExt}; @@ -305,14 +305,11 @@ async fn read_ranges( ) -> VortexResult> { stream::iter(ranges.into_iter()) .map(|MessageLocator(id, range)| { - let mut buf = BytesMut::with_capacity(range.len()); - unsafe { buf.set_len(range.len()) } - - let read_ft = reader.read_at_into(range.begin, buf); + let read_ft = reader.read_byte_range(range.begin, range.len()); read_ft.map(|result| { result - .map(|res| Message(id, res.freeze())) + .map(|res| Message(id, res)) .map_err(VortexError::from) }) }) diff --git a/vortex-io/src/compio.rs b/vortex-io/src/compio.rs index f4c97b8bf2..222771254b 100644 --- a/vortex-io/src/compio.rs +++ b/vortex-io/src/compio.rs @@ -1,7 +1,7 @@ use std::future::Future; use std::io; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use compio::fs::File; use compio::io::AsyncReadAtExt; use compio::BufResult; @@ -10,16 +10,20 @@ use vortex_error::vortex_panic; use super::VortexReadAt; impl VortexReadAt for File { - fn read_at_into( + fn read_byte_range( &self, pos: u64, - buffer: BytesMut, - ) -> impl Future> + 'static { + len: u64, + ) -> impl Future> + 'static { let this = self.clone(); + let mut buffer = BytesMut::with_capacity(len as usize); + unsafe { + buffer.set_len(len as usize); + } async move { // Turn the buffer into a static slice. let BufResult(res, buffer) = this.read_exact_at(buffer, pos).await; - res.map(|_| buffer) + res.map(|_| buffer.freeze()) } } @@ -38,7 +42,6 @@ impl VortexReadAt for File { mod tests { use std::io::Write; - use bytes::BytesMut; use compio::fs::File; use tempfile::NamedTempFile; @@ -54,8 +57,7 @@ mod tests { let file = File::open(tmpfile.path()).await.unwrap(); // Use the file as a VortexReadAt instance. - let four_bytes = BytesMut::zeroed(4); - let read = file.read_at_into(2, four_bytes).await.unwrap(); + let read = file.read_byte_range(2, 4).await.unwrap(); assert_eq!(&read, "2345".as_bytes()); } } diff --git a/vortex-io/src/futures.rs b/vortex-io/src/futures.rs index cb7739135d..b4d902b9e4 100644 --- a/vortex-io/src/futures.rs +++ b/vortex-io/src/futures.rs @@ -1,6 +1,6 @@ use std::io; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use futures_util::{AsyncRead, AsyncReadExt}; use crate::VortexRead; @@ -8,8 +8,12 @@ use crate::VortexRead; pub struct FuturesAdapter(pub IO); impl VortexRead for FuturesAdapter { - async fn read_into(&mut self, mut buffer: BytesMut) -> io::Result { + async fn read_bytes(&mut self, len: u64) -> io::Result { + let mut buffer = BytesMut::with_capacity(len as usize); + unsafe { + buffer.set_len(len as usize); + } self.0.read_exact(buffer.as_mut()).await?; - Ok(buffer) + Ok(buffer.freeze()) } } diff --git a/vortex-io/src/object_store.rs b/vortex-io/src/object_store.rs index 8b6bb6c65b..407f9d9cc0 100644 --- a/vortex-io/src/object_store.rs +++ b/vortex-io/src/object_store.rs @@ -4,7 +4,7 @@ use std::ops::Range; use std::sync::Arc; use std::{io, mem}; -use bytes::BytesMut; +use bytes::Bytes; use object_store::path::Path; use object_store::{ObjectStore, WriteMultipart}; use vortex_buffer::io_buf::IoBuf; @@ -66,21 +66,20 @@ impl ObjectStoreReadAt { } impl VortexReadAt for ObjectStoreReadAt { - fn read_at_into( + fn read_byte_range( &self, pos: u64, - mut buffer: BytesMut, - ) -> impl Future> + 'static { + len: u64, + ) -> impl Future> + 'static { let object_store = self.object_store.clone(); let location = self.location.clone(); Box::pin(async move { let start_range = pos as usize; let bytes = object_store - .get_range(&location, start_range..(start_range + buffer.len())) + .get_range(&location, start_range..(start_range + len as usize)) .await?; - buffer.as_mut().copy_from_slice(bytes.as_ref()); - Ok(buffer) + Ok(bytes) }) } diff --git a/vortex-io/src/offset.rs b/vortex-io/src/offset.rs index 12f3f47557..76fc27f773 100644 --- a/vortex-io/src/offset.rs +++ b/vortex-io/src/offset.rs @@ -1,6 +1,6 @@ use std::future::Future; -use bytes::BytesMut; +use bytes::Bytes; use futures::FutureExt; use crate::VortexReadAt; @@ -30,12 +30,12 @@ impl OffsetReadAt { } impl VortexReadAt for OffsetReadAt { - fn read_at_into( + fn read_byte_range( &self, pos: u64, - buffer: BytesMut, - ) -> impl Future> + 'static { - self.read.read_at_into(pos + self.offset, buffer) + len: u64, + ) -> impl Future> + 'static { + self.read.read_byte_range(pos + self.offset, len) } fn performance_hint(&self) -> usize { diff --git a/vortex-io/src/read.rs b/vortex-io/src/read.rs index c76cb4d6a5..873112e7ad 100644 --- a/vortex-io/src/read.rs +++ b/vortex-io/src/read.rs @@ -3,19 +3,19 @@ use std::io; use std::io::Cursor; use std::sync::Arc; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use vortex_buffer::Buffer; use vortex_error::vortex_err; /// An asynchronous streaming reader. /// -/// Implementations expose data via the asynchronous [`read_into`][VortexRead::read_into], which +/// Implementations expose data via the asynchronous [`read_bytes`][VortexRead::read_bytes], which /// will fill the exact number of bytes and advance the stream. /// /// If the exact number of bytes is not available from the stream, an /// [`UnexpectedEof`][std::io::ErrorKind::UnexpectedEof] error is returned. pub trait VortexRead { - fn read_into(&mut self, buffer: BytesMut) -> impl Future>; + fn read_bytes(&mut self, len: u64) -> impl Future>; } /// A trait for types that support asynchronous reads. @@ -25,23 +25,20 @@ pub trait VortexRead { /// /// Readers must be cheaply cloneable to allow for easy sharing across tasks or threads. pub trait VortexReadAt: Send + Sync + Clone + 'static { - /// Request an asynchronous positional read to be done, with results written into the provided `buffer`. + /// Request an asynchronous positional read. Results will be returned as an owned [`Bytes`]. /// - /// This method will take ownership of the provided `buffer`, and upon successful completion will return - /// the buffer completely full with data. - /// - /// If the reader does not have enough data available to fill the buffer, the returned Future will complete + /// If the reader does not have the requested number of bytes, the returned Future will complete /// with an [`io::Error`]. /// /// ## Thread Safety /// /// The resultant Future need not be [`Send`], allowing implementations that use thread-per-core /// executors. - fn read_at_into( + fn read_byte_range( &self, pos: u64, - buffer: BytesMut, - ) -> impl Future> + 'static; + len: u64, + ) -> impl Future> + 'static; // TODO(ngates): the read implementation should be able to hint at its latency/throughput // allowing the caller to make better decisions about how to coalesce reads. @@ -57,12 +54,12 @@ pub trait VortexReadAt: Send + Sync + Clone + 'static { } impl VortexReadAt for Arc { - fn read_at_into( + fn read_byte_range( &self, pos: u64, - buffer: BytesMut, - ) -> impl Future> + 'static { - T::read_at_into(self, pos, buffer) + len: u64, + ) -> impl Future> + 'static { + T::read_byte_range(self, pos, len) } fn performance_hint(&self) -> usize { @@ -75,45 +72,45 @@ impl VortexReadAt for Arc { } impl VortexRead for BytesMut { - async fn read_into(&mut self, buffer: BytesMut) -> io::Result { - if buffer.len() > self.len() { + async fn read_bytes(&mut self, len: u64) -> io::Result { + if (len as usize) > self.len() { Err(io::Error::new( io::ErrorKind::UnexpectedEof, vortex_err!("unexpected eof"), )) } else { - Ok(self.split_to(buffer.len())) + Ok(self.split_to(len as usize).freeze()) } } } // Implement reading for a cursor operation. impl VortexRead for Cursor { - async fn read_into(&mut self, buffer: BytesMut) -> io::Result { - let res = R::read_at_into(self.get_ref(), self.position(), buffer).await?; - self.set_position(self.position() + res.len() as u64); + async fn read_bytes(&mut self, len: u64) -> io::Result { + let res = R::read_byte_range(self.get_ref(), self.position(), len).await?; + self.set_position(self.position() + len); Ok(res) } } impl VortexReadAt for Buffer { - fn read_at_into( + fn read_byte_range( &self, pos: u64, - mut buffer: BytesMut, - ) -> impl Future> + 'static { - if buffer.len() + pos as usize > self.len() { + len: u64, + ) -> impl Future> + 'static { + if (len + pos) as usize > self.len() { future::ready(Err(io::Error::new( io::ErrorKind::UnexpectedEof, vortex_err!("unexpected eof"), ))) } else { - let buffer_len = buffer.len(); - buffer.copy_from_slice( - self.slice(pos as usize..pos as usize + buffer_len) - .as_slice(), - ); - future::ready(Ok(buffer)) + let mut buffer = BytesMut::with_capacity(len as usize); + unsafe { + buffer.set_len(len as usize); + } + buffer.copy_from_slice(self.slice(pos as usize..(pos + len) as usize).as_slice()); + future::ready(Ok(buffer.freeze())) } } @@ -121,3 +118,26 @@ impl VortexReadAt for Buffer { future::ready(self.len() as u64) } } + +impl VortexReadAt for Bytes { + fn read_byte_range( + &self, + pos: u64, + len: u64, + ) -> impl Future> + 'static { + if (pos + len) as usize > self.len() { + future::ready(Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + vortex_err!("unexpected eof"), + ))) + } else { + let sliced = self.slice(pos as usize..(pos + len) as usize); + future::ready(Ok(sliced)) + } + } + + fn size(&self) -> impl Future + 'static { + let len = self.len() as u64; + future::ready(len) + } +} diff --git a/vortex-io/src/tokio.rs b/vortex-io/src/tokio.rs index 1898893ace..f7650c953c 100644 --- a/vortex-io/src/tokio.rs +++ b/vortex-io/src/tokio.rs @@ -6,7 +6,7 @@ use std::os::unix::fs::FileExt; use std::path::Path; use std::sync::Arc; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use vortex_buffer::io_buf::IoBuf; use vortex_error::vortex_panic; @@ -17,9 +17,13 @@ use crate::{VortexRead, VortexWrite}; pub struct TokioAdapter(pub IO); impl VortexRead for TokioAdapter { - async fn read_into(&mut self, mut buffer: BytesMut) -> io::Result { + async fn read_bytes(&mut self, len: u64) -> io::Result { + let mut buffer = BytesMut::with_capacity(len as usize); + unsafe { + buffer.set_len(len as usize); + } self.0.read_exact(buffer.as_mut()).await?; - Ok(buffer) + Ok(buffer.freeze()) } } @@ -69,16 +73,19 @@ impl Deref for TokioFile { } impl VortexReadAt for TokioFile { - fn read_at_into( + fn read_byte_range( &self, pos: u64, - buffer: BytesMut, - ) -> impl Future> + 'static { + len: u64, + ) -> impl Future> + 'static { let this = self.clone(); - let mut buffer = buffer; + let mut buffer = BytesMut::with_capacity(len as usize); + unsafe { + buffer.set_len(len as usize); + } match this.read_exact_at(&mut buffer, pos) { - Ok(()) => future::ready(Ok(buffer)), + Ok(()) => future::ready(Ok(buffer.freeze())), Err(e) => future::ready(Err(e)), } } @@ -121,7 +128,6 @@ mod tests { use std::ops::Deref; use std::os::unix::fs::FileExt; - use bytes::BytesMut; use tempfile::NamedTempFile; use crate::{TokioFile, VortexReadAt}; @@ -133,14 +139,12 @@ mod tests { let shared_file = TokioFile::open(tmpfile.path()).unwrap(); - let first_half = BytesMut::zeroed(5); - let first_half = shared_file.read_at_into(0, first_half).await.unwrap(); + let first_half = shared_file.read_byte_range(0, 5).await.unwrap(); - let second_half = BytesMut::zeroed(5); - let second_half = shared_file.read_at_into(5, second_half).await.unwrap(); + let second_half = shared_file.read_byte_range(5, 5).await.unwrap(); - assert_eq!(first_half.freeze(), "01234".as_bytes()); - assert_eq!(second_half.freeze(), "56789".as_bytes()); + assert_eq!(&first_half, "01234".as_bytes()); + assert_eq!(&second_half, "56789".as_bytes()); } #[test] diff --git a/vortex-ipc/src/messages/reader.rs b/vortex-ipc/src/messages/reader.rs index 06e5d8da65..6f7b2fff35 100644 --- a/vortex-ipc/src/messages/reader.rs +++ b/vortex-ipc/src/messages/reader.rs @@ -1,14 +1,14 @@ use std::io; use std::sync::Arc; -use bytes::{Buf, Bytes, BytesMut}; +use bytes::{Buf, Bytes}; use flatbuffers::{root, root_unchecked}; use futures_util::stream::try_unfold; use vortex_array::stream::{ArrayStream, ArrayStreamAdapter}; use vortex_array::{ArrayData, Context, IntoArrayData, ViewedArrayData}; use vortex_buffer::Buffer; use vortex_dtype::DType; -use vortex_error::{vortex_bail, vortex_err, VortexResult}; +use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult}; use vortex_flatbuffers::message as fb; use vortex_io::VortexRead; @@ -17,8 +17,8 @@ pub const MESSAGE_PREFIX_LENGTH: usize = 4; /// A stateful reader of [`Message`s][fb::Message] from a stream. pub struct MessageReader { read: R, - message: BytesMut, - prev_message: BytesMut, + message: Option, + prev_message: Option, finished: bool, } @@ -26,8 +26,8 @@ impl MessageReader { pub async fn try_new(read: R) -> VortexResult { let mut reader = Self { read, - message: BytesMut::new(), - prev_message: BytesMut::new(), + message: None, + prev_message: None, finished: false, }; reader.load_next_message().await?; @@ -35,9 +35,7 @@ impl MessageReader { } async fn load_next_message(&mut self) -> VortexResult { - let mut buffer = std::mem::take(&mut self.message); - buffer.resize(MESSAGE_PREFIX_LENGTH, 0); - let mut buffer = match self.read.read_into(buffer).await { + let mut buffer = match self.read.read_bytes(MESSAGE_PREFIX_LENGTH as u64).await { Ok(b) => b, Err(e) => { return match e.kind() { @@ -55,15 +53,15 @@ impl MessageReader { vortex_bail!(InvalidSerde: "Invalid IPC stream") } - buffer.reserve(len as usize); - unsafe { buffer.set_len(len as usize) }; - self.message = self.read.read_into(buffer).await?; + let next_msg = self.read.read_bytes(len as u64).await?; // Validate that the message is valid a flatbuffer. - root::(&self.message).map_err( + root::(&next_msg).map_err( |e| vortex_err!(InvalidSerde: "Failed to parse flatbuffer message: {:?}", e), )?; + self.message = Some(next_msg); + Ok(true) } @@ -72,18 +70,28 @@ impl MessageReader { return None; } // The message has been validated by the next() call. - Some(unsafe { root_unchecked::(&self.message) }) + Some(unsafe { + root_unchecked::( + self.message + .as_ref() + .vortex_expect("MessageReader: message"), + ) + }) } async fn next(&mut self) -> VortexResult { if self.finished { vortex_bail!("Reader is finished, should've peeked!") } - self.prev_message = self.message.split(); + self.prev_message = self.message.take(); if !self.load_next_message().await? { self.finished = true; } - Ok(Buffer::from(self.prev_message.clone().freeze())) + Ok(Buffer::from( + self.prev_message + .clone() + .vortex_expect("MessageReader prev_message"), + )) } pub async fn read_dtype(&mut self) -> VortexResult { @@ -114,15 +122,14 @@ impl MessageReader { Some(chunk) => chunk.buffer_size() as usize, }; - let mut array_reader = - ArrayMessageReader::from_fb_bytes(Buffer::from(self.message.clone().freeze())); + let mut array_reader = ArrayMessageReader::from_fb_bytes(Buffer::from( + self.message.clone().vortex_expect("MessageReader: message"), + )); // Issue a single read to grab all buffers - let mut all_buffers = BytesMut::with_capacity(all_buffers_size); - unsafe { all_buffers.set_len(all_buffers_size) }; - let all_buffers = self.read.read_into(all_buffers).await?; + let all_buffers = self.read.read_bytes(all_buffers_size as u64).await?; - if array_reader.read(all_buffers.freeze())?.is_some() { + if array_reader.read(all_buffers)?.is_some() { unreachable!("This is an implementation bug") }; @@ -191,14 +198,11 @@ impl MessageReader { return Ok(None); }; - let buffer_len = page_msg.buffer_size() as usize; - let total_len = buffer_len + (page_msg.padding() as usize); + let buffer_len = page_msg.buffer_size() as u64; + let total_len = buffer_len + (page_msg.padding() as u64); - let mut buffer = BytesMut::with_capacity(total_len); - unsafe { buffer.set_len(total_len) } - buffer = self.read.read_into(buffer).await?; - buffer.truncate(buffer_len); - let page_buffer = Ok(Some(Buffer::from(buffer.freeze()))); + let buffer = self.read.read_bytes(total_len).await?; + let page_buffer = Ok(Some(Buffer::from(buffer.slice(..buffer_len as usize)))); let _ = self.next().await?; page_buffer } diff --git a/vortex-ipc/src/stream_writer/mod.rs b/vortex-ipc/src/stream_writer/mod.rs index 0b821b9ed2..9ba15db589 100644 --- a/vortex-ipc/src/stream_writer/mod.rs +++ b/vortex-ipc/src/stream_writer/mod.rs @@ -116,8 +116,8 @@ impl ByteRange { Self { begin, end } } - pub fn len(&self) -> usize { - (self.end - self.begin) as usize + pub fn len(&self) -> u64 { + self.end - self.begin } pub fn is_empty(&self) -> bool {