Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore rate limiting of connections opening #1340

Merged
merged 2 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion light-base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
extern crate alloc;

use alloc::{borrow::ToOwned as _, boxed::Box, format, string::String, sync::Arc, vec, vec::Vec};
use core::{num::NonZeroU32, ops, pin};
use core::{num::NonZeroU32, ops, pin, time::Duration};
use futures_util::FutureExt as _;
use hashbrown::{hash_map::Entry, HashMap};
use itertools::Itertools as _;
Expand Down Expand Up @@ -1098,6 +1098,8 @@ fn start_services<TPlat: platform::PlatformRef>(
platform: platform.clone(),
num_events_receivers: 1, // Configures the length of `network_event_receivers`
identify_agent_version: network_identify_agent_version,
connections_open_pool_size: 5,
connections_open_pool_restore_delay: Duration::from_secs(1),
chains: vec![network_service::ConfigChain {
log_name: log_name.clone(),
num_out_slots: 4,
Expand Down
60 changes: 57 additions & 3 deletions light-base/src/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ pub struct Config<TPlat> {

/// List of chains to connect to. Chains are later referred to by their index in this list.
pub chains: Vec<ConfigChain>,

/// Maximum number of connections that the service can open simultaneously. After this value
/// has been reached, a new connection can be opened after each
/// [`Config::connections_open_pool_restore_delay`].
pub connections_open_pool_size: u32,

/// Delay after which the service can open a new connection.
/// The delay is cumulative. If no connection has been opened for example for twice this
/// duration, then two connections can be opened at the same time, up to a maximum of
/// [`Config::connections_open_pool_size`].
pub connections_open_pool_restore_delay: Duration,
}

/// See [`Config::chains`].
Expand Down Expand Up @@ -251,6 +262,10 @@ impl<TPlat: PlatformRef> NetworkService<TPlat> {
},
),
network,
connections_open_pool_size: config.connections_open_pool_size,
connections_open_pool_restore_delay: config.connections_open_pool_restore_delay,
num_recent_connection_opening: 0,
next_recent_connection_restore: None,
platform: config.platform.clone(),
event_pending_send: None,
event_senders: either::Left(event_senders),
Expand Down Expand Up @@ -880,6 +895,20 @@ struct BackgroundTask<TPlat: PlatformRef> {
/// All known peers and their addresses.
peering_strategy: basic_peering_strategy::BasicPeeringStrategy<ChainId, TPlat::Instant>,

/// See [`Config::connections_open_pool_size`].
connections_open_pool_size: u32,

/// See [`Config::connections_open_pool_restore_delay`].
connections_open_pool_restore_delay: Duration,

/// Every time a connection is opened, the value in this field is increased by one. After
/// [`BackgroundTask::next_recent_connection_restore`] has yielded, the value is reduced by
/// one.
num_recent_connection_opening: u32,

/// Delay after which [`BackgroundTask::num_recent_connection_opening`] is increased by one.
next_recent_connection_restore: Option<Pin<Box<TPlat::Delay>>>,

/// List of nodes that are considered as important for logging purposes.
// TODO: should also detect whenever we fail to open a block announces substream with any of these peers
important_nodes: HashSet<PeerId, fnv::FnvBuildHasher>,
Expand Down Expand Up @@ -938,6 +967,7 @@ async fn background_task<TPlat: PlatformRef>(mut task: BackgroundTask<TPlat>) {
Message(ToBackground),
NetworkEvent(service::Event<async_channel::Sender<service::CoordinatorToConnection>>),
CanAssignSlot(PeerId, ChainId),
NextRecentConnectionRestore,
CanStartConnect(PeerId),
CanOpenGossip(PeerId, ChainId),
MessageToConnection {
Expand All @@ -959,7 +989,9 @@ async fn background_task<TPlat: PlatformRef>(mut task: BackgroundTask<TPlat>) {
{
WakeUpReason::NetworkEvent(event)
} else if let Some(start_connect) = {
let x = task.network.unconnected_desired().next().cloned();
let x = (task.num_recent_connection_opening < task.connections_open_pool_size)
.then(|| task.network.unconnected_desired().next().cloned())
.flatten();
x
} {
WakeUpReason::CanStartConnect(start_connect)
Expand Down Expand Up @@ -1021,6 +1053,23 @@ async fn background_task<TPlat: PlatformRef>(mut task: BackgroundTask<TPlat>) {
}
}
};
let next_recent_connection_restore = async {
if task.num_recent_connection_opening != 0
&& task.next_recent_connection_restore.is_none()
{
task.next_recent_connection_restore = Some(Box::pin(
task.platform
.sleep(task.connections_open_pool_restore_delay),
));
}
if let Some(delay) = task.next_recent_connection_restore.as_mut() {
delay.await;
task.next_recent_connection_restore = None;
WakeUpReason::NextRecentConnectionRestore
} else {
future::pending().await
}
};
let finished_sending_event = async {
if let either::Right(event_sending_future) = &mut task.event_senders {
let event_senders = event_sending_future.await;
Expand All @@ -1035,6 +1084,7 @@ async fn background_task<TPlat: PlatformRef>(mut task: BackgroundTask<TPlat>) {

message_received
.or(service_event)
.or(next_recent_connection_restore)
.or(finished_sending_event)
.await
};
Expand Down Expand Up @@ -1853,9 +1903,11 @@ async fn background_task<TPlat: PlatformRef>(mut task: BackgroundTask<TPlat>) {
service::GossipKind::ConsensusTransactions,
);
}
WakeUpReason::NextRecentConnectionRestore => {
task.num_recent_connection_opening =
task.num_recent_connection_opening.saturating_sub(1);
}
WakeUpReason::CanStartConnect(expected_peer_id) => {
// TODO: restore rate limiting

let Some(multiaddr) = task.peering_strategy.addr_to_connected(&expected_peer_id)
else {
// There is no address for that peer in the address book.
Expand Down Expand Up @@ -1921,6 +1973,8 @@ async fn background_task<TPlat: PlatformRef>(mut task: BackgroundTask<TPlat>) {
peer_id::PublicKey::Ed25519(*noise_key.libp2p_public_ed25519_key()).into_peer_id(),
);

task.num_recent_connection_opening += 1;

let (coordinator_to_connection_tx, coordinator_to_connection_rx) =
async_channel::bounded(8);
let task_name = format!("connection-{}", multiaddr);
Expand Down
4 changes: 4 additions & 0 deletions wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### Changed

- Smoldot will now only try opening a maximum of five connections simultaneously, then one per second. This avoids possible situations where a server is being accidentally hammered by smoldot, and avoids potentially making traffic suspicious to some ISPs. ([#1340](https://github.com/smol-dot/smoldot/pull/1340))

## 2.0.8 - 2023-11-15

### Changed
Expand Down
Loading