Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a combinator for temporarily using an AsyncRead/AsyncWrite as Read/Write #62

Merged
merged 5 commits into from
Nov 7, 2022
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 217 additions & 0 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub use std::io::{Error, ErrorKind, Result, SeekFrom};
#[doc(no_inline)]
pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};

use std::borrow::{Borrow, BorrowMut};
use std::cmp;
use std::fmt;
use std::future::Future;
Expand Down Expand Up @@ -282,6 +283,222 @@ impl<T: std::io::Seek> AsyncSeek for AssertAsync<T> {
}
}
notgull marked this conversation as resolved.
Show resolved Hide resolved

/// A wrapper around a type that implements `AsyncRead` or `AsyncWrite` that converts `Pending`
/// polls to `WouldBlock` errors.
///
/// This wrapper can be used as a compatibility layer between `AsyncRead` and `Read`, for types
/// that take `Read` as a parameter.
///
/// # Examples
///
/// ```
/// use std::io::Read;
/// use std::task::{Poll, Context};
///
/// fn poll_for_io(cx: &mut Context<'_>) -> Poll<usize> {
/// // Assume we have a library that's built around `Read` and `Write` traits.
/// use cooltls::Session;
///
/// // We want to use it with our writer that implements `AsyncWrite`.
/// let writer = Stream::new();
///
/// // First, we wrap our `Writer` with `AsyncAsSync` to convert `Pending` polls to `WouldBlock`.
/// use futures_lite::io::AsyncAsSync;
/// let writer = AsyncAsSync::new(cx, writer);
///
/// // Now, we can use it with `cooltls`.
/// let mut session = Session::new(writer);
///
/// // Match on the result of `read()` and translate it to poll.
/// match session.read(&mut [0; 1024]) {
/// Ok(n) => Poll::Ready(n),
/// Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => Poll::Pending,
/// Err(err) => panic!("unexpected error: {}", err),
/// }
/// }
///
/// // Usually, poll-based functions are best wrapped using `poll_fn`.
/// use futures_lite::future::poll_fn;
/// # futures_lite::future::block_on(async {
/// poll_fn(|cx| poll_for_io(cx)).await;
/// # });
/// # struct Stream;
/// # impl Stream {
/// # fn new() -> Stream {
/// # Stream
/// # }
/// # }
/// # impl futures_lite::io::AsyncRead for Stream {
/// # fn poll_read(self: std::pin::Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) -> Poll<std::io::Result<usize>> {
/// # Poll::Ready(Ok(0))
/// # }
/// # }
/// # mod cooltls {
/// # pub struct Session<W> {
/// # reader: W,
/// # }
/// # impl<W> Session<W> {
/// # pub fn new(reader: W) -> Session<W> {
/// # Session { reader }
/// # }
/// # }
/// # impl<W: std::io::Read> std::io::Read for Session<W> {
/// # fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
/// # self.reader.read(buf)
/// # }
/// # }
/// # }
/// ```
#[derive(Debug)]
pub struct AsyncAsSync<'r, 'ctx, T> {
/// The context we are using to poll the future.
pub context: &'r mut Context<'ctx>,

/// The actual reader/writer we are wrapping.
pub inner: T,
}

impl<'r, 'ctx, T> AsyncAsSync<'r, 'ctx, T> {
/// Wraps an I/O handle implementing [`AsyncRead`] or [`AsyncWrite`] traits.
///
/// # Examples
///
/// ```
/// use futures_lite::io::AsyncAsSync;
/// use std::task::Context;
/// use waker_fn::waker_fn;
///
/// let reader: &[u8] = b"hello";
/// let waker = waker_fn(|| {});
/// let mut context = Context::from_waker(&waker);
///
/// let async_reader = AsyncAsSync::new(&mut context, reader);
/// ```
#[inline]
pub fn new(context: &'r mut Context<'ctx>, io: T) -> Self {
AsyncAsSync { context, inner: io }
notgull marked this conversation as resolved.
Show resolved Hide resolved
}

/// Attempt to shutdown the I/O handle.
///
/// # Examples
///
/// ```
/// use futures_lite::io::AsyncAsSync;
/// use std::task::Context;
/// use waker_fn::waker_fn;
///
/// let reader: Vec<u8> = b"hello".to_vec();
/// let waker = waker_fn(|| {});
/// let mut context = Context::from_waker(&waker);
///
/// let mut async_reader = AsyncAsSync::new(&mut context, reader);
/// async_reader.close().unwrap();
/// ```
#[inline]
pub fn close(&mut self) -> Result<()>
where
T: AsyncWrite + Unpin,
{
self.poll_with(|io, cx| io.poll_close(cx))
}

/// Poll this `AsyncAsSync` for some function.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncAsSync, AsyncRead};
/// use std::task::Context;
/// use waker_fn::waker_fn;
///
/// let reader: &[u8] = b"hello";
/// let waker = waker_fn(|| {});
/// let mut context = Context::from_waker(&waker);
///
/// let mut async_reader = AsyncAsSync::new(&mut context, reader);
/// let r = async_reader.poll_with(|io, cx| io.poll_read(cx, &mut [0; 1024]));
/// assert_eq!(r.unwrap(), 5);
/// ```
#[inline]
pub fn poll_with<R>(
&mut self,
f: impl FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Result<R>>,
) -> Result<R>
where
T: Unpin,
{
match f(Pin::new(&mut self.inner), self.context) {
Poll::Ready(res) => res,
Poll::Pending => Err(ErrorKind::WouldBlock.into()),
}
}
}

impl<T: AsyncRead + Unpin> std::io::Read for AsyncAsSync<'_, '_, T> {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
self.poll_with(|io, cx| io.poll_read(cx, buf))
}

#[inline]
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize> {
self.poll_with(|io, cx| io.poll_read_vectored(cx, bufs))
}
}

impl<T: AsyncWrite + Unpin> std::io::Write for AsyncAsSync<'_, '_, T> {
#[inline]
fn write(&mut self, buf: &[u8]) -> Result<usize> {
self.poll_with(|io, cx| io.poll_write(cx, buf))
}

#[inline]
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
self.poll_with(|io, cx| io.poll_write_vectored(cx, bufs))
}

#[inline]
fn flush(&mut self) -> Result<()> {
self.poll_with(|io, cx| io.poll_flush(cx))
}
}

impl<T: AsyncSeek + Unpin> std::io::Seek for AsyncAsSync<'_, '_, T> {
#[inline]
fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
self.poll_with(|io, cx| io.poll_seek(cx, pos))
}
}

impl<T> AsRef<T> for AsyncAsSync<'_, '_, T> {
#[inline]
fn as_ref(&self) -> &T {
&self.inner
}
}

impl<T> AsMut<T> for AsyncAsSync<'_, '_, T> {
#[inline]
fn as_mut(&mut self) -> &mut T {
&mut self.inner
}
}

impl<T> Borrow<T> for AsyncAsSync<'_, '_, T> {
#[inline]
fn borrow(&self) -> &T {
&self.inner
}
}

impl<T> BorrowMut<T> for AsyncAsSync<'_, '_, T> {
#[inline]
fn borrow_mut(&mut self) -> &mut T {
&mut self.inner
}
}

/// Blocks on all async I/O operations and implements [`std::io`] traits.
///
/// Sometimes async I/O needs to be used in a blocking manner. If calling [`future::block_on()`]
Expand Down