Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Pin<Box<S>> in BodyStream and SizedStream #1328

Merged
merged 2 commits into from
Jan 31, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions actix-http/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,10 +361,8 @@ impl MessageBody for String {

/// Type represent streaming body.
/// Response does not contain `content-length` header and appropriate transfer encoding is used.
#[pin_project]
pub struct BodyStream<S, E> {
#[pin]
stream: S,
stream: Pin<Box<S>>,
_t: PhantomData<E>,
}

Expand All @@ -375,7 +373,7 @@ where
{
pub fn new(stream: S) -> Self {
BodyStream {
stream,
stream: Box::pin(stream),
_t: PhantomData,
}
}
Expand All @@ -396,7 +394,7 @@ where
/// ended on a zero-length chunk, but rather proceed until the underlying
/// [`Stream`] ends.
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
let mut stream = unsafe { Pin::new_unchecked(self) }.project().stream;
let mut stream = self.stream.as_mut();
loop {
return Poll::Ready(match ready!(stream.as_mut().poll_next(cx)) {
Some(Ok(ref bytes)) if bytes.is_empty() => continue,
Expand All @@ -408,19 +406,17 @@ where

/// Type represent streaming body. This body implementation should be used
/// if total size of stream is known. Data get sent as is without using transfer encoding.
#[pin_project]
pub struct SizedStream<S> {
size: u64,
#[pin]
stream: S,
stream: Pin<Box<S>>,
}

impl<S> SizedStream<S>
where
S: Stream<Item = Result<Bytes, Error>>,
{
pub fn new(size: u64, stream: S) -> Self {
SizedStream { size, stream }
SizedStream { size, stream: Box::pin(stream) }
}
}

Expand All @@ -438,7 +434,7 @@ where
/// ended on a zero-length chunk, but rather proceed until the underlying
/// [`Stream`] ends.
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
let mut stream = unsafe { Pin::new_unchecked(self) }.project().stream;
let mut stream = self.stream.as_mut();
loop {
return Poll::Ready(match ready!(stream.as_mut().poll_next(cx)) {
Some(Ok(ref bytes)) if bytes.is_empty() => continue,
Expand Down
26 changes: 26 additions & 0 deletions tests/test_weird_poll.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Regression test for #/1321

use futures::task::{noop_waker, Context};
use futures::stream::once;
use actix_http::body::{MessageBody, BodyStream};
use bytes::Bytes;

#[test]
fn weird_poll() {
let (sender, receiver) = futures::channel::oneshot::channel();
let mut body_stream = Ok(BodyStream::new(once(async {
let x = Box::new(0);
let y = &x;
receiver.await.unwrap();
let _z = **y;
Ok::<_, ()>(Bytes::new())
})));

let waker = noop_waker();
let mut context = Context::from_waker(&waker);

let _ = body_stream.as_mut().unwrap().poll_next(&mut context);
sender.send(()).unwrap();
let _ = std::mem::replace(&mut body_stream, Err([0; 32])).unwrap().poll_next(&mut context);
}