Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for transaction_v1_broadcast #1724

Merged
merged 4 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/src/json_rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,9 @@ define_methods! {
sudo_unstable_p2pDiscover(multiaddr: Cow<'a, str>) -> (),
sudo_unstable_version() -> Cow<'a, str>,

transaction_v1_broadcast(transaction: HexString) -> Cow<'a, str>,
transaction_v1_stop(#[rename = "operationId"] operation_id: Cow<'a, str>) -> (),

transactionWatch_unstable_submitAndWatch(transaction: HexString) -> Cow<'a, str>,
transactionWatch_unstable_unwatch(subscription: Cow<'a, str>) -> (),

Expand Down
7 changes: 7 additions & 0 deletions lib/src/json_rpc/service/client_main_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ impl ClientMainTask {
| methods::MethodCall::chain_subscribeNewHeads { .. }
| methods::MethodCall::state_subscribeRuntimeVersion { .. }
| methods::MethodCall::state_subscribeStorage { .. }
| methods::MethodCall::transaction_v1_broadcast { .. }
| methods::MethodCall::transactionWatch_unstable_submitAndWatch { .. }
| methods::MethodCall::sudo_network_unstable_watch { .. }
| methods::MethodCall::chainHead_unstable_follow { .. } => {
Expand Down Expand Up @@ -539,6 +540,9 @@ impl ClientMainTask {
methods::MethodCall::author_unwatchExtrinsic { subscription, .. }
| methods::MethodCall::state_unsubscribeRuntimeVersion { subscription, .. }
| methods::MethodCall::state_unsubscribeStorage { subscription, .. }
| methods::MethodCall::transaction_v1_stop {
operation_id: subscription,
}
| methods::MethodCall::transactionWatch_unstable_unwatch { subscription, .. }
| methods::MethodCall::sudo_network_unstable_unwatch { subscription, .. }
| methods::MethodCall::chainHead_unstable_unfollow {
Expand All @@ -562,6 +566,9 @@ impl ClientMainTask {
methods::MethodCall::state_unsubscribeStorage { .. } => {
methods::Response::state_unsubscribeStorage(true)
}
methods::MethodCall::transaction_v1_stop { .. } => {
methods::Response::transaction_v1_stop(())
}
methods::MethodCall::transactionWatch_unstable_unwatch {
..
} => methods::Response::transactionWatch_unstable_unwatch(()),
Expand Down
172 changes: 146 additions & 26 deletions light-base/src/json_rpc_service/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,13 @@ struct Background<TPlat: PlatformRef> {
/// List of all active `state_subscribeRuntimeVersion` subscriptions, indexed by the
/// subscription ID.
runtime_version_subscriptions: hashbrown::HashSet<String, fnv::FnvBuildHasher>,
/// List of all active `author_submitAndWatchExtrinsic` and
/// List of all active `author_submitAndWatchExtrinsic`, `transaction_v1_broadcast`, and
/// `transactionWatch_unstable_submitAndWatch` subscriptions, indexed by the subscription ID.
/// When it comes to `author_submitAndWatchExtrinsic` and
/// `transactionWatch_unstable_submitAndWatch`, transactions are removed from this list when
/// they are dropped from the transactions service. When it comes
/// to `transaction_v1_broadcast`, transactions are left forever until the API user
/// unsubscribes.
transactions_subscriptions: hashbrown::HashMap<String, TransactionWatch, fnv::FnvBuildHasher>,

/// List of all active `state_subscribeStorage` subscriptions, indexed by the subscription ID.
Expand Down Expand Up @@ -419,8 +424,14 @@ struct TransactionWatch {
enum TransactionWatchTy {
/// `author_submitAndWatchExtrinsic`.
Legacy,
/// `transaction_v1_broadcast`.
NewApi {
/// A copy of the body of the transaction is kept, as it might be necessary to re-insert
/// it in the transactions service later, for example if it reports having crashed.
transaction_bytes: Vec<u8>,
},
/// `transactionWatch_unstable_submitAndWatch`.
NewApi,
NewApiWatch,
}

/// See [`Background::state_get_keys_paged_cache`].
Expand Down Expand Up @@ -771,6 +782,8 @@ pub(super) async fn run<TPlat: PlatformRef>(
| methods::MethodCall::rpc_methods { .. }
| methods::MethodCall::sudo_unstable_p2pDiscover { .. }
| methods::MethodCall::sudo_unstable_version { .. }
| methods::MethodCall::transaction_v1_broadcast { .. }
| methods::MethodCall::transaction_v1_stop { .. }
| methods::MethodCall::transactionWatch_unstable_submitAndWatch { .. }
| methods::MethodCall::transactionWatch_unstable_unwatch { .. }
| methods::MethodCall::sudo_network_unstable_watch { .. }
Expand Down Expand Up @@ -837,7 +850,7 @@ pub(super) async fn run<TPlat: PlatformRef>(

let mut transaction_updates = Box::pin(
me.transactions_service
.submit_and_watch_transaction(transaction.0, 16)
.submit_and_watch_transaction(transaction.0, 16, true)
.await,
);

Expand Down Expand Up @@ -2504,43 +2517,62 @@ pub(super) async fn run<TPlat: PlatformRef>(
.await;
}

methods::MethodCall::transactionWatch_unstable_submitAndWatch {
transaction: methods::HexString(transaction),
} => {
request_parsed @ (methods::MethodCall::transaction_v1_broadcast { .. }
| methods::MethodCall::transactionWatch_unstable_submitAndWatch { .. }) => {
let (transaction, watched) = match request_parsed {
methods::MethodCall::transaction_v1_broadcast {
transaction: methods::HexString(transaction),
} => (transaction, false),
methods::MethodCall::transactionWatch_unstable_submitAndWatch {
transaction: methods::HexString(transaction),
} => (transaction, true),
_ => unreachable!()
};

let subscription_id = {
let mut subscription_id = [0u8; 32];
me.randomness.fill_bytes(&mut subscription_id);
bs58::encode(subscription_id).into_string()
};

let mut transaction_updates = Box::pin(
me.transactions_service
.submit_and_watch_transaction(transaction, 16)
.await,
);

let _prev_value = me.transactions_subscriptions.insert(
subscription_id.clone(),
TransactionWatch {
included_block: None,
num_broadcasted_peers: 0,
ty: TransactionWatchTy::NewApi,
ty: if watched { TransactionWatchTy::NewApiWatch } else {
TransactionWatchTy::NewApi { transaction_bytes: transaction.clone() }
},
},
);
debug_assert!(_prev_value.is_none());

let mut transaction_updates = Box::pin(
me.transactions_service
.submit_and_watch_transaction(transaction, 16, watched)
.await,
);

let _ = me
.responses_tx
.send(
.send(if watched {
methods::Response::transactionWatch_unstable_submitAndWatch(
Cow::Borrowed(&subscription_id),
)
} else {
methods::Response::transaction_v1_broadcast(
Cow::Borrowed(&subscription_id),
)
}
.to_json_response(request_id_json),
)
.await;

// A task is started that will yield when the transactions service
// generates a notification.
// Note that we do that even for `transaction_v1_broadcast`, as it is
// important to pull notifications from the channel in order to not
// clog it.
me.background_tasks.push(Box::pin(async move {
let Some(status) = transaction_updates.as_mut().next().await else {
unreachable!()
Expand All @@ -2553,11 +2585,41 @@ pub(super) async fn run<TPlat: PlatformRef>(
}));
}

methods::MethodCall::transaction_v1_stop { operation_id } => {
let exists = me
.transactions_subscriptions
.get(&*operation_id)
.map_or(false, |sub| {
matches!(sub.ty, TransactionWatchTy::NewApi { .. })
});
if exists {
me.transactions_subscriptions.remove(&*operation_id);
let _ = me
.responses_tx
.send(
methods::Response::transaction_v1_stop(())
.to_json_response(request_id_json),
)
.await;
} else {
let _ = me
.responses_tx
.send(parse::build_error_response(
request_id_json,
json_rpc::parse::ErrorResponse::InvalidParams,
None,
))
.await;
}
}

methods::MethodCall::transactionWatch_unstable_unwatch { subscription } => {
let exists = me
.transactions_subscriptions
.get(&*subscription)
.map_or(false, |sub| matches!(sub.ty, TransactionWatchTy::NewApi));
.map_or(false, |sub| {
matches!(sub.ty, TransactionWatchTy::NewApiWatch)
});
if exists {
me.transactions_subscriptions.remove(&*subscription);
}
Expand Down Expand Up @@ -4785,7 +4847,55 @@ pub(super) async fn run<TPlat: PlatformRef>(
continue;
};

match (drop_reason, transaction_watch.ty) {
match (drop_reason, &transaction_watch.ty) {
(
transactions_service::DropReason::GapInChain
| transactions_service::DropReason::Crashed,
TransactionWatchTy::NewApi { transaction_bytes },
) => {
// In case of `transaction_v1_broadcast`, we re-submit the transaction
// if it was dropped for a temporary reasons.
let mut new_watcher = Box::pin(
me.transactions_service
.submit_and_watch_transaction(transaction_bytes.clone(), 16, false)
.await,
);

let _prev_value = me
.transactions_subscriptions
.insert(subscription_id.clone(), transaction_watch);
debug_assert!(_prev_value.is_none());

// Push a new background task that waits for the next notification.
me.background_tasks.push(Box::pin(async move {
let Some(status) = new_watcher.as_mut().next().await else {
unreachable!()
};
Event::TransactionEvent {
subscription_id,
event: status,
watcher: new_watcher,
}
}));
}

(
transactions_service::DropReason::Finalized { .. }
| transactions_service::DropReason::Invalid(_)
| transactions_service::DropReason::MaxPendingTransactionsReached
| transactions_service::DropReason::ValidateError(_),
TransactionWatchTy::NewApi { .. },
) => {
// In case of `transaction_v1_broadcast`, the transaction is re-inserted
// in the list, but no new notification-generating task is pushed, making
// the transaction effectively dead and waiting for `transaction_v1_stop`
// to be called to remove it.
let _prev_value = me
.transactions_subscriptions
.insert(subscription_id.clone(), transaction_watch);
debug_assert!(_prev_value.is_none());
}

(transactions_service::DropReason::GapInChain, TransactionWatchTy::Legacy)
| (
transactions_service::DropReason::MaxPendingTransactionsReached,
Expand All @@ -4808,7 +4918,10 @@ pub(super) async fn run<TPlat: PlatformRef>(
)
.await;
}
(transactions_service::DropReason::GapInChain, TransactionWatchTy::NewApi) => {
(
transactions_service::DropReason::GapInChain,
TransactionWatchTy::NewApiWatch,
) => {
let _ = me
.responses_tx
.send(
Expand All @@ -4825,7 +4938,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
}
(
transactions_service::DropReason::MaxPendingTransactionsReached,
TransactionWatchTy::NewApi,
TransactionWatchTy::NewApiWatch,
) => {
let _ = me
.responses_tx
Expand All @@ -4843,7 +4956,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
}
(
transactions_service::DropReason::Invalid(error),
TransactionWatchTy::NewApi,
TransactionWatchTy::NewApiWatch,
) => {
let _ = me
.responses_tx
Expand All @@ -4860,7 +4973,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
}
(
transactions_service::DropReason::ValidateError(error),
TransactionWatchTy::NewApi,
TransactionWatchTy::NewApiWatch,
) => {
let _ = me
.responses_tx
Expand All @@ -4875,7 +4988,10 @@ pub(super) async fn run<TPlat: PlatformRef>(
)
.await;
}
(transactions_service::DropReason::Crashed, TransactionWatchTy::NewApi) => {
(
transactions_service::DropReason::Crashed,
TransactionWatchTy::NewApiWatch,
) => {
let _ = me
.responses_tx
.send(
Expand Down Expand Up @@ -4909,7 +5025,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
}
(
transactions_service::DropReason::Finalized { block_hash, index },
TransactionWatchTy::NewApi,
TransactionWatchTy::NewApiWatch,
) => {
let _ = me
.responses_tx
Expand Down Expand Up @@ -4946,6 +5062,10 @@ pub(super) async fn run<TPlat: PlatformRef>(
};

match (event, &transaction_watch.ty) {
(_, TransactionWatchTy::NewApi { .. }) => {
// Events are ignored when it comes to `transaction_v1_broadcast`.
}

(
transactions_service::TransactionStatus::Broadcast(peers),
TransactionWatchTy::Legacy,
Expand All @@ -4965,7 +5085,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
}
(
transactions_service::TransactionStatus::Broadcast(peers),
TransactionWatchTy::NewApi,
TransactionWatchTy::NewApiWatch,
) => {
transaction_watch.num_broadcasted_peers += peers.len();
let _ = me
Expand Down Expand Up @@ -4993,7 +5113,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
}
(
transactions_service::TransactionStatus::Validated,
TransactionWatchTy::NewApi,
TransactionWatchTy::NewApiWatch,
) => {
let _ = me
.responses_tx
Expand Down Expand Up @@ -5052,7 +5172,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
transactions_service::TransactionStatus::IncludedBlockUpdate {
block_hash: Some((block_hash, index)),
},
TransactionWatchTy::NewApi,
TransactionWatchTy::NewApiWatch,
) => {
transaction_watch.included_block = Some(block_hash);
let _ = me
Expand All @@ -5076,7 +5196,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
transactions_service::TransactionStatus::IncludedBlockUpdate {
block_hash: None,
},
TransactionWatchTy::NewApi,
TransactionWatchTy::NewApiWatch,
) => {
let _ = me
.responses_tx
Expand Down
Loading
Loading