Skip to content

Commit

Permalink
Return an error if a request exceeds the maximum allowed size, instea…
Browse files Browse the repository at this point in the history
…d of panicking (#318)

* Move request size maximum check in collection.rs

* Propagate the maximum request size error through API layers

* CHANGELOG

* Typo and CHANGELOG change
  • Loading branch information
tomaka authored Mar 20, 2023
1 parent f41897d commit 124f57e
Show file tree
Hide file tree
Showing 13 changed files with 176 additions and 67 deletions.
33 changes: 26 additions & 7 deletions lib/src/libp2p/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ where
protocol_index: usize,
request_data: impl Into<Vec<u8>>,
timeout: TNow,
) -> SubstreamId {
) -> Result<SubstreamId, StartRequestError> {
let connection = match self.connections.get(&target) {
Some(c) => c,
None => panic!(),
Expand All @@ -607,10 +607,21 @@ where
InnerConnectionState::Established
));

assert!(self
.request_response_protocols
.get(protocol_index)
.is_some());
let request_data = request_data.into();

// We check the size limit before sending the message. This size limit is the only reason
// why start a request can fail. By checking it here, we are guaranteed that the request
// start will succeed in the background.
if request_data.len()
> self
.request_response_protocols
.get(protocol_index)
.unwrap_or_else(|| panic!())
.inbound_config
.max_size()
{
return Err(StartRequestError::RequestTooLarge);
}

let substream_id = self.next_substream_id;
self.next_substream_id.0 += 1;
Expand All @@ -622,13 +633,13 @@ where
target,
CoordinatorToConnectionInner::StartRequest {
protocol_index,
request_data: request_data.into(),
request_data,
timeout,
substream_id,
},
));

substream_id
Ok(substream_id)
}

/// Start opening a notifications substream.
Expand Down Expand Up @@ -1493,6 +1504,13 @@ impl<TConn, TNow> ops::IndexMut<ConnectionId> for Network<TConn, TNow> {
}
}

/// Error potentially returned when starting a request.
#[derive(Debug, Clone, derive_more::Display)]
pub enum StartRequestError {
/// Size of the request is over maximum allowed by the protocol.
RequestTooLarge,
}

/// See [`Network::connection_state`].
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct ConnectionState {
Expand Down Expand Up @@ -1619,6 +1637,7 @@ enum CoordinatorToConnectionInner<TNow> {

StartRequest {
protocol_index: usize,
/// The size of the data is guaranteed to fit in the maximum allowed.
request_data: Vec<u8>,
timeout: TNow,
/// Id of the substream assigned by the coordinator.
Expand Down
12 changes: 10 additions & 2 deletions lib/src/libp2p/collection/multi_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,16 @@ where
..
},
) => {
let inner_substream_id =
established.add_request(protocol_index, request_data, timeout, substream_id);
let inner_substream_id = match established.add_request(
protocol_index,
request_data,
timeout,
substream_id,
) {
Ok(s) => s,
// The maximum request size is checked before sending the message.
Err(established::AddRequestError::RequestTooLarge) => unreachable!(),
};
let _prev_value = outbound_substreams_map.insert(substream_id, inner_substream_id);
debug_assert!(_prev_value.is_none());
let _prev_value =
Expand Down
12 changes: 10 additions & 2 deletions lib/src/libp2p/collection/single_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,16 @@ where
..
},
) => {
let inner_substream_id =
established.add_request(protocol_index, request_data, timeout, substream_id);
let inner_substream_id = match established.add_request(
protocol_index,
request_data,
timeout,
substream_id,
) {
Ok(s) => s,
// The maximum request size is checked before sending the message.
Err(established::AddRequestError::RequestTooLarge) => unreachable!(),
};
let _prev_value = outbound_substreams_map.insert(substream_id, inner_substream_id);
debug_assert!(_prev_value.is_none());
let _prev_value =
Expand Down
19 changes: 19 additions & 0 deletions lib/src/libp2p/connection/established.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,18 @@ pub enum ConfigRequestResponseIn {
},
}

impl ConfigRequestResponseIn {
/// Returns the maximum allowed size of a request.
///
/// Returns `0` for [`ConfigRequestResponseIn::Empty`].
pub fn max_size(&self) -> usize {
match self {
ConfigRequestResponseIn::Empty => 0,
ConfigRequestResponseIn::Payload { max_size } => *max_size,
}
}
}

/// Configuration for a notifications protocol.
#[derive(Debug, Clone)]
pub struct ConfigNotifications {
Expand All @@ -254,3 +266,10 @@ pub struct ConfigNotifications {
/// Maximum size, in bytes, of a notification that can be received.
pub max_notification_size: usize,
}

/// Error potentially returned when starting a request.
#[derive(Debug, Clone, derive_more::Display)]
pub enum AddRequestError {
/// Size of the request is over maximum allowed by the protocol.
RequestTooLarge,
}
16 changes: 9 additions & 7 deletions lib/src/libp2p/connection/established/multi_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// TODO: needs docs

use super::{
super::super::read_write::ReadWrite, substream, Config, ConfigNotifications,
super::super::read_write::ReadWrite, substream, AddRequestError, Config, ConfigNotifications,
ConfigRequestResponse, ConfigRequestResponseIn, Event, SubstreamId, SubstreamIdInner,
};
use crate::util::{self, protobuf};
Expand Down Expand Up @@ -700,16 +700,18 @@ where
request: Vec<u8>,
timeout: TNow,
user_data: TRqUd,
) -> SubstreamId {
) -> Result<SubstreamId, AddRequestError> {
let has_length_prefix = match self.request_protocols[protocol_index].inbound_config {
ConfigRequestResponseIn::Payload { max_size } => {
// TODO: turn this assert into something that can't panic?
assert!(request.len() <= max_size);
if request.len() > max_size {
return Err(AddRequestError::RequestTooLarge);
}
true
}
ConfigRequestResponseIn::Empty => {
// TODO: turn this assert into something that can't panic?
assert!(request.is_empty());
if !request.is_empty() {
return Err(AddRequestError::RequestTooLarge);
}
false
}
};
Expand Down Expand Up @@ -738,7 +740,7 @@ where

// TODO: ? do this? substream.reserve_window(128 * 1024 * 1024 + 128); // TODO: proper max size

SubstreamId(SubstreamIdInner::MultiStream(substream_id))
Ok(SubstreamId(SubstreamIdInner::MultiStream(substream_id)))
}

/// Returns the user data associated to a notifications substream.
Expand Down
18 changes: 10 additions & 8 deletions lib/src/libp2p/connection/established/single_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
use super::{
super::{super::read_write::ReadWrite, noise, yamux},
substream::{self, RespondInRequestError},
Config, ConfigNotifications, ConfigRequestResponse, ConfigRequestResponseIn, Event,
SubstreamId, SubstreamIdInner,
AddRequestError, Config, ConfigNotifications, ConfigRequestResponse, ConfigRequestResponseIn,
Event, SubstreamId, SubstreamIdInner,
};

use alloc::{boxed::Box, string::String, vec, vec::Vec};
Expand Down Expand Up @@ -794,16 +794,18 @@ where
request: Vec<u8>,
timeout: TNow,
user_data: TRqUd,
) -> SubstreamId {
) -> Result<SubstreamId, AddRequestError> {
let has_length_prefix = match self.inner.request_protocols[protocol_index].inbound_config {
ConfigRequestResponseIn::Payload { max_size } => {
// TODO: turn this assert into something that can't panic?
assert!(request.len() <= max_size);
if request.len() > max_size {
return Err(AddRequestError::RequestTooLarge);
}
true
}
ConfigRequestResponseIn::Empty => {
// TODO: turn this assert into something that can't panic?
assert!(request.is_empty());
if !request.is_empty() {
return Err(AddRequestError::RequestTooLarge);
}
false
}
};
Expand All @@ -830,7 +832,7 @@ where
.saturating_add(64),
);

SubstreamId(SubstreamIdInner::SingleStream(substream.id()))
Ok(SubstreamId(SubstreamIdInner::SingleStream(substream.id())))
}

/// Returns the user data associated to a notifications substream.
Expand Down
32 changes: 16 additions & 16 deletions lib/src/libp2p/connection/established/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,10 @@ fn successful_request() {

let mut connections = perform_handshake(256, 256, config.clone(), config);

let substream_id =
connections
.alice
.add_request(0, b"request payload".to_vec(), Duration::from_secs(5), ());
let substream_id = connections
.alice
.add_request(0, b"request payload".to_vec(), Duration::from_secs(5), ())
.unwrap();

let (connections_update, event) = connections.run_until_event();
connections = connections_update;
Expand Down Expand Up @@ -336,10 +336,10 @@ fn refused_request() {

let mut connections = perform_handshake(256, 256, config.clone(), config);

let substream_id =
connections
.alice
.add_request(0, b"request payload".to_vec(), Duration::from_secs(5), ());
let substream_id = connections
.alice
.add_request(0, b"request payload".to_vec(), Duration::from_secs(5), ())
.unwrap();

let (connections_update, event) = connections.run_until_event();
connections = connections_update;
Expand Down Expand Up @@ -393,10 +393,10 @@ fn request_protocol_not_supported() {

let mut connections = perform_handshake(256, 256, alice_config, bob_config);

let substream_id =
connections
.alice
.add_request(0, b"request payload".to_vec(), Duration::from_secs(5), ());
let substream_id = connections
.alice
.add_request(0, b"request payload".to_vec(), Duration::from_secs(5), ())
.unwrap();

let (_, event) = connections.run_until_event();
match event {
Expand Down Expand Up @@ -429,10 +429,10 @@ fn request_timeout() {

let mut connections = perform_handshake(256, 256, config.clone(), config);

let substream_id =
connections
.alice
.add_request(0, b"request payload".to_vec(), Duration::from_secs(5), ());
let substream_id = connections
.alice
.add_request(0, b"request payload".to_vec(), Duration::from_secs(5), ())
.unwrap();

let (connections_update, event) = connections.run_until_event();
connections = connections_update;
Expand Down
9 changes: 5 additions & 4 deletions lib/src/libp2p/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ pub use collection::{
ConfigRequestResponse, ConfigRequestResponseIn, ConnectionId, ConnectionToCoordinator,
CoordinatorToConnection, MultiStreamConnectionTask, MultiStreamHandshakeKind,
NotificationProtocolConfig, NotificationsInClosedErr, NotificationsOutErr, ReadWrite,
RequestError, SingleStreamConnectionTask, SingleStreamHandshakeKind, SubstreamId,
RequestError, SingleStreamConnectionTask, SingleStreamHandshakeKind, StartRequestError,
SubstreamId,
};

/// Configuration for a [`Peers`].
Expand Down Expand Up @@ -1758,18 +1759,18 @@ where
protocol_index: usize,
request_data: Vec<u8>,
timeout: TNow,
) -> OutRequestId {
) -> Result<OutRequestId, StartRequestError> {
let target_connection_id = match self.connection_id_for_peer(target) {
Some(id) => id,
None => panic!(), // As documented.
};

OutRequestId(self.inner.start_request(
Ok(OutRequestId(self.inner.start_request(
target_connection_id,
protocol_index,
request_data,
timeout,
))
)?))
}

/// Returns `true` if if it possible to send requests (i.e. through [`Peers::start_request`])
Expand Down
2 changes: 1 addition & 1 deletion lib/src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub use crate::libp2p::{
peers::{
ConnectionId, ConnectionToCoordinator, CoordinatorToConnection, InRequestId, InboundError,
MultiStreamConnectionTask, MultiStreamHandshakeKind, OutRequestId,
SingleStreamConnectionTask, SingleStreamHandshakeKind,
SingleStreamConnectionTask, SingleStreamHandshakeKind, StartRequestError,
},
};

Expand Down
Loading

0 comments on commit 124f57e

Please sign in to comment.