From 761d3c8ff210a9138a32a9317d31a798c90e22f9 Mon Sep 17 00:00:00 2001 From: pscott <30843220+pscott@users.noreply.github.com> Date: Wed, 22 Apr 2020 00:35:01 +0200 Subject: [PATCH] Remove TaskManagerBuilder (#5725) * Remove TaskManagerBuilder * Clean up use declaration fo SpawnTaskHandle Co-Authored-By: Pierre Krieger Co-authored-by: Pierre Krieger --- client/service/src/builder.rs | 54 +++++++------- client/service/src/lib.rs | 4 +- client/service/src/task_manager.rs | 116 ++++++----------------------- 3 files changed, 52 insertions(+), 122 deletions(-) diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 1cdebe8b14bf3..b819c47bee966 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -15,7 +15,7 @@ // along with Substrate. If not, see . use crate::{Service, NetworkStatus, NetworkState, error::Error, DEFAULT_PROTOCOL_ID, MallocSizeOfWasm}; -use crate::{TaskManagerBuilder, start_rpc_servers, build_network_future, TransactionPoolAdapter}; +use crate::{start_rpc_servers, build_network_future, TransactionPoolAdapter, TaskManager}; use crate::status_sinks; use crate::config::{Configuration, KeystoreConfig, PrometheusConfig}; use crate::metrics::MetricsService; @@ -81,7 +81,7 @@ pub struct ServiceBuilder, backend: Arc, - tasks_builder: TaskManagerBuilder, + task_manager: TaskManager, keystore: Arc>, fetcher: Option, select_chain: Option, @@ -145,7 +145,7 @@ type TFullParts = ( TFullClient, Arc>, Arc>, - TaskManagerBuilder, + TaskManager, ); /// Creates a new full client for the given config. @@ -172,9 +172,9 @@ fn new_full_parts( KeystoreConfig::InMemory => Keystore::new_in_memory(), }; - let tasks_builder = { + let task_manager = { let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry); - TaskManagerBuilder::new(registry)? + TaskManager::new(config.task_executor.clone(), registry)? }; let executor = NativeExecutor::::new( @@ -213,12 +213,12 @@ fn new_full_parts( fork_blocks, bad_blocks, extensions, - Box::new(tasks_builder.spawn_handle()), + Box::new(task_manager.spawn_handle()), config.prometheus_config.as_ref().map(|config| config.registry.clone()), )? }; - Ok((client, backend, keystore, tasks_builder)) + Ok((client, backend, keystore, task_manager)) } impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> { @@ -238,7 +238,7 @@ impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> { (), TFullBackend, >, Error> { - let (client, backend, keystore, tasks_builder) = new_full_parts(&config)?; + let (client, backend, keystore, task_manager) = new_full_parts(&config)?; let client = Arc::new(client); @@ -247,7 +247,7 @@ impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> { client, backend, keystore, - tasks_builder, + task_manager, fetcher: None, select_chain: None, import_queue: (), @@ -277,9 +277,9 @@ impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> { (), TLightBackend, >, Error> { - let tasks_builder = { + let task_manager = { let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry); - TaskManagerBuilder::new(registry)? + TaskManager::new(config.task_executor.clone(), registry)? }; let keystore = match &config.keystore { @@ -311,7 +311,7 @@ impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> { sc_client::light::new_fetch_checker::<_, TBl, _>( light_blockchain.clone(), executor.clone(), - Box::new(tasks_builder.spawn_handle()), + Box::new(task_manager.spawn_handle()), ), ); let fetcher = Arc::new(sc_network::config::OnDemand::new(fetch_checker)); @@ -321,7 +321,7 @@ impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> { backend.clone(), config.chain_spec.as_storage_builder(), executor, - Box::new(tasks_builder.spawn_handle()), + Box::new(task_manager.spawn_handle()), config.prometheus_config.as_ref().map(|config| config.registry.clone()), )?); @@ -329,7 +329,7 @@ impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> { config, client, backend, - tasks_builder, + task_manager, keystore, fetcher: Some(fetcher.clone()), select_chain: None, @@ -402,7 +402,7 @@ impl config: self.config, client: self.client, backend: self.backend, - tasks_builder: self.tasks_builder, + task_manager: self.task_manager, keystore: self.keystore, fetcher: self.fetcher, select_chain, @@ -445,7 +445,7 @@ impl config: self.config, client: self.client, backend: self.backend, - tasks_builder: self.tasks_builder, + task_manager: self.task_manager, keystore: self.keystore, fetcher: self.fetcher, select_chain: self.select_chain, @@ -483,7 +483,7 @@ impl config: self.config, client: self.client, backend: self.backend, - tasks_builder: self.tasks_builder, + task_manager: self.task_manager, keystore: self.keystore, fetcher: self.fetcher, select_chain: self.select_chain, @@ -545,7 +545,7 @@ impl config: self.config, client: self.client, backend: self.backend, - tasks_builder: self.tasks_builder, + task_manager: self.task_manager, keystore: self.keystore, fetcher: self.fetcher, select_chain: self.select_chain, @@ -606,7 +606,7 @@ impl Ok(ServiceBuilder { config: self.config, client: self.client, - tasks_builder: self.tasks_builder, + task_manager: self.task_manager, backend: self.backend, keystore: self.keystore, fetcher: self.fetcher, @@ -635,7 +635,7 @@ impl config: self.config, client: self.client, backend: self.backend, - tasks_builder: self.tasks_builder, + task_manager: self.task_manager, keystore: self.keystore, fetcher: self.fetcher, select_chain: self.select_chain, @@ -745,7 +745,7 @@ ServiceBuilder< marker: _, mut config, client, - tasks_builder, + task_manager, fetcher: on_demand, backend, keystore, @@ -789,7 +789,7 @@ ServiceBuilder< imports_external_transactions: !matches!(config.role, Role::Light), pool: transaction_pool.clone(), client: client.clone(), - executor: tasks_builder.spawn_handle(), + executor: task_manager.spawn_handle(), }); let protocol_id = { @@ -811,7 +811,7 @@ ServiceBuilder< let network_params = sc_network::config::Params { role: config.role.clone(), executor: { - let spawn_handle = tasks_builder.spawn_handle(); + let spawn_handle = task_manager.spawn_handle(); Some(Box::new(move |fut| { spawn_handle.spawn("libp2p-node", fut); })) @@ -845,7 +845,7 @@ ServiceBuilder< _ => None, }; - let spawn_handle = tasks_builder.spawn_handle(); + let spawn_handle = task_manager.spawn_handle(); // Spawn background tasks which were stacked during the // service building. @@ -857,7 +857,7 @@ ServiceBuilder< // block notifications let txpool = Arc::downgrade(&transaction_pool); let offchain = offchain_workers.as_ref().map(Arc::downgrade); - let notifications_spawn_handle = tasks_builder.spawn_handle(); + let notifications_spawn_handle = task_manager.spawn_handle(); let network_state_info: Arc = network.clone(); let is_validator = config.role.is_authority(); @@ -1013,7 +1013,7 @@ ServiceBuilder< chain_type: chain_spec.chain_type().clone(), }; - let subscriptions = sc_rpc::Subscriptions::new(Arc::new(tasks_builder.spawn_handle())); + let subscriptions = sc_rpc::Subscriptions::new(Arc::new(task_manager.spawn_handle())); let (chain, state, child_state) = if let (Some(remote_backend), Some(on_demand)) = (remote_backend.as_ref(), on_demand.as_ref()) { @@ -1145,7 +1145,7 @@ ServiceBuilder< Ok(Service { client, - task_manager: tasks_builder.into_task_manager(config.task_executor), + task_manager, network, network_status_sinks, select_chain, diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 039e0257ab5b5..56fee6b6d7423 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -74,7 +74,7 @@ pub use std::{ops::Deref, result::Result, sync::Arc}; #[doc(hidden)] pub use sc_network::config::{FinalityProofProvider, OnDemand, BoxFinalityProofRequestBuilder}; pub use sc_tracing::TracingReceiver; -pub use task_manager::{TaskManagerBuilder, SpawnTaskHandle}; +pub use task_manager::SpawnTaskHandle; use task_manager::TaskManager; const DEFAULT_PROTOCOL_ID: &str = "sup"; @@ -304,8 +304,6 @@ impl Future for } } - this.task_manager.process_receiver(cx); - // The service future never ends. Poll::Pending } diff --git a/client/service/src/task_manager.rs b/client/service/src/task_manager.rs index fd7fc62ab577f..e6847d0881120 100644 --- a/client/service/src/task_manager.rs +++ b/client/service/src/task_manager.rs @@ -15,13 +15,12 @@ use std::{ pin::Pin, - result::Result, sync::Arc, - task::{Poll, Context}, + result::Result, sync::Arc }; use exit_future::Signal; -use log::{debug, error}; +use log::{debug}; use futures::{ - Future, FutureExt, Stream, + Future, FutureExt, future::select, compat::*, task::{Spawn, FutureObj, SpawnError}, @@ -32,88 +31,17 @@ use prometheus_endpoint::{ CounterVec, HistogramOpts, HistogramVec, Opts, Registry, U64 }; use sc_client_api::CloneableSpawn; -use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver}; mod prometheus_future; /// Type alias for service task executor (usually runtime). pub type ServiceTaskExecutor = Arc + Send>>) + Send + Sync>; -/// Type alias for the task scheduler. -pub type TaskScheduler = TracingUnboundedSender + Send>>>; - -/// Helper struct to setup background tasks execution for service. -pub struct TaskManagerBuilder { - /// A future that resolves when the service has exited, this is useful to - /// make sure any internally spawned futures stop when the service does. - on_exit: exit_future::Exit, - /// A signal that makes the exit future above resolve, fired on service drop. - signal: Option, - /// Sender for futures that must be spawned as background tasks. - to_spawn_tx: TaskScheduler, - /// Receiver for futures that must be spawned as background tasks. - to_spawn_rx: TracingUnboundedReceiver + Send>>>, - /// Prometheus metrics where to report the stats about tasks. - metrics: Option, -} - -impl TaskManagerBuilder { - /// New asynchronous task manager setup. - /// - /// If a Prometheus registry is passed, it will be used to report statistics about the - /// service tasks. - pub fn new(prometheus_registry: Option<&Registry>) -> Result { - let (signal, on_exit) = exit_future::signal(); - let (to_spawn_tx, to_spawn_rx) = tracing_unbounded("mpsc_task_manager"); - - let metrics = prometheus_registry.map(Metrics::register).transpose()?; - - Ok(Self { - on_exit, - signal: Some(signal), - to_spawn_tx, - to_spawn_rx, - metrics, - }) - } - - /// Get spawn handle. - /// - /// Tasks spawned through this handle will get scheduled once - /// service is up and running. - pub fn spawn_handle(&self) -> SpawnTaskHandle { - SpawnTaskHandle { - on_exit: self.on_exit.clone(), - sender: self.to_spawn_tx.clone(), - metrics: self.metrics.clone(), - } - } - - /// Convert into actual task manager from initial setup. - pub(crate) fn into_task_manager(self, executor: ServiceTaskExecutor) -> TaskManager { - let TaskManagerBuilder { - on_exit, - signal, - to_spawn_rx, - to_spawn_tx, - metrics, - } = self; - TaskManager { - on_exit, - signal, - to_spawn_tx, - to_spawn_rx, - executor, - metrics, - } - } -} - /// An handle for spawning tasks in the service. #[derive(Clone)] pub struct SpawnTaskHandle { - sender: TaskScheduler, on_exit: exit_future::Exit, + executor: ServiceTaskExecutor, metrics: Option, } @@ -152,9 +80,7 @@ impl SpawnTaskHandle { } }; - if self.sender.unbounded_send(Box::pin(future)).is_err() { - error!("Failed to send task to spawn over channel"); - } + (self.executor)(Box::pin(future)); } } @@ -188,11 +114,6 @@ pub struct TaskManager { on_exit: exit_future::Exit, /// A signal that makes the exit future above resolve, fired on service drop. signal: Option, - /// Sender for futures that must be spawned as background tasks. - to_spawn_tx: TaskScheduler, - /// Receiver for futures that must be spawned as background tasks. - /// Note: please read comment on [`SpawnTaskHandle::spawn`] for why this is a `&'static str`. - to_spawn_rx: TracingUnboundedReceiver + Send>>>, /// How to spawn background tasks. executor: ServiceTaskExecutor, /// Prometheus metric where to report the polling times. @@ -200,6 +121,24 @@ pub struct TaskManager { } impl TaskManager { + /// If a Prometheus registry is passed, it will be used to report statistics about the + /// service tasks. + pub(super) fn new( + executor: ServiceTaskExecutor, + prometheus_registry: Option<&Registry> + ) -> Result { + let (signal, on_exit) = exit_future::signal(); + + let metrics = prometheus_registry.map(Metrics::register).transpose()?; + + Ok(Self { + on_exit, + signal: Some(signal), + executor, + metrics, + }) + } + /// Spawn background/async task, which will be aware on exit signal. /// /// See also the documentation of [`SpawnTaskHandler::spawn`]. @@ -210,18 +149,11 @@ impl TaskManager { pub(super) fn spawn_handle(&self) -> SpawnTaskHandle { SpawnTaskHandle { on_exit: self.on_exit.clone(), - sender: self.to_spawn_tx.clone(), + executor: self.executor.clone(), metrics: self.metrics.clone(), } } - /// Process background task receiver. - pub(super) fn process_receiver(&mut self, cx: &mut Context) { - while let Poll::Ready(Some(task_to_spawn)) = Pin::new(&mut self.to_spawn_rx).poll_next(cx) { - (self.executor)(task_to_spawn); - } - } - /// Clone on exit signal. pub(super) fn on_exit(&self) -> exit_future::Exit { self.on_exit.clone()