Skip to content

Commit

Permalink
graph: dedicated usurped/limit_enforced calls added
Browse files Browse the repository at this point in the history
  • Loading branch information
michalkucharczyk committed Nov 12, 2024
1 parent e798e34 commit 7b58a68
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 25 deletions.
48 changes: 27 additions & 21 deletions substrate/client/transaction-pool/src/graph/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,38 +120,44 @@ impl<H: hash::Hash + traits::Member + Serialize + Clone, C: ChainApi> Listener<H
self.fire(tx, |watcher| watcher.future());
if let Some(ref sink) = self.dropped_by_limits_sink {
if let Err(e) = sink.unbounded_send((tx.clone(), TransactionStatus::Future)) {
trace!(target: LOG_TARGET, "[{:?}] dropped_sink/future: send message failed: {:?}", tx, e);
trace!(target: LOG_TARGET, "[{:?}] dropped_sink: send message failed: {:?}", tx, e);
}
}
}

/// Transaction was dropped from the pool because of the limit.
///
/// If the function was actually called due to enforcing limits, the `limits_enforced` flag
/// shall be set to true.
pub fn dropped(&mut self, tx: &H, by: Option<&H>, limits_enforced: bool) {
/// Transaction was dropped from the pool because of enforcing the limit.
pub fn limit_enforced(&mut self, tx: &H) {
trace!(target: LOG_TARGET, "[{:?}] Dropped (limit enforced)", tx);
self.fire(tx, |watcher| watcher.limit_enforced());

if let Some(ref sink) = self.dropped_by_limits_sink {
if let Err(e) = sink.unbounded_send((tx.clone(), TransactionStatus::Dropped)) {
trace!(target: LOG_TARGET, "[{:?}] dropped_sink: send message failed: {:?}", tx, e);
}
}
}

/// Transaction was replaced with other extrinsic.
pub fn usurped(&mut self, tx: &H, by: &H) {
trace!(target: LOG_TARGET, "[{:?}] Dropped (replaced with {:?})", tx, by);
self.fire(tx, |watcher| match by {
Some(t) => watcher.usurped(t.clone()),
None => watcher.dropped(),
});
self.fire(tx, |watcher| watcher.usurped(by.clone()));

//note: LimitEnforced could be introduced as new status to get rid of this flag.
if let Some(ref sink) = self.dropped_by_limits_sink {
if let Some(t) = by {
if let Err(e) =
sink.unbounded_send((tx.clone(), TransactionStatus::Usurped(t.clone())))
{
trace!(target: LOG_TARGET, "[{:?}] dropped_sink/future: send message failed: {:?}", tx, e);
}
} else if limits_enforced {
if let Err(e) = sink.unbounded_send((tx.clone(), TransactionStatus::Dropped)) {
trace!(target: LOG_TARGET, "[{:?}] dropped_sink/future: send message failed: {:?}", tx, e);
}
if let Err(e) =
sink.unbounded_send((tx.clone(), TransactionStatus::Usurped(by.clone())))
{
trace!(target: LOG_TARGET, "[{:?}] dropped_sink: send message failed: {:?}", tx, e);
}
}
}

/// Transaction was dropped from the pool because of the failure during the resubmission of
/// revalidate transactions or failure during pruning tags.
pub fn dropped(&mut self, tx: &H) {
trace!(target: LOG_TARGET, "[{:?}] Dropped", tx);
self.fire(tx, |watcher| watcher.dropped());
}

/// Transaction was removed as invalid.
pub fn invalid(&mut self, tx: &H) {
trace!(target: LOG_TARGET, "[{:?}] Extrinsic invalid", tx);
Expand Down
8 changes: 4 additions & 4 deletions substrate/client/transaction-pool/src/graph/validated_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ impl<B: ChainApi> ValidatedPool<B> {
// run notifications
let mut listener = self.listener.write();
for h in &removed {
listener.dropped(h, None, true);
listener.limit_enforced(h);
}

removed
Expand Down Expand Up @@ -453,7 +453,7 @@ impl<B: ChainApi> ValidatedPool<B> {
match final_status {
Status::Future => listener.future(&hash),
Status::Ready => listener.ready(&hash, None),
Status::Dropped => listener.dropped(&hash, None, false),
Status::Dropped => listener.dropped(&hash),
Status::Failed => listener.invalid(&hash),
}
}
Expand Down Expand Up @@ -492,7 +492,7 @@ impl<B: ChainApi> ValidatedPool<B> {
fire_events(&mut *listener, promoted);
}
for f in &status.failed {
listener.dropped(f, None, false);
listener.dropped(f);
}
}

Expand Down Expand Up @@ -682,7 +682,7 @@ where
base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => {
listener.ready(hash, None);
failed.iter().for_each(|f| listener.invalid(f));
removed.iter().for_each(|r| listener.dropped(&r.hash, Some(hash), false));
removed.iter().for_each(|r| listener.usurped(&r.hash, hash));
promoted.iter().for_each(|p| listener.ready(p, None));
},
base::Imported::Future { ref hash } => listener.future(hash),
Expand Down
6 changes: 6 additions & 0 deletions substrate/client/transaction-pool/src/graph/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ impl<H: Clone, BH: Clone> Sender<H, BH> {
}

/// Transaction has been dropped from the pool because of the limit.
pub fn limit_enforced(&mut self) {
self.send(TransactionStatus::Dropped);
self.is_finalized = true;
}

/// Transaction has been dropped from the pool.
pub fn dropped(&mut self) {
self.send(TransactionStatus::Dropped);
self.is_finalized = true;
Expand Down

0 comments on commit 7b58a68

Please sign in to comment.