Skip to content

Commit

Permalink
Queued read
Browse files Browse the repository at this point in the history
 Please enter the commit message for your changes. Lines starting
  • Loading branch information
robert3005 committed Nov 27, 2024
1 parent 630835b commit 30a795e
Show file tree
Hide file tree
Showing 17 changed files with 559 additions and 475 deletions.
6 changes: 3 additions & 3 deletions vortex-file/src/read/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub struct VortexReadBuilder<R> {
io_dispatcher: Option<Arc<IoDispatcher>>,
}

impl<R: VortexReadAt> VortexReadBuilder<R> {
impl<R: VortexReadAt + Unpin> VortexReadBuilder<R> {
pub fn new(read_at: R, layout_serde: LayoutDeserializer) -> Self {
Self {
read_at,
Expand Down Expand Up @@ -167,7 +167,7 @@ impl<R: VortexReadAt> VortexReadBuilder<R> {
// Default: fallback to single-threaded tokio dispatcher.
let io_dispatcher = self.io_dispatcher.unwrap_or_default();

Ok(VortexFileArrayStream::new(
VortexFileArrayStream::try_new(
self.read_at,
layout_reader,
filter_reader,
Expand All @@ -176,7 +176,7 @@ impl<R: VortexReadAt> VortexReadBuilder<R> {
row_count,
row_mask,
io_dispatcher,
))
)
}

async fn size(&self) -> VortexResult<u64> {
Expand Down
232 changes: 232 additions & 0 deletions vortex-file/src/read/coalescer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
use std::collections::VecDeque;
use std::io;
use std::io::ErrorKind;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll, Waker};

use futures::Stream;
use futures_util::future::BoxFuture;
use futures_util::{FutureExt, StreamExt};
use vortex_array::ArrayData;
use vortex_error::{vortex_err, vortex_panic, VortexExpect, VortexResult};
use vortex_io::{Dispatch, IoDispatcher, VortexReadAt, VortexReadRanges};

use crate::{LayoutMessageCache, LayoutReader, Message, MessageLocator, MessageRead, RowMask};

const NUM_TO_COALESCE: usize = 8;

pub trait RowMaskReader<V> {
fn read_mask(&self, mask: &RowMask) -> VortexResult<Option<MessageRead<V>>>;
}

pub struct MaskLayoutReader {
layout: Box<dyn LayoutReader>,
}

impl MaskLayoutReader {
pub fn new(layout: Box<dyn LayoutReader>) -> Self {
Self { layout }
}
}

impl RowMaskReader<ArrayData> for MaskLayoutReader {
/// Read given mask out of the reader
fn read_mask(&self, mask: &RowMask) -> VortexResult<Option<MessageRead<ArrayData>>> {
self.layout.read_selection(mask)
}
}

enum RowMaskState<V> {
Pending(RowMask),
Ready(V),
Empty,
}

pub struct Coalescer<R, S, V, RM> {
values_iter: S,
in_flight: Option<BoxFuture<'static, io::Result<Vec<Message>>>>,
queued: VecDeque<RowMaskState<V>>,
io_read: VortexReadRanges<R>,
dispatcher: Arc<IoDispatcher>,
row_mask_reader: RM,
cache: Arc<RwLock<LayoutMessageCache>>,
}

impl<R, S, V, RM> Coalescer<R, S, V, RM>
where
R: VortexReadAt,
S: Stream<Item = VortexResult<RowMask>> + Unpin,
RM: RowMaskReader<V>,
{
pub fn new(
read: R,
dispatcher: Arc<IoDispatcher>,
values_iter: S,
row_mask_reader: RM,
cache: Arc<RwLock<LayoutMessageCache>>,
) -> Self {
Self {
values_iter,
in_flight: None,
queued: VecDeque::new(),
io_read: VortexReadRanges::new(read, dispatcher.clone(), 1 << 20),
dispatcher,
row_mask_reader,
cache,
}
}

fn store_messages(&self, messages: Vec<Message>) {
let mut write_cache_guard = self
.cache
.write()
.unwrap_or_else(|poison| vortex_panic!("Failed to write to message cache: {poison}"));
for Message(message_id, buf) in messages {
write_cache_guard.set(message_id, buf);
}
}

fn gather_read_messages(
&mut self,
cx: &mut Context<'_>,
) -> VortexResult<(Vec<MessageLocator>, bool)> {
let mut to_read = Vec::with_capacity(NUM_TO_COALESCE);
let mut read_more_count = 0;

// Poll all queued pending masks to see if we can make progress
for queued_res in self.queued.iter_mut() {
match queued_res {
RowMaskState::Pending(pending_mask) => {
if let Some(pending_read) = self.row_mask_reader.read_mask(pending_mask)? {
match pending_read {
MessageRead::ReadMore(m) => {
to_read.extend(m);
read_more_count += 1;
}
MessageRead::Value(v) => {
*queued_res = RowMaskState::Ready(v);
}
}
} else {
*queued_res = RowMaskState::Empty;
}
}
RowMaskState::Ready(_) => {}
RowMaskState::Empty => {}
}
}

let mut exhausted = false;
while read_more_count < NUM_TO_COALESCE {
match self.values_iter.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(next_mask))) => {
if let Some(read_result) = self.row_mask_reader.read_mask(&next_mask)? {
match read_result {
MessageRead::ReadMore(m) => {
self.queued.push_back(RowMaskState::Pending(next_mask));
to_read.extend(m);
read_more_count += 1;
}
MessageRead::Value(v) => {
self.queued.push_back(RowMaskState::Ready(v));
}
}
}
}
Poll::Ready(Some(Err(e))) => {
return Err(e);
}
Poll::Ready(None) => {
exhausted = true;
break;
}
Poll::Pending => {
break;
}
}
}
Ok((to_read, exhausted))
}

fn dispatch_messages(
&self,
messages: Vec<MessageLocator>,
waker: Waker,
) -> BoxFuture<'static, io::Result<Vec<Message>>> {
let reader = self.io_read.clone();
self.dispatcher
.dispatch(move || async move {
let read_messages = reader
.read_byte_ranges(messages.iter().map(|msg| msg.1.to_range()).collect())
.map(move |read_res| {
Ok(messages
.into_iter()
.map(|loc| loc.0)
.zip(read_res?)
.map(|(loc, bytes)| Message(loc, bytes))
.collect())
})
.await;
waker.wake();
read_messages
})
.vortex_expect("Async task dispatch")
.map(|res| res.unwrap_or_else(|e| Err(io::Error::new(ErrorKind::Other, e))))
.boxed()
}
}

impl<R, S, V, RM> Stream for Coalescer<R, S, V, RM>
where
R: VortexReadAt + Unpin,
S: Stream<Item = VortexResult<RowMask>> + Unpin,
RM: RowMaskReader<V> + Unpin,
V: Unpin,
{
type Item = VortexResult<V>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let exhausted = if let Some(in_flight) = &mut self.in_flight {
match in_flight.poll_unpin(cx) {
Poll::Ready(msgs) => {
self.store_messages(
msgs.map_err(|e| vortex_err!("Cancelled in flight read {e}"))?,
);
let (messages, exhausted) = self.gather_read_messages(cx)?;
if !messages.is_empty() {
self.in_flight = Some(self.dispatch_messages(messages, cx.waker().clone()));
} else {
self.in_flight = None;
}
exhausted
}
// If read is pending see if we have any available results
Poll::Pending => false,
}
} else {
let (messages, exhausted) = self.gather_read_messages(cx)?;
if !messages.is_empty() {
self.in_flight = Some(self.dispatch_messages(messages, cx.waker().clone()));
}
exhausted
};

while let Some(next_mask) = self.queued.pop_front() {
match next_mask {
RowMaskState::Pending(m) => {
self.queued.push_front(RowMaskState::Pending(m));
return Poll::Pending;
}
RowMaskState::Ready(next_ready) => return Poll::Ready(Some(Ok(next_ready))),
RowMaskState::Empty => continue,
}
}

if exhausted {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}
22 changes: 12 additions & 10 deletions vortex-file/src/read/layouts/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl ChunkedLayoutReader {
BatchRead::ReadMore(m) => {
messages_to_fetch.extend(m);
}
BatchRead::Batch(a) => {
BatchRead::Value(a) => {
*array_slot = ChildRead::Finished(Some(a));
}
}
Expand Down Expand Up @@ -266,10 +266,10 @@ impl LayoutReader for ChunkedLayoutReader {
.filter_map(ChildRead::into_value)
.collect::<Vec<_>>();
match child_arrays.len() {
0 | 1 => Ok(child_arrays.pop().map(BatchRead::Batch)),
0 | 1 => Ok(child_arrays.pop().map(BatchRead::Value)),
_ => {
let dtype = child_arrays[0].dtype().clone();
Ok(Some(BatchRead::Batch(
Ok(Some(BatchRead::Value(
ChunkedArray::try_new(child_arrays, dtype)?.into_array(),
)))
}
Expand All @@ -279,24 +279,26 @@ impl LayoutReader for ChunkedLayoutReader {
}
}

fn read_metadata(&self) -> VortexResult<MetadataRead> {
fn read_metadata(&self) -> VortexResult<Option<MetadataRead>> {
match self.metadata_layout() {
None => Ok(MetadataRead::None),
None => Ok(None),
Some(metadata_layout) => {
if let Some(md) = self.cached_metadata.get() {
return Ok(MetadataRead::Batches(vec![Some(md.clone())]));
return Ok(Some(MetadataRead::Value(vec![Some(md.clone())])));
}

match metadata_layout
.read_selection(&RowMask::new_valid_between(0, self.n_chunks()))?
{
Some(BatchRead::Batch(array)) => {
Some(BatchRead::Value(array)) => {
// We don't care if the write failed
_ = self.cached_metadata.set(array.clone());
Ok(MetadataRead::Batches(vec![Some(array)]))
Ok(Some(MetadataRead::Value(vec![Some(array)])))
}
Some(BatchRead::ReadMore(messages)) => Ok(MetadataRead::ReadMore(messages)),
None => Ok(MetadataRead::None),
Some(BatchRead::ReadMore(messages)) => {
Ok(Some(MetadataRead::ReadMore(messages)))
}
None => Ok(None),
}
}
}
Expand Down
33 changes: 17 additions & 16 deletions vortex-file/src/read/layouts/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ impl LayoutReader for ColumnarLayoutReader {
BatchRead::ReadMore(message) => {
messages.extend(message);
}
BatchRead::Batch(arr) => {
BatchRead::Value(arr) => {
if self.shortcircuit_siblings
&& arr.statistics().compute_true_count().vortex_expect(
"must be a bool array if shortcircuit_siblings is set to true",
Expand Down Expand Up @@ -299,34 +299,35 @@ impl LayoutReader for ColumnarLayoutReader {
.as_ref()
.map(|e| e.evaluate(&array))
.unwrap_or_else(|| Ok(array))
.map(BatchRead::Batch)
.map(BatchRead::Value)
.map(Some)
} else {
Ok(Some(BatchRead::ReadMore(messages)))
}
}

fn read_metadata(&self) -> VortexResult<MetadataRead> {
fn read_metadata(&self) -> VortexResult<Option<MetadataRead>> {
let mut in_progress_metadata = self
.in_progress_metadata
.write()
.unwrap_or_else(|e| vortex_panic!("lock is poisoned: {e}"));
let mut messages = Vec::default();

for (name, child_reader) in self.names.iter().zip(self.children.iter()) {
match child_reader.read_metadata()? {
MetadataRead::Batches(data) => {
if data.len() != 1 {
vortex_bail!("expected exactly one metadata array per-child");
if let Some(child_metadata) = child_reader.read_metadata()? {
match child_metadata {
MetadataRead::Value(data) => {
if data.len() != 1 {
vortex_bail!("expected exactly one metadata array per-child");
}
in_progress_metadata.insert(name.clone(), data[0].clone());
}
MetadataRead::ReadMore(rm) => {
messages.extend(rm);
}
in_progress_metadata.insert(name.clone(), data[0].clone());
}
MetadataRead::ReadMore(rm) => {
messages.extend(rm);
}
MetadataRead::None => {
in_progress_metadata.insert(name.clone(), None);
}
} else {
in_progress_metadata.insert(name.clone(), None);
}
}

Expand All @@ -338,9 +339,9 @@ impl LayoutReader for ColumnarLayoutReader {
.map(|name| in_progress_metadata[name].clone()) // TODO(Adam): Some columns might not have statistics
.collect::<Vec<_>>();

Ok(MetadataRead::Batches(child_arrays))
Ok(Some(MetadataRead::Value(child_arrays)))
} else {
Ok(MetadataRead::ReadMore(messages))
Ok(Some(MetadataRead::ReadMore(messages)))
}
}
}
Expand Down
Loading

0 comments on commit 30a795e

Please sign in to comment.