Skip to content

Commit

Permalink
fatxpool: size limits implemented (#6262)
Browse files Browse the repository at this point in the history
This PR adds size-limits to the fork-aware transaction pool.

**Review Notes**
- Existing
[`TrackedMap`](https://github.com/paritytech/polkadot-sdk/blob/58fd5ae4ce883f42c360e3ad4a5df7d2258b42fe/substrate/client/transaction-pool/src/graph/tracked_map.rs#L33-L41)
is used in internal mempool to track the size of extrinsics:

https://github.com/paritytech/polkadot-sdk/blob/58fd5ae4ce883f42c360e3ad4a5df7d2258b42fe/substrate/client/transaction-pool/src/graph/tracked_map.rs#L33-L41

- In this PR, I also removed the logic that kept transactions in the
`tx_mem_pool` if they were immediately dropped by the views. Initially,
I implemented this as an improvement: if there was available space in
the _mempool_ and all views dropped the transaction upon submission, the
transaction would still be retained in the _mempool_.

However, upon further consideration, I decided to remove this
functionality to reduce unnecessary complexity. Now, when all views drop
a transaction during submission, it is automatically rejected, with the
`submit/submit_and_watch` call returning `ImmediatelyDropped`.


Closes: #5476

---------

Co-authored-by: Sebastian Kunert <[email protected]>
Co-authored-by: Bastian Köcher <[email protected]>
  • Loading branch information
3 people authored Nov 11, 2024
1 parent b601d57 commit ace62f1
Show file tree
Hide file tree
Showing 9 changed files with 556 additions and 323 deletions.
10 changes: 10 additions & 0 deletions prdoc/pr_6262.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
title: "Size limits implemented for fork aware transaction pool"

doc:
- audience: Node Dev
description: |
Size limits are now obeyed in fork aware transaction pool

crates:
- name: sc-transaction-pool
bump: minor
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,7 @@ where
AddView(BlockHash<C>, ViewStream<C>),
/// Removes an existing view's stream associated with a specific block hash.
RemoveView(BlockHash<C>),
/// Adds initial views for given extrinsics hashes.
///
/// This message should be sent when the external submission of a transaction occures. It
/// provides the list of initial views for given extrinsics hashes.
/// The dropped notification is not sent if it comes from the initial views. It allows to keep
/// transaction in the mempool, even if all the views are full at the time of submitting
/// transaction to the pool.
AddInitialViews(Vec<ExtrinsicHash<C>>, BlockHash<C>),
/// Removes all initial views for given extrinsic hashes.
/// Removes internal states for given extrinsic hashes.
///
/// Intended to ba called on finalization.
RemoveFinalizedTxs(Vec<ExtrinsicHash<C>>),
Expand All @@ -90,7 +82,6 @@ where
match self {
Command::AddView(..) => write!(f, "AddView"),
Command::RemoveView(..) => write!(f, "RemoveView"),
Command::AddInitialViews(..) => write!(f, "AddInitialViews"),
Command::RemoveFinalizedTxs(..) => write!(f, "RemoveFinalizedTxs"),
}
}
Expand Down Expand Up @@ -118,13 +109,6 @@ where
///
/// Once transaction is dropped, dropping view is removed from the set.
transaction_states: HashMap<ExtrinsicHash<C>, HashSet<BlockHash<C>>>,

/// The list of initial view for every extrinsic.
///
/// Dropped notifications from initial views will be silenced. This allows to accept the
/// transaction into the mempool, even if all the views are full at the time of submitting new
/// transaction.
initial_views: HashMap<ExtrinsicHash<C>, HashSet<BlockHash<C>>>,
}

impl<C> MultiViewDropWatcherContext<C>
Expand Down Expand Up @@ -164,15 +148,7 @@ where
.iter()
.all(|h| !self.stream_map.contains_key(h))
{
return self
.initial_views
.get(&tx_hash)
.map(|list| !list.contains(&block_hash))
.unwrap_or(true)
.then(|| {
debug!("[{:?}] dropped_watcher: removing tx", tx_hash);
tx_hash
})
return Some(tx_hash)
}
} else {
debug!("[{:?}] dropped_watcher: removing (non-tracked) tx", tx_hash);
Expand Down Expand Up @@ -201,7 +177,6 @@ where
stream_map: StreamMap::new(),
command_receiver,
transaction_states: Default::default(),
initial_views: Default::default(),
};

let stream_map = futures::stream::unfold(ctx, |mut ctx| async move {
Expand All @@ -217,17 +192,13 @@ where
Command::RemoveView(key) => {
trace!(target: LOG_TARGET,"dropped_watcher: Command::RemoveView {key:?} views:{:?}",ctx.stream_map.keys().collect::<Vec<_>>());
ctx.stream_map.remove(&key);
},
Command::AddInitialViews(xts,block_hash) => {
log_xt_trace!(target: LOG_TARGET, xts.clone(), "[{:?}] dropped_watcher: xt initial view added {block_hash:?}");
xts.into_iter().for_each(|xt| {
ctx.initial_views.entry(xt).or_default().insert(block_hash);
ctx.transaction_states.iter_mut().for_each(|(_,state)| {
state.remove(&key);
});
},
Command::RemoveFinalizedTxs(xts) => {
log_xt_trace!(target: LOG_TARGET, xts.clone(), "[{:?}] dropped_watcher: finalized xt removed");
xts.iter().for_each(|xt| {
ctx.initial_views.remove(xt);
ctx.transaction_states.remove(xt);
});

Expand Down Expand Up @@ -291,34 +262,13 @@ where
});
}

/// Adds the initial view for the given transactions hashes.
///
/// This message should be called when the external submission of a transaction occures. It
/// provides the list of initial views for given extrinsics hashes.
///
/// The dropped notification is not sent if it comes from the initial views. It allows to keep
/// transaction in the mempool, even if all the views are full at the time of submitting
/// transaction to the pool.
pub fn add_initial_views(
&self,
xts: impl IntoIterator<Item = ExtrinsicHash<C>> + Clone,
block_hash: BlockHash<C>,
) {
let _ = self
.controller
.unbounded_send(Command::AddInitialViews(xts.into_iter().collect(), block_hash))
.map_err(|e| {
trace!(target: LOG_TARGET, "dropped_watcher: add_initial_views_ send message failed: {e}");
});
}

/// Removes all initial views for finalized transactions.
/// Removes status info for finalized transactions.
pub fn remove_finalized_txs(&self, xts: impl IntoIterator<Item = ExtrinsicHash<C>> + Clone) {
let _ = self
.controller
.unbounded_send(Command::RemoveFinalizedTxs(xts.into_iter().collect()))
.map_err(|e| {
trace!(target: LOG_TARGET, "dropped_watcher: remove_initial_views send message failed: {e}");
trace!(target: LOG_TARGET, "dropped_watcher: remove_finalized_txs send message failed: {e}");
});
}
}
Expand Down Expand Up @@ -471,63 +421,4 @@ mod dropped_watcher_tests {
let handle = tokio::spawn(async move { output_stream.take(1).collect::<Vec<_>>().await });
assert_eq!(handle.await.unwrap(), vec![tx_hash]);
}

#[tokio::test]
async fn test06() {
sp_tracing::try_init_simple();
let (watcher, mut output_stream) = MultiViewDroppedWatcher::new();
assert!(output_stream.next().now_or_never().is_none());

let block_hash0 = H256::repeat_byte(0x01);
let block_hash1 = H256::repeat_byte(0x02);
let tx_hash = H256::repeat_byte(0x0b);

let view_stream0 = futures::stream::iter(vec![
(tx_hash, TransactionStatus::Future),
(tx_hash, TransactionStatus::InBlock((block_hash1, 0))),
])
.boxed();
watcher.add_view(block_hash0, view_stream0);
assert!(output_stream.next().now_or_never().is_none());

let view_stream1 = futures::stream::iter(vec![
(tx_hash, TransactionStatus::Ready),
(tx_hash, TransactionStatus::Dropped),
])
.boxed();

watcher.add_view(block_hash1, view_stream1);
watcher.add_initial_views(vec![tx_hash], block_hash1);
assert!(output_stream.next().now_or_never().is_none());
}

#[tokio::test]
async fn test07() {
sp_tracing::try_init_simple();
let (watcher, mut output_stream) = MultiViewDroppedWatcher::new();
assert!(output_stream.next().now_or_never().is_none());

let block_hash0 = H256::repeat_byte(0x01);
let block_hash1 = H256::repeat_byte(0x02);
let tx_hash = H256::repeat_byte(0x0b);

let view_stream0 = futures::stream::iter(vec![
(tx_hash, TransactionStatus::Future),
(tx_hash, TransactionStatus::InBlock((block_hash1, 0))),
])
.boxed();
watcher.add_view(block_hash0, view_stream0);
watcher.add_initial_views(vec![tx_hash], block_hash0);
assert!(output_stream.next().now_or_never().is_none());

let view_stream1 = futures::stream::iter(vec![
(tx_hash, TransactionStatus::Ready),
(tx_hash, TransactionStatus::Dropped),
])
.boxed();
watcher.add_view(block_hash1, view_stream1);

let handle = tokio::spawn(async move { output_stream.take(1).collect::<Vec<_>>().await });
assert_eq!(handle.await.unwrap(), vec![tx_hash]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use futures::{
use parking_lot::Mutex;
use prometheus_endpoint::Registry as PrometheusRegistry;
use sc_transaction_pool_api::{
error::{Error, IntoPoolError},
ChainEvent, ImportNotificationStream, MaintainedTransactionPool, PoolFuture, PoolStatus,
TransactionFor, TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash,
};
Expand Down Expand Up @@ -193,6 +192,7 @@ where
listener.clone(),
Default::default(),
mempool_max_transactions_count,
ready_limits.total_bytes + future_limits.total_bytes,
));

let (dropped_stream_controller, dropped_stream) =
Expand Down Expand Up @@ -283,6 +283,7 @@ where
listener.clone(),
metrics.clone(),
TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER * (options.ready.count + options.future.count),
options.ready.total_bytes + options.future.total_bytes,
));

let (dropped_stream_controller, dropped_stream) =
Expand Down Expand Up @@ -599,48 +600,36 @@ where
log::debug!(target: LOG_TARGET, "fatp::submit_at count:{} views:{}", xts.len(), self.active_views_count());
log_xt_trace!(target: LOG_TARGET, xts.iter().map(|xt| self.tx_hash(xt)), "[{:?}] fatp::submit_at");
let xts = xts.into_iter().map(Arc::from).collect::<Vec<_>>();
let mempool_result = self.mempool.extend_unwatched(source, &xts);
let mempool_results = self.mempool.extend_unwatched(source, &xts);

if view_store.is_empty() {
return future::ready(Ok(mempool_result)).boxed()
return future::ready(Ok(mempool_results)).boxed()
}

let (hashes, to_be_submitted): (Vec<TxHash<Self>>, Vec<ExtrinsicFor<ChainApi>>) =
mempool_result
.iter()
.zip(xts)
.filter_map(|(result, xt)| result.as_ref().ok().map(|xt_hash| (xt_hash, xt)))
.unzip();
let to_be_submitted = mempool_results
.iter()
.zip(xts)
.filter_map(|(result, xt)| result.as_ref().ok().map(|_| xt))
.collect::<Vec<_>>();

self.metrics
.report(|metrics| metrics.submitted_transactions.inc_by(to_be_submitted.len() as _));

let mempool = self.mempool.clone();
async move {
let results_map = view_store.submit(source, to_be_submitted.into_iter(), hashes).await;
let results_map = view_store.submit(source, to_be_submitted.into_iter()).await;
let mut submission_results = reduce_multiview_result(results_map).into_iter();

Ok(mempool_result
Ok(mempool_results
.into_iter()
.map(|result| {
result.and_then(|xt_hash| {
let result = submission_results
submission_results
.next()
.expect("The number of Ok results in mempool is exactly the same as the size of to-views-submission result. qed.");
result.or_else(|error| {
let error = error.into_pool_error();
match error {
Ok(
// The transaction is still in mempool it may get included into the view for the next block.
Error::ImmediatelyDropped
) => Ok(xt_hash),
Ok(e) => {
mempool.remove(xt_hash);
Err(e.into())
},
Err(e) => Err(e),
}
})
.expect("The number of Ok results in mempool is exactly the same as the size of to-views-submission result. qed.")
.inspect_err(|_|
mempool.remove(xt_hash)
)
})
})
.collect::<Vec<_>>())
Expand Down Expand Up @@ -692,26 +681,10 @@ where
let view_store = self.view_store.clone();
let mempool = self.mempool.clone();
async move {
let result = view_store.submit_and_watch(at, source, xt).await;
let result = result.or_else(|(e, maybe_watcher)| {
let error = e.into_pool_error();
match (error, maybe_watcher) {
(
Ok(
// The transaction is still in mempool it may get included into the
// view for the next block.
Error::ImmediatelyDropped,
),
Some(watcher),
) => Ok(watcher),
(Ok(e), _) => {
mempool.remove(xt_hash);
Err(e.into())
},
(Err(e), _) => Err(e),
}
});
result
view_store
.submit_and_watch(at, source, xt)
.await
.inspect_err(|_| mempool.remove(xt_hash))
}
.boxed()
}
Expand Down Expand Up @@ -1056,7 +1029,7 @@ where
future::join_all(results).await
}

/// Updates the given view with the transaction from the internal mempol.
/// Updates the given view with the transactions from the internal mempol.
///
/// All transactions from the mempool (excluding those which are either already imported or
/// already included in blocks since recently finalized block) are submitted to the
Expand Down Expand Up @@ -1139,12 +1112,9 @@ where
// out the invalid event, and remove transaction.
if self.view_store.is_empty() {
for result in watched_results {
match result {
Err(tx_hash) => {
self.view_store.listener.invalidate_transactions(&[tx_hash]);
self.mempool.remove(tx_hash);
},
Ok(_) => {},
if let Err(tx_hash) = result {
self.view_store.listener.invalidate_transactions(&[tx_hash]);
self.mempool.remove(tx_hash);
}
}
}
Expand Down
Loading

0 comments on commit ace62f1

Please sign in to comment.