Skip to content

Commit

Permalink
WIP seek support for decode
Browse files Browse the repository at this point in the history
not tested at all
  • Loading branch information
rklaehn committed Jan 28, 2023
1 parent d44f5c7 commit 1639453
Showing 1 changed file with 150 additions and 5 deletions.
155 changes: 150 additions & 5 deletions src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,15 +459,19 @@ impl<T, O> fmt::Debug for DecoderShared<T, O> {
#[cfg(feature = "tokio_io")]
mod tokio_io {

use super::{DecoderShared, Hash, NextRead};
use crate::encode;

use super::{add_offset, DecoderShared, Hash, NextRead};
use std::{
cmp,
convert::TryInto,
io,
future::{poll_fn, Future},
io::{self, SeekFrom},
pin::Pin,
process::Output,
task::{ready, Context, Poll},
};
use tokio::io::{AsyncRead, ReadBuf};
use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt, ReadBuf};

// tokio flavour async io utilities, requiing AsyncRead
impl<T: AsyncRead + Unpin, O: AsyncRead + Unpin> DecoderShared<T, O> {
Expand Down Expand Up @@ -649,12 +653,95 @@ mod tokio_io {
}
}

#[derive(Debug)]
impl<T: AsyncRead + AsyncSeek + Unpin, O: AsyncRead + AsyncSeek + Unpin> DecoderShared<T, O> {
// The Decoder will call this as part of seeking, but note that the
// SliceDecoder won't, because all the seek bookkeeping has already been
// taken care of during slice extraction.
async fn handle_seek_bookkeeping_fut(
&mut self,
bookkeeping: encode::SeekBookkeeping,
) -> io::Result<NextRead> {
// The VerifyState handles all the subtree stack management. We just
// need to handle the underlying seek. This is done differently
// depending on whether the encoding is combined or outboard.
if let Some(outboard) = &mut self.outboard {
if let Some((content_pos, outboard_pos)) = bookkeeping.underlying_seek_outboard() {
// As with Decoder in the outboard case, the outboard extractor has to seek both of
// its inner readers. The content position of the state goes into the content
// reader, and the rest of the reported seek offset goes into the outboard reader.
self.input.seek(SeekFrom::Start(content_pos)).await?;
outboard.seek(SeekFrom::Start(outboard_pos)).await?;
}
} else if let Some(encoding_position) = bookkeeping.underlying_seek() {
let position_u64: u64 = encode::cast_offset(encoding_position)?;
self.input.seek(SeekFrom::Start(position_u64)).await?;
}
let next = self.state.seek_bookkeeping_done(bookkeeping);
Ok(next)
}

fn handle_seek_read_fut(
&mut self,
next: NextRead,
) -> impl Future<Output = io::Result<bool>> + '_ {
poll_fn(move |cx| self.poll_handle_seek_read(next, cx))
}

async fn seek_fut(self: Box<Self>, pos: SeekFrom) -> (Box<Self>, io::Result<u64>) {
let mut this = self;
let result = this.seek_fut_inner(pos).await;
(this, result)
}

async fn seek_fut_inner(&mut self, pos: SeekFrom) -> io::Result<u64> {
// Clear the internal buffer when seeking. The buffered bytes won't be
// valid reads at the new offset.
self.clear_buf();

// Get the absolute seek offset. If the caller passed in
// SeekFrom::Start, that's what we've got. If not, we need to compute
// it.
let seek_to = match pos {
SeekFrom::Start(offset) => offset,
SeekFrom::End(offset) => {
// To seek from the end we have to get the length, and that may
// require as a seek loop of its own to verify the length.
let content_len = loop {
match self.state.len_next() {
encode::LenNext::Seek(bookkeeping) => {
let next_read =
self.handle_seek_bookkeeping_fut(bookkeeping).await?;
let done = self.handle_seek_read_fut(next_read).await?;
debug_assert!(!done);
}
encode::LenNext::Len(len) => break len,
}
};
add_offset(content_len, offset)?
}
SeekFrom::Current(offset) => add_offset(self.adjusted_content_position(), offset)?,
};

// Now with the absolute seek offset, we perform the real (possibly
// second) seek loop.
loop {
let bookkeeping = self.state.seek_next(seek_to);
let next_read = self.handle_seek_bookkeeping_fut(bookkeeping).await?;
let done = self.handle_seek_read_fut(next_read).await?;
if done {
return Ok(seek_to);
}
}
}
}

enum DecoderState<T: AsyncRead + Unpin, O: AsyncRead + Unpin> {
/// we are reading from the underlying reader
Reading(Box<DecoderShared<T, O>>),
/// we are being polled for output
Output(Box<DecoderShared<T, O>>),
/// we are currently seeking
Seeking(Pin<Box<dyn Future<Output = (Box<DecoderShared<T, O>>, io::Result<u64>)>>>),
/// we are done
Done,
}
Expand All @@ -665,7 +752,6 @@ mod tokio_io {
}
}

#[derive(Debug)]
pub struct AsyncDecoder<T: AsyncRead + Unpin, O: AsyncRead + Unpin>(DecoderState<T, O>);

impl<T: AsyncRead + Unpin> AsyncDecoder<T, T> {
Expand Down Expand Up @@ -731,6 +817,9 @@ mod tokio_io {
};
break Poll::Ready(Ok(()));
}
DecoderState::Seeking(_) => {
break Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "seeking")))
}
DecoderState::Done => {
break Poll::Ready(Ok(()));
}
Expand All @@ -739,6 +828,62 @@ mod tokio_io {
}
}

fn io_err(text: &str) -> io::Error {
io::Error::new(io::ErrorKind::Other, text)
}

impl<
T: AsyncRead + AsyncSeek + Unpin + 'static,
O: AsyncRead + AsyncSeek + Unpin + 'static,
> AsyncSeek for AsyncDecoder<T, O>
{
fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
let state = self.0.take();
match state {
DecoderState::Reading(mut shared) => {
let fut = Box::pin(shared.seek_fut(position));
self.0 = DecoderState::Seeking(fut);
Ok(())
}
DecoderState::Output(shared) => Err(io_err("can't start seeking")),
DecoderState::Seeking(_) => Err(io_err("already seeking")),
DecoderState::Done => {
unreachable!()
}
}
}

fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
let state = self.0.take();
match state {
DecoderState::Reading(mut shared) => Poll::Ready(Err(io_err("not seeking"))),
DecoderState::Output(mut shared) => Poll::Ready(Err(io_err("not seeking"))),
DecoderState::Seeking(mut fut) => {
let res = Pin::new(&mut fut).poll(cx);
match res {
Poll::Ready((shared, Ok(offset))) => {
// successfully completed seeking. restore the state and return the offset
self.0 = DecoderState::Reading(shared);
Poll::Ready(Ok(offset))
}
Poll::Ready((shared, Err(e))) => {
// got an error from the seek. restore the state and return the error
//
// todo: make sure that the state will be good after this
self.0 = DecoderState::Reading(shared);
Poll::Ready(Err(e))
}
Poll::Pending => {
// we are still seeking
self.0 = DecoderState::Seeking(fut);
Poll::Pending
}
}
}
DecoderState::Done => unreachable!(),
}
}
}
pub struct SliceDecoderInner<T: AsyncRead + Unpin> {
shared: DecoderShared<T, T>,
slice_start: u64,
Expand Down

0 comments on commit 1639453

Please sign in to comment.