Skip to content

Commit

Permalink
fatp: better support for usurped transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
michalkucharczyk committed Nov 14, 2024
1 parent 69567b8 commit 414ec3c
Show file tree
Hide file tree
Showing 7 changed files with 306 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,9 @@ where
) -> Option<DroppedTransaction<ExtrinsicHash<C>>> {
trace!(
target: LOG_TARGET,
"dropped_watcher: handle_event: event:{:?} views:{:?}, ",
event,
"dropped_watcher: handle_event: event:{event:?} from:{block_hash:?} future_views:{:?} ready_views:{:?} stream_map views:{:?}, ",
self.future_transaction_views.get(&event.0),
self.ready_transaction_views.get(&event.0),
self.stream_map.keys().collect::<Vec<_>>(),
);
let (tx_hash, status) = event;
Expand All @@ -219,23 +220,8 @@ where
return Some(DroppedTransaction::new_enforced_by_limts(tx_hash))
}
},
// todo:
// 1. usurpued shall be sent unconditionally
// 2. fatp shall act for every usurped message - it should remove tx from every view and
// replace it with new one (also in mempool).
TransactionStatus::Usurped(by) => {
if let Entry::Occupied(mut views_keeping_tx_valid) =
self.ready_transaction_views.entry(tx_hash)
{
views_keeping_tx_valid.get_mut().remove(&block_hash);
if views_keeping_tx_valid.get().is_empty() {
return Some(DroppedTransaction::new_usurped(tx_hash, by))
}
} else {
debug!("[{:?}] dropped_watcher: removing (non-tracked) tx", tx_hash);
return Some(DroppedTransaction::new_usurped(tx_hash, by))
}
},
TransactionStatus::Usurped(by) =>
return Some(DroppedTransaction::new_usurped(tx_hash, by)),
_ => {},
};
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{
api::FullChainApi,
common::log_xt::log_xt_trace,
enactment_state::{EnactmentAction, EnactmentState},
fork_aware_txpool::revalidation_worker,
fork_aware_txpool::{dropped_watcher::DroppedReason, revalidation_worker},
graph::{
self,
base_pool::{TimedTransactionSource, Transaction},
Expand Down Expand Up @@ -201,9 +201,14 @@ where

let (dropped_stream_controller, dropped_stream) =
MultiViewDroppedWatcherController::<ChainApi>::new();

let view_store =
Arc::new(ViewStore::new(pool_api.clone(), listener, dropped_stream_controller));

let dropped_monitor_task = Self::dropped_monitor_task(
dropped_stream,
mempool.clone(),
view_store.clone(),
import_notification_sink.clone(),
);

Expand All @@ -220,8 +225,8 @@ where
(
Self {
mempool,
api: pool_api.clone(),
view_store: Arc::new(ViewStore::new(pool_api, listener, dropped_stream_controller)),
api: pool_api,
view_store,
ready_poll: Arc::from(Mutex::from(ReadyPoll::new())),
enactment_state: Arc::new(Mutex::new(EnactmentState::new(
best_block_hash,
Expand All @@ -237,14 +242,17 @@ where
)
}

/// Monitors the stream of dropped transactions and removes them from the mempool.
/// Monitors the stream of dropped transactions and removes them from the mempool and
/// view_store.
///
/// This asynchronous task continuously listens for dropped transaction notifications provided
/// within `dropped_stream` and ensures that these transactions are removed from the `mempool`
/// and `import_notification_sink` instances.
/// and `import_notification_sink` instances. For Usurped events, the transaction is also
/// removed from the view_store.
async fn dropped_monitor_task(
mut dropped_stream: StreamOfDropped<ChainApi>,
mempool: Arc<TxMemPool<ChainApi, Block>>,
view_store: Arc<ViewStore<ChainApi, Block>>,
import_notification_sink: MultiViewImportNotificationSink<
Block::Hash,
ExtrinsicHash<ChainApi>,
Expand All @@ -255,10 +263,33 @@ where
log::debug!(target: LOG_TARGET, "fatp::dropped_monitor_task: terminated...");
break;
};
log::trace!(target: LOG_TARGET, "[{:?}] fatp::dropped notification, removing", dropped);
let tx_hash = dropped.tx_hash;
mempool.remove_dropped_transaction(dropped).await;
import_notification_sink.clean_notified_items(&[tx_hash]);
let dropped_tx_hash = dropped.tx_hash;
log::trace!(target: LOG_TARGET, "[{:?}] fatp::dropped notification {:?}, removing", dropped_tx_hash,dropped.reason);
match dropped.reason {
DroppedReason::Usurped(new_tx_hash) => {
if let Some(new_tx) = mempool.get_by_hash(new_tx_hash) {
view_store
.replace_transaction(
new_tx.source(),
new_tx.tx(),
dropped_tx_hash,
new_tx.is_watched(),
)
.await;
} else {
log::trace!(
target:LOG_TARGET,
"error: dropped_monitor_task: no entry in mempool for new transaction {:?}",
new_tx_hash,
);
}
},
DroppedReason::LimitsEnforced => {},
};

mempool.remove_dropped_transaction(&dropped_tx_hash).await;
view_store.listener.transaction_dropped(dropped);
import_notification_sink.clean_notified_items(&[dropped_tx_hash]);
}
}

Expand Down Expand Up @@ -293,9 +324,13 @@ where

let (dropped_stream_controller, dropped_stream) =
MultiViewDroppedWatcherController::<ChainApi>::new();

let view_store =
Arc::new(ViewStore::new(pool_api.clone(), listener, dropped_stream_controller));
let dropped_monitor_task = Self::dropped_monitor_task(
dropped_stream,
mempool.clone(),
view_store.clone(),
import_notification_sink.clone(),
);

Expand All @@ -311,8 +346,8 @@ where

Self {
mempool,
api: pool_api.clone(),
view_store: Arc::new(ViewStore::new(pool_api, listener, dropped_stream_controller)),
api: pool_api,
view_store,
ready_poll: Arc::from(Mutex::from(ReadyPoll::new())),
enactment_state: Arc::new(Mutex::new(EnactmentState::new(
best_block_hash,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,11 @@ where
Self { controllers: Default::default() }
}

/// Returns `true` if the listener contains a stream controller for the specified hash.
pub fn contains_tx(&self, tx_hash: &ExtrinsicHash<ChainApi>) -> bool {
self.controllers.read().contains_key(tx_hash)
}

/// Creates an external aggregated stream of events for given transaction.
///
/// This method initializes an `ExternalWatcherContext` for the provided transaction hash, sets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ where
/// Shall the progress of transaction be watched.
///
/// Was transaction sent with `submit_and_watch`.
fn is_watched(&self) -> bool {
pub(crate) fn is_watched(&self) -> bool {
self.watched
}

Expand Down Expand Up @@ -328,15 +328,13 @@ where
self.try_insert(hash, TxInMemPool::new_watched(source, xt.clone(), length))
}

/// Removes transactions from the memory pool which are specified by the given list of hashes
/// and send the `Dropped` event to the listeners of these transactions.
/// Removes transaction from the memory pool which are specified by the given list of hashes.
pub(super) async fn remove_dropped_transaction(
&self,
dropped: DroppedTransaction<ExtrinsicHash<ChainApi>>,
) {
dropped: &ExtrinsicHash<ChainApi>,
) -> Option<Arc<TxInMemPool<ChainApi, Block>>> {
log::debug!(target: LOG_TARGET, "[{:?}] mempool::remove_dropped_transaction", dropped);
self.transactions.write().remove(&dropped.tx_hash);
self.listener.transaction_dropped(dropped);
self.transactions.write().remove(dropped)
}

/// Clones and returns a `HashMap` of references to all unwatched transactions in the memory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,4 +454,10 @@ where
);
}
}

/// Returns true if the transaction hash is already imported into the view
pub(super) fn is_imported(&self, tx_hash: &ExtrinsicHash<ChainApi>) -> bool {
const IGNORE_BANNED: bool = false;
self.pool.validated_pool().check_is_known(tx_hash, IGNORE_BANNED).is_ok()
}
}
Loading

0 comments on commit 414ec3c

Please sign in to comment.