Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use only one networking service for all chains #1398

Merged
merged 11 commits into from
Nov 24, 2023
33 changes: 33 additions & 0 deletions lib/src/network/basic_peering_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,39 @@ where
}
}

/// Removes all the chain assignments for the given chain.
///
/// If a peer isn't assigned to any chain anymore and doesn't have any connected address,
/// all of its addresses are also removed from the collection.
pub fn remove_chain_peers(&mut self, chain: &TChainId) {
let Some(chain_index) = self.chains_indices.remove(chain) else {
// Chain didn't exist.
return;
};
self.chains.remove(chain_index);

let chain_peers = {
let mut in_chain_and_after_chain = self.peers_chains_by_state.split_off(&(
chain_index,
PeerChainState::Assignable,
usize::min_value(),
));
let mut after_chain = in_chain_and_after_chain.split_off(&(
chain_index + 1,
PeerChainState::Assignable,
usize::min_value(),
));
self.peers_chains_by_state.append(&mut after_chain);
in_chain_and_after_chain
};

for (_, _, peer_id_index) in chain_peers {
let _was_in = self.peers_chains.remove(&(peer_id_index, chain_index));
debug_assert!(_was_in.is_some());
self.try_clean_up_peer_id(peer_id_index);
}
}

/// Inserts a chain-peer combination to the collection, indicating that the given peer belongs
/// to the given chain.
///
Expand Down
5 changes: 4 additions & 1 deletion lib/src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,10 @@ where
SubstreamId::max_value(),
),
)
.find(|(_, _, direction, _, _)| matches!(*direction, SubstreamDirection::Out))
.find(|(_, _, direction, state, _)| {
matches!(*direction, SubstreamDirection::Out)
&& matches!(*state, NotificationsSubstreamState::Open)
})
.is_some()
{
return Err(RemoveChainError::InUse);
Expand Down
5 changes: 2 additions & 3 deletions light-base/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ pub struct DatabaseContentRuntimeCodeHint {
/// The returned string is guaranteed to not exceed `max_size` bytes. A truncated or invalid
/// database is intentionally returned if `max_size` is too low to fit all the information.
pub async fn encode_database<TPlat: platform::PlatformRef>(
network_service: &network_service::NetworkService<TPlat>,
network_service_chain_id: network_service::ChainId,
network_service: &network_service::NetworkServiceChain<TPlat>,
sync_service: &sync_service::SyncService<TPlat>,
runtime_service: &runtime_service::RuntimeService<TPlat>,
genesis_block_hash: &[u8; 32],
Expand All @@ -102,7 +101,7 @@ pub async fn encode_database<TPlat: platform::PlatformRef>(
serde_json::from_str(&encoded).unwrap()
}),
nodes: network_service
.discovered_nodes(network_service_chain_id)
.discovered_nodes()
.await
.map(|(peer_id, addrs)| {
(
Expand Down
5 changes: 1 addition & 4 deletions light-base/src/json_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,7 @@ pub struct StartConfig<'a, TPlat: PlatformRef> {

/// Access to the network, and identifier of the chain from the point of view of the network
/// service.
pub network_service: (
Arc<network_service::NetworkService<TPlat>>,
network_service::ChainId,
),
pub network_service: Arc<network_service::NetworkServiceChain<TPlat>>,

/// Service responsible for synchronizing the chain.
pub sync_service: Arc<sync_service::SyncService<TPlat>>,
Expand Down
13 changes: 3 additions & 10 deletions light-base/src/json_rpc_service/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,8 @@ struct Background<TPlat: PlatformRef> {
system_version: String,

/// See [`StartConfig::network_service`].
network_service: (
Arc<network_service::NetworkService<TPlat>>,
network_service::ChainId,
),
network_service: Arc<network_service::NetworkServiceChain<TPlat>>,

/// See [`StartConfig::sync_service`].
sync_service: Arc<sync_service::SyncService<TPlat>>,
/// See [`StartConfig::runtime_service`].
Expand Down Expand Up @@ -680,12 +678,7 @@ impl<TPlat: PlatformRef> Background<TPlat> {
match PeerId::from_bytes(peer_id_bytes) {
Ok(peer_id) => {
self.network_service
.0
.discover(
self.network_service.1,
iter::once((peer_id, iter::once(addr))),
false,
)
.discover(iter::once((peer_id, iter::once(addr))), false)
.await;
request.respond(methods::Response::sudo_unstable_p2pDiscover(()));
}
Expand Down
3 changes: 1 addition & 2 deletions light-base/src/json_rpc_service/background/getters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ impl<TPlat: PlatformRef> Background<TPlat> {
};

let response = crate::database::encode_database(
&self.network_service.0,
self.network_service.1,
&self.network_service,
&self.sync_service,
&self.runtime_service,
&self.genesis_block_hash,
Expand Down
146 changes: 69 additions & 77 deletions light-base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ pub struct Client<TPlat: platform::PlatformRef, TChain = ()> {
/// Because we use a `SipHasher`, this hashmap isn't created in the `new` function (as this
/// function is `const`) but lazily the first time it is needed.
chains_by_key: Option<HashMap<ChainKey, RunningChain<TPlat>, util::SipHasherBuild>>,

/// All chains share a single networking service created lazily the first time that it
/// is used.
network_service: Option<Arc<network_service::NetworkService<TPlat>>>,
}

struct PublicApiChain<TChain> {
Expand Down Expand Up @@ -278,8 +282,7 @@ struct RunningChain<TPlat: platform::PlatformRef> {
}

struct ChainServices<TPlat: platform::PlatformRef> {
network_service: Arc<network_service::NetworkService<TPlat>>,
network_service_chain_id: network_service::ChainId,
network_service: Arc<network_service::NetworkServiceChain<TPlat>>,
sync_service: Arc<sync_service::SyncService<TPlat>>,
runtime_service: Arc<runtime_service::RuntimeService<TPlat>>,
transactions_service: Arc<transactions_service::TransactionsService<TPlat>>,
Expand All @@ -289,7 +292,6 @@ impl<TPlat: platform::PlatformRef> Clone for ChainServices<TPlat> {
fn clone(&self) -> Self {
ChainServices {
network_service: self.network_service.clone(),
network_service_chain_id: self.network_service_chain_id,
sync_service: self.sync_service.clone(),
runtime_service: self.runtime_service.clone(),
transactions_service: self.transactions_service.clone(),
Expand Down Expand Up @@ -355,6 +357,7 @@ impl<TPlat: platform::PlatformRef, TChain> Client<TPlat, TChain> {
platform,
public_api_chains: slab::Slab::new(),
chains_by_key: None,
network_service: None,
}
}

Expand Down Expand Up @@ -722,6 +725,7 @@ impl<TPlat: platform::PlatformRef, TChain> Client<TPlat, TChain> {
start_services(
log_name.clone(),
&self.platform,
&mut self.network_service,
runtime_code_hint,
genesis_block_header,
usize::from(chain_spec.block_number_bytes()),
Expand Down Expand Up @@ -861,14 +865,9 @@ impl<TPlat: platform::PlatformRef, TChain> Client<TPlat, TChain> {
self.platform
.spawn_task("network-service-add-initial-topology".into(), {
let network_service = services.network_service.clone();
let network_service_chain_id = services.network_service_chain_id;
async move {
network_service
.discover(network_service_chain_id, known_nodes, false)
.await;
network_service
.discover(network_service_chain_id, bootstrap_nodes, true)
.await;
network_service.discover(known_nodes, false).await;
network_service.discover(bootstrap_nodes, true).await;
}
.boxed()
});
Expand Down Expand Up @@ -897,10 +896,7 @@ impl<TPlat: platform::PlatformRef, TChain> Client<TPlat, TChain> {
service_starter.start(json_rpc_service::StartConfig {
platform: self.platform.clone(),
sync_service: services.sync_service.clone(),
network_service: (
services.network_service.clone(),
services.network_service_chain_id,
),
network_service: services.network_service.clone(),
transactions_service: services.transactions_service.clone(),
runtime_service: services.runtime_service.clone(),
chain_spec: &chain_spec,
Expand Down Expand Up @@ -1085,77 +1081,74 @@ enum StartServicesChainTy<'a, TPlat: platform::PlatformRef> {
fn start_services<TPlat: platform::PlatformRef>(
log_name: String,
platform: &TPlat,
network_service: &mut Option<Arc<network_service::NetworkService<TPlat>>>,
runtime_code_hint: Option<database::DatabaseContentRuntimeCodeHint>,
genesis_block_scale_encoded_header: Vec<u8>,
block_number_bytes: usize,
fork_id: Option<String>,
config: StartServicesChainTy<'_, TPlat>,
network_identify_agent_version: String,
) -> ChainServices<TPlat> {
// The network service is responsible for connecting to the peer-to-peer network.
let (network_service, network_service_chain_ids) =
let network_service = network_service.get_or_insert_with(|| {
network_service::NetworkService::new(network_service::Config {
platform: platform.clone(),
identify_agent_version: network_identify_agent_version,
connections_open_pool_size: 5,
connections_open_pool_restore_delay: Duration::from_secs(1),
chains: vec![network_service::ConfigChain {
log_name: log_name.clone(),
num_out_slots: 4,
grandpa_protocol_finalized_block_height: if let StartServicesChainTy::RelayChain {
chain_information,
} = &config
{
if matches!(
chain_information.as_ref().finality,
chain::chain_information::ChainInformationFinalityRef::Grandpa { .. }
) {
Some(chain_information.as_ref().finalized_block_header.number)
} else {
None
}
chains_capacity: 1,
})
});

let network_service_chain = network_service.add_chain(network_service::ConfigChain {
log_name: log_name.clone(),
num_out_slots: 4,
grandpa_protocol_finalized_block_height: if let StartServicesChainTy::RelayChain {
chain_information,
} = &config
{
if matches!(
chain_information.as_ref().finality,
chain::chain_information::ChainInformationFinalityRef::Grandpa { .. }
) {
Some(chain_information.as_ref().finalized_block_header.number)
} else {
None
}
} else {
// Parachains never use GrandPa.
None
},
genesis_block_hash: header::hash_from_scale_encoded_header(
&genesis_block_scale_encoded_header,
),
best_block: match &config {
StartServicesChainTy::RelayChain { chain_information } => (
chain_information.as_ref().finalized_block_header.number,
chain_information
.as_ref()
.finalized_block_header
.hash(block_number_bytes),
),
StartServicesChainTy::Parachain {
finalized_block_header,
..
} => {
if let Ok(decoded) = header::decode(finalized_block_header, block_number_bytes) {
(
decoded.number,
header::hash_from_scale_encoded_header(finalized_block_header),
)
} else {
// Parachains never use GrandPa.
None
},
genesis_block_hash: header::hash_from_scale_encoded_header(
&genesis_block_scale_encoded_header,
),
best_block: match &config {
StartServicesChainTy::RelayChain { chain_information } => (
chain_information.as_ref().finalized_block_header.number,
chain_information
.as_ref()
.finalized_block_header
.hash(block_number_bytes),
),
StartServicesChainTy::Parachain {
finalized_block_header,
..
} => {
if let Ok(decoded) =
header::decode(finalized_block_header, block_number_bytes)
{
(
decoded.number,
header::hash_from_scale_encoded_header(finalized_block_header),
)
} else {
(
0,
header::hash_from_scale_encoded_header(
&genesis_block_scale_encoded_header,
),
)
}
}
},
fork_id,
block_number_bytes,
}],
});

let network_service_chain_id = network_service_chain_ids.into_iter().next().unwrap();
(
0,
header::hash_from_scale_encoded_header(&genesis_block_scale_encoded_header),
)
}
}
},
fork_id,
block_number_bytes,
});

let (sync_service, runtime_service) = match config {
StartServicesChainTy::Parachain {
Expand All @@ -1173,7 +1166,7 @@ fn start_services<TPlat: platform::PlatformRef>(
platform: platform.clone(),
log_name: log_name.clone(),
block_number_bytes,
network_service: (network_service.clone(), network_service_chain_id),
network_service: network_service_chain.clone(),
chain_type: sync_service::ConfigChainType::Parachain(
sync_service::ConfigParachain {
finalized_block_header,
Expand Down Expand Up @@ -1209,7 +1202,7 @@ fn start_services<TPlat: platform::PlatformRef>(
log_name: log_name.clone(),
block_number_bytes,
platform: platform.clone(),
network_service: (network_service.clone(), network_service_chain_id),
network_service: network_service_chain.clone(),
chain_type: sync_service::ConfigChainType::RelayChain(
sync_service::ConfigRelayChain {
chain_information: chain_information.clone(),
Expand Down Expand Up @@ -1249,16 +1242,15 @@ fn start_services<TPlat: platform::PlatformRef>(
platform: platform.clone(),
sync_service: sync_service.clone(),
runtime_service: runtime_service.clone(),
network_service: (network_service.clone(), network_service_chain_id),
network_service: network_service_chain.clone(),
max_pending_transactions: NonZeroU32::new(64).unwrap(),
max_concurrent_downloads: NonZeroU32::new(3).unwrap(),
max_concurrent_validations: NonZeroU32::new(2).unwrap(),
},
));

ChainServices {
network_service,
network_service_chain_id,
network_service: network_service_chain,
runtime_service,
sync_service,
transactions_service,
Expand Down
Loading
Loading