diff --git a/Cargo.lock b/Cargo.lock index 5125e46ca89a..3b0242b39c5f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2624,9 +2624,9 @@ dependencies = [ [[package]] name = "clap-num" -version = "1.1.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e063d263364859dc54fb064cedb7c122740cd4733644b14b176c097f51e8ab7" +checksum = "488557e97528174edaa2ee268b23a809e0c598213a4bbcb4f34575a46fda147e" dependencies = [ "num-traits", ] @@ -2844,10 +2844,11 @@ checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" [[package]] name = "colored" -version = "2.1.0" +version = "2.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbf2150cce219b664a8a70df7a1f933836724b503f8a413af9365b4dcc4d90b8" +checksum = "2674ec482fbc38012cf31e6c42ba0177b431a0cb6f15fe40efa5aab1bda516f6" dependencies = [ + "is-terminal", "lazy_static", "windows-sys 0.48.0", ] diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs index 975abbca4b68..86d9a726d7be 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -62,6 +62,9 @@ pub struct ChainHeadConfig { pub subscription_max_pinned_duration: Duration, /// The maximum number of ongoing operations per subscription. pub subscription_max_ongoing_operations: usize, + /// Stop all subscriptions if the distance between the leaves and the current finalized + /// block is larger than this value. + pub max_lagging_distance: usize, /// The maximum number of items reported by the `chainHead_storage` before /// pagination is required. pub operation_max_storage_items: usize, @@ -88,6 +91,10 @@ const MAX_ONGOING_OPERATIONS: usize = 16; /// before paginations is required. const MAX_STORAGE_ITER_ITEMS: usize = 5; +/// Stop all subscriptions if the distance between the leaves and the current finalized +/// block is larger than this value. +const MAX_LAGGING_DISTANCE: usize = 128; + /// The maximum number of `chainHead_follow` subscriptions per connection. const MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION: usize = 4; @@ -97,6 +104,7 @@ impl Default for ChainHeadConfig { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: MAX_PINNED_DURATION, subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS, + max_lagging_distance: MAX_LAGGING_DISTANCE, operation_max_storage_items: MAX_STORAGE_ITER_ITEMS, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, } @@ -116,6 +124,9 @@ pub struct ChainHead, Block: BlockT, Client> { /// The maximum number of items reported by the `chainHead_storage` before /// pagination is required. operation_max_storage_items: usize, + /// Stop all subscriptions if the distance between the leaves and the current finalized + /// block is larger than this value. + max_lagging_distance: usize, /// Phantom member to pin the block type. _phantom: PhantomData, } @@ -140,6 +151,7 @@ impl, Block: BlockT, Client> ChainHead { backend, ), operation_max_storage_items: config.operation_max_storage_items, + max_lagging_distance: config.max_lagging_distance, _phantom: PhantomData, } } @@ -187,6 +199,7 @@ where let subscriptions = self.subscriptions.clone(); let backend = self.backend.clone(); let client = self.client.clone(); + let max_lagging_distance = self.max_lagging_distance; let fut = async move { // Ensure the current connection ID has enough space to accept a new subscription. @@ -207,8 +220,8 @@ where let Some(sub_data) = reserved_subscription.insert_subscription(sub_id.clone(), with_runtime) else { - // Inserting the subscription can only fail if the JsonRPSee - // generated a duplicate subscription ID. + // Inserting the subscription can only fail if the JsonRPSee generated a duplicate + // subscription ID. debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id); let msg = to_sub_message(&sink, &FollowEvent::::Stop); let _ = sink.send(msg).await; @@ -222,9 +235,13 @@ where subscriptions, with_runtime, sub_id.clone(), + max_lagging_distance, ); - - chain_head_follow.generate_events(sink, sub_data).await; + let result = chain_head_follow.generate_events(sink, sub_data).await; + if let Err(SubscriptionManagementError::BlockDistanceTooLarge) = result { + debug!(target: LOG_TARGET, "[follow][id={:?}] All subscriptions are stopped", sub_id); + reserved_subscription.stop_all_subscriptions(); + } debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id); }; diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs index 90cc62a36fa9..0d87a45c07e2 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs @@ -41,12 +41,14 @@ use sp_api::CallApiAt; use sp_blockchain::{ Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info, }; -use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; +use sp_runtime::{ + traits::{Block as BlockT, Header as HeaderT, NumberFor}, + SaturatedConversion, Saturating, +}; use std::{ collections::{HashSet, VecDeque}, sync::Arc, }; - /// The maximum number of finalized blocks provided by the /// `Initialized` event. const MAX_FINALIZED_BLOCKS: usize = 16; @@ -67,6 +69,9 @@ pub struct ChainHeadFollower, Block: BlockT, Client> { sub_id: String, /// The best reported block by this subscription. best_block_cache: Option, + /// Stop all subscriptions if the distance between the leaves and the current finalized + /// block is larger than this value. + max_lagging_distance: usize, } impl, Block: BlockT, Client> ChainHeadFollower { @@ -77,8 +82,17 @@ impl, Block: BlockT, Client> ChainHeadFollower, with_runtime: bool, sub_id: String, + max_lagging_distance: usize, ) -> Self { - Self { client, backend, sub_handle, with_runtime, sub_id, best_block_cache: None } + Self { + client, + backend, + sub_handle, + with_runtime, + sub_id, + best_block_cache: None, + max_lagging_distance, + } } } @@ -186,6 +200,35 @@ where } } + /// Check the distance between the provided blocks does not exceed a + /// a reasonable range. + /// + /// When the blocks are too far apart (potentially millions of blocks): + /// - Tree route is expensive to calculate. + /// - The RPC layer will not be able to generate the `NewBlock` events for all blocks. + /// + /// This edge-case can happen for parachains where the relay chain syncs slower to + /// the head of the chain than the parachain node that is synced already. + fn distace_within_reason( + &self, + block: Block::Hash, + finalized: Block::Hash, + ) -> Result<(), SubscriptionManagementError> { + let Some(block_num) = self.client.number(block)? else { + return Err(SubscriptionManagementError::BlockHashAbsent) + }; + let Some(finalized_num) = self.client.number(finalized)? else { + return Err(SubscriptionManagementError::BlockHashAbsent) + }; + + let distance: usize = block_num.saturating_sub(finalized_num).saturated_into(); + if distance > self.max_lagging_distance { + return Err(SubscriptionManagementError::BlockDistanceTooLarge); + } + + Ok(()) + } + /// Get the in-memory blocks of the client, starting from the provided finalized hash. /// /// The reported blocks are pinned by this function. @@ -198,6 +241,13 @@ where let mut pruned_forks = HashSet::new(); let mut finalized_block_descendants = Vec::new(); let mut unique_descendants = HashSet::new(); + + // Ensure all leaves are within a reasonable distance from the finalized block, + // before traversing the tree. + for leaf in &leaves { + self.distace_within_reason(*leaf, finalized)?; + } + for leaf in leaves { let tree_route = sp_blockchain::tree_route(blockchain, finalized, leaf)?; @@ -542,7 +592,8 @@ where mut to_ignore: HashSet, sink: SubscriptionSink, rx_stop: oneshot::Receiver<()>, - ) where + ) -> Result<(), SubscriptionManagementError> + where EventStream: Stream> + Unpin, { let mut stream_item = stream.next(); @@ -576,7 +627,7 @@ where ); let msg = to_sub_message(&sink, &FollowEvent::::Stop); let _ = sink.send(msg).await; - return + return Err(err) }, }; @@ -591,7 +642,8 @@ where let msg = to_sub_message(&sink, &FollowEvent::::Stop); let _ = sink.send(msg).await; - return + // No need to propagate this error further, the client disconnected. + return Ok(()) } } @@ -605,6 +657,7 @@ where // - the client disconnected. let msg = to_sub_message(&sink, &FollowEvent::::Stop); let _ = sink.send(msg).await; + Ok(()) } /// Generate the block events for the `chainHead_follow` method. @@ -612,7 +665,7 @@ where &mut self, sink: SubscriptionSink, sub_data: InsertedSubscriptionData, - ) { + ) -> Result<(), SubscriptionManagementError> { // Register for the new block and finalized notifications. let stream_import = self .client @@ -640,7 +693,7 @@ where ); let msg = to_sub_message(&sink, &FollowEvent::::Stop); let _ = sink.send(msg).await; - return + return Err(err) }, }; @@ -650,6 +703,6 @@ where let stream = stream::once(futures::future::ready(initial)).chain(merged); self.submit_events(&startup_point, stream.boxed(), pruned_forks, sink, sub_data.rx_stop) - .await; + .await } } diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/error.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/error.rs index 2c22e51ca4dc..91ce26db22a5 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/error.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/error.rs @@ -41,6 +41,9 @@ pub enum SubscriptionManagementError { /// The unpin method was called with duplicate hashes. #[error("Duplicate hashes")] DuplicateHashes, + /// The distance between the leaves and the current finalized block is too large. + #[error("Distance too large")] + BlockDistanceTooLarge, /// Custom error. #[error("Subscription error {0}")] Custom(String), @@ -57,6 +60,7 @@ impl PartialEq for SubscriptionManagementError { (Self::BlockHeaderAbsent, Self::BlockHeaderAbsent) | (Self::SubscriptionAbsent, Self::SubscriptionAbsent) | (Self::DuplicateHashes, Self::DuplicateHashes) => true, + (Self::BlockDistanceTooLarge, Self::BlockDistanceTooLarge) => true, (Self::Custom(lhs), Self::Custom(rhs)) => lhs == rhs, _ => false, } diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index 1ebee3c80fc8..0e5ccb91d39a 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -560,6 +560,7 @@ pub struct SubscriptionsInner> { max_ongoing_operations: usize, /// Map the subscription ID to internal details of the subscription. subs: HashMap>, + /// Backend pinning / unpinning blocks. /// /// The `Arc` is handled one level-above, but substrate exposes the backend as Arc. @@ -623,6 +624,15 @@ impl> SubscriptionsInner { } } + /// All active subscriptions are removed. + pub fn stop_all_subscriptions(&mut self) { + let to_remove: Vec<_> = self.subs.keys().map(|sub_id| sub_id.clone()).collect(); + + for sub_id in to_remove { + self.remove_subscription(&sub_id); + } + } + /// Ensure that a new block could be pinned. /// /// If the global number of blocks has been reached this method @@ -878,6 +888,30 @@ mod tests { (backend, client) } + fn produce_blocks( + mut client: Arc>>, + num_blocks: usize, + ) -> Vec<::Hash> { + let mut blocks = Vec::with_capacity(num_blocks); + let mut parent_hash = client.chain_info().genesis_hash; + + for i in 0..num_blocks { + let block = BlockBuilderBuilder::new(&*client) + .on_parent_block(parent_hash) + .with_parent_block_number(i as u64) + .build() + .unwrap() + .build() + .unwrap() + .block; + parent_hash = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + blocks.push(block.header.hash()); + } + + blocks + } + #[test] fn block_state_machine_register_unpin() { let mut state = BlockStateMachine::new(); @@ -1003,37 +1037,10 @@ mod tests { #[test] fn unpin_duplicate_hashes() { - let (backend, mut client) = init_backend(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(client.chain_info().genesis_hash) - .with_parent_block_number(0) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_1 = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(hash_1) - .with_parent_block_number(1) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_2 = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(hash_2) - .with_parent_block_number(2) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_3 = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let (backend, client) = init_backend(); + + let hashes = produce_blocks(client, 3); + let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]); let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); @@ -1102,18 +1109,10 @@ mod tests { #[test] fn subscription_check_block() { - let (backend, mut client) = init_backend(); - - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(client.chain_info().genesis_hash) - .with_parent_block_number(0) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash = block.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let (backend, client) = init_backend(); + + let hashes = produce_blocks(client, 1); + let hash = hashes[0]; let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); @@ -1140,17 +1139,10 @@ mod tests { #[test] fn subscription_ref_count() { - let (backend, mut client) = init_backend(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(client.chain_info().genesis_hash) - .with_parent_block_number(0) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let (backend, client) = init_backend(); + + let hashes = produce_blocks(client, 1); + let hash = hashes[0]; let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); @@ -1190,37 +1182,10 @@ mod tests { #[test] fn subscription_remove_subscription() { - let (backend, mut client) = init_backend(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(client.chain_info().genesis_hash) - .with_parent_block_number(0) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_1 = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(hash_1) - .with_parent_block_number(1) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_2 = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(hash_2) - .with_parent_block_number(2) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_3 = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let (backend, client) = init_backend(); + + let hashes = produce_blocks(client, 3); + let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]); let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); @@ -1256,37 +1221,10 @@ mod tests { #[test] fn subscription_check_limits() { - let (backend, mut client) = init_backend(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(client.chain_info().genesis_hash) - .with_parent_block_number(0) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_1 = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(hash_1) - .with_parent_block_number(1) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_2 = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(hash_2) - .with_parent_block_number(2) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_3 = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let (backend, client) = init_backend(); + + let hashes = produce_blocks(client, 3); + let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]); // Maximum number of pinned blocks is 2. let mut subs = @@ -1328,37 +1266,10 @@ mod tests { #[test] fn subscription_check_limits_with_duration() { - let (backend, mut client) = init_backend(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(client.chain_info().genesis_hash) - .with_parent_block_number(0) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_1 = block.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(hash_1) - .with_parent_block_number(1) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_2 = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let block = BlockBuilderBuilder::new(&*client) - .on_parent_block(hash_2) - .with_parent_block_number(2) - .build() - .unwrap() - .build() - .unwrap() - .block; - let hash_3 = block.header.hash(); - futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let (backend, client) = init_backend(); + + let hashes = produce_blocks(client, 3); + let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]); // Maximum number of pinned blocks is 2 and maximum pin duration is 5 second. let mut subs = @@ -1456,6 +1367,39 @@ mod tests { assert_eq!(permit_three.num_ops, 1); } + #[test] + fn stop_all_subscriptions() { + let (backend, client) = init_backend(); + + let hashes = produce_blocks(client, 3); + let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]); + + let mut subs = + SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); + let id_1 = "abc".to_string(); + let id_2 = "abcd".to_string(); + + // Pin all blocks for the first subscription. + let _stop = subs.insert_subscription(id_1.clone(), true).unwrap(); + assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true); + assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true); + assert_eq!(subs.pin_block(&id_1, hash_3).unwrap(), true); + + // Pin only block 2 for the second subscription. + let _stop = subs.insert_subscription(id_2.clone(), true).unwrap(); + assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true); + + // Check reference count. + assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1); + assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2); + assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1); + assert_eq!(subs.global_blocks.len(), 3); + + // Stop all active subscriptions. + subs.stop_all_subscriptions(); + assert!(subs.global_blocks.is_empty()); + } + #[test] fn reserved_subscription_cleans_resources() { let builder = TestClientBuilder::new(); diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs index 5b016af1aa49..f266c9d8b34f 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs @@ -233,6 +233,15 @@ impl> ReservedSubscription { }, } } + + /// Stop all active subscriptions. + /// + /// For all active subscriptions, the internal data is discarded, blocks are unpinned and the + /// `Stop` event will be generated. + pub fn stop_all_subscriptions(&self) { + let mut inner = self.inner.write(); + inner.stop_all_subscriptions() + } } impl> Drop for ReservedSubscription { diff --git a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs index c3f10a201c58..c2bff7c50d5e 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs @@ -63,6 +63,7 @@ const MAX_PINNED_BLOCKS: usize = 32; const MAX_PINNED_SECS: u64 = 60; const MAX_OPERATIONS: usize = 16; const MAX_PAGINATION_LIMIT: usize = 5; +const MAX_LAGGING_DISTANCE: usize = 128; const MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION: usize = 4; const INVALID_HASH: [u8; 32] = [1; 32]; @@ -88,6 +89,7 @@ pub async fn run_server() -> std::net::SocketAddr { subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, max_follow_subscriptions_per_connection: 1, + max_lagging_distance: MAX_LAGGING_DISTANCE, }, ) .into_rpc(); @@ -148,6 +150,7 @@ async fn setup_api() -> ( subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) @@ -199,6 +202,8 @@ async fn follow_subscription_produces_blocks() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + + max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) @@ -268,6 +273,8 @@ async fn follow_with_runtime() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + + max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) @@ -581,6 +588,8 @@ async fn call_runtime_without_flag() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + + max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) @@ -1240,6 +1249,8 @@ async fn separate_operation_ids_for_subscriptions() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + + max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) @@ -1329,6 +1340,8 @@ async fn follow_generates_initial_blocks() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + + max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) @@ -1485,6 +1498,8 @@ async fn follow_exceeding_pinned_blocks() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + + max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) @@ -1562,6 +1577,8 @@ async fn follow_with_unpin() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + + max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) @@ -1674,6 +1691,8 @@ async fn unpin_duplicate_hashes() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + + max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) @@ -1777,6 +1796,8 @@ async fn follow_with_multiple_unpin_hashes() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + + max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) @@ -1931,6 +1952,8 @@ async fn follow_prune_best_block() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + + max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) @@ -2117,6 +2140,8 @@ async fn follow_forks_pruned_block() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + + max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) @@ -2277,6 +2302,8 @@ async fn follow_report_multiple_pruned_block() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + + max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) @@ -2523,6 +2550,8 @@ async fn pin_block_references() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + + max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) @@ -2661,6 +2690,8 @@ async fn follow_finalized_before_new_block() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + + max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) @@ -2776,6 +2807,8 @@ async fn ensure_operation_limits_works() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: 1, operation_max_storage_items: MAX_PAGINATION_LIMIT, + + max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) @@ -2881,6 +2914,8 @@ async fn check_continue_operation() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: 1, + + max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) @@ -3064,6 +3099,8 @@ async fn stop_storage_operation() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: 1, + + max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) @@ -3351,6 +3388,88 @@ async fn storage_closest_merkle_value() { ); } +#[tokio::test] +async fn chain_head_stop_all_subscriptions() { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + // Configure the chainHead to stop all subscriptions on lagging distance of 5 blocks. + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_lagging_distance: 5, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, + }, + ) + .into_rpc(); + + let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap(); + + // Ensure the imported block is propagated and pinned for this subscription. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::Initialized(_) + ); + + // Import 6 blocks in total to trigger the suspension distance. + let mut parent_hash = client.chain_info().genesis_hash; + for i in 0..6 { + let block = BlockBuilderBuilder::new(&*client) + .on_parent_block(parent_hash) + .with_parent_block_number(i) + .build() + .unwrap() + .build() + .unwrap() + .block; + + let hash = block.hash(); + parent_hash = hash; + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::BestBlockChanged(_) + ); + } + + let mut second_sub = + api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap(); + // Lagging detected, the stop event is delivered immediately. + assert_matches!( + get_next_event::>(&mut second_sub).await, + FollowEvent::Stop + ); + + // Ensure that all subscriptions are stopped. + assert_matches!(get_next_event::>(&mut sub).await, FollowEvent::Stop); + + // Other subscriptions cannot be started until the suspension period is over. + let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap(); + // Should receive the stop event immediately. + assert_matches!(get_next_event::>(&mut sub).await, FollowEvent::Stop); + + // For the next subscription, lagging distance must be smaller. + client.finalize_block(parent_hash, None).unwrap(); + + let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap(); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::Initialized(_) + ); +} + #[tokio::test] async fn chain_head_single_connection_context() { let server_addr = run_server().await; @@ -3500,12 +3619,14 @@ async fn chain_head_limit_reached() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: 1, }, ) .into_rpc(); let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap(); + // Initialized must always be reported first. let _event: FollowEvent = get_next_event(&mut sub).await;