Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Stream::try_iter() method #70

Merged
merged 5 commits into from
Feb 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1763,6 +1763,57 @@ pub trait StreamExt: Stream {
}
}

/// Yields all immediately available values from a stream.
///
/// This is intended to be used as a way of polling a stream without waiting, similar to the
/// [`try_iter`] function on [`std::sync::mpsc::Receiver`]. For instance, running this stream
/// on an [`async_channel::Receiver`] will return all messages that are currently in the
/// channel, but will not wait for new messages.
///
/// This returns a [`Stream`] instead of an [`Iterator`] because it still needs access to the
/// polling context in order to poll the underlying stream. Since this stream will never return
/// `Poll::Pending`, wrapping it in [`block_on`] will allow it to be effectively used as an
/// [`Iterator`].
///
/// This stream is not necessarily fused. After it returns `None`, it can return `Some(x)` in
/// the future if it is polled again.
///
/// [`try_iter`]: std::sync::mpsc::Receiver::try_iter
/// [`async_channel::Receiver`]: https://docs.rs/async-channel/latest/async_channel/struct.Receiver.html
/// [`Stream`]: crate::stream::Stream
/// [`Iterator`]: std::iter::Iterator
///
/// # Examples
///
/// ```
/// use futures_lite::{future, pin};
/// use futures_lite::stream::{self, StreamExt};
///
/// # #[cfg(feature = "std")] {
/// // A stream that yields two values, returns `Pending`, and then yields one more value.
/// let pend_once = stream::once_future(async {
/// future::yield_now().await;
/// 3
/// });
/// let s = stream::iter(vec![1, 2]).chain(pend_once);
/// pin!(s);
///
/// // This will return the first two values, and then `None` because the stream returns
/// // `Pending` after that.
/// let mut iter = stream::block_on(s.try_stream());
/// assert_eq!(iter.next(), Some(1));
/// assert_eq!(iter.next(), Some(2));
/// assert_eq!(iter.next(), None);
///
/// // This will return the last value, because the stream returns `Ready` when polled.
/// assert_eq!(iter.next(), Some(3));
/// assert_eq!(iter.next(), None);
/// # }
/// ```
fn try_stream(&mut self) -> TryStream<'_, Self> {
TryStream { stream: self }
}

/// Boxes the stream and changes its type to `dyn Stream + Send + 'a`.
///
/// # Examples
Expand Down Expand Up @@ -3175,3 +3226,87 @@ where
}
}
}

/// Stream for the [`StreamExt::try_stream()`] method.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct TryStream<'a, S: ?Sized> {
stream: &'a mut S,
}

impl<'a, S: Unpin + ?Sized> Unpin for TryStream<'a, S> {}

impl<'a, S: Unpin + ?Sized> TryStream<'a, S> {
/// Get a reference to the underlying stream.
///
/// ## Examples
///
/// ```
/// use futures_lite::{prelude::*, stream};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::iter(vec![1, 2, 3]);
/// let s2 = s.try_stream();
///
/// let inner = s2.get_ref();
/// // s and inner are the same.
/// # });
/// ```
pub fn get_ref(&self) -> &S {
&self.stream
}

/// Get a mutable reference to the underlying stream.
///
/// ## Examples
///
/// ```
/// use futures_lite::{prelude::*, stream};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::iter(vec![1, 2, 3]);
/// let mut s2 = s.try_stream();
///
/// let inner = s2.get_mut();
/// assert_eq!(inner.collect::<Vec<_>>().await, vec![1, 2, 3]);
/// # });
/// ```
pub fn get_mut(&mut self) -> &mut S {
&mut self.stream
}

/// Consume this stream and get the underlying stream.
///
/// ## Examples
///
/// ```
/// use futures_lite::{prelude::*, stream};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::iter(vec![1, 2, 3]);
/// let mut s2 = s.try_stream();
///
/// let inner = s2.into_inner();
/// assert_eq!(inner.collect::<Vec<_>>().await, vec![1, 2, 3]);
/// # });
/// ```
pub fn into_inner(self) -> &'a mut S {
self.stream
}
}

impl<'a, S: Stream + Unpin + ?Sized> Stream for TryStream<'a, S> {
type Item = S::Item;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.stream.poll_next(cx) {
Poll::Ready(x) => Poll::Ready(x),
Poll::Pending => Poll::Ready(None),
}
Comment on lines +3302 to +3305
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
match self.stream.poll_next(cx) {
Poll::Ready(x) => Poll::Ready(x),
Poll::Pending => Poll::Ready(None),
}
Poll::Ready(match self.stream.poll_next(cx) {
Poll::Ready(x) => x,
Poll::Pending => None,
})

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this looks nice

}

fn size_hint(&self) -> (usize, Option<usize>) {
let (_, hi) = self.stream.size_hint();
(0, hi)
}
}
Loading