Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: return Bytes from readers #1330

Merged
merged 2 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion vortex-file/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ vortex-sampling-compressor = { path = "../vortex-sampling-compressor" }
workspace = true

[features]
default = []
default = ["tokio"]
futures = ["futures-util/io", "vortex-io/futures"]
compio = ["dep:compio", "vortex-io/compio"]
tokio = ["dep:tokio", "vortex-io/tokio"]
Expand Down
14 changes: 7 additions & 7 deletions vortex-file/src/chunked_reader/take_rows.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::io::Cursor;
use std::ops::Range;

use bytes::BytesMut;
use futures_util::{stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use vortex_array::aliases::hash_map::HashMap;
Expand Down Expand Up @@ -122,7 +122,7 @@ impl<R: VortexReadAt> ChunkedArrayReader<R> {
byte_range: Range<u64>,
row_range: Range<u64>,
) -> VortexResult<impl ArrayStream> {
let range_byte_len = (byte_range.end - byte_range.start) as usize;
let range_byte_len = byte_range.end - byte_range.start;

// Relativize the indices to these chunks
let indices_start =
Expand All @@ -133,15 +133,15 @@ impl<R: VortexReadAt> ChunkedArrayReader<R> {
let row_start_scalar = Scalar::from(row_range.start).cast(relative_indices.dtype())?;
let relative_indices = subtract_scalar(&relative_indices, &row_start_scalar)?;

// Set up an array reader to read this range of chunks.
let mut buffer = BytesMut::with_capacity(range_byte_len);
unsafe { buffer.set_len(range_byte_len) }
// TODO(ngates): instead of reading the whole range into a buffer, we should stream
// the byte range (e.g. if its coming from an HTTP endpoint) and wrap that with an
// MesssageReader.
let buffer = self.read.read_at_into(byte_range.start, buffer).await?;
let buffer = self
.read
.read_byte_range(byte_range.start, range_byte_len)
.await?;

let reader = StreamArrayReader::try_new(buffer, self.context.clone())
let reader = StreamArrayReader::try_new(Cursor::new(buffer), self.context.clone())
.await?
.with_dtype(self.dtype.clone());

Expand Down
5 changes: 2 additions & 3 deletions vortex-file/src/dispatcher/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ impl Dispatch for TokioDispatcher {
mod tests {
use std::io::Write;

use bytes::BytesMut;
use tempfile::NamedTempFile;
use vortex_io::{TokioFile, VortexReadAt};

Expand All @@ -133,8 +132,8 @@ mod tests {
let rx = dispatcher
.dispatch(|| async move {
let file = TokioFile::open(tmpfile.path()).unwrap();
let bytes = BytesMut::zeroed(4);
file.read_at_into(0, bytes).await.unwrap()

file.read_byte_range(0, 4).await.unwrap()
})
.unwrap();

Expand Down
10 changes: 5 additions & 5 deletions vortex-file/src/read/builder/initial_read.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use core::ops::Range;

use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use flatbuffers::{root, root_unchecked};
use vortex_error::{vortex_bail, vortex_err, VortexResult};
use vortex_flatbuffers::{footer, message};
Expand Down Expand Up @@ -91,11 +91,11 @@ pub async fn read_initial_bytes<R: VortexReadAt>(
}

let read_size = INITIAL_READ_SIZE.min(file_size as usize);
let mut buf = BytesMut::with_capacity(read_size);
unsafe { buf.set_len(read_size) }

let initial_read_offset = file_size - read_size as u64;
buf = read.read_at_into(initial_read_offset, buf).await?;
let buf = read
.read_byte_range(initial_read_offset, read_size as u64)
.await?;

let eof_loc = read_size - EOF_SIZE;
let magic_bytes_loc = eof_loc + (EOF_SIZE - MAGIC_BYTES.len());
Expand Down Expand Up @@ -168,7 +168,7 @@ pub async fn read_initial_bytes<R: VortexReadAt>(
root::<footer::Layout>(&buf[layout_loc..ps_loc])?;

Ok(InitialRead {
buf: buf.freeze(),
buf,
initial_read_offset,
fb_postscript_byte_range,
})
Expand Down
9 changes: 3 additions & 6 deletions vortex-file/src/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};

use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::Stream;
use futures_util::{stream, FutureExt, StreamExt, TryStreamExt};
Expand Down Expand Up @@ -305,14 +305,11 @@ async fn read_ranges<R: VortexReadAt>(
) -> VortexResult<Vec<Message>> {
stream::iter(ranges.into_iter())
.map(|MessageLocator(id, range)| {
let mut buf = BytesMut::with_capacity(range.len());
unsafe { buf.set_len(range.len()) }

let read_ft = reader.read_at_into(range.begin, buf);
let read_ft = reader.read_byte_range(range.begin, range.len());

read_ft.map(|result| {
result
.map(|res| Message(id, res.freeze()))
.map(|res| Message(id, res))
.map_err(VortexError::from)
})
})
Expand Down
18 changes: 10 additions & 8 deletions vortex-io/src/compio.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::future::Future;
use std::io;

use bytes::BytesMut;
use bytes::{Bytes, BytesMut};
use compio::fs::File;
use compio::io::AsyncReadAtExt;
use compio::BufResult;
Expand All @@ -10,16 +10,20 @@ use vortex_error::vortex_panic;
use super::VortexReadAt;

impl VortexReadAt for File {
fn read_at_into(
fn read_byte_range(
&self,
pos: u64,
buffer: BytesMut,
) -> impl Future<Output = io::Result<BytesMut>> + 'static {
len: u64,
) -> impl Future<Output = io::Result<Bytes>> + 'static {
let this = self.clone();
let mut buffer = BytesMut::with_capacity(len as usize);
unsafe {
buffer.set_len(len as usize);
}
async move {
// Turn the buffer into a static slice.
let BufResult(res, buffer) = this.read_exact_at(buffer, pos).await;
res.map(|_| buffer)
res.map(|_| buffer.freeze())
}
}

Expand All @@ -38,7 +42,6 @@ impl VortexReadAt for File {
mod tests {
use std::io::Write;

use bytes::BytesMut;
use compio::fs::File;
use tempfile::NamedTempFile;

Expand All @@ -54,8 +57,7 @@ mod tests {
let file = File::open(tmpfile.path()).await.unwrap();

// Use the file as a VortexReadAt instance.
let four_bytes = BytesMut::zeroed(4);
let read = file.read_at_into(2, four_bytes).await.unwrap();
let read = file.read_byte_range(2, 4).await.unwrap();
assert_eq!(&read, "2345".as_bytes());
}
}
10 changes: 7 additions & 3 deletions vortex-io/src/futures.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
use std::io;

use bytes::BytesMut;
use bytes::{Bytes, BytesMut};
use futures_util::{AsyncRead, AsyncReadExt};

use crate::VortexRead;

pub struct FuturesAdapter<IO>(pub IO);

impl<R: AsyncRead + Unpin> VortexRead for FuturesAdapter<R> {
async fn read_into(&mut self, mut buffer: BytesMut) -> io::Result<BytesMut> {
async fn read_bytes(&mut self, len: u64) -> io::Result<Bytes> {
let mut buffer = BytesMut::with_capacity(len as usize);
unsafe {
buffer.set_len(len as usize);
}
self.0.read_exact(buffer.as_mut()).await?;
Ok(buffer)
Ok(buffer.freeze())
}
}
13 changes: 6 additions & 7 deletions vortex-io/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::ops::Range;
use std::sync::Arc;
use std::{io, mem};

use bytes::BytesMut;
use bytes::Bytes;
use object_store::path::Path;
use object_store::{ObjectStore, WriteMultipart};
use vortex_buffer::io_buf::IoBuf;
Expand Down Expand Up @@ -66,21 +66,20 @@ impl ObjectStoreReadAt {
}

impl VortexReadAt for ObjectStoreReadAt {
fn read_at_into(
fn read_byte_range(
&self,
pos: u64,
mut buffer: BytesMut,
) -> impl Future<Output = io::Result<BytesMut>> + 'static {
len: u64,
) -> impl Future<Output = io::Result<Bytes>> + 'static {
let object_store = self.object_store.clone();
let location = self.location.clone();

Box::pin(async move {
let start_range = pos as usize;
let bytes = object_store
.get_range(&location, start_range..(start_range + buffer.len()))
.get_range(&location, start_range..(start_range + len as usize))
.await?;
buffer.as_mut().copy_from_slice(bytes.as_ref());
Ok(buffer)
Ok(bytes)
})
}

Expand Down
10 changes: 5 additions & 5 deletions vortex-io/src/offset.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::future::Future;

use bytes::BytesMut;
use bytes::Bytes;
use futures::FutureExt;

use crate::VortexReadAt;
Expand Down Expand Up @@ -30,12 +30,12 @@ impl<R: VortexReadAt> OffsetReadAt<R> {
}

impl<R: VortexReadAt> VortexReadAt for OffsetReadAt<R> {
fn read_at_into(
fn read_byte_range(
&self,
pos: u64,
buffer: BytesMut,
) -> impl Future<Output = std::io::Result<BytesMut>> + 'static {
self.read.read_at_into(pos + self.offset, buffer)
len: u64,
) -> impl Future<Output = std::io::Result<Bytes>> + 'static {
self.read.read_byte_range(pos + self.offset, len)
}

fn performance_hint(&self) -> usize {
Expand Down
Loading
Loading