Skip to content

Commit

Permalink
preserveorder
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 committed Nov 24, 2024
1 parent 4c2cd6f commit 4ffd42b
Showing 1 changed file with 33 additions and 20 deletions.
53 changes: 33 additions & 20 deletions vortex-file/src/read/coalescer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#![allow(dead_code)]

use std::collections::VecDeque;
use std::io;
use std::io::ErrorKind;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::task::{ready, Context, Poll, Waker};
use std::{io, mem};

use futures::Stream;
use futures_util::future::BoxFuture;
Expand Down Expand Up @@ -39,11 +39,15 @@ impl RowMaskReader<ArrayData> for LayoutMaskReader {
}
}

enum RowMaskState<V> {
Pending(RowMask),
Ready(V),
}

pub struct Coalescer<R, S, V, RM> {
values_iter: S,
in_flight: Option<BoxFuture<'static, io::Result<Vec<Message>>>>,
ready: VecDeque<V>,
pending: VecDeque<RowMask>,
queued: VecDeque<RowMaskState<V>>,
io_read: VortexReadRanges<R>,
dispatcher: Arc<IoDispatcher>,
row_mask_reader: RM,
Expand All @@ -66,8 +70,7 @@ where
Self {
values_iter,
in_flight: None,
ready: VecDeque::new(),
pending: VecDeque::new(),
queued: VecDeque::new(),
io_read: VortexReadRanges::new(read, dispatcher.clone(), 1 << 20),
dispatcher,
row_mask_reader,
Expand All @@ -92,19 +95,23 @@ where
let mut to_read = Vec::with_capacity(NUM_TO_COALESCE);
let mut read_more_count = 0;

// TODO(robert): Preserve relative order of masks, if a mask requires reading X messages it shouldn't be returned after later mask that requires only X-1 messages
for pending_mask in mem::take(&mut self.pending) {
if let Some(pending_read) = self.row_mask_reader.read_mask(&pending_mask)? {
match pending_read {
MessageRead::ReadMore(m) => {
self.pending.push_front(pending_mask);
to_read.extend(m);
read_more_count += 1;
}
MessageRead::Value(v) => {
self.ready.push_back(v);
// Poll all queued pending masks to see if we can make progress
for queued_res in self.queued.iter_mut() {
match queued_res {
RowMaskState::Pending(pending_mask) => {
if let Some(pending_read) = self.row_mask_reader.read_mask(pending_mask)? {
match pending_read {
MessageRead::ReadMore(m) => {
to_read.extend(m);
read_more_count += 1;
}
MessageRead::Value(v) => {
*queued_res = RowMaskState::Ready(v);
}
}
}
}
RowMaskState::Ready(_) => {}
}
}

Expand All @@ -114,12 +121,12 @@ where
if let Some(read_result) = self.row_mask_reader.read_mask(&next_mask)? {
match read_result {
MessageRead::ReadMore(m) => {
self.pending.push_back(next_mask);
self.queued.push_back(RowMaskState::Pending(next_mask));
to_read.extend(m);
read_more_count += 1;
}
MessageRead::Value(v) => {
self.ready.push_back(v);
self.queued.push_back(RowMaskState::Ready(v));
}
}
}
Expand Down Expand Up @@ -191,8 +198,14 @@ where
}
}

if let Some(next_ready) = self.ready.pop_front() {
Poll::Ready(Some(Ok(next_ready)))
if let Some(next_ready) = self.queued.pop_front() {
match next_ready {
RowMaskState::Pending(m) => {
self.queued.push_front(RowMaskState::Pending(m));
Poll::Pending
}
RowMaskState::Ready(next_ready) => Poll::Ready(Some(Ok(next_ready))),
}
} else if self.in_flight.is_some() {
Poll::Pending
} else {
Expand Down

0 comments on commit 4ffd42b

Please sign in to comment.