Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Move spawning tasks from thread pools to Service's TaskManager for block importing #5647

Merged
merged 25 commits into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
037989c
Move tasks from tx pool to Service for basic_queue
Apr 15, 2020
894c74f
Add spawner to tests
Apr 15, 2020
b8fae5d
Update tests
Apr 15, 2020
05c6c6e
Merge branch 'master' of github.com:paritytech/substrate into scott_m…
Apr 22, 2020
5ae2652
Change usage of TaskManagerBuilder to TaskManager
Apr 22, 2020
f51dc2b
Renove dbg!
Apr 22, 2020
225e6c1
Revert exposing TaskManager in public API
Apr 22, 2020
5b9975e
Move client and import queue inside the import closure
Apr 22, 2020
58351b6
Move client inside the export closure
Apr 22, 2020
0edfdce
Move comments outside of closure
Apr 22, 2020
13e0f23
Add spawn_blocking method to SpawnTaskHandle; WIP
Apr 23, 2020
7dc8aa9
Use futures::executor::block_on instead of || wrapping the future
Apr 23, 2020
786811c
Add comments to code
pscott Apr 24, 2020
b891cbe
Remove useless block on task_executor declaration
Apr 24, 2020
ddd28a1
Add spawn_inner private function
Apr 24, 2020
497c540
Merge branch 'master' of github.com:paritytech/substrate into scott_m…
Apr 24, 2020
a69a69f
Merge branch 'master' of github.com:paritytech/substrate into scott_m…
Apr 25, 2020
e347a80
Merge branch 'master' of github.com:paritytech/substrate into scott_m…
Apr 27, 2020
1abcf8c
Merge branch 'master' of github.com:paritytech/substrate into scott_m…
Apr 27, 2020
2ac200d
Merge branch 'master' of github.com:paritytech/substrate into scott_m…
Apr 27, 2020
3857f7a
Merge branch 'master' of github.com:paritytech/substrate into scott_m…
Apr 27, 2020
1989c75
Merge branch 'master' of github.com:paritytech/substrate into scott_m…
Apr 28, 2020
7b9f700
Add Send to block_announce_validator_builder
Apr 28, 2020
c93ecf2
Merge branch 'master' of github.com:paritytech/substrate into scott_m…
Apr 29, 2020
e7ef065
Merge branch 'master' of github.com:paritytech/substrate into scott_m…
Apr 29, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ macro_rules! new_full_start {
let pool_api = sc_transaction_pool::FullChainApi::new(client.clone());
Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api), prometheus_registry))
})?
.with_import_queue(|_config, client, mut select_chain, _transaction_pool| {
.with_import_queue(|_config, client, mut select_chain, _transaction_pool, task_manager| {
let select_chain = select_chain.take()
.ok_or_else(|| sc_service::Error::SelectChainRequired)?;

Expand All @@ -52,13 +52,16 @@ macro_rules! new_full_start {
grandpa_block_import.clone(), client.clone(),
);

let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair>(
let spawner = |future| task_manager.spawn("import-queue-worker", future);

let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _>(
sc_consensus_aura::slot_duration(&*client)?,
aura_block_import,
Some(Box::new(grandpa_block_import.clone())),
None,
client,
inherent_data_providers.clone(),
spawner,
)?;

import_setup = Some((grandpa_block_import, grandpa_link));
Expand Down Expand Up @@ -195,7 +198,7 @@ pub fn new_light(config: Configuration)
);
Ok(pool)
})?
.with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool| {
.with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool, task_manager| {
let fetch_checker = fetcher
.map(|fetcher| fetcher.checker().clone())
.ok_or_else(|| "Trying to start light import queue without active fetch checker")?;
Expand All @@ -209,13 +212,16 @@ pub fn new_light(config: Configuration)
let finality_proof_request_builder =
finality_proof_import.create_finality_proof_request_builder();

let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair>(
let spawner = |future| task_manager.spawn("import-queue-worker", future);

let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _>(
sc_consensus_aura::slot_duration(&*client)?,
grandpa_block_import,
None,
Some(Box::new(finality_proof_import)),
client,
inherent_data_providers.clone(),
spawner,
)?;

Ok((import_queue, finality_proof_request_builder))
Expand Down
10 changes: 8 additions & 2 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ macro_rules! new_full_start {
let pool_api = sc_transaction_pool::FullChainApi::new(client.clone());
Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api), prometheus_registry))
})?
.with_import_queue(|_config, client, mut select_chain, _transaction_pool| {
.with_import_queue(|_config, client, mut select_chain, _transaction_pool, task_manager| {
let select_chain = select_chain.take()
.ok_or_else(|| sc_service::Error::SelectChainRequired)?;
let (grandpa_block_import, grandpa_link) = grandpa::block_import(
Expand All @@ -76,13 +76,16 @@ macro_rules! new_full_start {
client.clone(),
)?;

let spawner = |future| task_manager.spawn("import-queue-worker", future);

let import_queue = sc_consensus_babe::import_queue(
babe_link.clone(),
block_import.clone(),
Some(Box::new(justification_import)),
None,
client,
inherent_data_providers.clone(),
spawner,
)?;

import_setup = Some((block_import, grandpa_link, babe_link));
Expand Down Expand Up @@ -321,7 +324,7 @@ pub fn new_light(config: Configuration)
);
Ok(pool)
})?
.with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool| {
.with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool, task_manager| {
let fetch_checker = fetcher
.map(|fetcher| fetcher.checker().clone())
.ok_or_else(|| "Trying to start light import queue without active fetch checker")?;
Expand All @@ -342,13 +345,16 @@ pub fn new_light(config: Configuration)
client.clone(),
)?;

let spawner = |future| task_manager.spawn("import-queue-worker", future);

let import_queue = sc_consensus_babe::import_queue(
babe_link,
babe_block_import,
None,
Some(Box::new(finality_proof_import)),
client.clone(),
inherent_data_providers.clone(),
spawner,
)?;

Ok((import_queue, finality_proof_request_builder))
Expand Down
7 changes: 5 additions & 2 deletions client/consensus/aura/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use std::{
collections::HashMap
};

use futures::prelude::*;
use futures::{prelude::*, future::BoxFuture};
use parking_lot::Mutex;
use log::{debug, info, trace};

Expand Down Expand Up @@ -788,13 +788,14 @@ impl<Block: BlockT, C, I, P> BlockImport<Block> for AuraBlockImport<Block, C, I,
}

/// Start an import queue for the Aura consensus algorithm.
pub fn import_queue<B, I, C, P>(
pub fn import_queue<B, I, C, P, F>(
slot_duration: SlotDuration,
block_import: I,
justification_import: Option<BoxJustificationImport<B>>,
finality_proof_import: Option<BoxFinalityProofImport<B>>,
client: Arc<C>,
inherent_data_providers: InherentDataProviders,
spawner: F,
rphmeier marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<AuraImportQueue<B, sp_api::TransactionFor<C, B>>, sp_consensus::Error> where
B: BlockT,
C::Api: BlockBuilderApi<B> + AuraApi<B, AuthorityId<P>> + ApiExt<B, Error = sp_blockchain::Error>,
Expand All @@ -804,6 +805,7 @@ pub fn import_queue<B, I, C, P>(
P: Pair + Send + Sync + 'static,
P::Public: Clone + Eq + Send + Sync + Hash + Debug + Encode + Decode,
P::Signature: Encode + Decode,
F: Fn(BoxFuture<'static, ()>) -> (),
{
register_aura_inherent_data_provider(&inherent_data_providers, slot_duration.get())?;
initialize_authorities_cache(&*client)?;
Expand All @@ -818,6 +820,7 @@ pub fn import_queue<B, I, C, P>(
Box::new(block_import),
justification_import,
finality_proof_import,
spawner,
))
}

Expand Down
4 changes: 3 additions & 1 deletion client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ use sc_client_api::{
};
use sp_block_builder::BlockBuilder as BlockBuilderApi;

use futures::prelude::*;
use futures::{prelude::*, future::BoxFuture};
use log::{debug, info, log, trace, warn};
use sc_consensus_slots::{
SlotWorker, SlotInfo, SlotCompatible, StorageChanges, CheckedHeader, check_equivocation,
Expand Down Expand Up @@ -1203,6 +1203,7 @@ pub fn import_queue<Block: BlockT, Client, Inner>(
finality_proof_import: Option<BoxFinalityProofImport<Block>>,
client: Arc<Client>,
inherent_data_providers: InherentDataProviders,
spawner: impl Fn(BoxFuture<'static, ()>) -> (),
) -> ClientResult<BabeImportQueue<Block, sp_api::TransactionFor<Client, Block>>> where
Inner: BlockImport<Block, Error = ConsensusError, Transaction = sp_api::TransactionFor<Client, Block>>
+ Send + Sync + 'static,
Expand All @@ -1225,6 +1226,7 @@ pub fn import_queue<Block: BlockT, Client, Inner>(
Box::new(block_import),
justification_import,
finality_proof_import,
spawner,
))
}

Expand Down
6 changes: 4 additions & 2 deletions client/consensus/manual-seal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! A manual sealing engine: the engine listens for rpc calls to seal blocks and create forks.
//! This is suitable for a testing environment.

use futures::prelude::*;
use futures::{prelude::*, future::BoxFuture};
use sp_consensus::{
Environment, Proposer, ForkChoiceStrategy, BlockImportParams, BlockOrigin, SelectChain,
import_queue::{BasicQueue, CacheKeyId, Verifier, BoxBlockImport},
Expand Down Expand Up @@ -67,7 +67,8 @@ impl<B: BlockT> Verifier<B> for ManualSealVerifier {

/// Instantiate the import queue for the manual seal consensus engine.
pub fn import_queue<Block, B>(
block_import: BoxBlockImport<Block, TransactionFor<B, Block>>
block_import: BoxBlockImport<Block, TransactionFor<B, Block>>,
spawner: impl Fn(BoxFuture<'static, ()>) -> ()
) -> BasicQueue<Block, TransactionFor<B, Block>>
where
Block: BlockT,
Expand All @@ -78,6 +79,7 @@ pub fn import_queue<Block, B>(
Box::new(block_import),
None,
None,
spawner,
)
}

Expand Down
5 changes: 4 additions & 1 deletion client/consensus/pow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use codec::{Encode, Decode};
use sc_client_api;
use log::*;
use sp_timestamp::{InherentError as TIError, TimestampInherentData};
use futures::future::BoxFuture;

#[derive(derive_more::Display, Debug)]
pub enum Error<B: BlockT> {
Expand Down Expand Up @@ -459,6 +460,7 @@ pub fn import_queue<B, Transaction, Algorithm>(
block_import: BoxBlockImport<B, Transaction>,
algorithm: Algorithm,
inherent_data_providers: InherentDataProviders,
spawner: impl Fn(BoxFuture<'static, ()>) -> (),
) -> Result<
PowImportQueue<B, Transaction>,
sp_consensus::Error
Expand All @@ -475,7 +477,8 @@ pub fn import_queue<B, Transaction, Algorithm>(
verifier,
block_import,
None,
None
None,
spawner
))
}

Expand Down
4 changes: 4 additions & 0 deletions client/network/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,15 @@ fn build_test_full_node(config: config::NetworkConfiguration)
}
}

let threads_pool = futures::executor::ThreadPool::new().unwrap();
let spawner = |future| threads_pool.spawn_ok(future);

let import_queue = Box::new(sp_consensus::import_queue::BasicQueue::new(
PassThroughVerifier(false),
Box::new(client.clone()),
None,
None,
spawner,
));

let worker = NetworkWorker::new(config::Params {
Expand Down
5 changes: 4 additions & 1 deletion client/network/test/src/block_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ fn async_import_queue_drops() {
// Perform this test multiple times since it exhibits non-deterministic behavior.
for _ in 0..100 {
let verifier = PassThroughVerifier(true);
let queue = BasicQueue::new(verifier, Box::new(substrate_test_runtime_client::new()), None, None);

let threads_pool = futures::executor::ThreadPool::new().unwrap();
let spawner = |future| threads_pool.spawn_ok(future);
let queue = BasicQueue::new(verifier, Box::new(substrate_test_runtime_client::new()), None, None, spawner);
drop(queue);
}
}
8 changes: 8 additions & 0 deletions client/network/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,11 +594,15 @@ pub trait TestNetFactory: Sized {
);
let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>)));

let threads_pool = futures::executor::ThreadPool::new().unwrap();
let spawner = |future| threads_pool.spawn_ok(future);

let import_queue = Box::new(BasicQueue::new(
verifier.clone(),
Box::new(block_import.clone()),
justification_import,
finality_proof_import,
spawner,
));

let listen_addr = build_multiaddr![Memory(rand::random::<u64>())];
Expand Down Expand Up @@ -670,11 +674,15 @@ pub trait TestNetFactory: Sized {
);
let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>)));

let threads_pool = futures::executor::ThreadPool::new().unwrap();
let spawner = |future| threads_pool.spawn_ok(future);

let import_queue = Box::new(BasicQueue::new(
verifier.clone(),
Box::new(block_import.clone()),
justification_import,
finality_proof_import,
spawner,
));

let listen_addr = build_multiaddr![Memory(rand::random::<u64>())];
Expand Down
14 changes: 9 additions & 5 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ impl<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend>
/// Defines which import queue to use.
pub fn with_import_queue<UImpQu>(
self,
builder: impl FnOnce(&Configuration, Arc<TCl>, Option<TSc>, Arc<TExPool>)
builder: impl FnOnce(&Configuration, Arc<TCl>, Option<TSc>, Arc<TExPool>, &TaskManager)
pscott marked this conversation as resolved.
Show resolved Hide resolved
-> Result<UImpQu, Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCl, TFchr, TSc, UImpQu, TFprb, TFpp,
TExPool, TRpc, Backend>, Error>
Expand All @@ -438,7 +438,8 @@ impl<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend>
&self.config,
self.client.clone(),
self.select_chain.clone(),
self.transaction_pool.clone()
self.transaction_pool.clone(),
&self.task_manager,
)?;

Ok(ServiceBuilder {
Expand Down Expand Up @@ -528,6 +529,7 @@ impl<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend>
Option<TFchr>,
Option<TSc>,
Arc<TExPool>,
&TaskManager,
) -> Result<(UImpQu, Option<UFprb>), Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCl, TFchr, TSc, UImpQu, UFprb, TFpp,
TExPool, TRpc, Backend>, Error>
Expand All @@ -538,7 +540,8 @@ impl<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend>
self.backend.clone(),
self.fetcher.clone(),
self.select_chain.clone(),
self.transaction_pool.clone()
self.transaction_pool.clone(),
&self.task_manager,
)?;

Ok(ServiceBuilder {
Expand Down Expand Up @@ -570,12 +573,13 @@ impl<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend>
Option<TFchr>,
Option<TSc>,
Arc<TExPool>,
&TaskManager,
) -> Result<(UImpQu, UFprb), Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCl, TFchr, TSc, UImpQu, UFprb, TFpp,
TExPool, TRpc, Backend>, Error>
where TSc: Clone, TFchr: Clone {
self.with_import_queue_and_opt_fprb(|cfg, cl, b, f, sc, tx|
builder(cfg, cl, b, f, sc, tx)
self.with_import_queue_and_opt_fprb(|cfg, cl, b, f, sc, tx, tb|
builder(cfg, cl, b, f, sc, tx, tb)
.map(|(q, f)| (q, Some(f)))
)
}
Expand Down
2 changes: 2 additions & 0 deletions client/service/src/chain_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ impl<
// This makes it possible either to interleave other operations in-between the block imports,
// or to stop the operation completely.
let import = future::poll_fn(move |cx| {
dbg!("import");
// Start by reading the number of blocks if not done so already.
let count = match count {
Some(c) => c,
Expand Down Expand Up @@ -190,6 +191,7 @@ impl<

} else {
// Polling the import queue will re-schedule the task when ready.
dbg!("PENDING");
return std::task::Poll::Pending;
}
});
Expand Down
8 changes: 4 additions & 4 deletions client/service/src/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ pub struct TaskManager {
impl TaskManager {
/// If a Prometheus registry is passed, it will be used to report statistics about the
/// service tasks.
pub(super) fn new(
pub fn new(
pscott marked this conversation as resolved.
Show resolved Hide resolved
executor: ServiceTaskExecutor,
prometheus_registry: Option<&Registry>
) -> Result<Self, PrometheusError> {
Expand All @@ -142,11 +142,11 @@ impl TaskManager {
/// Spawn background/async task, which will be aware on exit signal.
///
/// See also the documentation of [`SpawnTaskHandler::spawn`].
pub(super) fn spawn(&self, name: &'static str, task: impl Future<Output = ()> + Send + 'static) {
pub fn spawn(&self, name: &'static str, task: impl Future<Output = ()> + Send + 'static) {
self.spawn_handle().spawn(name, task)
}

pub(super) fn spawn_handle(&self) -> SpawnTaskHandle {
pub fn spawn_handle(&self) -> SpawnTaskHandle {
SpawnTaskHandle {
on_exit: self.on_exit.clone(),
executor: self.executor.clone(),
Expand All @@ -155,7 +155,7 @@ impl TaskManager {
}

/// Clone on exit signal.
pub(super) fn on_exit(&self) -> exit_future::Exit {
pub fn on_exit(&self) -> exit_future::Exit {
self.on_exit.clone()
}
}
Expand Down
Loading