Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: async API when Response has been processed. #1281

Merged
merged 29 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4773227
feat: add `RpcModule::register_raw_method`
niklasad1 Jan 30, 2024
ee9b8ea
add proc macro support
niklasad1 Jan 30, 2024
b595e5f
rename API
niklasad1 Jan 30, 2024
304c938
simplify API with MethodResponse::notify_when_sent
niklasad1 Jan 31, 2024
973e62d
improve notify API
niklasad1 Jan 31, 2024
90da71b
fix nits
niklasad1 Jan 31, 2024
53520d8
introduce ResponsePayloadV2
niklasad1 Jan 31, 2024
f71d046
impl ResponsePayloadV2 for T
niklasad1 Feb 1, 2024
dc88a6d
cleanup
niklasad1 Feb 1, 2024
51ce17e
client: proc macro support for custom ret_ty
niklasad1 Feb 1, 2024
ffea2ca
add tests
niklasad1 Feb 2, 2024
f549bf4
address grumbles
niklasad1 Feb 2, 2024
2f1fdd8
remove unused code
niklasad1 Feb 2, 2024
0c98f5c
fix tests
niklasad1 Feb 3, 2024
db7e08c
proc: revert unrelated changes
niklasad1 Feb 5, 2024
925108a
remove panics; move should be enough
niklasad1 Feb 5, 2024
d256661
bring back UI tests
niklasad1 Feb 5, 2024
0eba672
grumbles: remove NotifiedError
niklasad1 Feb 5, 2024
8391579
break stuff for uniform API
niklasad1 Feb 5, 2024
f4b5a92
make more stuff private
niklasad1 Feb 5, 2024
e835a72
remove ResponseErrorUnit type alias
niklasad1 Feb 5, 2024
7f93b20
fix ui tests
niklasad1 Feb 5, 2024
7006a00
Update proc-macros/src/render_server.rs
niklasad1 Feb 5, 2024
487df31
Rename ws_notify_on_method_answered.rs to response_payload_notify_on_…
niklasad1 Feb 5, 2024
f9d7fc3
Merge remote-tracking branch 'origin/master' into low-level-api-v1
niklasad1 Feb 6, 2024
da41c62
remove unit_error APIs
niklasad1 Feb 6, 2024
a83c766
Merge remote-tracking branch 'origin/low-level-api-v1' into low-level…
niklasad1 Feb 6, 2024
802f77f
replace notify_on_x with notify_on_completion
niklasad1 Feb 6, 2024
2836906
Update server/src/transport/ws.rs
niklasad1 Feb 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 57 additions & 50 deletions core/src/client/async_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ mod utils;
use crate::client::async_client::helpers::{process_subscription_close_response, InnerBatchResponse};
use crate::client::async_client::utils::MaybePendingFutures;
use crate::client::{
BatchMessage, BatchResponse, ClientT, ReceivedMessage, RegisterNotificationMessage, RequestMessage,
Subscription, SubscriptionClientT, SubscriptionKind, SubscriptionMessage, TransportReceiverT, TransportSenderT, Error
BatchMessage, BatchResponse, ClientT, Error, ReceivedMessage, RegisterNotificationMessage, RequestMessage,
Subscription, SubscriptionClientT, SubscriptionKind, SubscriptionMessage, TransportReceiverT, TransportSenderT,
};
use crate::error::RegisterMethodError;
use crate::params::{BatchRequestBuilder, EmptyBatchRequest};
Expand Down Expand Up @@ -64,7 +64,7 @@ use serde::de::DeserializeOwned;
use tokio::sync::{mpsc, oneshot};
use tracing::instrument;

use self::utils::{IntervalStream, InactivityCheck};
use self::utils::{InactivityCheck, IntervalStream};

use super::{generate_batch_id_range, FrontToBack, IdKind, RequestIdManager};

Expand Down Expand Up @@ -94,11 +94,7 @@ pub struct PingConfig {

impl Default for PingConfig {
fn default() -> Self {
Self {
ping_interval: Duration::from_secs(30),
max_failures: 1,
inactive_limit: Duration::from_secs(40),
}
Self { ping_interval: Duration::from_secs(30), max_failures: 1, inactive_limit: Duration::from_secs(40) }
}
}

Expand Down Expand Up @@ -126,9 +122,9 @@ impl PingConfig {

/// Configure how many times the connection is allowed be
/// inactive until the connection is closed.
///
///
/// # Panics
///
///
/// This method panics if `max` == 0.
pub fn max_failures(mut self, max: usize) -> Self {
assert!(max > 0);
Expand All @@ -137,7 +133,6 @@ impl PingConfig {
}
}


#[derive(Debug, Default, Clone)]
pub(crate) struct ThreadSafeRequestManager(Arc<std::sync::Mutex<RequestManager>>);

Expand Down Expand Up @@ -179,7 +174,9 @@ impl ErrorFromBack {
// This should never happen because the receiving end is still alive.
// Before shutting down the background task a error message should
// be emitted.
Err(_) => Error::Custom("Error reason could not be found. This is a bug. Please open an issue.".to_string()),
Err(_) => Error::Custom(
"Error reason could not be found. This is a bug. Please open an issue.".to_string(),
),
});
*write = Some(ReadErrorOnce::Read(arc_err.clone()));
arc_err
Expand Down Expand Up @@ -281,7 +278,7 @@ impl ClientBuilder {
}

/// Enable WebSocket ping/pong on the client.
///
///
/// This only works if the transport supports WebSocket pings.
///
/// Default: pings are disabled.
Expand Down Expand Up @@ -332,11 +329,16 @@ impl ClientBuilder {
Some(p) => {
// NOTE: This emits a tick immediately to sync how the `inactive_interval` works
// because it starts measuring when the client start-ups.
let ping_interval = IntervalStream::new(tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(p.ping_interval)));
let ping_interval = IntervalStream::new(tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval(p.ping_interval),
));

let inactive_interval = {
let inactive_interval = {
let start = tokio::time::Instant::now() + p.inactive_limit;
IntervalStream::new(tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at(start, p.inactive_limit)))
IntervalStream::new(tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at(
start,
p.inactive_limit,
)))
};

let inactivity_check = InactivityCheck::new(p.inactive_limit, p.max_failures);
Expand Down Expand Up @@ -386,8 +388,8 @@ impl ClientBuilder {
{
use futures_util::stream::Pending;

type PendingIntervalStream = IntervalStream<Pending<()>>;
type PendingIntervalStream = IntervalStream<Pending<()>>;

let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests);
let (err_to_front, err_from_back) = oneshot::channel::<Error>();
let max_buffer_capacity_per_subscription = self.max_buffer_capacity_per_subscription;
Expand Down Expand Up @@ -466,12 +468,12 @@ impl Client {

/// This is similar to [`Client::on_disconnect`] but it can be used to get
/// the reason why the client was disconnected but it's not cancel-safe.
///
///
/// The typical use-case is that this method will be called after
/// [`Client::on_disconnect`] has returned in a "select loop".
///
///
/// # Cancel-safety
///
///
/// This method is not cancel-safe
pub async fn disconnect_reason(&self) -> Error {
self.error.read_error().await
Expand Down Expand Up @@ -554,7 +556,7 @@ impl ClientT for Client {
Err(_) => return Err(self.disconnect_reason().await),
};

rx_log_from_json(&Response::new(ResponsePayload::result_borrowed(&json_value), id), self.max_log_length);
rx_log_from_json(&Response::new(ResponsePayload::success_borrowed(&json_value), id), self.max_log_length);

serde_json::from_value(json_value).map_err(Error::ParseError)
}
Expand Down Expand Up @@ -643,9 +645,7 @@ impl SubscriptionClientT for Client {
Notif: DeserializeOwned,
{
if subscribe_method == unsubscribe_method {
return Err(RegisterMethodError::SubscriptionNameConflict(
unsubscribe_method.to_owned(),
).into());
return Err(RegisterMethodError::SubscriptionNameConflict(unsubscribe_method.to_owned()).into());
}

let guard = self.id_manager.next_request_two_ids()?;
Expand Down Expand Up @@ -680,7 +680,7 @@ impl SubscriptionClientT for Client {
Err(_) => return Err(self.disconnect_reason().await),
};

rx_log_from_json(&Response::new(ResponsePayload::result_borrowed(&sub_id), id_unsub), self.max_log_length);
rx_log_from_json(&Response::new(ResponsePayload::success_borrowed(&sub_id), id_unsub), self.max_log_length);

Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Subscription(sub_id)))
}
Expand Down Expand Up @@ -901,8 +901,7 @@ async fn handle_frontend_messages<S: TransportSenderT>(
if manager.lock().insert_notification_handler(&reg.method, subscribe_tx).is_ok() {
let _ = reg.send_back.send(Ok((subscribe_rx, reg.method)));
} else {
let _ =
reg.send_back.send(Err(RegisterMethodError::AlreadyRegistered(reg.method).into()));
let _ = reg.send_back.send(Err(RegisterMethodError::AlreadyRegistered(reg.method).into()));
}
}
// User dropped the NotificationHandler for this method
Expand Down Expand Up @@ -950,30 +949,30 @@ where

// This is safe because `tokio::time::Interval`, `tokio::mpsc::Sender` and `tokio::mpsc::Receiver`
// are cancel-safe.
let res = loop {
tokio::select! {
biased;
_ = close_tx.closed() => break Ok(()),
maybe_msg = from_frontend.recv() => {
let Some(msg) = maybe_msg else {
break Ok(());
};

if let Err(e) =
handle_frontend_messages(msg, &manager, &mut sender, max_buffer_capacity_per_subscription).await
{
tracing::error!(target: LOG_TARGET, "ws send failed: {e}");
break Err(Error::Transport(e.into()));
}
let res = loop {
tokio::select! {
biased;
_ = close_tx.closed() => break Ok(()),
maybe_msg = from_frontend.recv() => {
let Some(msg) = maybe_msg else {
break Ok(());
};

if let Err(e) =
handle_frontend_messages(msg, &manager, &mut sender, max_buffer_capacity_per_subscription).await
{
tracing::error!(target: LOG_TARGET, "ws send failed: {e}");
break Err(Error::Transport(e.into()));
}
_ = ping_interval.next() => {
if let Err(err) = sender.send_ping().await {
tracing::error!(target: LOG_TARGET, "Send ws ping failed: {err}");
break Err(Error::Transport(err.into()));
}
}
_ = ping_interval.next() => {
if let Err(err) = sender.send_ping().await {
tracing::error!(target: LOG_TARGET, "Send ws ping failed: {err}");
break Err(Error::Transport(err.into()));
}
}
};
}
};

from_frontend.close();
let _ = sender.close().await;
Expand All @@ -995,7 +994,15 @@ where
R: TransportReceiverT,
S: Stream + Unpin,
{
let ReadTaskParams { receiver, close_tx, to_send_task, manager, max_buffer_capacity_per_subscription, mut inactivity_check, mut inactivity_stream } = params;
let ReadTaskParams {
receiver,
close_tx,
to_send_task,
manager,
max_buffer_capacity_per_subscription,
mut inactivity_check,
mut inactivity_stream,
} = params;

let backend_event = futures_util::stream::unfold(receiver, |mut receiver| async {
let res = receiver.receive().await;
Expand Down
Loading
Loading