Skip to content

Commit

Permalink
feat: add StreamExt::map_while
Browse files Browse the repository at this point in the history
  • Loading branch information
nanoqsh committed Nov 9, 2024
1 parent 83bcf1e commit 902805c
Showing 1 changed file with 55 additions and 0 deletions.
55 changes: 55 additions & 0 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,33 @@ pub trait StreamExt: Stream {
}
}

/// Maps items while `predicate` returns [`Some`].
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let s = stream::iter(vec![1, 2, 0, 3]);
/// let mut s = s.map_while(|x: u32| x.checked_sub(1));
///
/// assert_eq!(s.next().await, Some(0));
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, None);
/// # });
/// ```
fn map_while<B, P>(self, predicate: P) -> MapWhile<Self, P>
where
Self: Sized,
P: FnMut(Self::Item) -> Option<B>,
{
MapWhile {
stream: self,
predicate,
}
}

/// Skips the first `n` items of the stream.
///
/// # Examples
Expand Down Expand Up @@ -2790,6 +2817,34 @@ where
}
}

pin_project! {
/// Stream for the [`StreamExt::map_while()`] method.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct MapWhile<S, P> {
#[pin]
stream: S,
predicate: P,
}
}

impl<B, S, P> Stream for MapWhile<S, P>
where
S: Stream,
P: FnMut(S::Item) -> Option<B>,
{
type Item = B;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();

match ready!(this.stream.poll_next(cx)) {
Some(v) => Poll::Ready((this.predicate)(v)),
None => Poll::Ready(None),
}
}
}

pin_project! {
/// Stream for the [`StreamExt::skip()`] method.
#[derive(Clone, Debug)]
Expand Down

0 comments on commit 902805c

Please sign in to comment.