Skip to content

Commit

Permalink
Fix transactions service being stuck before warp sync finishes (#1110)
Browse files Browse the repository at this point in the history
* Fix transactions service being stuck before warp sync finishes

* PR link
  • Loading branch information
tomaka authored Sep 5, 2023
1 parent 8b364b6 commit d91bd01
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 19 deletions.
72 changes: 53 additions & 19 deletions light-base/src/transactions_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ use core::{
time::Duration,
};
use futures_channel::mpsc;
use futures_lite::FutureExt as _;
use futures_util::stream::FuturesUnordered;
use futures_util::{future, FutureExt as _, SinkExt as _, StreamExt as _};
use itertools::Itertools as _;
Expand Down Expand Up @@ -352,19 +353,51 @@ async fn background_task<TPlat: PlatformRef>(mut config: BackgroundTaskConfig<TP
// service. This happens when there is a gap in the blocks, either intentionally (e.g.
// after a Grandpa warp sync) or because the transactions service was too busy to process
// the new blocks.
let mut subscribe_all = {
let sub_future = async {
Some(
// The buffer size should be large enough so that, if the CPU is busy, it
// doesn't become full before the execution of the transactions service resumes.
// The maximum number of pinned block is ignored, as this maximum is a way to
// avoid malicious behaviors. This code is by definition not considered
// malicious.
worker
.runtime_service
.subscribe_all(
"transactions-service",
32,
NonZeroUsize::new(usize::max_value()).unwrap(),
)
.await,
)
};

// Because `runtime_service.subscribe_all()` might take a long time (potentially
// forever), we need to process messages coming from the foreground in parallel.
let from_foreground = &mut config.from_foreground;
let messages_process = async move {
loop {
match from_foreground.next().await {
Some(ToBackground::SubmitTransaction {
updates_report: Some(mut updates_report),
..
}) => {
let _ = updates_report
.send(TransactionStatus::Dropped(DropReason::GapInChain))
.await;
}
Some(ToBackground::SubmitTransaction { .. }) => {}
None => break None,
}
}
};

match sub_future.or(messages_process).await {
Some(s) => s,
None => return,
}
};

// The buffer size should be large enough so that, if the CPU is busy, it doesn't
// become full before the execution of the transactions service resumes.
// The maximum number of pinned block is ignored, as this maximum is a way to avoid
// malicious behaviors. This code is by definition not considered malicious.
let mut subscribe_all = worker
.runtime_service
.subscribe_all(
"transactions-service",
32,
NonZeroUsize::new(usize::max_value()).unwrap(),
)
.await;
let initial_finalized_block_hash = header::hash_from_scale_encoded_header(
&subscribe_all.finalized_block_scale_encoded_header,
);
Expand Down Expand Up @@ -491,7 +524,7 @@ async fn background_task<TPlat: PlatformRef>(mut config: BackgroundTaskConfig<TP
let (to_execute, result_rx) = validation_future.remote_handle();
worker
.validations_in_progress
.push(to_execute.map(move |()| to_start_validate).boxed());
.push(Box::pin(to_execute.map(move |()| to_start_validate)));
let tx = worker
.pending_transactions
.transaction_user_data_mut(to_start_validate)
Expand Down Expand Up @@ -582,8 +615,9 @@ async fn background_task<TPlat: PlatformRef>(mut config: BackgroundTaskConfig<TP
NonZeroU32::new(3).unwrap(),
);

async move { (block_hash, download_future.await.map(|b| b.body.unwrap())) }
.boxed()
Box::pin(
async move { (block_hash, download_future.await.map(|b| b.body.unwrap())) },
)
});

worker
Expand Down Expand Up @@ -756,10 +790,10 @@ async fn background_task<TPlat: PlatformRef>(mut config: BackgroundTaskConfig<TP
tx.when_reannounce = now + Duration::from_secs(5);
worker.next_reannounce.push({
let platform = worker.platform.clone();
async move {
Box::pin(async move {
platform.sleep(Duration::from_secs(5)).await;
maybe_reannounce_tx_id
}.boxed()
})
});

// Perform the announce.
Expand Down Expand Up @@ -844,9 +878,9 @@ async fn background_task<TPlat: PlatformRef>(mut config: BackgroundTaskConfig<TP
.update_status(TransactionStatus::Validated);

// Schedule this transaction for announcement.
worker.next_reannounce.push(async move {
worker.next_reannounce.push(Box::pin(async move {
maybe_validated_tx_id
}.boxed());
}));

Ok(result)
}
Expand Down
9 changes: 9 additions & 0 deletions wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

## Unreleased

### Changed

- Transactions submitted through the JSON-RPC server before the warp syncing process is finished will now immediately be dropped. ([#1110](https://github.com/smol-dot/smoldot/pull/1110))

### Fixed

- Fix `Chain.remove()` not actually removing the chain until the warp syncing process is finished (which might never happen if for example bootnodes are misconfigured). ([#1110](https://github.com/smol-dot/smoldot/pull/1110))
- Fix JSON-RPC server not processing requests if many transactions are submitted before the warp syncing process is finished. ([#1110](https://github.com/smol-dot/smoldot/pull/1110))

## 1.0.17 - 2023-08-25

### Changed
Expand Down

0 comments on commit d91bd01

Please sign in to comment.