Skip to content

Commit

Permalink
Add a combinator for temporarily using an AsyncRead/AsyncWrite as Rea…
Browse files Browse the repository at this point in the history
…d/Write (#62)

Add AsyncAsSync wrapper

Co-authored-by: Alain Zscheile <[email protected]>
  • Loading branch information
notgull and fogti authored Nov 7, 2022
1 parent 4c03e0d commit a0871eb
Showing 1 changed file with 217 additions and 0 deletions.
217 changes: 217 additions & 0 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub use std::io::{Error, ErrorKind, Result, SeekFrom};
#[doc(no_inline)]
pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};

use std::borrow::{Borrow, BorrowMut};
use std::cmp;
use std::fmt;
use std::future::Future;
Expand Down Expand Up @@ -275,6 +276,222 @@ impl<T: std::io::Seek> AsyncSeek for AssertAsync<T> {
}
}

/// A wrapper around a type that implements `AsyncRead` or `AsyncWrite` that converts `Pending`
/// polls to `WouldBlock` errors.
///
/// This wrapper can be used as a compatibility layer between `AsyncRead` and `Read`, for types
/// that take `Read` as a parameter.
///
/// # Examples
///
/// ```
/// use std::io::Read;
/// use std::task::{Poll, Context};
///
/// fn poll_for_io(cx: &mut Context<'_>) -> Poll<usize> {
/// // Assume we have a library that's built around `Read` and `Write` traits.
/// use cooltls::Session;
///
/// // We want to use it with our writer that implements `AsyncWrite`.
/// let writer = Stream::new();
///
/// // First, we wrap our `Writer` with `AsyncAsSync` to convert `Pending` polls to `WouldBlock`.
/// use futures_lite::io::AsyncAsSync;
/// let writer = AsyncAsSync::new(cx, writer);
///
/// // Now, we can use it with `cooltls`.
/// let mut session = Session::new(writer);
///
/// // Match on the result of `read()` and translate it to poll.
/// match session.read(&mut [0; 1024]) {
/// Ok(n) => Poll::Ready(n),
/// Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => Poll::Pending,
/// Err(err) => panic!("unexpected error: {}", err),
/// }
/// }
///
/// // Usually, poll-based functions are best wrapped using `poll_fn`.
/// use futures_lite::future::poll_fn;
/// # futures_lite::future::block_on(async {
/// poll_fn(|cx| poll_for_io(cx)).await;
/// # });
/// # struct Stream;
/// # impl Stream {
/// # fn new() -> Stream {
/// # Stream
/// # }
/// # }
/// # impl futures_lite::io::AsyncRead for Stream {
/// # fn poll_read(self: std::pin::Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) -> Poll<std::io::Result<usize>> {
/// # Poll::Ready(Ok(0))
/// # }
/// # }
/// # mod cooltls {
/// # pub struct Session<W> {
/// # reader: W,
/// # }
/// # impl<W> Session<W> {
/// # pub fn new(reader: W) -> Session<W> {
/// # Session { reader }
/// # }
/// # }
/// # impl<W: std::io::Read> std::io::Read for Session<W> {
/// # fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
/// # self.reader.read(buf)
/// # }
/// # }
/// # }
/// ```
#[derive(Debug)]
pub struct AsyncAsSync<'r, 'ctx, T> {
/// The context we are using to poll the future.
pub context: &'r mut Context<'ctx>,

/// The actual reader/writer we are wrapping.
pub inner: T,
}

impl<'r, 'ctx, T> AsyncAsSync<'r, 'ctx, T> {
/// Wraps an I/O handle implementing [`AsyncRead`] or [`AsyncWrite`] traits.
///
/// # Examples
///
/// ```
/// use futures_lite::io::AsyncAsSync;
/// use std::task::Context;
/// use waker_fn::waker_fn;
///
/// let reader: &[u8] = b"hello";
/// let waker = waker_fn(|| {});
/// let mut context = Context::from_waker(&waker);
///
/// let async_reader = AsyncAsSync::new(&mut context, reader);
/// ```
#[inline]
pub fn new(context: &'r mut Context<'ctx>, inner: T) -> Self {
AsyncAsSync { context, inner }
}

/// Attempt to shutdown the I/O handle.
///
/// # Examples
///
/// ```
/// use futures_lite::io::AsyncAsSync;
/// use std::task::Context;
/// use waker_fn::waker_fn;
///
/// let reader: Vec<u8> = b"hello".to_vec();
/// let waker = waker_fn(|| {});
/// let mut context = Context::from_waker(&waker);
///
/// let mut async_reader = AsyncAsSync::new(&mut context, reader);
/// async_reader.close().unwrap();
/// ```
#[inline]
pub fn close(&mut self) -> Result<()>
where
T: AsyncWrite + Unpin,
{
self.poll_with(|io, cx| io.poll_close(cx))
}

/// Poll this `AsyncAsSync` for some function.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncAsSync, AsyncRead};
/// use std::task::Context;
/// use waker_fn::waker_fn;
///
/// let reader: &[u8] = b"hello";
/// let waker = waker_fn(|| {});
/// let mut context = Context::from_waker(&waker);
///
/// let mut async_reader = AsyncAsSync::new(&mut context, reader);
/// let r = async_reader.poll_with(|io, cx| io.poll_read(cx, &mut [0; 1024]));
/// assert_eq!(r.unwrap(), 5);
/// ```
#[inline]
pub fn poll_with<R>(
&mut self,
f: impl FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Result<R>>,
) -> Result<R>
where
T: Unpin,
{
match f(Pin::new(&mut self.inner), self.context) {
Poll::Ready(res) => res,
Poll::Pending => Err(ErrorKind::WouldBlock.into()),
}
}
}

impl<T: AsyncRead + Unpin> std::io::Read for AsyncAsSync<'_, '_, T> {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
self.poll_with(|io, cx| io.poll_read(cx, buf))
}

#[inline]
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize> {
self.poll_with(|io, cx| io.poll_read_vectored(cx, bufs))
}
}

impl<T: AsyncWrite + Unpin> std::io::Write for AsyncAsSync<'_, '_, T> {
#[inline]
fn write(&mut self, buf: &[u8]) -> Result<usize> {
self.poll_with(|io, cx| io.poll_write(cx, buf))
}

#[inline]
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
self.poll_with(|io, cx| io.poll_write_vectored(cx, bufs))
}

#[inline]
fn flush(&mut self) -> Result<()> {
self.poll_with(|io, cx| io.poll_flush(cx))
}
}

impl<T: AsyncSeek + Unpin> std::io::Seek for AsyncAsSync<'_, '_, T> {
#[inline]
fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
self.poll_with(|io, cx| io.poll_seek(cx, pos))
}
}

impl<T> AsRef<T> for AsyncAsSync<'_, '_, T> {
#[inline]
fn as_ref(&self) -> &T {
&self.inner
}
}

impl<T> AsMut<T> for AsyncAsSync<'_, '_, T> {
#[inline]
fn as_mut(&mut self) -> &mut T {
&mut self.inner
}
}

impl<T> Borrow<T> for AsyncAsSync<'_, '_, T> {
#[inline]
fn borrow(&self) -> &T {
&self.inner
}
}

impl<T> BorrowMut<T> for AsyncAsSync<'_, '_, T> {
#[inline]
fn borrow_mut(&mut self) -> &mut T {
&mut self.inner
}
}

/// Blocks on all async I/O operations and implements [`std::io`] traits.
///
/// Sometimes async I/O needs to be used in a blocking manner. If calling [`future::block_on()`]
Expand Down

0 comments on commit a0871eb

Please sign in to comment.