Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Move spawning tasks from thread pools to Service's TaskManager for bl…
Browse files Browse the repository at this point in the history
…ock importing (#5647)

Co-Authored-By: Pierre Krieger <[email protected]>
  • Loading branch information
pscott and tomaka authored Apr 29, 2020
1 parent d418254 commit d43fb8e
Show file tree
Hide file tree
Showing 22 changed files with 125 additions and 108 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ macro_rules! new_full_start {
let pool_api = sc_transaction_pool::FullChainApi::new(client.clone());
Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api), prometheus_registry))
})?
.with_import_queue(|_config, client, mut select_chain, _transaction_pool| {
.with_import_queue(|_config, client, mut select_chain, _transaction_pool, spawn_task_handle| {
let select_chain = select_chain.take()
.ok_or_else(|| sc_service::Error::SelectChainRequired)?;

Expand All @@ -52,13 +52,16 @@ macro_rules! new_full_start {
grandpa_block_import.clone(), client.clone(),
);

let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair>(
let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future);

let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _>(
sc_consensus_aura::slot_duration(&*client)?,
aura_block_import,
Some(Box::new(grandpa_block_import.clone())),
None,
client,
inherent_data_providers.clone(),
spawner,
)?;

import_setup = Some((grandpa_block_import, grandpa_link));
Expand Down Expand Up @@ -191,7 +194,7 @@ pub fn new_light(config: Configuration) -> Result<impl AbstractService, ServiceE
);
Ok(pool)
})?
.with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool| {
.with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool, spawn_task_handle| {
let fetch_checker = fetcher
.map(|fetcher| fetcher.checker().clone())
.ok_or_else(|| "Trying to start light import queue without active fetch checker")?;
Expand All @@ -205,13 +208,16 @@ pub fn new_light(config: Configuration) -> Result<impl AbstractService, ServiceE
let finality_proof_request_builder =
finality_proof_import.create_finality_proof_request_builder();

let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair>(
let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future);

let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _>(
sc_consensus_aura::slot_duration(&*client)?,
grandpa_block_import,
None,
Some(Box::new(finality_proof_import)),
client,
inherent_data_providers.clone(),
spawner,
)?;

Ok((import_queue, finality_proof_request_builder))
Expand Down
10 changes: 8 additions & 2 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ macro_rules! new_full_start {
let pool_api = sc_transaction_pool::FullChainApi::new(client.clone());
Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api), prometheus_registry))
})?
.with_import_queue(|_config, client, mut select_chain, _transaction_pool| {
.with_import_queue(|_config, client, mut select_chain, _transaction_pool, spawn_task_handle| {
let select_chain = select_chain.take()
.ok_or_else(|| sc_service::Error::SelectChainRequired)?;
let (grandpa_block_import, grandpa_link) = grandpa::block_import(
Expand All @@ -68,13 +68,16 @@ macro_rules! new_full_start {
client.clone(),
)?;

let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future);

let import_queue = sc_consensus_babe::import_queue(
babe_link.clone(),
block_import.clone(),
Some(Box::new(justification_import)),
None,
client,
inherent_data_providers.clone(),
spawner,
)?;

import_setup = Some((block_import, grandpa_link, babe_link));
Expand Down Expand Up @@ -284,7 +287,7 @@ pub fn new_light(config: Configuration)
);
Ok(pool)
})?
.with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool| {
.with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool, spawn_task_handle| {
let fetch_checker = fetcher
.map(|fetcher| fetcher.checker().clone())
.ok_or_else(|| "Trying to start light import queue without active fetch checker")?;
Expand All @@ -305,13 +308,16 @@ pub fn new_light(config: Configuration)
client.clone(),
)?;

let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future);

let import_queue = sc_consensus_babe::import_queue(
babe_link,
babe_block_import,
None,
Some(Box::new(finality_proof_import)),
client.clone(),
inherent_data_providers.clone(),
spawner,
)?;

Ok((import_queue, finality_proof_request_builder))
Expand Down
4 changes: 2 additions & 2 deletions client/cli/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use names::{Generator, Name};
use sc_service::config::{
WasmExecutionMethod, Role, OffchainWorkerConfig,
Configuration, DatabaseConfig, ExtTransport, KeystoreConfig, NetworkConfiguration,
NodeKeyConfig, PrometheusConfig, PruningMode, TelemetryEndpoints, TransactionPoolOptions,
NodeKeyConfig, PrometheusConfig, PruningMode, TelemetryEndpoints, TransactionPoolOptions, TaskType
};
use sc_client_api::execution_extensions::ExecutionStrategies;
use sc_service::{ChainSpec, TracingReceiver};
Expand Down Expand Up @@ -385,7 +385,7 @@ pub trait CliConfiguration: Sized {
fn create_configuration<C: SubstrateCli>(
&self,
cli: &C,
task_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
task_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>, TaskType) + Send + Sync>,
) -> Result<Configuration> {
let is_dev = self.is_dev()?;
let chain_id = self.chain_id(is_dev)?;
Expand Down
4 changes: 2 additions & 2 deletions client/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use log::info;
pub use params::*;
use regex::Regex;
pub use runner::*;
use sc_service::{ChainSpec, Configuration};
use sc_service::{ChainSpec, Configuration, TaskType};
use std::future::Future;
use std::io::Write;
use std::pin::Pin;
Expand Down Expand Up @@ -177,7 +177,7 @@ pub trait SubstrateCli: Sized {
fn create_configuration<T: CliConfiguration>(
&self,
command: &T,
task_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
task_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>, TaskType) + Send + Sync>,
) -> error::Result<Configuration> {
command.create_configuration(self, task_executor)
}
Expand Down
25 changes: 17 additions & 8 deletions client/cli/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use futures::pin_mut;
use futures::select;
use futures::{future, future::FutureExt, Future};
use log::info;
use sc_service::{AbstractService, Configuration, Role, ServiceBuilderCommand};
use sc_service::{AbstractService, Configuration, Role, ServiceBuilderCommand, TaskType};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use sp_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL};
use std::fmt::Debug;
Expand Down Expand Up @@ -116,13 +116,22 @@ impl<C: SubstrateCli> Runner<C> {
/// Create a new runtime with the command provided in argument
pub fn new<T: CliConfiguration>(cli: &C, command: &T) -> Result<Runner<C>> {
let tokio_runtime = build_runtime()?;

let task_executor = {
let runtime_handle = tokio_runtime.handle().clone();
Arc::new(move |fut| {
runtime_handle.spawn(fut);
})
};
let runtime_handle = tokio_runtime.handle().clone();

let task_executor = Arc::new(
move |fut, task_type| {
match task_type {
TaskType::Async => { runtime_handle.spawn(fut); }
TaskType::Blocking => {
runtime_handle.spawn( async move {
// `spawn_blocking` is looking for the current runtime, and as such has to be called
// from within `spawn`.
tokio::task::spawn_blocking(move || futures::executor::block_on(fut))
});
}
}
}
);

Ok(Runner {
config: command.create_configuration(cli, task_executor)?,
Expand Down
7 changes: 5 additions & 2 deletions client/consensus/aura/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use std::{
collections::HashMap
};

use futures::prelude::*;
use futures::{prelude::*, future::BoxFuture};
use parking_lot::Mutex;
use log::{debug, info, trace};

Expand Down Expand Up @@ -788,13 +788,14 @@ impl<Block: BlockT, C, I, P> BlockImport<Block> for AuraBlockImport<Block, C, I,
}

/// Start an import queue for the Aura consensus algorithm.
pub fn import_queue<B, I, C, P>(
pub fn import_queue<B, I, C, P, F>(
slot_duration: SlotDuration,
block_import: I,
justification_import: Option<BoxJustificationImport<B>>,
finality_proof_import: Option<BoxFinalityProofImport<B>>,
client: Arc<C>,
inherent_data_providers: InherentDataProviders,
spawner: F,
) -> Result<AuraImportQueue<B, sp_api::TransactionFor<C, B>>, sp_consensus::Error> where
B: BlockT,
C::Api: BlockBuilderApi<B> + AuraApi<B, AuthorityId<P>> + ApiExt<B, Error = sp_blockchain::Error>,
Expand All @@ -804,6 +805,7 @@ pub fn import_queue<B, I, C, P>(
P: Pair + Send + Sync + 'static,
P::Public: Clone + Eq + Send + Sync + Hash + Debug + Encode + Decode,
P::Signature: Encode + Decode,
F: Fn(BoxFuture<'static, ()>) -> (),
{
register_aura_inherent_data_provider(&inherent_data_providers, slot_duration.get())?;
initialize_authorities_cache(&*client)?;
Expand All @@ -818,6 +820,7 @@ pub fn import_queue<B, I, C, P>(
Box::new(block_import),
justification_import,
finality_proof_import,
spawner,
))
}

Expand Down
4 changes: 3 additions & 1 deletion client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ use sc_client_api::{
};
use sp_block_builder::BlockBuilder as BlockBuilderApi;

use futures::prelude::*;
use futures::{prelude::*, future::BoxFuture};
use log::{debug, info, log, trace, warn};
use sc_consensus_slots::{
SlotWorker, SlotInfo, SlotCompatible, StorageChanges, CheckedHeader, check_equivocation,
Expand Down Expand Up @@ -1272,6 +1272,7 @@ pub fn import_queue<Block: BlockT, Client, Inner>(
finality_proof_import: Option<BoxFinalityProofImport<Block>>,
client: Arc<Client>,
inherent_data_providers: InherentDataProviders,
spawner: impl Fn(BoxFuture<'static, ()>) -> (),
) -> ClientResult<BabeImportQueue<Block, sp_api::TransactionFor<Client, Block>>> where
Inner: BlockImport<Block, Error = ConsensusError, Transaction = sp_api::TransactionFor<Client, Block>>
+ Send + Sync + 'static,
Expand All @@ -1294,6 +1295,7 @@ pub fn import_queue<Block: BlockT, Client, Inner>(
Box::new(block_import),
justification_import,
finality_proof_import,
spawner,
))
}

Expand Down
6 changes: 4 additions & 2 deletions client/consensus/manual-seal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! A manual sealing engine: the engine listens for rpc calls to seal blocks and create forks.
//! This is suitable for a testing environment.
use futures::prelude::*;
use futures::{prelude::*, future::BoxFuture};
use sp_consensus::{
Environment, Proposer, ForkChoiceStrategy, BlockImportParams, BlockOrigin, SelectChain,
import_queue::{BasicQueue, CacheKeyId, Verifier, BoxBlockImport},
Expand Down Expand Up @@ -67,7 +67,8 @@ impl<B: BlockT> Verifier<B> for ManualSealVerifier {

/// Instantiate the import queue for the manual seal consensus engine.
pub fn import_queue<Block, B>(
block_import: BoxBlockImport<Block, TransactionFor<B, Block>>
block_import: BoxBlockImport<Block, TransactionFor<B, Block>>,
spawner: impl Fn(BoxFuture<'static, ()>) -> ()
) -> BasicQueue<Block, TransactionFor<B, Block>>
where
Block: BlockT,
Expand All @@ -78,6 +79,7 @@ pub fn import_queue<Block, B>(
Box::new(block_import),
None,
None,
spawner,
)
}

Expand Down
5 changes: 4 additions & 1 deletion client/consensus/pow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use codec::{Encode, Decode};
use sc_client_api;
use log::*;
use sp_timestamp::{InherentError as TIError, TimestampInherentData};
use futures::future::BoxFuture;

#[derive(derive_more::Display, Debug)]
pub enum Error<B: BlockT> {
Expand Down Expand Up @@ -461,6 +462,7 @@ pub fn import_queue<B, Transaction, Algorithm>(
finality_proof_import: Option<BoxFinalityProofImport<B>>,
algorithm: Algorithm,
inherent_data_providers: InherentDataProviders,
spawner: impl Fn(BoxFuture<'static, ()>) -> (),
) -> Result<
PowImportQueue<B, Transaction>,
sp_consensus::Error
Expand All @@ -477,7 +479,8 @@ pub fn import_queue<B, Transaction, Algorithm>(
verifier,
block_import,
justification_import,
finality_proof_import
finality_proof_import,
spawner,
))
}

Expand Down
4 changes: 4 additions & 0 deletions client/network/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,15 @@ fn build_test_full_node(config: config::NetworkConfiguration)
}
}

let threads_pool = futures::executor::ThreadPool::new().unwrap();
let spawner = |future| threads_pool.spawn_ok(future);

let import_queue = Box::new(sp_consensus::import_queue::BasicQueue::new(
PassThroughVerifier(false),
Box::new(client.clone()),
None,
None,
spawner,
));

let worker = NetworkWorker::new(config::Params {
Expand Down
5 changes: 4 additions & 1 deletion client/network/test/src/block_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ fn async_import_queue_drops() {
// Perform this test multiple times since it exhibits non-deterministic behavior.
for _ in 0..100 {
let verifier = PassThroughVerifier(true);
let queue = BasicQueue::new(verifier, Box::new(substrate_test_runtime_client::new()), None, None);

let threads_pool = futures::executor::ThreadPool::new().unwrap();
let spawner = |future| threads_pool.spawn_ok(future);
let queue = BasicQueue::new(verifier, Box::new(substrate_test_runtime_client::new()), None, None, spawner);
drop(queue);
}
}
8 changes: 8 additions & 0 deletions client/network/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,11 +606,15 @@ pub trait TestNetFactory: Sized {
);
let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>)));

let threads_pool = futures::executor::ThreadPool::new().unwrap();
let spawner = |future| threads_pool.spawn_ok(future);

let import_queue = Box::new(BasicQueue::new(
verifier.clone(),
Box::new(block_import.clone()),
justification_import,
finality_proof_import,
spawner,
));

let listen_addr = build_multiaddr![Memory(rand::random::<u64>())];
Expand Down Expand Up @@ -683,11 +687,15 @@ pub trait TestNetFactory: Sized {
);
let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>)));

let threads_pool = futures::executor::ThreadPool::new().unwrap();
let spawner = |future| threads_pool.spawn_ok(future);

let import_queue = Box::new(BasicQueue::new(
verifier.clone(),
Box::new(block_import.clone()),
justification_import,
finality_proof_import,
spawner,
));

let listen_addr = build_multiaddr![Memory(rand::random::<u64>())];
Expand Down
Loading

0 comments on commit d43fb8e

Please sign in to comment.