Skip to content

Commit

Permalink
refactor: resolve weirdness with using buf_start to track end
Browse files Browse the repository at this point in the history
we don't have to do this anymore now that we properly track state
  • Loading branch information
rklaehn committed Jan 28, 2023
1 parent 09ecead commit d44f5c7
Showing 1 changed file with 18 additions and 26 deletions.
44 changes: 18 additions & 26 deletions src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,21 +493,17 @@ mod tokio_io {
NextRead::Header => {
// ensure reading state, reading 8 bytes
// we might already be in the reading state,
// so we must not set buf_start to 0
self.buf_end = 8;
// header comes from outboard if we have one, otherwise from input
ready!(self.poll_fill_buffer_from_input_or_outboard(cx))?;
ready!(self.poll_fill_buffer_from_input_or_outboard(8, cx))?;
self.state.feed_header(self.buf[0..8].try_into().unwrap());
// we don't want to write the header, so we are done with the buffer contents
self.clear_buf();
}
NextRead::Parent => {
// ensure reading state, reading 64 bytes
// we might already be in the reading state,
// so we must not set buf_start to 0
self.buf_end = 64;
// parent comes from outboard if we have one, otherwise from input
ready!(self.poll_fill_buffer_from_input_or_outboard(cx))?;
ready!(self.poll_fill_buffer_from_input_or_outboard(64, cx))?;
self.state
.feed_parent(&self.buf[0..64].try_into().unwrap())?;
// we don't want to write the parent, so we are done with the buffer contents
Expand All @@ -524,9 +520,8 @@ mod tokio_io {
// ensure reading state, reading size bytes
// we might already be in the reading state,
// so we must not set buf_start to 0
self.buf_end = size;
// chunk never comes from outboard
ready!(self.poll_fill_buffer_from_input(cx))?;
ready!(self.poll_fill_buffer_from_input(size, cx))?;

// Hash it and push its hash into the VerifyState. This
// returns an error if the hash is bad. Otherwise, the
Expand Down Expand Up @@ -564,10 +559,8 @@ mod tokio_io {
NextRead::Header => {
// ensure reading state, reading 8 bytes
// we might already be in the reading state,
// so we must not set buf_start to 0
self.buf_end = 8;
// header comes from outboard if we have one, otherwise from input
ready!(self.poll_fill_buffer_from_input_or_outboard(cx))?;
ready!(self.poll_fill_buffer_from_input_or_outboard(8, cx))?;
self.state.feed_header(self.buf[0..8].try_into().unwrap());
// we don't want to write the header, so we are done with the buffer contents
self.clear_buf();
Expand All @@ -577,10 +570,8 @@ mod tokio_io {
NextRead::Parent => {
// ensure reading state, reading 64 bytes
// we might already be in the reading state,
// so we must not set buf_start to 0
self.buf_end = 64;
// parent comes from outboard if we have one, otherwise from input
ready!(self.poll_fill_buffer_from_input_or_outboard(cx))?;
ready!(self.poll_fill_buffer_from_input_or_outboard(64, cx))?;
self.state
.feed_parent(&self.buf[0..64].try_into().unwrap())?;
// we don't want to write the parent, so we are done with the buffer contents
Expand All @@ -596,10 +587,8 @@ mod tokio_io {
} => {
// ensure reading state, reading size bytes
// we might already be in the reading state,
// so we must not set buf_start to 0
self.buf_end = size;
// chunk never comes from outboard
ready!(self.poll_fill_buffer_from_input(cx))?;
ready!(self.poll_fill_buffer_from_input(size, cx))?;

// Hash it and push its hash into the VerifyState. This
// returns an error if the hash is bad. Otherwise, the
Expand All @@ -619,40 +608,43 @@ mod tokio_io {

fn poll_fill_buffer_from_input(
&mut self,
size: usize,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
let mut buf = ReadBuf::new(&mut self.buf[..self.buf_end]);
buf.advance(self.buf_start);
let mut buf = ReadBuf::new(&mut self.buf[..size]);
buf.advance(self.buf_end);
let src = &mut self.input;
while buf.remaining() > 0 {
ready!(AsyncRead::poll_read(Pin::new(src), cx, &mut buf))?;
self.buf_start = buf.filled().len();
self.buf_end = buf.filled().len();
}
Poll::Ready(Ok(()))
}

fn poll_fill_buffer_from_outboard(
&mut self,
size: usize,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
let mut buf = ReadBuf::new(&mut self.buf[..self.buf_end]);
buf.advance(self.buf_start);
let mut buf = ReadBuf::new(&mut self.buf[..size]);
buf.advance(self.buf_end);
let src = self.outboard.as_mut().unwrap();
while buf.remaining() > 0 {
ready!(AsyncRead::poll_read(Pin::new(src), cx, &mut buf))?;
self.buf_start = buf.filled().len();
self.buf_end = buf.filled().len();
}
Poll::Ready(Ok(()))
}

fn poll_fill_buffer_from_input_or_outboard(
&mut self,
size: usize,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
if self.outboard.is_some() {
self.poll_fill_buffer_from_outboard(cx)
self.poll_fill_buffer_from_outboard(size, cx)
} else {
self.poll_fill_buffer_from_input(cx)
self.poll_fill_buffer_from_input(size, cx)
}
}
}
Expand Down Expand Up @@ -1031,7 +1023,7 @@ mod tokio_io {
}

#[cfg(feature = "tokio_io")]
pub use tokio_io::AsyncDecoder;
pub use tokio_io::{AsyncDecoder, AsyncSliceDecoder};

/// An incremental decoder, which reads and verifies the output of
/// [`Encoder`](../encode/struct.Encoder.html).
Expand Down

0 comments on commit d44f5c7

Please sign in to comment.