Skip to content

Commit

Permalink
Return Close messages to the user
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-abramov committed Feb 7, 2019
1 parent 1dcf833 commit 7d2f0c9
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 8 deletions.
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ futures = "0.1.23"
tokio-io = "0.1.7"

[dependencies.tungstenite]
version = "0.6.0"
# Uncomment when `tungstenite-rs` is released.
# version = "0.7.0"
git = "https://github.com/snapview/tungstenite-rs"
branch = "close-as-message"
default-features = false

[dependencies.bytes]
Expand Down
30 changes: 23 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ where
/// and unit tests for this crate.
pub struct WebSocketStream<S> {
inner: WebSocket<S>,
stream_ended: bool,
}

impl<S> WebSocketStream<S> {
Expand All @@ -171,8 +172,7 @@ impl<S> WebSocketStream<S> {
role: Role,
config: Option<WebSocketConfig>,
) -> Self {
let ws = WebSocket::from_raw_socket(stream, role, config);
WebSocketStream { inner: ws }
Self::new(WebSocket::from_raw_socket(stream, role, config))
}

/// Convert a raw socket into a WebSocketStream without performing a
Expand All @@ -183,8 +183,14 @@ impl<S> WebSocketStream<S> {
role: Role,
config: Option<WebSocketConfig>,
) -> Self {
let ws = WebSocket::from_partially_read(stream, part, role, config);
WebSocketStream { inner: ws }
Self::new(WebSocket::from_partially_read(stream, part, role, config))
}

fn new(ws: WebSocket<S>) -> Self {
WebSocketStream {
inner: ws,
stream_ended: false,
}
}
}

Expand All @@ -193,7 +199,17 @@ impl<T> Stream for WebSocketStream<T> where T: AsyncRead + AsyncWrite {
type Error = WsError;

fn poll(&mut self) -> Poll<Option<Message>, WsError> {
self.inner.read_message().map(|m| Some(m)).to_async()
if self.stream_ended {
self.stream_ended = false;
return Ok(Async::Ready(None))
}

self.inner.read_message().map(|m| {
if m.is_close() {
self.stream_ended = true;
}
Some(m)
}).to_async()
}
}

Expand Down Expand Up @@ -227,7 +243,7 @@ impl<S: AsyncRead + AsyncWrite> Future for ConnectAsync<S> {
fn poll(&mut self) -> Poll<Self::Item, WsError> {
match self.inner.poll()? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready((ws, resp)) => Ok(Async::Ready((WebSocketStream { inner: ws }, resp))),
Async::Ready((ws, resp)) => Ok(Async::Ready((WebSocketStream::new(ws), resp))),
}
}
}
Expand All @@ -245,7 +261,7 @@ impl<S: AsyncRead + AsyncWrite, C: Callback> Future for AcceptAsync<S, C> {
fn poll(&mut self) -> Poll<Self::Item, WsError> {
match self.inner.poll()? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready(ws) => Ok(Async::Ready(WebSocketStream { inner: ws })),
Async::Ready(ws) => Ok(Async::Ready(WebSocketStream::new(ws))),
}
}
}
Expand Down

0 comments on commit 7d2f0c9

Please sign in to comment.