Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ws client redirections #397

Merged
merged 43 commits into from
Oct 5, 2021
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
31ae533
feat(ws client): support redirections
niklasad1 Jun 30, 2021
753699c
reuse socket
niklasad1 Jun 30, 2021
b6f09da
reuse socket
niklasad1 Jun 30, 2021
ce77bf5
add hacks
niklasad1 Jun 30, 2021
67b550e
fix build
niklasad1 Jun 30, 2021
e9d863f
remove hacks
niklasad1 Jun 30, 2021
4c1dc3c
Merge branch 'master' into na-ws-client-redirections
dvdplm Jul 12, 2021
8a31148
Merge remote-tracking branch 'origin/master' into na-ws-client-redire…
niklasad1 Aug 30, 2021
4685447
Merge branch 'na-ws-client-redirections' of github.com:paritytech/jso…
niklasad1 Aug 30, 2021
5a1e118
fix bad merge
niklasad1 Aug 30, 2021
9e506c1
address grumbles
niklasad1 Aug 30, 2021
ffa8150
Merge remote-tracking branch 'origin/master' into na-ws-client-redire…
niklasad1 Sep 29, 2021
d27a708
fix grumbles
niklasad1 Sep 29, 2021
1fce702
fix grumbles
niklasad1 Sep 29, 2021
250c896
fix nit
niklasad1 Sep 29, 2021
d200744
add redirection test
niklasad1 Sep 30, 2021
1f7cfe4
Merge branch 'master' into na-ws-client-redirections
dvdplm Oct 1, 2021
33f364c
Update test-utils/src/types.rs
dvdplm Oct 1, 2021
9d9aaba
Resolved todo
dvdplm Oct 1, 2021
720f709
Check that redirected client actually works
dvdplm Oct 1, 2021
ce87119
Merge branch 'master' into na-ws-client-redirections
dvdplm Oct 1, 2021
21583d1
Rename test-utils "types" to "mocks"
dvdplm Oct 1, 2021
f25a22d
Fix windows test (?)
dvdplm Oct 1, 2021
0d54d2e
fmt
dvdplm Oct 1, 2021
b93287c
What is wrong with you windows?
dvdplm Oct 1, 2021
4df383f
Ignore redirect test on windows
dvdplm Oct 1, 2021
ae2791c
fix bad transport errors
niklasad1 Oct 2, 2021
0d4b18b
debug windows tests
niklasad1 Oct 2, 2021
adbd2c6
update soketto
niklasad1 Oct 2, 2021
bf0d71d
maybe fix windows test
niklasad1 Oct 2, 2021
186bf84
add config flag for max redirections
niklasad1 Oct 2, 2021
efe1c28
revert faulty change.
niklasad1 Oct 2, 2021
e70842b
revert windows path
niklasad1 Oct 2, 2021
549e563
use manual join paths
niklasad1 Oct 3, 2021
df28dbd
remove url dep
niklasad1 Oct 3, 2021
4b5065c
Update ws-client/src/tests.rs
niklasad1 Oct 3, 2021
93d0c3b
default max redirects 5
niklasad1 Oct 4, 2021
c257fea
Merge branch 'na-ws-client-redirections' of github.com:paritytech/jso…
niklasad1 Oct 4, 2021
0488a2a
remove needless clone vec
niklasad1 Oct 5, 2021
56db40a
Merge remote-tracking branch 'origin/master' into na-ws-client-redire…
niklasad1 Oct 5, 2021
3aede64
Merge remote-tracking branch 'origin/master' into na-ws-client-redire…
niklasad1 Oct 5, 2021
223c37c
fix bad merge
niklasad1 Oct 5, 2021
ae1e405
cmon CI run
niklasad1 Oct 5, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion http-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::types::{
};
use crate::HttpClientBuilder;
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::types::Id;
use jsonrpsee_test_utils::mocks::Id;
use jsonrpsee_test_utils::TimeoutFutureExt;

#[tokio::test]
Expand Down
2 changes: 1 addition & 1 deletion http-server/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::types::error::{CallError, Error};
use crate::{server::StopHandle, HttpServerBuilder, RpcModule};

use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::types::{Id, StatusCode, TestContext};
use jsonrpsee_test_utils::mocks::{Id, StatusCode, TestContext};
use jsonrpsee_test_utils::TimeoutFutureExt;
use serde_json::Value as JsonValue;
use tokio::task::JoinHandle;
Expand Down
2 changes: 1 addition & 1 deletion test-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ hyper = { version = "0.14.10", features = ["full"] }
log = "0.4"
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = "1"
soketto = "0.6"
soketto = { version = "0.7", features = ["http"] }
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
tokio = { version = "1", features = ["net", "rt-multi-thread", "macros", "time"] }
tokio-util = { version = "0.6", features = ["compat"] }
2 changes: 1 addition & 1 deletion test-utils/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::types::{Body, HttpResponse, Id, Uri};
use crate::mocks::{Body, HttpResponse, Id, Uri};
use hyper::service::{make_service_fn, service_fn};
use hyper::{Request, Response, Server};
use serde_json::Value;
Expand Down
2 changes: 1 addition & 1 deletion test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::{future::Future, time::Duration};
use tokio::time::{timeout, Timeout};

pub mod helpers;
pub mod types;
pub mod mocks;

/// Helper extension trait which allows to limit execution time for the futures.
/// It is helpful in tests to ensure that no future will ever get stuck forever.
Expand Down
57 changes: 56 additions & 1 deletion test-utils/src/types.rs → test-utils/src/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use futures_util::{
stream::{self, StreamExt},
};
use serde::{Deserialize, Serialize};
use soketto::handshake::{self, server::Response, Error as SokettoError, Server};
use soketto::handshake::{self, http::is_upgrade_request, server::Response, Error as SokettoError, Server};
use std::io;
use std::net::SocketAddr;
use std::time::Duration;
Expand Down Expand Up @@ -314,3 +314,58 @@ async fn connection_task(socket: tokio::net::TcpStream, mode: ServerMode, mut ex
}
}
}

// Run a WebSocket server running on localhost that redirects requests for testing.
// Requests to any url except for `/myblock/two` will redirect one or two times (HTTP 301) and eventually end up in `/myblock/two`.
pub fn ws_server_with_redirect(other_server: String) -> String {
let addr = ([127, 0, 0, 1], 0).into();

let service = hyper::service::make_service_fn(move |_| {
let other_server = other_server.clone();
async move {
Ok::<_, hyper::Error>(hyper::service::service_fn(move |req| {
let other_server = other_server.clone();
async move { handler(req, other_server).await }
}))
}
});
let server = hyper::Server::bind(&addr).serve(service);
let addr = server.local_addr();

tokio::spawn(async move { server.await });
format!("ws://{}", addr)
}

/// Handle incoming HTTP Requests.
async fn handler(
req: hyper::Request<Body>,
other_server: String,
) -> Result<hyper::Response<Body>, soketto::BoxedError> {
if is_upgrade_request(&req) {
match req.uri().to_string().as_str() {
"/myblock/two" => {
let response = hyper::Response::builder()
.status(301)
.header("Location", other_server)
.body(Body::empty())
.unwrap();
Ok(response)
}
"/myblock/one" => {
let response =
hyper::Response::builder().status(301).header("Location", "two").body(Body::empty()).unwrap();
Ok(response)
}
_ => {
let response = hyper::Response::builder()
.status(301)
.header("Location", "/myblock/one")
.body(Body::empty())
.unwrap();
Ok(response)
}
}
} else {
panic!("expect upgrade to WS");
}
}
1 change: 1 addition & 0 deletions ws-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ rustls-native-certs = "0.5.0"

[dev-dependencies]
jsonrpsee-test-utils = { path = "../test-utils" }
env_logger = "0.9"
33 changes: 32 additions & 1 deletion ws-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::types::{
};
use crate::WsClientBuilder;
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::types::{Id, WebSocketTestServer};
use jsonrpsee_test_utils::mocks::{Id, WebSocketTestServer};
use jsonrpsee_test_utils::TimeoutFutureExt;
use serde_json::Value as JsonValue;

Expand Down Expand Up @@ -263,3 +263,34 @@ fn assert_error_response(err: Error, exp: ErrorObject) {
e => panic!("Expected error: \"{}\", got: {:?}", err, e),
};
}

#[cfg_attr(target_os = "windows", ignore)]
#[tokio::test]
async fn redirections() {
let expected = "abc 123";
let server = WebSocketTestServer::with_hardcoded_response(
"127.0.0.1:0".parse().unwrap(),
ok_response(expected.into(), Id::Num(0)),
)
.with_default_timeout()
.await
.unwrap();

let server_url = format!("ws://{}", server.local_addr());
let redirect_url = jsonrpsee_test_utils::mocks::ws_server_with_redirect(server_url);

// The client will first connect to a server that only performs re-directions and finally
// redirect to another server to complete the handshake.
let client = WsClientBuilder::default().build(&redirect_url).with_default_timeout().await;
// It's an ok client
let client = match client {
Ok(Ok(client)) => client,
Ok(Err(e)) => panic!("WsClient builder failed with: {:?}", e),
Err(e) => panic!("WsClient builder timed out with: {:?}", e),
};
// It's connected
assert!(client.is_connected());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// It works
let response = client.request::<String>("anything", ParamsSer::NoParams).with_default_timeout().await.unwrap();
assert_eq!(response.unwrap(), String::from(expected));
}
186 changes: 126 additions & 60 deletions ws-client/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::stream::EitherStream;
use futures::io::{BufReader, BufWriter};
use futures::prelude::*;
use soketto::connection;
use soketto::handshake::client::{Client as WsRawClient, ServerResponse};
use soketto::handshake::client::{Client as WsHandshakeClient, ServerResponse};
use std::path::Path;
use std::{borrow::Cow, io, net::SocketAddr, sync::Arc, time::Duration};
use thiserror::Error;
use tokio::net::TcpStream;
Expand All @@ -37,7 +38,9 @@ use tokio_rustls::{
TlsConnector,
};

type TlsOrPlain = crate::stream::EitherStream<TcpStream, TlsStream<TcpStream>>;
type TlsOrPlain = EitherStream<TcpStream, TlsStream<TcpStream>>;

const MAX_REDIRECTIONS_ALLOWED: usize = 5;

/// Sending end of WebSocket transport.
#[derive(Debug)]
Expand Down Expand Up @@ -112,7 +115,7 @@ pub enum WsHandshakeError {
#[error("Invalid DNS name: {}", 0)]
InvalidDnsName(#[source] InvalidDNSNameError),

/// RawServer rejected our handshake.
/// Server rejected the handshake.
#[error("Connection rejected with status code: {}", status_code)]
Rejected {
/// HTTP status code that the server returned.
Expand Down Expand Up @@ -184,74 +187,137 @@ impl<'a> WsTransportClientBuilder<'a> {
Mode::Plain => None,
};

self.try_connect(connector).await
}

async fn try_connect(
self,
mut tls_connector: Option<TlsConnector>,
) -> Result<(Sender, Receiver), WsHandshakeError> {
let mut target = self.target;
let mut used_sockaddrs = Vec::new();

let mut err = None;
for sockaddr in &self.target.sockaddrs {
match self.try_connect(*sockaddr, &connector).await {
Ok(res) => return Ok(res),

for _ in 0..MAX_REDIRECTIONS_ALLOWED {
log::debug!("Connecting to target: {:?}", target);

let sockaddr = match target.sockaddrs.pop() {
Some(addr) => {
used_sockaddrs.push(addr);
addr
}
None => return err.unwrap_or(Err(WsHandshakeError::NoAddressFound(target.host))),
};

let tcp_stream = match connect(sockaddr, self.timeout, &target.host, &tls_connector).await {
Ok(stream) => stream,
Err(e) => {
log::debug!("Failed to connect to sockaddr: {:?} with err: {:?}", sockaddr, e);
log::error!("Failed to connect to sockaddr: {:?}", sockaddr);
err = Some(Err(e));
continue;
}
};
let mut client = WsHandshakeClient::new(
BufReader::new(BufWriter::new(tcp_stream)),
&target.host_header,
&target.path_and_query,
);
if let Some(origin) = self.origin_header.as_ref() {
client.set_origin(origin);
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
}
}
// NOTE(niklasad1): this is most likely unreachable because [`Url::socket_addrs`] doesn't
// return an empty `Vec` if no socket address was found for the host name.
err.unwrap_or(Err(WsHandshakeError::NoAddressFound(self.target.host)))
}
// Perform the initial handshake.
match client.handshake().await {
Ok(ServerResponse::Accepted { .. }) => {
log::info!("Connection established to target: {:?}", target);
let mut builder = client.into_builder();
builder.set_max_message_size(self.max_request_body_size as usize);
let (sender, receiver) = builder.finish();
return Ok((Sender { inner: sender }, Receiver { inner: receiver }));
}

async fn try_connect(
&self,
sockaddr: SocketAddr,
tls_connector: &Option<TlsConnector>,
) -> Result<(Sender, Receiver), WsHandshakeError> {
// Try establish the TCP connection.
let tcp_stream = {
let socket = TcpStream::connect(sockaddr);
let timeout = tokio::time::sleep(self.timeout);
futures::pin_mut!(socket, timeout);
match future::select(socket, timeout).await {
future::Either::Left((socket, _)) => {
let socket = socket?;
if let Err(err) = socket.set_nodelay(true) {
log::warn!("set nodelay failed: {:?}", err);
}
match tls_connector {
None => TlsOrPlain::Plain(socket),
Some(connector) => {
let dns_name = DNSNameRef::try_from_ascii_str(&self.target.host)?;
let tls_stream = connector.connect(dns_name, socket).await?;
TlsOrPlain::Tls(tls_stream)
Ok(ServerResponse::Rejected { status_code }) => {
log::debug!("Connection rejected: {:?}", status_code);
err = Some(Err(WsHandshakeError::Rejected { status_code }));
}
Ok(ServerResponse::Redirect { status_code, location }) => {
log::trace!("redirection: status_code: {}, location: {}", status_code, location);
match url::Url::parse(&location) {
// redirection with absolute path => need to lookup.
Ok(url) => {
target = Target::parse(url)?;
used_sockaddrs.clear();
tls_connector = match target.mode {
Mode::Tls => {
let mut client_config = rustls::ClientConfig::default();
if let CertificateStore::Native = self.certificate_store {
client_config.root_store = rustls_native_certs::load_native_certs()
.map_err(|(_, e)| WsHandshakeError::CertificateStore(e))?;
}
Some(Arc::new(client_config).into())
}
Mode::Plain => None,
};
}
// redirection is relative, either `/baz` or `bar`.
Err(
url::ParseError::RelativeUrlWithoutBase | url::ParseError::RelativeUrlWithCannotBeABaseBase,
) => {
// replace the entire path if `location` is `/`.
if location.starts_with('/') {
target.path_and_query = location;
} else {
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
// join paths such that the leaf is replaced with `location`.
let strip_last_child = Path::new(&target.path_and_query)
.ancestors()
.nth(1)
.unwrap_or_else(|| Path::new("/"));
target.path_and_query = strip_last_child
.join(location)
.to_str()
.expect("valid UTF-8 checked by Url::parse; qed")
.to_string();
}
std::mem::swap(&mut target.sockaddrs, &mut used_sockaddrs);
}
Err(e) => {
err = Some(Err(WsHandshakeError::Url(e.to_string().into())));
}
}
};
}
future::Either::Right((_, _)) => return Err(WsHandshakeError::Timeout(self.timeout)),
}
};

log::debug!("Connecting to target: {:?}", self.target);
let mut client = WsRawClient::new(
BufReader::new(BufWriter::new(tcp_stream)),
&self.target.host_header,
&self.target.path_and_query,
);
if let Some(origin) = self.origin_header.as_ref() {
client.set_origin(origin);
Err(e) => {
err = Some(Err(e.into()));
}
};
}
err.unwrap_or(Err(WsHandshakeError::NoAddressFound(target.host)))
}
}

// Perform the initial handshake.
match client.handshake().await? {
ServerResponse::Accepted { .. } => {}
ServerResponse::Rejected { status_code } | ServerResponse::Redirect { status_code, .. } => {
// TODO: HTTP redirects also lead here #339.
return Err(WsHandshakeError::Rejected { status_code });
async fn connect(
sockaddr: SocketAddr,
timeout_dur: Duration,
host: &str,
tls_connector: &Option<TlsConnector>,
) -> Result<EitherStream<TcpStream, TlsStream<TcpStream>>, WsHandshakeError> {
let socket = TcpStream::connect(sockaddr);
let timeout = tokio::time::sleep(timeout_dur);
tokio::select! {
socket = socket => {
let socket = socket?;
if let Err(err) = socket.set_nodelay(true) {
log::warn!("set nodelay failed: {:?}", err);
}
match tls_connector {
None => Ok(TlsOrPlain::Plain(socket)),
Some(connector) => {
let dns_name = DNSNameRef::try_from_ascii_str(host)?;
let tls_stream = connector.connect(dns_name, socket).await?;
Ok(TlsOrPlain::Tls(tls_stream))
}
}
}

// If the handshake succeeded, return.
let mut builder = client.into_builder();
builder.set_max_message_size(self.max_request_body_size as usize);
let (sender, receiver) = builder.finish();
Ok((Sender { inner: sender }, Receiver { inner: receiver }))
_ = timeout => Err(WsHandshakeError::Timeout(timeout_dur))
}
}

Expand Down
2 changes: 1 addition & 1 deletion ws-server/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::types::error::{CallError, Error};
use crate::{future::StopHandle, RpcModule, WsServerBuilder};
use anyhow::anyhow;
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::types::{Id, TestContext, WebSocketTestClient, WebSocketTestError};
use jsonrpsee_test_utils::mocks::{Id, TestContext, WebSocketTestClient, WebSocketTestError};
use jsonrpsee_test_utils::TimeoutFutureExt;
use serde_json::Value as JsonValue;
use std::fmt;
Expand Down