Skip to content

Commit

Permalink
feat: return Bytes from readers (#1330)
Browse files Browse the repository at this point in the history
No pooling right now. I had implemented a naive buffer pool on a
separate branch and it drastically hurt performance, so just getting
this up as a first step
  • Loading branch information
a10y authored Nov 15, 2024
1 parent 19e52fd commit b09a2dd
Show file tree
Hide file tree
Showing 13 changed files with 151 additions and 122 deletions.
2 changes: 1 addition & 1 deletion vortex-file/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ vortex-sampling-compressor = { path = "../vortex-sampling-compressor" }
workspace = true

[features]
default = []
default = ["tokio"]
futures = ["futures-util/io", "vortex-io/futures"]
compio = ["dep:compio", "vortex-io/compio"]
tokio = ["dep:tokio", "vortex-io/tokio"]
Expand Down
14 changes: 7 additions & 7 deletions vortex-file/src/chunked_reader/take_rows.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::io::Cursor;
use std::ops::Range;

use bytes::BytesMut;
use futures_util::{stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use vortex_array::aliases::hash_map::HashMap;
Expand Down Expand Up @@ -122,7 +122,7 @@ impl<R: VortexReadAt> ChunkedArrayReader<R> {
byte_range: Range<u64>,
row_range: Range<u64>,
) -> VortexResult<impl ArrayStream> {
let range_byte_len = (byte_range.end - byte_range.start) as usize;
let range_byte_len = byte_range.end - byte_range.start;

// Relativize the indices to these chunks
let indices_start =
Expand All @@ -133,15 +133,15 @@ impl<R: VortexReadAt> ChunkedArrayReader<R> {
let row_start_scalar = Scalar::from(row_range.start).cast(relative_indices.dtype())?;
let relative_indices = subtract_scalar(&relative_indices, &row_start_scalar)?;

// Set up an array reader to read this range of chunks.
let mut buffer = BytesMut::with_capacity(range_byte_len);
unsafe { buffer.set_len(range_byte_len) }
// TODO(ngates): instead of reading the whole range into a buffer, we should stream
// the byte range (e.g. if its coming from an HTTP endpoint) and wrap that with an
// MesssageReader.
let buffer = self.read.read_at_into(byte_range.start, buffer).await?;
let buffer = self
.read
.read_byte_range(byte_range.start, range_byte_len)
.await?;

let reader = StreamArrayReader::try_new(buffer, self.context.clone())
let reader = StreamArrayReader::try_new(Cursor::new(buffer), self.context.clone())
.await?
.with_dtype(self.dtype.clone());

Expand Down
5 changes: 2 additions & 3 deletions vortex-file/src/dispatcher/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ impl Dispatch for TokioDispatcher {
mod tests {
use std::io::Write;

use bytes::BytesMut;
use tempfile::NamedTempFile;
use vortex_io::{TokioFile, VortexReadAt};

Expand All @@ -133,8 +132,8 @@ mod tests {
let rx = dispatcher
.dispatch(|| async move {
let file = TokioFile::open(tmpfile.path()).unwrap();
let bytes = BytesMut::zeroed(4);
file.read_at_into(0, bytes).await.unwrap()

file.read_byte_range(0, 4).await.unwrap()
})
.unwrap();

Expand Down
10 changes: 5 additions & 5 deletions vortex-file/src/read/builder/initial_read.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use core::ops::Range;

use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use flatbuffers::{root, root_unchecked};
use vortex_error::{vortex_bail, vortex_err, VortexResult};
use vortex_flatbuffers::{footer, message};
Expand Down Expand Up @@ -91,11 +91,11 @@ pub async fn read_initial_bytes<R: VortexReadAt>(
}

let read_size = INITIAL_READ_SIZE.min(file_size as usize);
let mut buf = BytesMut::with_capacity(read_size);
unsafe { buf.set_len(read_size) }

let initial_read_offset = file_size - read_size as u64;
buf = read.read_at_into(initial_read_offset, buf).await?;
let buf = read
.read_byte_range(initial_read_offset, read_size as u64)
.await?;

let eof_loc = read_size - EOF_SIZE;
let magic_bytes_loc = eof_loc + (EOF_SIZE - MAGIC_BYTES.len());
Expand Down Expand Up @@ -168,7 +168,7 @@ pub async fn read_initial_bytes<R: VortexReadAt>(
root::<footer::Layout>(&buf[layout_loc..ps_loc])?;

Ok(InitialRead {
buf: buf.freeze(),
buf,
initial_read_offset,
fb_postscript_byte_range,
})
Expand Down
9 changes: 3 additions & 6 deletions vortex-file/src/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};

use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::Stream;
use futures_util::{stream, FutureExt, StreamExt, TryStreamExt};
Expand Down Expand Up @@ -305,14 +305,11 @@ async fn read_ranges<R: VortexReadAt>(
) -> VortexResult<Vec<Message>> {
stream::iter(ranges.into_iter())
.map(|MessageLocator(id, range)| {
let mut buf = BytesMut::with_capacity(range.len());
unsafe { buf.set_len(range.len()) }

let read_ft = reader.read_at_into(range.begin, buf);
let read_ft = reader.read_byte_range(range.begin, range.len());

read_ft.map(|result| {
result
.map(|res| Message(id, res.freeze()))
.map(|res| Message(id, res))
.map_err(VortexError::from)
})
})
Expand Down
18 changes: 10 additions & 8 deletions vortex-io/src/compio.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::future::Future;
use std::io;

use bytes::BytesMut;
use bytes::{Bytes, BytesMut};
use compio::fs::File;
use compio::io::AsyncReadAtExt;
use compio::BufResult;
Expand All @@ -10,16 +10,20 @@ use vortex_error::vortex_panic;
use super::VortexReadAt;

impl VortexReadAt for File {
fn read_at_into(
fn read_byte_range(
&self,
pos: u64,
buffer: BytesMut,
) -> impl Future<Output = io::Result<BytesMut>> + 'static {
len: u64,
) -> impl Future<Output = io::Result<Bytes>> + 'static {
let this = self.clone();
let mut buffer = BytesMut::with_capacity(len as usize);
unsafe {
buffer.set_len(len as usize);
}
async move {
// Turn the buffer into a static slice.
let BufResult(res, buffer) = this.read_exact_at(buffer, pos).await;
res.map(|_| buffer)
res.map(|_| buffer.freeze())
}
}

Expand All @@ -38,7 +42,6 @@ impl VortexReadAt for File {
mod tests {
use std::io::Write;

use bytes::BytesMut;
use compio::fs::File;
use tempfile::NamedTempFile;

Expand All @@ -54,8 +57,7 @@ mod tests {
let file = File::open(tmpfile.path()).await.unwrap();

// Use the file as a VortexReadAt instance.
let four_bytes = BytesMut::zeroed(4);
let read = file.read_at_into(2, four_bytes).await.unwrap();
let read = file.read_byte_range(2, 4).await.unwrap();
assert_eq!(&read, "2345".as_bytes());
}
}
10 changes: 7 additions & 3 deletions vortex-io/src/futures.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
use std::io;

use bytes::BytesMut;
use bytes::{Bytes, BytesMut};
use futures_util::{AsyncRead, AsyncReadExt};

use crate::VortexRead;

pub struct FuturesAdapter<IO>(pub IO);

impl<R: AsyncRead + Unpin> VortexRead for FuturesAdapter<R> {
async fn read_into(&mut self, mut buffer: BytesMut) -> io::Result<BytesMut> {
async fn read_bytes(&mut self, len: u64) -> io::Result<Bytes> {
let mut buffer = BytesMut::with_capacity(len as usize);
unsafe {
buffer.set_len(len as usize);
}
self.0.read_exact(buffer.as_mut()).await?;
Ok(buffer)
Ok(buffer.freeze())
}
}
13 changes: 6 additions & 7 deletions vortex-io/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::ops::Range;
use std::sync::Arc;
use std::{io, mem};

use bytes::BytesMut;
use bytes::Bytes;
use object_store::path::Path;
use object_store::{ObjectStore, WriteMultipart};
use vortex_buffer::io_buf::IoBuf;
Expand Down Expand Up @@ -66,21 +66,20 @@ impl ObjectStoreReadAt {
}

impl VortexReadAt for ObjectStoreReadAt {
fn read_at_into(
fn read_byte_range(
&self,
pos: u64,
mut buffer: BytesMut,
) -> impl Future<Output = io::Result<BytesMut>> + 'static {
len: u64,
) -> impl Future<Output = io::Result<Bytes>> + 'static {
let object_store = self.object_store.clone();
let location = self.location.clone();

Box::pin(async move {
let start_range = pos as usize;
let bytes = object_store
.get_range(&location, start_range..(start_range + buffer.len()))
.get_range(&location, start_range..(start_range + len as usize))
.await?;
buffer.as_mut().copy_from_slice(bytes.as_ref());
Ok(buffer)
Ok(bytes)
})
}

Expand Down
10 changes: 5 additions & 5 deletions vortex-io/src/offset.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::future::Future;

use bytes::BytesMut;
use bytes::Bytes;
use futures::FutureExt;

use crate::VortexReadAt;
Expand Down Expand Up @@ -30,12 +30,12 @@ impl<R: VortexReadAt> OffsetReadAt<R> {
}

impl<R: VortexReadAt> VortexReadAt for OffsetReadAt<R> {
fn read_at_into(
fn read_byte_range(
&self,
pos: u64,
buffer: BytesMut,
) -> impl Future<Output = std::io::Result<BytesMut>> + 'static {
self.read.read_at_into(pos + self.offset, buffer)
len: u64,
) -> impl Future<Output = std::io::Result<Bytes>> + 'static {
self.read.read_byte_range(pos + self.offset, len)
}

fn performance_hint(&self) -> usize {
Expand Down
Loading

0 comments on commit b09a2dd

Please sign in to comment.