Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: fix http graceful shutdown #1090

Merged
merged 11 commits into from
Apr 21, 2023
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
error: use of deprecated associated function `DeprecatedClient::async_method`: please use `new_method` instead
error: use of deprecated method `DeprecatedClient::async_method`: please use `new_method` instead
--> $DIR/rpc_deprecated_method.rs:63:20
|
63 | assert_eq!(client.async_method().await.unwrap(), 16);
Expand Down
24 changes: 14 additions & 10 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::future::{ConnectionGuard, ServerHandle, StopHandle};
use crate::logger::{Logger, TransportProtocol};
use crate::transport::{http, ws};

use futures_util::future::{Either, FutureExt};
use futures_util::future::{self, Either, FutureExt};
use futures_util::io::{BufReader, BufWriter};

use futures_util::stream::{FuturesUnordered, StreamExt};
Expand Down Expand Up @@ -805,18 +805,22 @@ where
<B as HttpBody>::Data: Send,
{
let conn = hyper::server::conn::Http::new().serve_connection(socket, service).with_upgrades();
let stopped = stop_handle.shutdown();

tokio::pin!(conn);
tokio::pin!(stopped);

tokio::select! {
res = &mut conn => {
if let Err(e) = res {
tracing::warn!("HTTP serve connection failed {:?}", e);
}
}
_ = stop_handle.shutdown() => {
conn.graceful_shutdown();
let res = match future::select(conn, stopped).await {
Either::Left((conn, _)) => conn,
Either::Right((_, mut conn)) => {
// NOTE: the connection should continue to be polled until shutdown can finish.
// Thus, both lines below are needed and not a nit.
Pin::new(&mut conn).graceful_shutdown();
conn.await
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the fix, we need to await for the conn future anyway not just call graceful_shutdown my bad

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah interesting! (I found https://docs.rs/hyper/latest/hyper/server/conn/struct.Connection.html#method.graceful_shutdown more useful)

Might be worth a comment next to these lines to note that since it's a slightly odd API?

}
};

if let Err(e) = res {
tracing::warn!("HTTP serve connection failed {:?}", e);
}
}

Expand Down
99 changes: 70 additions & 29 deletions tests/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use jsonrpsee::core::params::{ArrayParams, BatchRequestBuilder};
use jsonrpsee::core::server::SubscriptionMessage;
use jsonrpsee::core::{Error, JsonValue};
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::server::ServerBuilder;
use jsonrpsee::server::{ServerBuilder, ServerHandle};
use jsonrpsee::types::error::{ErrorObject, UNKNOWN_ERROR_CODE};
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::{rpc_params, RpcModule};
Expand Down Expand Up @@ -1148,36 +1148,20 @@ async fn subscription_ok_unit_not_sent() {
async fn graceful_shutdown_works() {
init_logger();

let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let call_answered = Arc::new(AtomicBool::new(false));

let (handle, addr) = {
let server = ServerBuilder::default().build("127.0.0.1:0").with_default_timeout().await.unwrap().unwrap();

let mut module = RpcModule::new((tx, call_answered.clone()));

module
.register_async_method("sleep_10s", |_, mut ctx| async move {
let ctx = Arc::make_mut(&mut ctx);
let _ = ctx.0.send(());
tokio::time::sleep(Duration::from_secs(10)).await;
ctx.1.store(true, std::sync::atomic::Ordering::SeqCst);
"ok"
})
.unwrap();
let addr = server.local_addr().unwrap();

(server.start(module).unwrap(), addr)
};

let client = Arc::new(
WsClientBuilder::default().build(format!("ws://{addr}")).with_default_timeout().await.unwrap().unwrap(),
);
run_shutdown_test("http").await;
run_shutdown_test("ws").await;
}

async fn run_shutdown_test_inner<C: ClientT + Send + Sync + 'static>(
client: Arc<C>,
handle: ServerHandle,
call_answered: Arc<AtomicBool>,
mut call_ack: tokio::sync::mpsc::UnboundedReceiver<()>,
) {
let mut calls: FuturesUnordered<_> = (0..10)
.map(|_| {
let c = client.clone();
async move { c.request::<String, _>("sleep_10s", rpc_params!()).await }
async move { c.request::<String, _>("sleep_20s", rpc_params!()).await }
})
.collect();

Expand All @@ -1193,7 +1177,7 @@ async fn graceful_shutdown_works() {

// All calls has been received by server => then stop.
for _ in 0..calls_len {
rx.recv().await.unwrap();
call_ack.recv().await.unwrap();
}

// Assert that no calls have been answered yet
Expand All @@ -1202,8 +1186,65 @@ async fn graceful_shutdown_works() {

// Stop the server.
handle.stop().unwrap();

// This is to ensure that server main has some time to receive the stop signal.
tokio::time::sleep(Duration::from_millis(100)).await;

// Make a call between the server stop has been requested and until all pending calls have been resolved.
let c = client.clone();
let call_after_stop = tokio::spawn(async move { c.request::<String, _>("sleep_20s", rpc_params!()).await });

handle.stopped().await;

assert!(call_after_stop.await.unwrap().is_err());

// The pending calls should be answered before shutdown.
assert_eq!(res.await.unwrap(), calls_len);
assert_eq!(res.await.unwrap(), 10);

// The server should be closed now.
assert!(client.request::<String, _>("sleep_20s", rpc_params!()).await.is_err());
}

/// Run shutdown test and it does:
///
/// - Make 10 calls that sleeps for 20 seconds
/// - Once they have all reached the server but before they have responded, stop the server.
/// - Ensure that no calls handled between the server has been requested to stop and until it has been stopped.
/// - All calls should be responded to before the server shuts down.
/// - No calls should be handled after the server has been stopped.
///
async fn run_shutdown_test(transport: &str) {
let (tx, call_ack) = tokio::sync::mpsc::unbounded_channel();
let call_answered = Arc::new(AtomicBool::new(false));

let (handle, addr) = {
let server = ServerBuilder::default().build("127.0.0.1:0").with_default_timeout().await.unwrap().unwrap();

let mut module = RpcModule::new((tx, call_answered.clone()));

module
.register_async_method("sleep_20s", |_, mut ctx| async move {
let ctx = Arc::make_mut(&mut ctx);
let _ = ctx.0.send(());
tokio::time::sleep(Duration::from_secs(20)).await;
ctx.1.store(true, std::sync::atomic::Ordering::SeqCst);
"ok"
})
.unwrap();
let addr = server.local_addr().unwrap();

(server.start(module).unwrap(), addr)
};

match transport {
"ws" => {
let ws = Arc::new(WsClientBuilder::default().build(&format!("ws://{addr}")).await.unwrap());
run_shutdown_test_inner(ws, handle, call_answered, call_ack).await
}
"http" => {
let http = Arc::new(HttpClientBuilder::default().build(&format!("http://{addr}")).unwrap());
run_shutdown_test_inner(http, handle, call_answered, call_ack).await
}
_ => todo!(),
}
}