Skip to content

Commit

Permalink
dropped_watcher: dropping future transcations improved
Browse files Browse the repository at this point in the history
  • Loading branch information
michalkucharczyk committed Nov 12, 2024
1 parent a116c20 commit 560db28
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ use sc_transaction_pool_api::TransactionStatus;
use sc_utils::mpsc;
use sp_runtime::traits::Block as BlockT;
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
collections::{
hash_map::{Entry, OccupiedEntry},
HashMap, HashSet,
},
fmt::{self, Debug, Formatter},
pin::Pin,
};
Expand Down Expand Up @@ -98,7 +101,7 @@ where
AddView(BlockHash<ChainApi>, ViewStream<ChainApi>),
/// Removes an existing view's stream associated with a specific block hash.
RemoveView(BlockHash<ChainApi>),
/// Removes internal states for given extrinsic hashes.
/// Removes referencing views for given extrinsic hashes.
///
/// Intended to ba called on finalization.
RemoveFinalizedTxs(Vec<ExtrinsicHash<ChainApi>>),
Expand Down Expand Up @@ -129,23 +132,53 @@ where
/// A map that associates the views identified by corresponding block hashes with their streams
/// of dropped-related events. This map is used to keep track of active views and their event
/// streams.
/// todo: rename: view_stream map
stream_map: StreamMap<BlockHash<ChainApi>, ViewStream<ChainApi>>,
/// A receiver for commands to control the state of the stream, allowing the addition and
/// removal of views. This is used to dynamically update which views are being tracked.
command_receiver: CommandReceiver<Command<ChainApi>>,

/// For each transaction hash we keep the set of hashes representing the views that see this
/// transaction as ready or future.
/// transaction as ready or in_block.
///
/// Even if all views referencing a ready transactions are removed, we still want to keep
/// transaction, there can be a fork which sees the transaction as ready.
///
/// Once transaction is dropped, dropping view is removed from the set.
ready_transaction_views: HashMap<ExtrinsicHash<ChainApi>, HashSet<BlockHash<ChainApi>>>,
/// For each transaction hash we keep the set of hashes representing the views that see this
/// transaction as future.
///
/// Once all views referencing a future transactions are removed, the future can be dropped.
///
/// Once transaction is dropped, dropping view is removed from the set.
transaction_states: HashMap<ExtrinsicHash<ChainApi>, HashSet<BlockHash<ChainApi>>>,
future_transaction_views: HashMap<ExtrinsicHash<ChainApi>, HashSet<BlockHash<ChainApi>>>,

/// Transactions that need to be notified as dropped.
pending_dropped_transactions: Vec<ExtrinsicHash<ChainApi>>,
}

impl<C> MultiViewDropWatcherContext<C>
where
C: graph::ChainApi + 'static,
<<C as graph::ChainApi>::Block as BlockT>::Hash: Unpin,
{
/// Provides the ready or future `HashSet` containing views referencing given transaction.
fn get_transaction_views(
&mut self,
tx_hash: ExtrinsicHash<C>,
) -> Option<OccupiedEntry<ExtrinsicHash<C>, HashSet<BlockHash<C>>>> {
if let Entry::Occupied(views_keeping_tx_valid) = self.ready_transaction_views.entry(tx_hash)
{
return Some(views_keeping_tx_valid)
}
if let Entry::Occupied(views_keeping_tx_valid) =
self.future_transaction_views.entry(tx_hash)
{
return Some(views_keeping_tx_valid)
}
None
}

/// Processes a `ViewStreamEvent` from a specific view and updates the internal state
/// accordingly.
///
Expand All @@ -164,13 +197,19 @@ where
);
let (tx_hash, status) = event;
match status {
TransactionStatus::Ready | TransactionStatus::Future => {
self.transaction_states.entry(tx_hash).or_default().insert(block_hash);
TransactionStatus::Future => {
self.future_transaction_views.entry(tx_hash).or_default().insert(block_hash);
},
TransactionStatus::Ready | TransactionStatus::InBlock(..) => {
if let Some(mut views) = self.future_transaction_views.remove(&tx_hash) {
views.insert(block_hash);
self.ready_transaction_views.insert(tx_hash, views);
} else {
self.ready_transaction_views.entry(tx_hash).or_default().insert(block_hash);
}
},
TransactionStatus::Dropped => {
if let Entry::Occupied(mut views_keeping_tx_valid) =
self.transaction_states.entry(tx_hash)
{
if let Some(mut views_keeping_tx_valid) = self.get_transaction_views(tx_hash) {
views_keeping_tx_valid.get_mut().remove(&block_hash);
if views_keeping_tx_valid.get().is_empty() {
return Some(DroppedTransaction::new_enforced_by_limts(tx_hash))
Expand All @@ -186,7 +225,7 @@ where
// replace it with new one (also in mempool).
TransactionStatus::Usurped(by) => {
if let Entry::Occupied(mut views_keeping_tx_valid) =
self.transaction_states.entry(tx_hash)
self.ready_transaction_views.entry(tx_hash)
{
views_keeping_tx_valid.get_mut().remove(&block_hash);
if views_keeping_tx_valid.get().is_empty() {
Expand All @@ -202,6 +241,25 @@ where
None
}

/// Gets pending dropped transactions if any.
fn get_pending_dropped_transaction(&mut self) -> Option<DroppedTransaction<ExtrinsicHash<C>>> {
while let Some(tx_hash) = self.pending_dropped_transactions.pop() {
// never drop transaction that was seens as ready. It may not have a referencing
// view now, but such fork can appear.
if let Some(_) = self.ready_transaction_views.get(&tx_hash) {
continue
}

if let Some(views) = self.future_transaction_views.get(&tx_hash) {
if views.is_empty() {
self.future_transaction_views.remove(&tx_hash);
return Some(DroppedTransaction::new_enforced_by_limts(tx_hash))
}
}
}
return None
}

/// Creates a new `StreamOfDropped` and its associated event stream controller.
///
/// This method initializes the internal structures and unfolds the stream of dropped
Expand All @@ -218,13 +276,25 @@ where
let ctx = Self {
stream_map: StreamMap::new(),
command_receiver,
transaction_states: Default::default(),
ready_transaction_views: Default::default(),
future_transaction_views: Default::default(),
pending_dropped_transactions: Default::default(),
};

let stream_map = futures::stream::unfold(ctx, |mut ctx| async move {
loop {
if let Some(dropped) = ctx.get_pending_dropped_transaction() {
debug!("dropped_watcher: sending out (pending): {dropped:?}");
return Some((dropped, ctx));
}
tokio::select! {
biased;
Some(event) = next_event(&mut ctx.stream_map) => {
if let Some(dropped) = ctx.handle_event(event.0, event.1) {
debug!("dropped_watcher: sending out: {dropped:?}");
return Some((dropped, ctx));
}
},
cmd = ctx.command_receiver.next() => {
match cmd? {
Command::AddView(key,stream) => {
Expand All @@ -234,26 +304,30 @@ where
Command::RemoveView(key) => {
trace!(target: LOG_TARGET,"dropped_watcher: Command::RemoveView {key:?} views:{:?}",ctx.stream_map.keys().collect::<Vec<_>>());
ctx.stream_map.remove(&key);
ctx.transaction_states.iter_mut().for_each(|(_,state)| {
state.remove(&key);
ctx.ready_transaction_views.iter_mut().for_each(|(tx_hash,views)| {
trace!(target: LOG_TARGET,"[{:?}] dropped_watcher: Command::RemoveView ready views: {:?}",tx_hash, views);
views.remove(&key);
});

ctx.future_transaction_views.iter_mut().for_each(|(tx_hash,views)| {
trace!(target: LOG_TARGET,"[{:?}] dropped_watcher: Command::RemoveView future views: {:?}",tx_hash, views);
views.remove(&key);
if views.is_empty() {
ctx.pending_dropped_transactions.push(*tx_hash);
}
});
},
Command::RemoveFinalizedTxs(xts) => {
log_xt_trace!(target: LOG_TARGET, xts.clone(), "[{:?}] dropped_watcher: finalized xt removed");
xts.iter().for_each(|xt| {
ctx.transaction_states.remove(xt);
ctx.ready_transaction_views.remove(xt);
ctx.future_transaction_views.remove(xt);
});

},
}
},

Some(event) = next_event(&mut ctx.stream_map) => {
if let Some(dropped) = ctx.handle_event(event.0, event.1) {
debug!("dropped_watcher: sending out: {dropped:?}");
return Some((dropped, ctx));
}
}

}
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ where
/// stream map.
fn remove_view(&mut self, block_hash: BlockHash<ChainApi>) {
self.status_stream_map.remove(&block_hash);
self.views_keeping_tx_valid.remove(&block_hash);
trace!(target: LOG_TARGET, "[{:?}] RemoveView view: {:?} views:{:?}", self.tx_hash, block_hash, self.status_stream_map.keys().collect::<Vec<_>>());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ where

log::debug!(
target: LOG_TARGET,
"mempool::revalidate: at {finalized_block:?} count:{input_len}/{count} purged:{} took {duration:?}", invalid_hashes.len(),
"mempool::revalidate: at {finalized_block:?} count:{input_len}/{count} invalid_hashes:{} took {duration:?}", invalid_hashes.len(),
);

invalid_hashes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,13 +471,16 @@ where
log::trace!(target:LOG_TARGET,"handle_finalized: inactive_views: {:?}", inactive_views.keys());
}

log::trace!(target:LOG_TARGET,"handle_finalized: dropped_views: {:?}", dropped_views);

self.listener.remove_stale_controllers();
self.dropped_stream_controller.remove_finalized_txs(finalized_xts.clone());

self.listener.remove_view(finalized_hash);
for view in dropped_views {
self.listener.remove_view(view);
self.dropped_stream_controller.remove_view(view);
}
self.listener.remove_stale_controllers();
self.dropped_stream_controller.remove_finalized_txs(finalized_xts.clone());

finalized_xts
}
Expand Down

0 comments on commit 560db28

Please sign in to comment.