diff --git a/prdoc/pr_6262.prdoc b/prdoc/pr_6262.prdoc new file mode 100644 index 000000000000..8ad99bc6ad28 --- /dev/null +++ b/prdoc/pr_6262.prdoc @@ -0,0 +1,10 @@ +title: "Size limits implemented for fork aware transaction pool" + +doc: + - audience: Node Dev + description: | + Size limits are now obeyed in fork aware transaction pool + +crates: + - name: sc-transaction-pool + bump: minor diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs index 2dd5836c570f..ecae21395c91 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs @@ -68,15 +68,7 @@ where AddView(BlockHash, ViewStream), /// Removes an existing view's stream associated with a specific block hash. RemoveView(BlockHash), - /// Adds initial views for given extrinsics hashes. - /// - /// This message should be sent when the external submission of a transaction occures. It - /// provides the list of initial views for given extrinsics hashes. - /// The dropped notification is not sent if it comes from the initial views. It allows to keep - /// transaction in the mempool, even if all the views are full at the time of submitting - /// transaction to the pool. - AddInitialViews(Vec>, BlockHash), - /// Removes all initial views for given extrinsic hashes. + /// Removes internal states for given extrinsic hashes. /// /// Intended to ba called on finalization. RemoveFinalizedTxs(Vec>), @@ -90,7 +82,6 @@ where match self { Command::AddView(..) => write!(f, "AddView"), Command::RemoveView(..) => write!(f, "RemoveView"), - Command::AddInitialViews(..) => write!(f, "AddInitialViews"), Command::RemoveFinalizedTxs(..) => write!(f, "RemoveFinalizedTxs"), } } @@ -118,13 +109,6 @@ where /// /// Once transaction is dropped, dropping view is removed from the set. transaction_states: HashMap, HashSet>>, - - /// The list of initial view for every extrinsic. - /// - /// Dropped notifications from initial views will be silenced. This allows to accept the - /// transaction into the mempool, even if all the views are full at the time of submitting new - /// transaction. - initial_views: HashMap, HashSet>>, } impl MultiViewDropWatcherContext @@ -164,15 +148,7 @@ where .iter() .all(|h| !self.stream_map.contains_key(h)) { - return self - .initial_views - .get(&tx_hash) - .map(|list| !list.contains(&block_hash)) - .unwrap_or(true) - .then(|| { - debug!("[{:?}] dropped_watcher: removing tx", tx_hash); - tx_hash - }) + return Some(tx_hash) } } else { debug!("[{:?}] dropped_watcher: removing (non-tracked) tx", tx_hash); @@ -201,7 +177,6 @@ where stream_map: StreamMap::new(), command_receiver, transaction_states: Default::default(), - initial_views: Default::default(), }; let stream_map = futures::stream::unfold(ctx, |mut ctx| async move { @@ -217,17 +192,13 @@ where Command::RemoveView(key) => { trace!(target: LOG_TARGET,"dropped_watcher: Command::RemoveView {key:?} views:{:?}",ctx.stream_map.keys().collect::>()); ctx.stream_map.remove(&key); - }, - Command::AddInitialViews(xts,block_hash) => { - log_xt_trace!(target: LOG_TARGET, xts.clone(), "[{:?}] dropped_watcher: xt initial view added {block_hash:?}"); - xts.into_iter().for_each(|xt| { - ctx.initial_views.entry(xt).or_default().insert(block_hash); + ctx.transaction_states.iter_mut().for_each(|(_,state)| { + state.remove(&key); }); }, Command::RemoveFinalizedTxs(xts) => { log_xt_trace!(target: LOG_TARGET, xts.clone(), "[{:?}] dropped_watcher: finalized xt removed"); xts.iter().for_each(|xt| { - ctx.initial_views.remove(xt); ctx.transaction_states.remove(xt); }); @@ -291,34 +262,13 @@ where }); } - /// Adds the initial view for the given transactions hashes. - /// - /// This message should be called when the external submission of a transaction occures. It - /// provides the list of initial views for given extrinsics hashes. - /// - /// The dropped notification is not sent if it comes from the initial views. It allows to keep - /// transaction in the mempool, even if all the views are full at the time of submitting - /// transaction to the pool. - pub fn add_initial_views( - &self, - xts: impl IntoIterator> + Clone, - block_hash: BlockHash, - ) { - let _ = self - .controller - .unbounded_send(Command::AddInitialViews(xts.into_iter().collect(), block_hash)) - .map_err(|e| { - trace!(target: LOG_TARGET, "dropped_watcher: add_initial_views_ send message failed: {e}"); - }); - } - - /// Removes all initial views for finalized transactions. + /// Removes status info for finalized transactions. pub fn remove_finalized_txs(&self, xts: impl IntoIterator> + Clone) { let _ = self .controller .unbounded_send(Command::RemoveFinalizedTxs(xts.into_iter().collect())) .map_err(|e| { - trace!(target: LOG_TARGET, "dropped_watcher: remove_initial_views send message failed: {e}"); + trace!(target: LOG_TARGET, "dropped_watcher: remove_finalized_txs send message failed: {e}"); }); } } @@ -471,63 +421,4 @@ mod dropped_watcher_tests { let handle = tokio::spawn(async move { output_stream.take(1).collect::>().await }); assert_eq!(handle.await.unwrap(), vec![tx_hash]); } - - #[tokio::test] - async fn test06() { - sp_tracing::try_init_simple(); - let (watcher, mut output_stream) = MultiViewDroppedWatcher::new(); - assert!(output_stream.next().now_or_never().is_none()); - - let block_hash0 = H256::repeat_byte(0x01); - let block_hash1 = H256::repeat_byte(0x02); - let tx_hash = H256::repeat_byte(0x0b); - - let view_stream0 = futures::stream::iter(vec![ - (tx_hash, TransactionStatus::Future), - (tx_hash, TransactionStatus::InBlock((block_hash1, 0))), - ]) - .boxed(); - watcher.add_view(block_hash0, view_stream0); - assert!(output_stream.next().now_or_never().is_none()); - - let view_stream1 = futures::stream::iter(vec![ - (tx_hash, TransactionStatus::Ready), - (tx_hash, TransactionStatus::Dropped), - ]) - .boxed(); - - watcher.add_view(block_hash1, view_stream1); - watcher.add_initial_views(vec![tx_hash], block_hash1); - assert!(output_stream.next().now_or_never().is_none()); - } - - #[tokio::test] - async fn test07() { - sp_tracing::try_init_simple(); - let (watcher, mut output_stream) = MultiViewDroppedWatcher::new(); - assert!(output_stream.next().now_or_never().is_none()); - - let block_hash0 = H256::repeat_byte(0x01); - let block_hash1 = H256::repeat_byte(0x02); - let tx_hash = H256::repeat_byte(0x0b); - - let view_stream0 = futures::stream::iter(vec![ - (tx_hash, TransactionStatus::Future), - (tx_hash, TransactionStatus::InBlock((block_hash1, 0))), - ]) - .boxed(); - watcher.add_view(block_hash0, view_stream0); - watcher.add_initial_views(vec![tx_hash], block_hash0); - assert!(output_stream.next().now_or_never().is_none()); - - let view_stream1 = futures::stream::iter(vec![ - (tx_hash, TransactionStatus::Ready), - (tx_hash, TransactionStatus::Dropped), - ]) - .boxed(); - watcher.add_view(block_hash1, view_stream1); - - let handle = tokio::spawn(async move { output_stream.take(1).collect::>().await }); - assert_eq!(handle.await.unwrap(), vec![tx_hash]); - } } diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs index 7e72b44adf38..a342d35b2844 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs @@ -45,7 +45,6 @@ use futures::{ use parking_lot::Mutex; use prometheus_endpoint::Registry as PrometheusRegistry; use sc_transaction_pool_api::{ - error::{Error, IntoPoolError}, ChainEvent, ImportNotificationStream, MaintainedTransactionPool, PoolFuture, PoolStatus, TransactionFor, TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash, }; @@ -193,6 +192,7 @@ where listener.clone(), Default::default(), mempool_max_transactions_count, + ready_limits.total_bytes + future_limits.total_bytes, )); let (dropped_stream_controller, dropped_stream) = @@ -283,6 +283,7 @@ where listener.clone(), metrics.clone(), TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER * (options.ready.count + options.future.count), + options.ready.total_bytes + options.future.total_bytes, )); let (dropped_stream_controller, dropped_stream) = @@ -599,48 +600,36 @@ where log::debug!(target: LOG_TARGET, "fatp::submit_at count:{} views:{}", xts.len(), self.active_views_count()); log_xt_trace!(target: LOG_TARGET, xts.iter().map(|xt| self.tx_hash(xt)), "[{:?}] fatp::submit_at"); let xts = xts.into_iter().map(Arc::from).collect::>(); - let mempool_result = self.mempool.extend_unwatched(source, &xts); + let mempool_results = self.mempool.extend_unwatched(source, &xts); if view_store.is_empty() { - return future::ready(Ok(mempool_result)).boxed() + return future::ready(Ok(mempool_results)).boxed() } - let (hashes, to_be_submitted): (Vec>, Vec>) = - mempool_result - .iter() - .zip(xts) - .filter_map(|(result, xt)| result.as_ref().ok().map(|xt_hash| (xt_hash, xt))) - .unzip(); + let to_be_submitted = mempool_results + .iter() + .zip(xts) + .filter_map(|(result, xt)| result.as_ref().ok().map(|_| xt)) + .collect::>(); self.metrics .report(|metrics| metrics.submitted_transactions.inc_by(to_be_submitted.len() as _)); let mempool = self.mempool.clone(); async move { - let results_map = view_store.submit(source, to_be_submitted.into_iter(), hashes).await; + let results_map = view_store.submit(source, to_be_submitted.into_iter()).await; let mut submission_results = reduce_multiview_result(results_map).into_iter(); - Ok(mempool_result + Ok(mempool_results .into_iter() .map(|result| { result.and_then(|xt_hash| { - let result = submission_results + submission_results .next() - .expect("The number of Ok results in mempool is exactly the same as the size of to-views-submission result. qed."); - result.or_else(|error| { - let error = error.into_pool_error(); - match error { - Ok( - // The transaction is still in mempool it may get included into the view for the next block. - Error::ImmediatelyDropped - ) => Ok(xt_hash), - Ok(e) => { - mempool.remove(xt_hash); - Err(e.into()) - }, - Err(e) => Err(e), - } - }) + .expect("The number of Ok results in mempool is exactly the same as the size of to-views-submission result. qed.") + .inspect_err(|_| + mempool.remove(xt_hash) + ) }) }) .collect::>()) @@ -692,26 +681,10 @@ where let view_store = self.view_store.clone(); let mempool = self.mempool.clone(); async move { - let result = view_store.submit_and_watch(at, source, xt).await; - let result = result.or_else(|(e, maybe_watcher)| { - let error = e.into_pool_error(); - match (error, maybe_watcher) { - ( - Ok( - // The transaction is still in mempool it may get included into the - // view for the next block. - Error::ImmediatelyDropped, - ), - Some(watcher), - ) => Ok(watcher), - (Ok(e), _) => { - mempool.remove(xt_hash); - Err(e.into()) - }, - (Err(e), _) => Err(e), - } - }); - result + view_store + .submit_and_watch(at, source, xt) + .await + .inspect_err(|_| mempool.remove(xt_hash)) } .boxed() } @@ -1056,7 +1029,7 @@ where future::join_all(results).await } - /// Updates the given view with the transaction from the internal mempol. + /// Updates the given view with the transactions from the internal mempol. /// /// All transactions from the mempool (excluding those which are either already imported or /// already included in blocks since recently finalized block) are submitted to the @@ -1139,12 +1112,9 @@ where // out the invalid event, and remove transaction. if self.view_store.is_empty() { for result in watched_results { - match result { - Err(tx_hash) => { - self.view_store.listener.invalidate_transactions(&[tx_hash]); - self.mempool.remove(tx_hash); - }, - Ok(_) => {}, + if let Err(tx_hash) = result { + self.view_store.listener.invalidate_transactions(&[tx_hash]); + self.mempool.remove(tx_hash); } } } diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs index 989c7e8ef356..86c07008c3f3 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs @@ -30,12 +30,11 @@ use super::{metrics::MetricsLink as PrometheusMetrics, multi_view_listener::Mult use crate::{ common::log_xt::log_xt_trace, graph, - graph::{ExtrinsicFor, ExtrinsicHash}, + graph::{tracked_map::Size, ExtrinsicFor, ExtrinsicHash}, LOG_TARGET, }; use futures::FutureExt; use itertools::Itertools; -use parking_lot::RwLock; use sc_transaction_pool_api::TransactionSource; use sp_blockchain::HashAndNumber; use sp_runtime::{ @@ -43,7 +42,7 @@ use sp_runtime::{ transaction_validity::{InvalidTransaction, TransactionValidityError}, }; use std::{ - collections::{hash_map::Entry, HashMap}, + collections::HashMap, sync::{atomic, atomic::AtomicU64, Arc}, time::Instant, }; @@ -72,6 +71,8 @@ where watched: bool, /// Extrinsic actual body. tx: ExtrinsicFor, + /// Size of the extrinsics actual body. + bytes: usize, /// Transaction source. source: TransactionSource, /// When the transaction was revalidated, used to periodically revalidate the mem pool buffer. @@ -99,13 +100,13 @@ where } /// Creates a new instance of wrapper for unwatched transaction. - fn new_unwatched(source: TransactionSource, tx: ExtrinsicFor) -> Self { - Self { watched: false, tx, source, validated_at: AtomicU64::new(0) } + fn new_unwatched(source: TransactionSource, tx: ExtrinsicFor, bytes: usize) -> Self { + Self { watched: false, tx, source, validated_at: AtomicU64::new(0), bytes } } /// Creates a new instance of wrapper for watched transaction. - fn new_watched(source: TransactionSource, tx: ExtrinsicFor) -> Self { - Self { watched: true, tx, source, validated_at: AtomicU64::new(0) } + fn new_watched(source: TransactionSource, tx: ExtrinsicFor, bytes: usize) -> Self { + Self { watched: true, tx, source, validated_at: AtomicU64::new(0), bytes } } /// Provides a clone of actual transaction body. @@ -121,10 +122,18 @@ where } } +impl Size for Arc> +where + Block: BlockT, + ChainApi: graph::ChainApi + 'static, +{ + fn size(&self) -> usize { + self.bytes + } +} + type InternalTxMemPoolMap = - HashMap, Arc>>; -type InternalTxMemPoolMapEntry<'a, ChainApi, Block> = - Entry<'a, ExtrinsicHash, Arc>>; + graph::tracked_map::TrackedMap, Arc>>; /// An intermediary transactions buffer. /// @@ -153,13 +162,16 @@ where /// /// The key is the hash of the transaction, and the value is a wrapper /// structure, which contains the mempool specific details of the transaction. - transactions: RwLock>, + transactions: InternalTxMemPoolMap, /// Prometheus's metrics endpoint. metrics: PrometheusMetrics, /// Indicates the maximum number of transactions that can be maintained in the memory pool. max_transactions_count: usize, + + /// Maximal size of encodings of all transactions in the memory pool. + max_transactions_total_bytes: usize, } impl TxMemPool @@ -175,19 +187,32 @@ where listener: Arc>, metrics: PrometheusMetrics, max_transactions_count: usize, + max_transactions_total_bytes: usize, ) -> Self { - Self { api, listener, transactions: Default::default(), metrics, max_transactions_count } + Self { + api, + listener, + transactions: Default::default(), + metrics, + max_transactions_count, + max_transactions_total_bytes, + } } /// Creates a new `TxMemPool` instance for testing purposes. #[allow(dead_code)] - fn new_test(api: Arc, max_transactions_count: usize) -> Self { + fn new_test( + api: Arc, + max_transactions_count: usize, + max_transactions_total_bytes: usize, + ) -> Self { Self { api, listener: Arc::from(MultiViewListener::new()), transactions: Default::default(), metrics: Default::default(), max_transactions_count, + max_transactions_total_bytes, } } @@ -200,28 +225,42 @@ where } /// Returns a tuple with the count of unwatched and watched transactions in the memory pool. - pub(super) fn unwatched_and_watched_count(&self) -> (usize, usize) { + pub fn unwatched_and_watched_count(&self) -> (usize, usize) { let transactions = self.transactions.read(); let watched_count = transactions.values().filter(|t| t.is_watched()).count(); (transactions.len() - watched_count, watched_count) } + /// Returns the number of bytes used by all extrinsics in the the pool. + #[cfg(test)] + pub fn bytes(&self) -> usize { + return self.transactions.bytes() + } + + /// Returns true if provided values would exceed defined limits. + fn is_limit_exceeded(&self, length: usize, current_total_bytes: usize) -> bool { + length > self.max_transactions_count || + current_total_bytes > self.max_transactions_total_bytes + } + /// Attempts to insert a transaction into the memory pool, ensuring it does not /// exceed the maximum allowed transaction count. fn try_insert( &self, - current_len: usize, - entry: InternalTxMemPoolMapEntry<'_, ChainApi, Block>, hash: ExtrinsicHash, tx: TxInMemPool, ) -> Result, ChainApi::Error> { - //todo: obey size limits [#5476] - let result = match (current_len < self.max_transactions_count, entry) { - (true, Entry::Vacant(v)) => { - v.insert(Arc::from(tx)); + let bytes = self.transactions.bytes(); + let mut transactions = self.transactions.write(); + let result = match ( + !self.is_limit_exceeded(transactions.len() + 1, bytes + tx.bytes), + transactions.contains_key(&hash), + ) { + (true, false) => { + transactions.insert(hash, Arc::from(tx)); Ok(hash) }, - (_, Entry::Occupied(_)) => + (_, true) => Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(hash)).into()), (false, _) => Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped.into()), }; @@ -239,17 +278,11 @@ where source: TransactionSource, xts: &[ExtrinsicFor], ) -> Vec, ChainApi::Error>> { - let mut transactions = self.transactions.write(); let result = xts .iter() .map(|xt| { - let hash = self.api.hash_and_length(&xt).0; - self.try_insert( - transactions.len(), - transactions.entry(hash), - hash, - TxInMemPool::new_unwatched(source, xt.clone()), - ) + let (hash, length) = self.api.hash_and_length(&xt); + self.try_insert(hash, TxInMemPool::new_unwatched(source, xt.clone(), length)) }) .collect::>(); result @@ -262,14 +295,8 @@ where source: TransactionSource, xt: ExtrinsicFor, ) -> Result, ChainApi::Error> { - let mut transactions = self.transactions.write(); - let hash = self.api.hash_and_length(&xt).0; - self.try_insert( - transactions.len(), - transactions.entry(hash), - hash, - TxInMemPool::new_watched(source, xt.clone()), - ) + let (hash, length) = self.api.hash_and_length(&xt); + self.try_insert(hash, TxInMemPool::new_watched(source, xt.clone(), length)) } /// Removes transactions from the memory pool which are specified by the given list of hashes @@ -324,12 +351,11 @@ where let start = Instant::now(); let (count, input) = { - let transactions = self.transactions.read(); + let transactions = self.transactions.clone_map(); ( transactions.len(), transactions - .clone() .into_iter() .filter(|xt| { let finalized_block_number = finalized_block.number.into().as_u64(); @@ -417,8 +443,8 @@ where #[cfg(test)] mod tx_mem_pool_tests { use super::*; - use crate::common::tests::TestApi; - use substrate_test_runtime::{AccountId, Extrinsic, Transfer, H256}; + use crate::{common::tests::TestApi, graph::ChainApi}; + use substrate_test_runtime::{AccountId, Extrinsic, ExtrinsicBuilder, Transfer, H256}; use substrate_test_runtime_client::AccountKeyring::*; fn uxt(nonce: u64) -> Extrinsic { crate::common::tests::uxt(Transfer { @@ -433,7 +459,7 @@ mod tx_mem_pool_tests { fn extend_unwatched_obeys_limit() { let max = 10; let api = Arc::from(TestApi::default()); - let mempool = TxMemPool::new_test(api, max); + let mempool = TxMemPool::new_test(api, max, usize::MAX); let xts = (0..max + 1).map(|x| Arc::from(uxt(x as _))).collect::>(); @@ -450,7 +476,7 @@ mod tx_mem_pool_tests { sp_tracing::try_init_simple(); let max = 10; let api = Arc::from(TestApi::default()); - let mempool = TxMemPool::new_test(api, max); + let mempool = TxMemPool::new_test(api, max, usize::MAX); let mut xts = (0..max - 1).map(|x| Arc::from(uxt(x as _))).collect::>(); xts.push(xts.iter().last().unwrap().clone()); @@ -467,7 +493,7 @@ mod tx_mem_pool_tests { fn push_obeys_limit() { let max = 10; let api = Arc::from(TestApi::default()); - let mempool = TxMemPool::new_test(api, max); + let mempool = TxMemPool::new_test(api, max, usize::MAX); let xts = (0..max).map(|x| Arc::from(uxt(x as _))).collect::>(); @@ -492,7 +518,7 @@ mod tx_mem_pool_tests { fn push_detects_already_imported() { let max = 10; let api = Arc::from(TestApi::default()); - let mempool = TxMemPool::new_test(api, 2 * max); + let mempool = TxMemPool::new_test(api, 2 * max, usize::MAX); let xts = (0..max).map(|x| Arc::from(uxt(x as _))).collect::>(); let xt0 = xts.iter().last().unwrap().clone(); @@ -517,7 +543,7 @@ mod tx_mem_pool_tests { fn count_works() { let max = 100; let api = Arc::from(TestApi::default()); - let mempool = TxMemPool::new_test(api, max); + let mempool = TxMemPool::new_test(api, max, usize::MAX); let xts0 = (0..10).map(|x| Arc::from(uxt(x as _))).collect::>(); @@ -532,4 +558,39 @@ mod tx_mem_pool_tests { assert!(results.iter().all(Result::is_ok)); assert_eq!(mempool.unwatched_and_watched_count(), (10, 5)); } + + fn large_uxt(x: usize) -> Extrinsic { + ExtrinsicBuilder::new_include_data(vec![x as u8; 1024]).build() + } + + #[test] + fn push_obeys_size_limit() { + sp_tracing::try_init_simple(); + let max = 10; + let api = Arc::from(TestApi::default()); + //size of large extrinsic is: 1129 + let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * 1129); + + let xts = (0..max).map(|x| Arc::from(large_uxt(x))).collect::>(); + + let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1); + + let results = mempool.extend_unwatched(TransactionSource::External, &xts); + assert!(results.iter().all(Result::is_ok)); + assert_eq!(mempool.bytes(), total_xts_bytes); + + let xt = Arc::from(large_uxt(98)); + let result = mempool.push_watched(TransactionSource::External, xt); + assert!(matches!( + result.unwrap_err(), + sc_transaction_pool_api::error::Error::ImmediatelyDropped + )); + + let xt = Arc::from(large_uxt(99)); + let mut result = mempool.extend_unwatched(TransactionSource::External, &[xt]); + assert!(matches!( + result.pop().unwrap().unwrap_err(), + sc_transaction_pool_api::error::Error::ImmediatelyDropped + )); + } } diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs index 413fca223242..f23dcedd5bfd 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs @@ -91,7 +91,6 @@ where &self, source: TransactionSource, xts: impl IntoIterator> + Clone, - xts_hashes: impl IntoIterator> + Clone, ) -> HashMap, ChainApi::Error>>> { let submit_futures = { let active_views = self.active_views.read(); @@ -100,9 +99,7 @@ where .map(|(_, view)| { let view = view.clone(); let xts = xts.clone(); - self.dropped_stream_controller - .add_initial_views(xts_hashes.clone(), view.at.hash); - async move { (view.at.hash, view.submit_many(source, xts.clone()).await) } + async move { (view.at.hash, view.submit_many(source, xts).await) } }) .collect::>() }; @@ -127,11 +124,7 @@ where let result = active_views .iter() - .map(|view| { - self.dropped_stream_controller - .add_initial_views(std::iter::once(tx_hash), view.at.hash); - view.submit_local(xt.clone()) - }) + .map(|view| view.submit_local(xt.clone())) .find_or_first(Result::is_ok); if let Some(Err(err)) = result { @@ -154,10 +147,10 @@ where _at: Block::Hash, source: TransactionSource, xt: ExtrinsicFor, - ) -> Result, (ChainApi::Error, Option>)> { + ) -> Result, ChainApi::Error> { let tx_hash = self.api.hash_and_length(&xt).0; let Some(external_watcher) = self.listener.create_external_watcher_for_tx(tx_hash) else { - return Err((PoolError::AlreadyImported(Box::new(tx_hash)).into(), None)) + return Err(PoolError::AlreadyImported(Box::new(tx_hash)).into()) }; let submit_and_watch_futures = { let active_views = self.active_views.read(); @@ -166,8 +159,6 @@ where .map(|(_, view)| { let view = view.clone(); let xt = xt.clone(); - self.dropped_stream_controller - .add_initial_views(std::iter::once(tx_hash), view.at.hash); async move { match view.submit_and_watch(source, xt).await { Ok(watcher) => { @@ -191,7 +182,7 @@ where if let Some(Err(err)) = maybe_error { log::trace!(target: LOG_TARGET, "[{:?}] submit_and_watch: err: {}", tx_hash, err); - return Err((err, Some(external_watcher))); + return Err(err); }; Ok(external_watcher) diff --git a/substrate/client/transaction-pool/src/graph/mod.rs b/substrate/client/transaction-pool/src/graph/mod.rs index c1225d7356d9..d93898b1b22a 100644 --- a/substrate/client/transaction-pool/src/graph/mod.rs +++ b/substrate/client/transaction-pool/src/graph/mod.rs @@ -31,7 +31,7 @@ mod listener; mod pool; mod ready; mod rotator; -mod tracked_map; +pub(crate) mod tracked_map; mod validated_pool; pub mod base_pool; diff --git a/substrate/client/transaction-pool/src/graph/tracked_map.rs b/substrate/client/transaction-pool/src/graph/tracked_map.rs index 9e92dffc9f96..6c3bbbf34b55 100644 --- a/substrate/client/transaction-pool/src/graph/tracked_map.rs +++ b/substrate/client/transaction-pool/src/graph/tracked_map.rs @@ -18,7 +18,7 @@ use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use std::{ - collections::HashMap, + collections::{hash_map::Iter, HashMap}, sync::{ atomic::{AtomicIsize, Ordering as AtomicOrdering}, Arc, @@ -101,20 +101,30 @@ impl<'a, K, V> TrackedMapReadAccess<'a, K, V> where K: Eq + std::hash::Hash, { - /// Returns true if map contains key. + /// Returns true if the map contains given key. pub fn contains_key(&self, key: &K) -> bool { self.inner_guard.contains_key(key) } - /// Returns reference to the contained value by key, if exists. + /// Returns the reference to the contained value by key, if exists. pub fn get(&self, key: &K) -> Option<&V> { self.inner_guard.get(key) } - /// Returns iterator over all values. + /// Returns an iterator over all values. pub fn values(&self) -> std::collections::hash_map::Values { self.inner_guard.values() } + + /// Returns the number of elements in the map. + pub fn len(&self) -> usize { + self.inner_guard.len() + } + + /// Returns an iterator over all key-value pairs. + pub fn iter(&self) -> Iter<'_, K, V> { + self.inner_guard.iter() + } } pub struct TrackedMapWriteAccess<'a, K, V> { @@ -149,10 +159,20 @@ where val } + /// Returns `true` if the inner map contains a value for the specified key. + pub fn contains_key(&self, key: &K) -> bool { + self.inner_guard.contains_key(key) + } + /// Returns mutable reference to the contained value by key, if exists. pub fn get_mut(&mut self, key: &K) -> Option<&mut V> { self.inner_guard.get_mut(key) } + + /// Returns the number of elements in the map. + pub fn len(&mut self) -> usize { + self.inner_guard.len() + } } #[cfg(test)] diff --git a/substrate/client/transaction-pool/tests/fatp_common/mod.rs b/substrate/client/transaction-pool/tests/fatp_common/mod.rs index 63af729b8b73..15f2b7f79c14 100644 --- a/substrate/client/transaction-pool/tests/fatp_common/mod.rs +++ b/substrate/client/transaction-pool/tests/fatp_common/mod.rs @@ -186,9 +186,9 @@ macro_rules! assert_pool_status { #[macro_export] macro_rules! assert_ready_iterator { - ($hash:expr, $pool:expr, [$( $xt:expr ),+]) => {{ + ($hash:expr, $pool:expr, [$( $xt:expr ),*]) => {{ let ready_iterator = $pool.ready_at($hash).now_or_never().unwrap(); - let expected = vec![ $($pool.api().hash_and_length(&$xt).0),+]; + let expected = vec![ $($pool.api().hash_and_length(&$xt).0),*]; let output: Vec<_> = ready_iterator.collect(); log::debug!(target:LOG_TARGET, "expected: {:#?}", expected); log::debug!(target:LOG_TARGET, "output: {:#?}", output); diff --git a/substrate/client/transaction-pool/tests/fatp_limits.rs b/substrate/client/transaction-pool/tests/fatp_limits.rs index 6fd5f93ed070..03792fd89dfa 100644 --- a/substrate/client/transaction-pool/tests/fatp_limits.rs +++ b/substrate/client/transaction-pool/tests/fatp_limits.rs @@ -19,6 +19,7 @@ //! Tests of limits for fork-aware transaction pool. pub mod fatp_common; + use fatp_common::{ finalized_block_event, invalid_hash, new_best_block_event, TestPoolBuilder, LOG_TARGET, SOURCE, }; @@ -27,6 +28,7 @@ use sc_transaction_pool::ChainApi; use sc_transaction_pool_api::{ error::Error as TxPoolError, MaintainedTransactionPool, TransactionPool, TransactionStatus, }; +use std::thread::sleep; use substrate_test_runtime_client::AccountKeyring::*; use substrate_test_runtime_transaction_pool::uxt; @@ -92,25 +94,103 @@ fn fatp_limits_ready_count_works() { //charlie was not included into view: assert_pool_status!(header01.hash(), &pool, 2, 0); assert_ready_iterator!(header01.hash(), pool, [xt1, xt2]); + //todo: can we do better? We don't have API to check if event was processed internally. + let mut counter = 0; + while pool.mempool_len().0 == 3 { + sleep(std::time::Duration::from_millis(1)); + counter = counter + 1; + if counter > 20 { + assert!(false, "timeout"); + } + } + assert_eq!(pool.mempool_len().0, 2); //branch with alice transactions: let header02b = api.push_block(2, vec![xt1.clone(), xt2.clone()], true); let event = new_best_block_event(&pool, Some(header01.hash()), header02b.hash()); block_on(pool.maintain(event)); - assert_eq!(pool.mempool_len().0, 3); - //charlie was resubmitted from mmepool into the view: - assert_pool_status!(header02b.hash(), &pool, 1, 0); - assert_ready_iterator!(header02b.hash(), pool, [xt0]); + assert_eq!(pool.mempool_len().0, 2); + assert_pool_status!(header02b.hash(), &pool, 0, 0); + assert_ready_iterator!(header02b.hash(), pool, []); //branch with alice/charlie transactions shall also work: let header02a = api.push_block(2, vec![xt0.clone(), xt1.clone()], true); + api.set_nonce(header02a.hash(), Alice.into(), 201); let event = new_best_block_event(&pool, Some(header02b.hash()), header02a.hash()); block_on(pool.maintain(event)); - assert_eq!(pool.mempool_len().0, 3); - assert_pool_status!(header02a.hash(), &pool, 1, 0); + assert_eq!(pool.mempool_len().0, 2); + // assert_pool_status!(header02a.hash(), &pool, 1, 0); assert_ready_iterator!(header02a.hash(), pool, [xt2]); } +#[test] +fn fatp_limits_ready_count_works_for_submit_at() { + sp_tracing::try_init_simple(); + + let builder = TestPoolBuilder::new(); + let (pool, api, _) = builder.with_mempool_count_limit(3).with_ready_count(2).build(); + api.set_nonce(api.genesis_hash(), Bob.into(), 200); + api.set_nonce(api.genesis_hash(), Charlie.into(), 500); + + let header01 = api.push_block(1, vec![], true); + + let event = new_best_block_event(&pool, None, header01.hash()); + block_on(pool.maintain(event)); + + let xt0 = uxt(Charlie, 500); + let xt1 = uxt(Alice, 200); + let xt2 = uxt(Alice, 201); + + let results = block_on(pool.submit_at( + header01.hash(), + SOURCE, + vec![xt0.clone(), xt1.clone(), xt2.clone()], + )) + .unwrap(); + + assert!(matches!(results[0].as_ref().unwrap_err().0, TxPoolError::ImmediatelyDropped)); + assert!(results[1].as_ref().is_ok()); + assert!(results[2].as_ref().is_ok()); + assert_eq!(pool.mempool_len().0, 2); + //charlie was not included into view: + assert_pool_status!(header01.hash(), &pool, 2, 0); + assert_ready_iterator!(header01.hash(), pool, [xt1, xt2]); +} + +#[test] +fn fatp_limits_ready_count_works_for_submit_and_watch() { + sp_tracing::try_init_simple(); + + let builder = TestPoolBuilder::new(); + let (pool, api, _) = builder.with_mempool_count_limit(3).with_ready_count(2).build(); + api.set_nonce(api.genesis_hash(), Bob.into(), 300); + api.set_nonce(api.genesis_hash(), Charlie.into(), 500); + + let header01 = api.push_block(1, vec![], true); + + let event = new_best_block_event(&pool, None, header01.hash()); + block_on(pool.maintain(event)); + + let xt0 = uxt(Charlie, 500); + let xt1 = uxt(Alice, 200); + let xt2 = uxt(Bob, 300); + api.set_priority(&xt0, 2); + api.set_priority(&xt1, 2); + api.set_priority(&xt2, 1); + + let result0 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())); + let result1 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())); + let result2 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).map(|_| ()); + + assert!(matches!(result2.unwrap_err().0, TxPoolError::ImmediatelyDropped)); + assert!(result0.is_ok()); + assert!(result1.is_ok()); + assert_eq!(pool.mempool_len().1, 2); + //charlie was not included into view: + assert_pool_status!(header01.hash(), &pool, 2, 0); + assert_ready_iterator!(header01.hash(), pool, [xt0, xt1]); +} + #[test] fn fatp_limits_future_count_works() { sp_tracing::try_init_simple(); @@ -131,29 +211,33 @@ fn fatp_limits_future_count_works() { let xt2 = uxt(Alice, 201); let xt3 = uxt(Alice, 202); - let submissions = vec![ - pool.submit_one(header01.hash(), SOURCE, xt1.clone()), - pool.submit_one(header01.hash(), SOURCE, xt2.clone()), - pool.submit_one(header01.hash(), SOURCE, xt3.clone()), - ]; + block_on(pool.submit_one(header01.hash(), SOURCE, xt1.clone())).unwrap(); + block_on(pool.submit_one(header01.hash(), SOURCE, xt2.clone())).unwrap(); + block_on(pool.submit_one(header01.hash(), SOURCE, xt3.clone())).unwrap(); - let results = block_on(futures::future::join_all(submissions)); - assert!(results.iter().all(Result::is_ok)); //charlie was not included into view due to limits: assert_pool_status!(header01.hash(), &pool, 0, 2); + //todo: can we do better? We don't have API to check if event was processed internally. + let mut counter = 0; + while pool.mempool_len().0 != 2 { + sleep(std::time::Duration::from_millis(1)); + counter = counter + 1; + if counter > 20 { + assert!(false, "timeout"); + } + } let header02 = api.push_block(2, vec![xt0], true); api.set_nonce(header02.hash(), Alice.into(), 201); //redundant let event = new_best_block_event(&pool, Some(header01.hash()), header02.hash()); block_on(pool.maintain(event)); - //charlie was resubmitted from mmepool into the view: - assert_pool_status!(header02.hash(), &pool, 2, 1); - assert_eq!(pool.mempool_len().0, 3); + assert_pool_status!(header02.hash(), &pool, 2, 0); + assert_eq!(pool.mempool_len().0, 2); } #[test] -fn fatp_limits_watcher_mempool_prevents_dropping() { +fn fatp_limits_watcher_mempool_doesnt_prevent_dropping() { sp_tracing::try_init_simple(); let builder = TestPoolBuilder::new(); @@ -169,23 +253,15 @@ fn fatp_limits_watcher_mempool_prevents_dropping() { let xt1 = uxt(Bob, 300); let xt2 = uxt(Alice, 200); - let submissions = vec![ - pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone()), - pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone()), - pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone()), - ]; - let mut submissions = block_on(futures::future::join_all(submissions)); - let xt2_watcher = submissions.remove(2).unwrap(); - let xt1_watcher = submissions.remove(1).unwrap(); - let xt0_watcher = submissions.remove(0).unwrap(); + let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap(); + let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap(); + let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap(); assert_pool_status!(header01.hash(), &pool, 2, 0); - let xt0_status = futures::executor::block_on_stream(xt0_watcher).take(1).collect::>(); - + let xt0_status = futures::executor::block_on_stream(xt0_watcher).take(2).collect::>(); log::debug!("xt0_status: {:#?}", xt0_status); - - assert_eq!(xt0_status, vec![TransactionStatus::Ready]); + assert_eq!(xt0_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped]); let xt1_status = futures::executor::block_on_stream(xt1_watcher).take(1).collect::>(); assert_eq!(xt1_status, vec![TransactionStatus::Ready]); @@ -214,28 +290,23 @@ fn fatp_limits_watcher_non_intial_view_drops_transaction() { let xt1 = uxt(Charlie, 400); let xt2 = uxt(Bob, 300); - let submissions = vec![ - pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone()), - pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone()), - pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone()), - ]; - let mut submissions = block_on(futures::future::join_all(submissions)); - let xt2_watcher = submissions.remove(2).unwrap(); - let xt1_watcher = submissions.remove(1).unwrap(); - let xt0_watcher = submissions.remove(0).unwrap(); + let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap(); + let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap(); + let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap(); + + // make sure tx0 is actually dropped before checking iterator + let xt0_status = futures::executor::block_on_stream(xt0_watcher).take(2).collect::>(); + assert_eq!(xt0_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped]); assert_ready_iterator!(header01.hash(), pool, [xt1, xt2]); let header02 = api.push_block_with_parent(header01.hash(), vec![], true); block_on(pool.maintain(finalized_block_event(&pool, api.genesis_hash(), header02.hash()))); assert_pool_status!(header02.hash(), &pool, 2, 0); - assert_ready_iterator!(header02.hash(), pool, [xt2, xt0]); - - let xt0_status = futures::executor::block_on_stream(xt0_watcher).take(1).collect::>(); - assert_eq!(xt0_status, vec![TransactionStatus::Ready]); + assert_ready_iterator!(header02.hash(), pool, [xt1, xt2]); - let xt1_status = futures::executor::block_on_stream(xt1_watcher).take(2).collect::>(); - assert_eq!(xt1_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped]); + let xt1_status = futures::executor::block_on_stream(xt1_watcher).take(1).collect::>(); + assert_eq!(xt1_status, vec![TransactionStatus::Ready]); let xt2_status = futures::executor::block_on_stream(xt2_watcher).take(1).collect::>(); assert_eq!(xt2_status, vec![TransactionStatus::Ready]); @@ -259,32 +330,19 @@ fn fatp_limits_watcher_finalized_transaction_frees_ready_space() { let xt1 = uxt(Charlie, 400); let xt2 = uxt(Bob, 300); - let submissions = vec![ - pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone()), - pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone()), - pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone()), - ]; - let mut submissions = block_on(futures::future::join_all(submissions)); - let xt2_watcher = submissions.remove(2).unwrap(); - let xt1_watcher = submissions.remove(1).unwrap(); - let xt0_watcher = submissions.remove(0).unwrap(); + let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap(); + let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap(); + let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap(); assert_ready_iterator!(header01.hash(), pool, [xt1, xt2]); + let xt0_status = futures::executor::block_on_stream(xt0_watcher).take(2).collect::>(); + assert_eq!(xt0_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped]); + let header02 = api.push_block_with_parent(header01.hash(), vec![xt0.clone()], true); block_on(pool.maintain(finalized_block_event(&pool, api.genesis_hash(), header02.hash()))); assert_pool_status!(header02.hash(), &pool, 2, 0); assert_ready_iterator!(header02.hash(), pool, [xt1, xt2]); - let xt0_status = futures::executor::block_on_stream(xt0_watcher).take(3).collect::>(); - assert_eq!( - xt0_status, - vec![ - TransactionStatus::Ready, - TransactionStatus::InBlock((header02.hash(), 0)), - TransactionStatus::Finalized((header02.hash(), 0)) - ] - ); - let xt1_status = futures::executor::block_on_stream(xt1_watcher).take(1).collect::>(); assert_eq!(xt1_status, vec![TransactionStatus::Ready]); @@ -311,43 +369,275 @@ fn fatp_limits_watcher_view_can_drop_transcation() { let xt2 = uxt(Bob, 300); let xt3 = uxt(Alice, 200); - let submissions = vec![ - pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone()), - pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone()), - pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone()), - ]; - let mut submissions = block_on(futures::future::join_all(submissions)); - let xt2_watcher = submissions.remove(2).unwrap(); - let xt1_watcher = submissions.remove(1).unwrap(); - let xt0_watcher = submissions.remove(0).unwrap(); + let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap(); + let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap(); + let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap(); + + let xt0_status = futures::executor::block_on_stream(xt0_watcher).take(2).collect::>(); + assert_eq!(xt0_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped,]); assert_ready_iterator!(header01.hash(), pool, [xt1, xt2]); - let header02 = api.push_block_with_parent(header01.hash(), vec![xt0.clone()], true); + let header02 = api.push_block_with_parent(header01.hash(), vec![], true); block_on(pool.maintain(finalized_block_event(&pool, api.genesis_hash(), header02.hash()))); - let submission = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())); - let xt3_watcher = submission.unwrap(); + let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap(); + + let xt1_status = futures::executor::block_on_stream(xt1_watcher).take(2).collect::>(); + assert_eq!(xt1_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped]); assert_pool_status!(header02.hash(), pool, 2, 0); assert_ready_iterator!(header02.hash(), pool, [xt2, xt3]); - let xt0_status = futures::executor::block_on_stream(xt0_watcher).take(3).collect::>(); - assert_eq!( - xt0_status, - vec![ - TransactionStatus::Ready, - TransactionStatus::InBlock((header02.hash(), 0)), - TransactionStatus::Finalized((header02.hash(), 0)) - ] + let xt2_status = futures::executor::block_on_stream(xt2_watcher).take(1).collect::>(); + assert_eq!(xt2_status, vec![TransactionStatus::Ready]); + + let xt3_status = futures::executor::block_on_stream(xt3_watcher).take(1).collect::>(); + assert_eq!(xt3_status, vec![TransactionStatus::Ready]); +} + +#[test] +fn fatp_limits_watcher_empty_and_full_view_immediately_drops() { + sp_tracing::try_init_simple(); + + let builder = TestPoolBuilder::new(); + let (pool, api, _) = builder.with_mempool_count_limit(4).with_ready_count(2).build(); + api.set_nonce(api.genesis_hash(), Bob.into(), 300); + api.set_nonce(api.genesis_hash(), Charlie.into(), 400); + api.set_nonce(api.genesis_hash(), Dave.into(), 500); + api.set_nonce(api.genesis_hash(), Eve.into(), 600); + api.set_nonce(api.genesis_hash(), Ferdie.into(), 700); + + let header01 = api.push_block(1, vec![], true); + let event = new_best_block_event(&pool, None, header01.hash()); + block_on(pool.maintain(event)); + + let xt0 = uxt(Alice, 200); + let xt1 = uxt(Bob, 300); + let xt2 = uxt(Charlie, 400); + + let xt3 = uxt(Dave, 500); + let xt4 = uxt(Eve, 600); + let xt5 = uxt(Ferdie, 700); + + let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap(); + let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap(); + let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap(); + + let xt0_status = futures::executor::block_on_stream(xt0_watcher).take(2).collect::>(); + assert_eq!(xt0_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped]); + + assert_pool_status!(header01.hash(), &pool, 2, 0); + assert_eq!(pool.mempool_len().1, 2); + + let header02e = api.push_block_with_parent( + header01.hash(), + vec![xt0.clone(), xt1.clone(), xt2.clone()], + true, ); + api.set_nonce(header02e.hash(), Alice.into(), 201); + api.set_nonce(header02e.hash(), Bob.into(), 301); + api.set_nonce(header02e.hash(), Charlie.into(), 401); + block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02e.hash()))); + + assert_pool_status!(header02e.hash(), &pool, 0, 0); + + let header02f = api.push_block_with_parent(header01.hash(), vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02f.hash()))); + assert_pool_status!(header02f.hash(), &pool, 2, 0); + assert_ready_iterator!(header02f.hash(), pool, [xt1, xt2]); + + let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap(); + let xt4_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt4.clone())).unwrap(); + let result5 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt5.clone())).map(|_| ()); + + //xt5 hits internal mempool limit + assert!(matches!(result5.unwrap_err().0, TxPoolError::ImmediatelyDropped)); + + assert_pool_status!(header02e.hash(), &pool, 2, 0); + assert_ready_iterator!(header02e.hash(), pool, [xt3, xt4]); + assert_eq!(pool.mempool_len().1, 4); let xt1_status = futures::executor::block_on_stream(xt1_watcher).take(2).collect::>(); - assert_eq!(xt1_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped]); + assert_eq!( + xt1_status, + vec![TransactionStatus::Ready, TransactionStatus::InBlock((header02e.hash(), 1))] + ); - let xt2_status = futures::executor::block_on_stream(xt2_watcher).take(1).collect::>(); - assert_eq!(xt2_status, vec![TransactionStatus::Ready]); + let xt2_status = futures::executor::block_on_stream(xt2_watcher).take(2).collect::>(); + assert_eq!( + xt2_status, + vec![TransactionStatus::Ready, TransactionStatus::InBlock((header02e.hash(), 2))] + ); let xt3_status = futures::executor::block_on_stream(xt3_watcher).take(1).collect::>(); assert_eq!(xt3_status, vec![TransactionStatus::Ready]); + let xt4_status = futures::executor::block_on_stream(xt4_watcher).take(1).collect::>(); + assert_eq!(xt4_status, vec![TransactionStatus::Ready]); +} + +#[test] +fn fatp_limits_watcher_empty_and_full_view_drops_with_event() { + // it is almost copy of fatp_limits_watcher_empty_and_full_view_immediately_drops, but the + // mempool_count limit is set to 5 (vs 4). + sp_tracing::try_init_simple(); + + let builder = TestPoolBuilder::new(); + let (pool, api, _) = builder.with_mempool_count_limit(5).with_ready_count(2).build(); + api.set_nonce(api.genesis_hash(), Bob.into(), 300); + api.set_nonce(api.genesis_hash(), Charlie.into(), 400); + api.set_nonce(api.genesis_hash(), Dave.into(), 500); + api.set_nonce(api.genesis_hash(), Eve.into(), 600); + api.set_nonce(api.genesis_hash(), Ferdie.into(), 700); + + let header01 = api.push_block(1, vec![], true); + let event = new_best_block_event(&pool, None, header01.hash()); + block_on(pool.maintain(event)); + + let xt0 = uxt(Alice, 200); + let xt1 = uxt(Bob, 300); + let xt2 = uxt(Charlie, 400); + + let xt3 = uxt(Dave, 500); + let xt4 = uxt(Eve, 600); + let xt5 = uxt(Ferdie, 700); + + let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap(); + let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap(); + let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap(); + + let xt0_status = futures::executor::block_on_stream(xt0_watcher).take(2).collect::>(); + assert_eq!(xt0_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped]); + + assert_pool_status!(header01.hash(), &pool, 2, 0); + assert_eq!(pool.mempool_len().1, 2); + + let header02e = api.push_block_with_parent( + header01.hash(), + vec![xt0.clone(), xt1.clone(), xt2.clone()], + true, + ); + api.set_nonce(header02e.hash(), Alice.into(), 201); + api.set_nonce(header02e.hash(), Bob.into(), 301); + api.set_nonce(header02e.hash(), Charlie.into(), 401); + block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02e.hash()))); + + assert_pool_status!(header02e.hash(), &pool, 0, 0); + + let header02f = api.push_block_with_parent(header01.hash(), vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02f.hash()))); + assert_pool_status!(header02f.hash(), &pool, 2, 0); + assert_ready_iterator!(header02f.hash(), pool, [xt1, xt2]); + + let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap(); + let xt4_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt4.clone())).unwrap(); + let xt5_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt5.clone())).unwrap(); + + assert_pool_status!(header02e.hash(), &pool, 2, 0); + assert_ready_iterator!(header02e.hash(), pool, [xt4, xt5]); + + let xt3_status = futures::executor::block_on_stream(xt3_watcher).take(2).collect::>(); + assert_eq!(xt3_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped]); + + //xt5 got dropped + assert_eq!(pool.mempool_len().1, 4); + + let xt1_status = futures::executor::block_on_stream(xt1_watcher).take(2).collect::>(); + assert_eq!( + xt1_status, + vec![TransactionStatus::Ready, TransactionStatus::InBlock((header02e.hash(), 1))] + ); + + let xt2_status = futures::executor::block_on_stream(xt2_watcher).take(2).collect::>(); + assert_eq!( + xt2_status, + vec![TransactionStatus::Ready, TransactionStatus::InBlock((header02e.hash(), 2))] + ); + + let xt4_status = futures::executor::block_on_stream(xt4_watcher).take(1).collect::>(); + assert_eq!(xt4_status, vec![TransactionStatus::Ready]); + + let xt5_status = futures::executor::block_on_stream(xt5_watcher).take(1).collect::>(); + assert_eq!(xt5_status, vec![TransactionStatus::Ready]); +} + +fn large_uxt(x: usize) -> substrate_test_runtime::Extrinsic { + substrate_test_runtime::ExtrinsicBuilder::new_include_data(vec![x as u8; 1024]).build() +} + +#[test] +fn fatp_limits_ready_size_works() { + sp_tracing::try_init_simple(); + + let builder = TestPoolBuilder::new(); + let (pool, api, _) = builder.with_ready_bytes_size(3390).with_future_bytes_size(0).build(); + + let header01 = api.push_block(1, vec![], true); + let event = new_best_block_event(&pool, None, header01.hash()); + block_on(pool.maintain(event)); + + let xt0 = large_uxt(0); + let xt1 = large_uxt(1); + let xt2 = large_uxt(2); + + let submissions = vec![ + pool.submit_one(header01.hash(), SOURCE, xt0.clone()), + pool.submit_one(header01.hash(), SOURCE, xt1.clone()), + pool.submit_one(header01.hash(), SOURCE, xt2.clone()), + ]; + + let results = block_on(futures::future::join_all(submissions)); + assert!(results.iter().all(Result::is_ok)); + //charlie was not included into view: + assert_pool_status!(header01.hash(), &pool, 3, 0); + assert_ready_iterator!(header01.hash(), pool, [xt0, xt1, xt2]); + + let xt3 = large_uxt(3); + let result3 = block_on(pool.submit_one(header01.hash(), SOURCE, xt3.clone())); + assert!(matches!(result3.as_ref().unwrap_err().0, TxPoolError::ImmediatelyDropped)); +} + +#[test] +fn fatp_limits_future_size_works() { + sp_tracing::try_init_simple(); + const UXT_SIZE: usize = 137; + + let builder = TestPoolBuilder::new(); + let (pool, api, _) = builder + .with_ready_bytes_size(UXT_SIZE) + .with_future_bytes_size(3 * UXT_SIZE) + .build(); + api.set_nonce(api.genesis_hash(), Bob.into(), 200); + api.set_nonce(api.genesis_hash(), Charlie.into(), 500); + + let header01 = api.push_block(1, vec![], true); + + let event = new_best_block_event(&pool, None, header01.hash()); + block_on(pool.maintain(event)); + + let xt0 = uxt(Bob, 201); + let xt1 = uxt(Charlie, 501); + let xt2 = uxt(Alice, 201); + let xt3 = uxt(Alice, 202); + assert_eq!(api.hash_and_length(&xt0).1, UXT_SIZE); + assert_eq!(api.hash_and_length(&xt1).1, UXT_SIZE); + assert_eq!(api.hash_and_length(&xt2).1, UXT_SIZE); + assert_eq!(api.hash_and_length(&xt3).1, UXT_SIZE); + + let _ = block_on(pool.submit_one(header01.hash(), SOURCE, xt0.clone())).unwrap(); + let _ = block_on(pool.submit_one(header01.hash(), SOURCE, xt1.clone())).unwrap(); + let _ = block_on(pool.submit_one(header01.hash(), SOURCE, xt2.clone())).unwrap(); + let _ = block_on(pool.submit_one(header01.hash(), SOURCE, xt3.clone())).unwrap(); + + //todo: can we do better? We don't have API to check if event was processed internally. + let mut counter = 0; + while pool.mempool_len().0 == 4 { + sleep(std::time::Duration::from_millis(1)); + counter = counter + 1; + if counter > 20 { + assert!(false, "timeout"); + } + } + assert_pool_status!(header01.hash(), &pool, 0, 3); + assert_eq!(pool.mempool_len().0, 3); }