Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a limit to the number of inbound substreams #2724

Merged
merged 6 commits into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/fuzz/fuzz_targets/network-connection-encrypted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ libfuzzer_sys::fuzz_target!(|data: &[u8]| {
handshake::Handshake::Success { connection, .. } => connection
.into_connection::<_, (), ()>(Config {
first_out_ping: Duration::new(60, 0),
max_inbound_substreams: 10,
notifications_protocols: Vec::new(),
request_protocols: vec![ConfigRequestResponse {
inbound_allowed: true,
Expand Down
1 change: 1 addition & 0 deletions bin/fuzz/fuzz_targets/network-connection-raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ libfuzzer_sys::fuzz_target!(|data: &[u8]| {
smoldot::libp2p::collection::Network::new(smoldot::libp2p::collection::Config {
randomness_seed: [0; 32],
capacity: 0,
max_inbound_substreams: 10,
notification_protocols: Vec::new(),
request_response_protocols: Vec::new(),
// This timeout doesn't matter as we pass dummy time values.
Expand Down
4 changes: 4 additions & 0 deletions bin/wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

- Add support for the `system_nodeRoles` JSON-RPC method. ([#2725](https://github.com/paritytech/smoldot/pull/2725))

### Changed

- A limit to the number of substreams a remote can maintain open over a connection is now enforced. ([#2724](https://github.com/paritytech/smoldot/pull/2724))

## 0.6.32 - 2022-09-07

### Fixed
Expand Down
12 changes: 12 additions & 0 deletions src/libp2p/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ pub struct Config {
/// Number of connections containers should initially allocate for.
pub capacity: usize,

/// Maximum number of substreams that each remote can have simultaneously opened.
///
/// > **Note**: This limit is necessary in order to avoid DoS attacks where a remote opens too
/// > many substreams.
pub max_inbound_substreams: usize,

pub notification_protocols: Vec<NotificationProtocolConfig>,

pub request_response_protocols: Vec<ConfigRequestResponse>,
Expand Down Expand Up @@ -217,6 +223,9 @@ pub struct Network<TConn, TNow> {
/// Generator for randomness seeds given to the established connections.
randomness_seeds: ChaCha20Rng,

/// See [`Config::max_inbound_substreams`].
max_inbound_substreams: usize,

/// See [`Config::handshake_timeout`].
handshake_timeout: Duration,

Expand Down Expand Up @@ -320,6 +329,7 @@ where
ingoing_notification_substreams_by_connection: BTreeMap::new(),
randomness_seeds: ChaCha20Rng::from_seed(config.randomness_seed),
noise_key: Arc::new(config.noise_key),
max_inbound_substreams: config.max_inbound_substreams,
notification_protocols,
request_response_protocols: config.request_response_protocols.into_iter().collect(), // TODO: stupid overhead
ping_protocol: config.ping_protocol.into(),
Expand Down Expand Up @@ -347,6 +357,7 @@ where
is_initiator,
when_connected + self.handshake_timeout,
self.noise_key.clone(),
self.max_inbound_substreams,
self.notification_protocols.clone(),
self.request_response_protocols.clone(),
self.ping_protocol.clone(),
Expand Down Expand Up @@ -382,6 +393,7 @@ where
let connection_task = MultiStreamConnectionTask::new(
self.randomness_seeds.gen(),
now,
self.max_inbound_substreams,
self.notification_protocols.clone(),
self.request_response_protocols.clone(),
self.ping_protocol.clone(),
Expand Down
2 changes: 2 additions & 0 deletions src/libp2p/collection/multi_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ where
pub(super) fn new(
randomness_seed: [u8; 32],
now: TNow,
max_inbound_substreams: usize,
notification_protocols: Arc<[OverlayNetwork]>,
request_response_protocols: Arc<[ConfigRequestResponse]>,
ping_protocol: Arc<str>,
Expand All @@ -108,6 +109,7 @@ where
})
.collect(),
request_protocols: request_response_protocols.to_vec(), // TODO: overhead
max_inbound_substreams,
randomness_seed,
ping_protocol: ping_protocol.to_string(), // TODO: cloning :-/
ping_interval: Duration::from_secs(20), // TODO: hardcoded
Expand Down
8 changes: 8 additions & 0 deletions src/libp2p/collection/single_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ enum SingleStreamConnectionTaskInner<TNow> {
/// See [`super::Config::noise_key`].
noise_key: Arc<NoiseKey>,

/// See [`super::Config::max_inbound_substreams`].
max_inbound_substreams: usize,

/// See [`OverlayNetwork`].
notification_protocols: Arc<[OverlayNetwork]>,

Expand Down Expand Up @@ -130,6 +133,7 @@ where
is_initiator: bool,
handshake_timeout: TNow,
noise_key: Arc<NoiseKey>,
max_inbound_substreams: usize,
notification_protocols: Arc<[OverlayNetwork]>,
request_response_protocols: Arc<[ConfigRequestResponse]>,
ping_protocol: Arc<str>,
Expand All @@ -140,6 +144,7 @@ where
randomness_seed,
timeout: handshake_timeout,
noise_key,
max_inbound_substreams,
notification_protocols,
request_response_protocols,
ping_protocol,
Expand Down Expand Up @@ -627,6 +632,7 @@ where
randomness_seed,
timeout,
noise_key,
max_inbound_substreams,
notification_protocols,
request_response_protocols,
ping_protocol,
Expand Down Expand Up @@ -690,6 +696,7 @@ where
randomness_seed,
timeout,
noise_key,
max_inbound_substreams,
notification_protocols,
request_response_protocols,
ping_protocol,
Expand Down Expand Up @@ -718,6 +725,7 @@ where
})
.collect(),
request_protocols: request_response_protocols.to_vec(), // TODO: overhead
max_inbound_substreams,
randomness_seed,
ping_protocol: ping_protocol.to_string(), // TODO: cloning :-/
ping_interval: Duration::from_secs(20), // TODO: hardcoded
Expand Down
2 changes: 2 additions & 0 deletions src/libp2p/connection/established.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ pub enum Event<TRqUd, TNotifUd> {
// TODO: this struct isn't zero-cost, but making it zero-cost is kind of hard and annoying
#[derive(Debug, Clone)]
pub struct Config<TNow> {
/// Maximum number of substreams that the remote can have simultaneously opened.
pub max_inbound_substreams: usize,
/// List of request-response protocols supported for incoming substreams.
pub request_protocols: Vec<ConfigRequestResponse>,
/// List of notifications protocols supported for incoming substreams.
Expand Down
4 changes: 4 additions & 0 deletions src/libp2p/connection/established/multi_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ pub struct MultiStream<TNow, TSubId, TRqUd, TNotifUd> {
/// unnecessary code being included in the binary and reduces the binary size.
ping_payload_randomness: rand_chacha::ChaCha20Rng,

/// See [`Config::max_inbound_substreams`].
// TODO: not enforced at the moment
_max_inbound_substreams: usize,
/// See [`Config::request_protocols`].
request_protocols: Vec<ConfigRequestResponse>,
/// See [`Config::notifications_protocols`].
Expand Down Expand Up @@ -139,6 +142,7 @@ where
ping_substream: None,
next_ping: config.first_out_ping,
ping_payload_randomness: randomness,
_max_inbound_substreams: config.max_inbound_substreams,
request_protocols: config.request_protocols,
notifications_protocols: config.notifications_protocols,
ping_protocol: config.ping_protocol,
Expand Down
20 changes: 16 additions & 4 deletions src/libp2p/connection/established/single_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ struct Inner<TNow, TRqUd, TNotifUd> {
/// unnecessary code being included in the binary and reduces the binary size.
ping_payload_randomness: rand_chacha::ChaCha20Rng,

/// See [`Config::max_inbound_substreams`].
max_inbound_substreams: usize,
/// See [`Config::request_protocols`].
request_protocols: Vec<ConfigRequestResponse>,
/// See [`Config::notifications_protocols`].
Expand Down Expand Up @@ -308,9 +310,20 @@ where
Some(yamux::IncomingDataDetail::IncomingSubstream) => {
debug_assert!(!self.inner.yamux.goaway_queued_or_sent());

self.encryption
.consume_inbound_data(yamux_decode.bytes_read);

// Receive a request from the remote for a new incoming substream.
// These requests are automatically accepted.
// TODO: add a limit to the number of substreams
// These requests are automatically accepted unless the total limit to the
// number of substreams has been reached.
// Note that `num_inbound()` counts substreams that have been closed but not
// yet removed from the state machine. This can affect the actual limit in a
// subtle way. At the time of writing of this comment the limit should be
// properly enforced, however it is not considered problematic if it weren't.
if self.inner.yamux.num_inbound() >= self.inner.max_inbound_substreams {
self.inner.yamux.reject_pending_substream();
continue;
}

let supported_protocols = self
.inner
Expand All @@ -332,8 +345,6 @@ where
.accept_pending_substream(Some(substream::Substream::ingoing(
supported_protocols,
)));
self.encryption
.consume_inbound_data(yamux_decode.bytes_read);
}

Some(
Expand Down Expand Up @@ -1059,6 +1070,7 @@ impl ConnectionPrototype {
outgoing_pings,
next_ping: config.first_out_ping,
ping_payload_randomness: randomness,
max_inbound_substreams: config.max_inbound_substreams,
request_protocols: config.request_protocols,
notifications_protocols: config.notifications_protocols,
ping_protocol: config.ping_protocol,
Expand Down
31 changes: 31 additions & 0 deletions src/libp2p/connection/yamux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ pub struct Yamux<T> {
/// A `SipHasher` is used in order to avoid hash collision attacks on substream IDs.
substreams: hashbrown::HashMap<NonZeroU32, Substream<T>, SipHasherBuild>,

/// Number of substreams within [`Yamux::substreams`] whose [`Substream::inbound`] is `true`.
num_inbound: usize,

/// `Some` if a `GoAway` frame has been received in the past.
received_goaway: Option<GoAwayErrorCode>,

Expand Down Expand Up @@ -127,6 +130,8 @@ pub struct Yamux<T> {
struct Substream<T> {
/// State of the substream.
state: SubstreamState,
/// `true` if the substream has been opened by the remote.
inbound: bool,
/// Data chosen by the user.
user_data: T,
}
Expand Down Expand Up @@ -282,6 +287,7 @@ impl<T> Yamux<T> {
config.capacity,
SipHasherBuild::new(randomness.gen()),
),
num_inbound: 0,
received_goaway: None,
incoming: Incoming::Header(arrayvec::ArrayVec::new()),
outgoing: Outgoing::Idle,
Expand All @@ -307,6 +313,23 @@ impl<T> Yamux<T> {
self.substreams.is_empty()
}

/// Returns the number of substreams in the Yamux state machine. Includes substreams that are
/// dead but haven't been removed yet.
pub fn len(&self) -> usize {
self.substreams.len()
}

/// Returns the number of inbound substreams in the Yamux state machine. Includes substreams
/// that are dead but haven't been removed yet.
pub fn num_inbound(&self) -> usize {
debug_assert_eq!(
self.num_inbound,
self.substreams.values().filter(|s| s.inbound).count()
);

self.num_inbound
}

/// Opens a new substream.
///
/// This method only modifies the state of `self` and reserves an identifier. No message needs
Expand Down Expand Up @@ -380,6 +403,7 @@ impl<T> Yamux<T> {
write_buffers: Vec::with_capacity(16),
first_write_buffer_offset: 0,
},
inbound: false,
user_data,
});

Expand Down Expand Up @@ -542,6 +566,10 @@ impl<T> Yamux<T> {

let substream = self.substreams.remove(&id_to_remove).unwrap();

if substream.inbound {
self.num_inbound -= 1;
}

Some((
SubstreamId(id_to_remove),
if let SubstreamState::Reset = substream.state {
Expand Down Expand Up @@ -1118,11 +1146,14 @@ impl<T> Yamux<T> {
write_buffers: Vec::new(),
first_write_buffer_offset: 0,
},
inbound: true,
user_data,
},
);
debug_assert!(_was_before.is_none());

self.num_inbound += 1;

self.incoming = if data_frame_size == 0 {
Incoming::Header(arrayvec::ArrayVec::new())
} else {
Expand Down
11 changes: 11 additions & 0 deletions src/libp2p/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ pub struct Config {
/// Capacity to initially reserve to the list of peers.
pub peers_capacity: usize,

/// Maximum number of substreams that each remote can have simultaneously opened on each
/// connection.
///
/// If there exists multiple connections with the same remote, the limit is enforced for
/// each connection separately.
///
/// > **Note**: This limit is necessary in order to avoid DoS attacks where a remote opens too
/// > many substreams.
pub max_inbound_substreams: usize,

pub notification_protocols: Vec<NotificationProtocolConfig>,

pub request_response_protocols: Vec<ConfigRequestResponse>,
Expand Down Expand Up @@ -206,6 +216,7 @@ where
inner: collection::Network::new(collection::Config {
capacity: config.connections_capacity,
noise_key: config.noise_key,
max_inbound_substreams: config.max_inbound_substreams,
notification_protocols: config.notification_protocols,
request_response_protocols: config.request_response_protocols,
ping_protocol: config.ping_protocol,
Expand Down
12 changes: 12 additions & 0 deletions src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,10 +379,22 @@ where
})
.collect::<Vec<_>>();

// Maximum number that each remote is allowed to open.
// Note that this maximum doesn't have to be precise. There only needs to be *a* limit
// that is not exaggerately large, and this limit shouldn't be too low as to cause
// legitimate substreams to be refused.
// According to the protocol, a remote can only open one substream of each protocol at
// a time. However, we multiply this value by 2 in order to be generous. We also add 1
// to account for the ping protocol.
let max_inbound_substreams = chains.len()
* (1 + REQUEST_RESPONSE_PROTOCOLS_PER_CHAIN + NOTIFICATIONS_PROTOCOLS_PER_CHAIN)
* 2;

ChainNetwork {
inner: peers::Peers::new(peers::Config {
connections_capacity: config.connections_capacity,
peers_capacity: config.peers_capacity,
max_inbound_substreams,
request_response_protocols,
noise_key: config.noise_key,
randomness_seed: randomness.sample(rand::distributions::Standard),
Expand Down