Skip to content

Commit

Permalink
fatp: TimedTransactionSource added
Browse files Browse the repository at this point in the history
  • Loading branch information
michalkucharczyk committed Nov 12, 2024
1 parent c566dad commit f10590f
Show file tree
Hide file tree
Showing 14 changed files with 278 additions and 178 deletions.
4 changes: 2 additions & 2 deletions substrate/client/transaction-pool/benches/basics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ fn uxt(transfer: TransferData) -> Extrinsic {
}

fn bench_configured(pool: Pool<TestApi>, number: u64, api: Arc<TestApi>) {
let source = TransactionSource::External;
let source = TimedTransactionSource::new_external(false);
let mut futures = Vec::new();
let mut tags = Vec::new();
let at = HashAndNumber {
Expand All @@ -171,7 +171,7 @@ fn bench_configured(pool: Pool<TestApi>, number: u64, api: Arc<TestApi>) {

tags.push(to_tag(nonce, AccountId::from_h256(H256::from_low_u64_be(1))));

futures.push(pool.submit_one(&at, source, xt));
futures.push(pool.submit_one(&at, source.clone(), xt));
}

let res = block_on(futures::future::join_all(futures.into_iter()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use super::{
import_notification_sink::MultiViewImportNotificationSink,
metrics::MetricsLink as PrometheusMetrics,
multi_view_listener::MultiViewListener,
tx_mem_pool::{TxInMemPool, TxMemPool, TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER},
tx_mem_pool::{InsertionInfo, TxInMemPool, TxMemPool, TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER},
view::View,
view_store::ViewStore,
};
Expand All @@ -32,7 +32,11 @@ use crate::{
common::log_xt::log_xt_trace,
enactment_state::{EnactmentAction, EnactmentState},
fork_aware_txpool::revalidation_worker,
graph::{self, base_pool::Transaction, ExtrinsicFor, ExtrinsicHash, IsValidator, Options},
graph::{
self,
base_pool::{TimedTransactionSource, Transaction},
ExtrinsicFor, ExtrinsicHash, IsValidator, Options,
},
PolledIterator, ReadyIteratorFor, LOG_TARGET,
};
use async_trait::async_trait;
Expand Down Expand Up @@ -604,32 +608,38 @@ where
let mempool_results = self.mempool.extend_unwatched(source, &xts);

if view_store.is_empty() {
return future::ready(Ok(mempool_results)).boxed()
return future::ready(Ok(mempool_results
.into_iter()
.map(|r| r.map(|r| r.hash))
.collect::<Vec<_>>()))
.boxed()
}

let to_be_submitted = mempool_results
.iter()
.zip(xts)
.filter_map(|(result, xt)| result.as_ref().ok().map(|_| xt))
.filter_map(|(result, xt)| {
result.as_ref().ok().map(|insertion| (insertion.source.clone(), xt))
})
.collect::<Vec<_>>();

self.metrics
.report(|metrics| metrics.submitted_transactions.inc_by(to_be_submitted.len() as _));

let mempool = self.mempool.clone();
async move {
let results_map = view_store.submit(source, to_be_submitted.into_iter()).await;
let results_map = view_store.submit(to_be_submitted.into_iter()).await;
let mut submission_results = reduce_multiview_result(results_map).into_iter();

Ok(mempool_results
.into_iter()
.map(|result| {
result.and_then(|xt_hash| {
result.and_then(|insertion| {
submission_results
.next()
.expect("The number of Ok results in mempool is exactly the same as the size of to-views-submission result. qed.")
.inspect_err(|_|
mempool.remove(xt_hash)
mempool.remove(insertion.hash)
)
})
})
Expand Down Expand Up @@ -672,18 +682,19 @@ where
) -> PoolFuture<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
log::trace!(target: LOG_TARGET, "[{:?}] fatp::submit_and_watch views:{}", self.tx_hash(&xt), self.active_views_count());
let xt = Arc::from(xt);
let xt_hash = match self.mempool.push_watched(source, xt.clone()) {
Ok(xt_hash) => xt_hash,
Err(e) => return future::ready(Err(e)).boxed(),
};
let InsertionInfo { hash: xt_hash, source: timed_source } =
match self.mempool.push_watched(source, xt.clone()) {
Ok(result) => result,
Err(e) => return future::ready(Err(e)).boxed(),
};

self.metrics.report(|metrics| metrics.submitted_transactions.inc());

let view_store = self.view_store.clone();
let mempool = self.mempool.clone();
async move {
view_store
.submit_and_watch(at, source, xt)
.submit_and_watch(at, timed_source, xt)
.await
.inspect_err(|_| mempool.remove(xt_hash))
}
Expand Down Expand Up @@ -816,12 +827,12 @@ where
) -> Result<Self::Hash, Self::Error> {
log::debug!(target: LOG_TARGET, "fatp::submit_local views:{}", self.active_views_count());
let xt = Arc::from(xt);
let result = self
let InsertionInfo { hash: xt_hash, .. } = self
.mempool
.extend_unwatched(TransactionSource::Local, &[xt.clone()])
.remove(0)?;

self.view_store.submit_local(xt).or_else(|_| Ok(result))
self.view_store.submit_local(xt).or_else(|_| Ok(xt_hash))
}
}

Expand Down Expand Up @@ -929,6 +940,9 @@ where
let start = Instant::now();
let watched_xts = self.register_listeners(&mut view).await;
let duration = start.elapsed();
// sync the transactions statuses and referencing views in all the listeners with newly
// cloned view.
view.pool.validated_pool().retrigger_notifications();
log::debug!(target: LOG_TARGET, "register_listeners: at {at:?} took {duration:?}");

// 2. Handle transactions from the tree route. Pruning transactions from the view first
Expand Down Expand Up @@ -1061,46 +1075,33 @@ where
let mut all_submitted_count = 0;
if !xts.is_empty() {
let unwatched_count = xts.len();
let mut buckets = HashMap::<TransactionSource, Vec<ExtrinsicFor<ChainApi>>>::default();
xts.into_iter()
let xts_with_src = xts
.into_iter()
.filter(|(hash, _)| !view.pool.validated_pool().pool.read().is_imported(hash))
.filter(|(hash, _)| !included_xts.contains(&hash))
.map(|(_, tx)| (tx.source(), tx.tx()))
.for_each(|(source, tx)| buckets.entry(source).or_default().push(tx));
.map(|(_, tx)| (tx.source(), tx.tx()));

for (source, xts) in buckets {
all_submitted_count += xts.len();
let _ = view.submit_many(source, xts).await;
}
let results = view.submit_many(xts_with_src).await;
all_submitted_count = results.len();
log::debug!(target: LOG_TARGET, "update_view_with_mempool: at {:?} unwatched {}/{}", view.at.hash, all_submitted_count, unwatched_count);
}

let watched_submitted_count = watched_xts.len();

let mut buckets = HashMap::<
TransactionSource,
Vec<(ExtrinsicHash<ChainApi>, ExtrinsicFor<ChainApi>)>,
>::default();
watched_xts
let watched_xts_filtered = watched_xts
.into_iter()
.filter(|(hash, _)| !included_xts.contains(&hash))
.map(|(tx_hash, tx)| (tx.source(), tx_hash, tx.tx()))
.for_each(|(source, tx_hash, tx)| {
buckets.entry(source).or_default().push((tx_hash, tx))
});
.map(|(tx_hash, tx)| (tx_hash, tx.source(), tx.tx()))
.collect::<Vec<_>>();

let mut watched_results = Vec::default();
for (source, watched_xts) in buckets {
let hashes = watched_xts.iter().map(|i| i.0).collect::<Vec<_>>();
let results = view
.submit_many(source, watched_xts.into_iter().map(|i| i.1))
.await
.into_iter()
.zip(hashes)
.map(|(result, tx_hash)| result.or_else(|_| Err(tx_hash)))
.collect::<Vec<_>>();
watched_results.extend(results);
}
let watched_submitted_count = watched_xts_filtered.len();

let hashes = watched_xts_filtered.iter().map(|i| i.0).collect::<Vec<_>>();
let watched_results = view
.submit_many(watched_xts_filtered.into_iter().map(|i| (i.1, i.2)))
.await
.into_iter()
.zip(hashes)
.map(|(result, tx_hash)| result.or_else(|_| Err(tx_hash)))
.collect::<Vec<_>>();

log::debug!(target: LOG_TARGET, "update_view_with_mempool: at {:?} watched {}/{}", view.at.hash, watched_submitted_count, self.mempool_len().1);

Expand Down Expand Up @@ -1191,7 +1192,14 @@ where
})
.map(|(tx_hash, tx)| {
//find arc if tx is known
self.mempool.get_by_hash(tx_hash).unwrap_or_else(|| Arc::from(tx))
self.mempool
.get_by_hash(tx_hash)
.map(|tx| (tx.source(), tx.tx()))
.unwrap_or_else(|| {
// These transactions are coming from retracted blocks, we
// should simply consider them external.
(TimedTransactionSource::new_external(true), Arc::from(tx))
})
}),
);

Expand All @@ -1200,16 +1208,7 @@ where
});
}

let _ = view
.pool
.resubmit_at(
&hash_and_number,
// These transactions are coming from retracted blocks, we should
// simply consider them external.
TransactionSource::External,
resubmit_transactions,
)
.await;
let _ = view.pool.resubmit_at(&hash_and_number, resubmit_transactions).await;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ mod tests {
use crate::{
common::tests::{uxt, TestApi},
fork_aware_txpool::view::FinishRevalidationLocalChannels,
TimedTransactionSource,
};
use futures::executor::block_on;
use sc_transaction_pool_api::TransactionSource;
use substrate_test_runtime::{AccountId, Transfer, H256};
use substrate_test_runtime_client::AccountKeyring::Alice;
#[test]
Expand All @@ -212,9 +212,10 @@ mod tests {
nonce: 0,
});

let _ = block_on(
view.submit_many(TransactionSource::External, std::iter::once(uxt.clone().into())),
);
let _ = block_on(view.submit_many(std::iter::once((
TimedTransactionSource::new_external(false),
uxt.clone().into(),
))));
assert_eq!(api.validation_requests().len(), 1);

let (finish_revalidation_request_tx, finish_revalidation_request_rx) =
Expand Down
Loading

0 comments on commit f10590f

Please sign in to comment.