Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
a10y committed Nov 15, 2024
1 parent 480f22f commit 6f015a2
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 7 deletions.
3 changes: 2 additions & 1 deletion vortex-file/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,14 @@ arrow-select = { workspace = true }
rstest = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["full"] }
vortex-io = { workspace = true, features = ["tokio"] }
vortex-sampling-compressor = { path = "../vortex-sampling-compressor" }

[lints]
workspace = true

[features]
default = []
default = ["tokio"]
futures = ["futures-util/io", "vortex-io/futures"]
compio = ["dep:compio", "vortex-io/compio"]
tokio = ["dep:tokio", "vortex-io/tokio"]
Expand Down
18 changes: 12 additions & 6 deletions vortex-io/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use vortex_error::vortex_err;

/// An asynchronous streaming reader.
///
/// Implementations expose data via the asynchronous [`read_into`][VortexRead::read_into], which
/// Implementations expose data via the asynchronous [`read_bytes`][VortexRead::read_bytes], which
/// will fill the exact number of bytes and advance the stream.
///
/// If the exact number of bytes is not available from the stream, an
Expand Down Expand Up @@ -87,9 +87,8 @@ impl VortexRead for BytesMut {
// Implement reading for a cursor operation.
impl<R: VortexReadAt> VortexRead for Cursor<R> {
async fn read_bytes(&mut self, len: u64) -> io::Result<Bytes> {
let res =
R::read_byte_range(self.get_ref(), self.position(), self.position() + len).await?;
self.set_position(self.position() + res.len() as u64);
let res = R::read_byte_range(self.get_ref(), self.position(), len).await?;
self.set_position(self.position() + len);
Ok(res)
}
}
Expand Down Expand Up @@ -126,8 +125,15 @@ impl VortexReadAt for Bytes {
pos: u64,
len: u64,
) -> impl Future<Output = io::Result<Bytes>> + 'static {
let sliced = self.slice(pos as usize..(pos + len) as usize);
future::ready(Ok(sliced))
if (pos + len) as usize > self.len() {
future::ready(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
vortex_err!("unexpected eof"),
)))
} else {
let sliced = self.slice(pos as usize..(pos + len) as usize);
future::ready(Ok(sliced))
}
}

fn size(&self) -> impl Future<Output = u64> + 'static {
Expand Down

0 comments on commit 6f015a2

Please sign in to comment.