Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: bounded channels and backpressure #962

Merged
merged 68 commits into from
Feb 8, 2023

Conversation

niklasad1
Copy link
Member

@niklasad1 niklasad1 commented Jan 4, 2023

This PR adds a bounded channel for each connection along with backpressure i.e if there is no spot in the bounded channel buffer the server will wait until there is and then poll the underlying socket. TCP protocol will take of the slowing down the connection if the client can't keep up to speed by decreasing the TCP window size...

Notable changes:

  • Split SubscriptionSink to PendingSubscriptionSink and SubscriptionSink again. Mainly because accept and reject needs to be async methods now because we can't ensure that these messages will go through (when the channel capacity is exceeded) when doing so non_blocking
  • Move to tokio::sync::mpsc because it has this nice API called reserve which wait until a spot in the channel is reserved.
  • Add tokio wrappers called send - async, try_send - non_blocking
  • Remove pipe_from_stream
  • Remove resource limiting that's somewhat redundant and tower supports some simple rate limiting (applies only to the HTTP part)
  • Make subscription callback async which in turn is spawned by jsonrpsee under the hood.

TODO:

  • Update examples especially how to deal subscriptions when the buffer is full
  • Add tests for this PR, I give up on this I haven't found a reliable way of testing that. I think we could run some benches to assert that :P

Exampe subscription API:

#[tokio::main]
async fn main() -> anyhow::Result<()> {
	tracing_subscriber::FmtSubscriber::builder()
		.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
		.try_init()
		.expect("setting default subscriber failed");

	let h = run_server().await?;

	h.stopped().await;

	Ok(())
}

async fn run_server() -> anyhow::Result<ServerHandle> {
	const LETTERS: &str = "abcdefghijklmnopqrstuvxyz";
	let server = ServerBuilder::default().set_message_buffer_capacity(10).build("127.0.0.1:9944").await?;
	let mut module = RpcModule::new(());
	module
		.register_subscription("sub_one_param", "sub_one_param", "unsub_one_param", |_, pending, _| async move {
			let mut sink = pending.accept().await?;
			let interval = interval(Duration::from_millis(200));
			let stream = IntervalStream::new(interval).map(move |_| "A".repeat(1024 * 1024));

			loop {
				tokio::select! {
					biased;
					_ = sink.closed() => {
						// Client closed connection.
						break;
					},
					maybe_item = stream.next() => {
						let item = match maybe_item {
							Some(item) => item,
							None => break,
						};

						let msg = SubscriptionMessage::from_json(&item)?;
						sink.send_timeout(msg, std::time::Duration::from_millis(100)).await;
					},
				}
			}
			Ok(())
		})
		.unwrap();

	let handle = server.start(module)?;

	Ok(handle)
}

@niklasad1
Copy link
Member Author

I switched to tokio::sync::mpsc because futures::channel::mpsc capacity is quite hard to reason about:

The channel’s capacity is equal to buffer + num-senders

Thus, if one configures the buffer to have a buffer of 10 messages in practice it becomes "the number of pending calls" + 10.

@niklasad1 niklasad1 changed the title WIP: server bounded channels WIP: server bounded channels and backpressure Jan 14, 2023
benches/helpers.rs Outdated Show resolved Hide resolved
server/src/server.rs Outdated Show resolved Hide resolved
tests/tests/rpc_module.rs Outdated Show resolved Hide resolved

#[derive(Debug, Clone)]
/// Represents whether a subscription was answered or not.
pub enum SubscriptionAnswered {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need MethodResponse for the subscriptions but I kept it for now to all calls to return MethodResponse.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, it's required by the Logger so no way around it right now.

Comment on lines +859 to +861
if let Err(SubscriptionCallbackError::Some(msg)) = sub_fut.await {
tracing::warn!("Subscribe call `{subscribe_method_name}` failed: {msg}");
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this makes sense to me; I kept mulling over what should happen if a user returns Err(e) in a subscription thing now, and the options are to either return it to the user as a subscription error, or not.

The nice thing about not returning the error to the user is that if you want to return some error to the user you have to do it super explicitly, otherwise no error is returned. (The downside is that if they return an error from the callback, they might expect it to be sent to the user but it won't be?).

On the other hand, if you did send any error to the user here, you could remove APIs like PendingSubscriptionSink::reject() and SubscriptionSink::close(error), and instead, returning from the subscription async callback with Ok would mean it closed successfully and no need to send an error, and returning with Err(e) means it closes and to send an error back in whatever form makes sense.

Just thinking out loud though; I guess I don't like that the error is silently disappeared from the client perspective at present, but I do like that the current approach is very explicit about things, so I'm happy either way and this doesn't block merging for me!

Copy link
Member Author

@niklasad1 niklasad1 Feb 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do send an error to the "client" if subscription call fails i.e, the user wouldn't call accept/reject.

when we get back an error here it's just some special logic from the user code that failed such as SubscriptionMessage or Sink::send_timeout or whatever.

what I am trying to say if a subscription just executes "correctly" or is "dropped" after the Subscription::accept/reject because of some error in the async closure then the client wouldn't be notified unless one calls SubscriptionSink::close

For a long time ago I called SubscriptionSink::close under the hood here but it may break other libraries such as polkadot-js IIRC.

@@ -1110,29 +1111,29 @@ impl SubscriptionSink {
/// }
/// ```
///
pub fn close(self, err: impl Into<ErrorObjectOwned>) -> bool {
pub fn close(self, err: impl Into<ErrorObjectOwned>) -> impl Future<Output = ()> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the subscription will gracefully close if you don't call this message too, and this is really just a way to close the subscription with some error, I wonder whether it should be called eg close_with_error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it's a way to indicate to the connected client that subscription is closed with a special notification.

if one doesn't call this at least our client will wait until the connection is closed unless the subscription itself has customized logic when a subscription is regarded as "closed"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thus, this may be an error or success.

but maybe we should have close_with_success and close_with_error instead as you suggested.

Copy link
Collaborator

@jsdw jsdw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome job! A couple of nits and pondering the returning result from subscrioption API but looks great!

core/src/error.rs Outdated Show resolved Hide resolved
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants