Skip to content

Commit

Permalink
Add support for transaction_v1_broadcast (#1724)
Browse files Browse the repository at this point in the history
* Add support for `transaction_v1_broadcast`

* PR link

* Tweaks

* Change error type for stop
  • Loading branch information
tomaka authored Mar 15, 2024
1 parent f06c591 commit 8517939
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 37 deletions.
3 changes: 3 additions & 0 deletions lib/src/json_rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,9 @@ define_methods! {
sudo_unstable_p2pDiscover(multiaddr: Cow<'a, str>) -> (),
sudo_unstable_version() -> Cow<'a, str>,

transaction_v1_broadcast(transaction: HexString) -> Cow<'a, str>,
transaction_v1_stop(#[rename = "operationId"] operation_id: Cow<'a, str>) -> (),

transactionWatch_unstable_submitAndWatch(transaction: HexString) -> Cow<'a, str>,
transactionWatch_unstable_unwatch(subscription: Cow<'a, str>) -> (),

Expand Down
7 changes: 7 additions & 0 deletions lib/src/json_rpc/service/client_main_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ impl ClientMainTask {
| methods::MethodCall::chain_subscribeNewHeads { .. }
| methods::MethodCall::state_subscribeRuntimeVersion { .. }
| methods::MethodCall::state_subscribeStorage { .. }
| methods::MethodCall::transaction_v1_broadcast { .. }
| methods::MethodCall::transactionWatch_unstable_submitAndWatch { .. }
| methods::MethodCall::sudo_network_unstable_watch { .. }
| methods::MethodCall::chainHead_unstable_follow { .. } => {
Expand Down Expand Up @@ -539,6 +540,9 @@ impl ClientMainTask {
methods::MethodCall::author_unwatchExtrinsic { subscription, .. }
| methods::MethodCall::state_unsubscribeRuntimeVersion { subscription, .. }
| methods::MethodCall::state_unsubscribeStorage { subscription, .. }
| methods::MethodCall::transaction_v1_stop {
operation_id: subscription,
}
| methods::MethodCall::transactionWatch_unstable_unwatch { subscription, .. }
| methods::MethodCall::sudo_network_unstable_unwatch { subscription, .. }
| methods::MethodCall::chainHead_unstable_unfollow {
Expand All @@ -562,6 +566,9 @@ impl ClientMainTask {
methods::MethodCall::state_unsubscribeStorage { .. } => {
methods::Response::state_unsubscribeStorage(true)
}
methods::MethodCall::transaction_v1_stop { .. } => {
methods::Response::transaction_v1_stop(())
}
methods::MethodCall::transactionWatch_unstable_unwatch {
..
} => methods::Response::transactionWatch_unstable_unwatch(()),
Expand Down
172 changes: 146 additions & 26 deletions light-base/src/json_rpc_service/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,13 @@ struct Background<TPlat: PlatformRef> {
/// List of all active `state_subscribeRuntimeVersion` subscriptions, indexed by the
/// subscription ID.
runtime_version_subscriptions: hashbrown::HashSet<String, fnv::FnvBuildHasher>,
/// List of all active `author_submitAndWatchExtrinsic` and
/// List of all active `author_submitAndWatchExtrinsic`, `transaction_v1_broadcast`, and
/// `transactionWatch_unstable_submitAndWatch` subscriptions, indexed by the subscription ID.
/// When it comes to `author_submitAndWatchExtrinsic` and
/// `transactionWatch_unstable_submitAndWatch`, transactions are removed from this list when
/// they are dropped from the transactions service. When it comes
/// to `transaction_v1_broadcast`, transactions are left forever until the API user
/// unsubscribes.
transactions_subscriptions: hashbrown::HashMap<String, TransactionWatch, fnv::FnvBuildHasher>,

/// List of all active `state_subscribeStorage` subscriptions, indexed by the subscription ID.
Expand Down Expand Up @@ -419,8 +424,14 @@ struct TransactionWatch {
enum TransactionWatchTy {
/// `author_submitAndWatchExtrinsic`.
Legacy,
/// `transaction_v1_broadcast`.
NewApi {
/// A copy of the body of the transaction is kept, as it might be necessary to re-insert
/// it in the transactions service later, for example if it reports having crashed.
transaction_bytes: Vec<u8>,
},
/// `transactionWatch_unstable_submitAndWatch`.
NewApi,
NewApiWatch,
}

/// See [`Background::state_get_keys_paged_cache`].
Expand Down Expand Up @@ -771,6 +782,8 @@ pub(super) async fn run<TPlat: PlatformRef>(
| methods::MethodCall::rpc_methods { .. }
| methods::MethodCall::sudo_unstable_p2pDiscover { .. }
| methods::MethodCall::sudo_unstable_version { .. }
| methods::MethodCall::transaction_v1_broadcast { .. }
| methods::MethodCall::transaction_v1_stop { .. }
| methods::MethodCall::transactionWatch_unstable_submitAndWatch { .. }
| methods::MethodCall::transactionWatch_unstable_unwatch { .. }
| methods::MethodCall::sudo_network_unstable_watch { .. }
Expand Down Expand Up @@ -837,7 +850,7 @@ pub(super) async fn run<TPlat: PlatformRef>(

let mut transaction_updates = Box::pin(
me.transactions_service
.submit_and_watch_transaction(transaction.0, 16)
.submit_and_watch_transaction(transaction.0, 16, true)
.await,
);

Expand Down Expand Up @@ -2504,43 +2517,62 @@ pub(super) async fn run<TPlat: PlatformRef>(
.await;
}

methods::MethodCall::transactionWatch_unstable_submitAndWatch {
transaction: methods::HexString(transaction),
} => {
request_parsed @ (methods::MethodCall::transaction_v1_broadcast { .. }
| methods::MethodCall::transactionWatch_unstable_submitAndWatch { .. }) => {
let (transaction, watched) = match request_parsed {
methods::MethodCall::transaction_v1_broadcast {
transaction: methods::HexString(transaction),
} => (transaction, false),
methods::MethodCall::transactionWatch_unstable_submitAndWatch {
transaction: methods::HexString(transaction),
} => (transaction, true),
_ => unreachable!()
};

let subscription_id = {
let mut subscription_id = [0u8; 32];
me.randomness.fill_bytes(&mut subscription_id);
bs58::encode(subscription_id).into_string()
};

let mut transaction_updates = Box::pin(
me.transactions_service
.submit_and_watch_transaction(transaction, 16)
.await,
);

let _prev_value = me.transactions_subscriptions.insert(
subscription_id.clone(),
TransactionWatch {
included_block: None,
num_broadcasted_peers: 0,
ty: TransactionWatchTy::NewApi,
ty: if watched { TransactionWatchTy::NewApiWatch } else {
TransactionWatchTy::NewApi { transaction_bytes: transaction.clone() }
},
},
);
debug_assert!(_prev_value.is_none());

let mut transaction_updates = Box::pin(
me.transactions_service
.submit_and_watch_transaction(transaction, 16, watched)
.await,
);

let _ = me
.responses_tx
.send(
.send(if watched {
methods::Response::transactionWatch_unstable_submitAndWatch(
Cow::Borrowed(&subscription_id),
)
} else {
methods::Response::transaction_v1_broadcast(
Cow::Borrowed(&subscription_id),
)
}
.to_json_response(request_id_json),
)
.await;

// A task is started that will yield when the transactions service
// generates a notification.
// Note that we do that even for `transaction_v1_broadcast`, as it is
// important to pull notifications from the channel in order to not
// clog it.
me.background_tasks.push(Box::pin(async move {
let Some(status) = transaction_updates.as_mut().next().await else {
unreachable!()
Expand All @@ -2553,11 +2585,41 @@ pub(super) async fn run<TPlat: PlatformRef>(
}));
}

methods::MethodCall::transaction_v1_stop { operation_id } => {
let exists = me
.transactions_subscriptions
.get(&*operation_id)
.map_or(false, |sub| {
matches!(sub.ty, TransactionWatchTy::NewApi { .. })
});
if exists {
me.transactions_subscriptions.remove(&*operation_id);
let _ = me
.responses_tx
.send(
methods::Response::transaction_v1_stop(())
.to_json_response(request_id_json),
)
.await;
} else {
let _ = me
.responses_tx
.send(parse::build_error_response(
request_id_json,
json_rpc::parse::ErrorResponse::InvalidParams,
None,
))
.await;
}
}

methods::MethodCall::transactionWatch_unstable_unwatch { subscription } => {
let exists = me
.transactions_subscriptions
.get(&*subscription)
.map_or(false, |sub| matches!(sub.ty, TransactionWatchTy::NewApi));
.map_or(false, |sub| {
matches!(sub.ty, TransactionWatchTy::NewApiWatch)
});
if exists {
me.transactions_subscriptions.remove(&*subscription);
}
Expand Down Expand Up @@ -4785,7 +4847,55 @@ pub(super) async fn run<TPlat: PlatformRef>(
continue;
};

match (drop_reason, transaction_watch.ty) {
match (drop_reason, &transaction_watch.ty) {
(
transactions_service::DropReason::GapInChain
| transactions_service::DropReason::Crashed,
TransactionWatchTy::NewApi { transaction_bytes },
) => {
// In case of `transaction_v1_broadcast`, we re-submit the transaction
// if it was dropped for a temporary reasons.
let mut new_watcher = Box::pin(
me.transactions_service
.submit_and_watch_transaction(transaction_bytes.clone(), 16, false)
.await,
);

let _prev_value = me
.transactions_subscriptions
.insert(subscription_id.clone(), transaction_watch);
debug_assert!(_prev_value.is_none());

// Push a new background task that waits for the next notification.
me.background_tasks.push(Box::pin(async move {
let Some(status) = new_watcher.as_mut().next().await else {
unreachable!()
};
Event::TransactionEvent {
subscription_id,
event: status,
watcher: new_watcher,
}
}));
}

(
transactions_service::DropReason::Finalized { .. }
| transactions_service::DropReason::Invalid(_)
| transactions_service::DropReason::MaxPendingTransactionsReached
| transactions_service::DropReason::ValidateError(_),
TransactionWatchTy::NewApi { .. },
) => {
// In case of `transaction_v1_broadcast`, the transaction is re-inserted
// in the list, but no new notification-generating task is pushed, making
// the transaction effectively dead and waiting for `transaction_v1_stop`
// to be called to remove it.
let _prev_value = me
.transactions_subscriptions
.insert(subscription_id.clone(), transaction_watch);
debug_assert!(_prev_value.is_none());
}

(transactions_service::DropReason::GapInChain, TransactionWatchTy::Legacy)
| (
transactions_service::DropReason::MaxPendingTransactionsReached,
Expand All @@ -4808,7 +4918,10 @@ pub(super) async fn run<TPlat: PlatformRef>(
)
.await;
}
(transactions_service::DropReason::GapInChain, TransactionWatchTy::NewApi) => {
(
transactions_service::DropReason::GapInChain,
TransactionWatchTy::NewApiWatch,
) => {
let _ = me
.responses_tx
.send(
Expand All @@ -4825,7 +4938,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
}
(
transactions_service::DropReason::MaxPendingTransactionsReached,
TransactionWatchTy::NewApi,
TransactionWatchTy::NewApiWatch,
) => {
let _ = me
.responses_tx
Expand All @@ -4843,7 +4956,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
}
(
transactions_service::DropReason::Invalid(error),
TransactionWatchTy::NewApi,
TransactionWatchTy::NewApiWatch,
) => {
let _ = me
.responses_tx
Expand All @@ -4860,7 +4973,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
}
(
transactions_service::DropReason::ValidateError(error),
TransactionWatchTy::NewApi,
TransactionWatchTy::NewApiWatch,
) => {
let _ = me
.responses_tx
Expand All @@ -4875,7 +4988,10 @@ pub(super) async fn run<TPlat: PlatformRef>(
)
.await;
}
(transactions_service::DropReason::Crashed, TransactionWatchTy::NewApi) => {
(
transactions_service::DropReason::Crashed,
TransactionWatchTy::NewApiWatch,
) => {
let _ = me
.responses_tx
.send(
Expand Down Expand Up @@ -4909,7 +5025,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
}
(
transactions_service::DropReason::Finalized { block_hash, index },
TransactionWatchTy::NewApi,
TransactionWatchTy::NewApiWatch,
) => {
let _ = me
.responses_tx
Expand Down Expand Up @@ -4946,6 +5062,10 @@ pub(super) async fn run<TPlat: PlatformRef>(
};

match (event, &transaction_watch.ty) {
(_, TransactionWatchTy::NewApi { .. }) => {
// Events are ignored when it comes to `transaction_v1_broadcast`.
}

(
transactions_service::TransactionStatus::Broadcast(peers),
TransactionWatchTy::Legacy,
Expand All @@ -4965,7 +5085,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
}
(
transactions_service::TransactionStatus::Broadcast(peers),
TransactionWatchTy::NewApi,
TransactionWatchTy::NewApiWatch,
) => {
transaction_watch.num_broadcasted_peers += peers.len();
let _ = me
Expand Down Expand Up @@ -4993,7 +5113,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
}
(
transactions_service::TransactionStatus::Validated,
TransactionWatchTy::NewApi,
TransactionWatchTy::NewApiWatch,
) => {
let _ = me
.responses_tx
Expand Down Expand Up @@ -5052,7 +5172,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
transactions_service::TransactionStatus::IncludedBlockUpdate {
block_hash: Some((block_hash, index)),
},
TransactionWatchTy::NewApi,
TransactionWatchTy::NewApiWatch,
) => {
transaction_watch.included_block = Some(block_hash);
let _ = me
Expand All @@ -5076,7 +5196,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
transactions_service::TransactionStatus::IncludedBlockUpdate {
block_hash: None,
},
TransactionWatchTy::NewApi,
TransactionWatchTy::NewApiWatch,
) => {
let _ = me
.responses_tx
Expand Down
Loading

0 comments on commit 8517939

Please sign in to comment.