Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race conditions in the handling of notifications in events #2785

Merged
merged 5 commits into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bin/wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

- Removed the `version` field of the struct returned by the `rpc_methods` function. This is technically a breaking change, but it has been introduced in a minor version bump because it is very insubstantial. ([#2756](https://github.com/paritytech/smoldot/pull/2756))

### Fixed

- Fix several panics related to cancelling the opening of incoming substreams. ([#2785](https://github.com/paritytech/smoldot/pull/2785))

## 0.6.34 - 2022-09-20

### Added
Expand Down
52 changes: 43 additions & 9 deletions src/libp2p/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1271,18 +1271,32 @@ where
continue;
}

let substream_id = self
// The event might concern a substream that we have already accepted or
// refused. In that situation, either reinterpret the event as
// "NotificationsInClose" or discard it.
if let Some(substream_id) = self
.ingoing_notification_substreams_by_connection
.remove(&(connection_id, inner_substream_id))
.unwrap();
let _was_in = self.ingoing_notification_substreams.remove(&substream_id);
debug_assert!(_was_in.is_some());
{
let _was_in = self.ingoing_notification_substreams.remove(&substream_id);
debug_assert!(_was_in.is_some());

Event::NotificationsInClose {
substream_id,
outcome: Err(NotificationsInClosedErr::Substream(
established::NotificationsInClosedErr::SubstreamReset,
)),
Event::NotificationsInClose {
substream_id,
outcome: Err(NotificationsInClosedErr::Substream(
established::NotificationsInClosedErr::SubstreamReset,
)),
}
} else {
// Substream was refused. As documented, we must confirm the reception of
// the event by sending back a rejection.
self.messages_to_connections.push_back((
connection_id,
CoordinatorToConnectionInner::RejectInNotifications {
substream_id: inner_substream_id,
},
));
continue;
}
}
ConnectionToCoordinatorInner::NotificationIn {
Expand Down Expand Up @@ -1507,6 +1521,26 @@ enum ConnectionToCoordinatorInner {
handshake: Vec<u8>,
},
/// See the corresponding event in [`established::Event`].
///
/// The coordinator should be aware that, due to the asynchronous nature of communications, it
/// might receive this event after having sent a
/// [`CoordinatorToConnectionInner::AcceptInNotifications`] or
/// [`CoordinatorToConnectionInner::RejectInNotifications`]. In that situation, the coordinator
/// should either reinterpret the message as a `NotificationsInClose` (if it had accepted it)
/// or ignore it (if it had rejected it).
///
/// The connection should be aware that, due to the asynchronous nature of communications, it
/// might later receive an [`CoordinatorToConnectionInner::AcceptInNotifications`] or
/// [`CoordinatorToConnectionInner::RejectInNotifications`] concerning this substream. In that
/// situation, the connection should ignore this message.
///
/// Because substream IDs can be reused, this introduces an ambiguity in the following sequence
/// of events: send `NotificationsInOpen`, send `NotificationsInOpenCancel`, send
/// `NotificationsInOpen`, receive `AcceptInNotifications`. Does the `AcceptInNotifications`
/// refer to the first `NotificationsInOpen` or to the second?
/// In order to solve this problem, the coordinator must always send back a
/// [`CoordinatorToConnectionInner::RejectInNotifications`] in order to acknowledge a
/// `NotificationsInOpenCancel`.
NotificationsInOpenCancel {
id: established::SubstreamId,
},
Expand Down
44 changes: 37 additions & 7 deletions src/libp2p/collection/multi_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use super::{
NotificationsOutErr, OverlayNetwork, PeerId, ShutdownCause, SubstreamId,
};

use alloc::{string::ToString as _, sync::Arc};
use alloc::{collections::VecDeque, string::ToString as _, sync::Arc};
use core::{
hash::Hash,
iter,
Expand Down Expand Up @@ -73,6 +73,11 @@ enum MultiStreamConnectionTaskInner<TNow, TSubId> {
// TODO: could be user datas in established?
outbound_substreams_reverse:
hashbrown::HashMap<established::SubstreamId, SubstreamId, fnv::FnvBuildHasher>,

/// After a [`ConnectionToCoordinatorInner::NotificationsInOpenCancel`] is emitted, an
/// entry is added to this list. If the coordinator accepts or refuses a substream in this
/// list, the acceptance/refusal is dismissed.
notifications_in_open_cancel_acknowledgments: VecDeque<established::SubstreamId>,
},

/// Connection has finished its shutdown. A [`ConnectionToCoordinatorInner::ShutdownFinished`]
Expand Down Expand Up @@ -198,6 +203,7 @@ where
outbound_substreams_map,
outbound_substreams_reverse,
handshake_finished_message_to_send,
notifications_in_open_cancel_acknowledgments,
..
} => {
if let Some(remote_peer_id) = handshake_finished_message_to_send.take() {
Expand Down Expand Up @@ -249,6 +255,7 @@ where
handshake,
}),
Some(established::Event::NotificationsInOpenCancel { id, .. }) => {
notifications_in_open_cancel_acknowledgments.push_back(id);
Some(ConnectionToCoordinatorInner::NotificationsInOpenCancel { id })
}
Some(established::Event::NotificationIn { id, notification }) => {
Expand Down Expand Up @@ -443,17 +450,37 @@ where
substream_id,
handshake,
},
MultiStreamConnectionTaskInner::Established { established, .. },
MultiStreamConnectionTaskInner::Established {
established,
notifications_in_open_cancel_acknowledgments,
..
},
) => {
// TODO: must verify that the substream is still valid
established.accept_in_notifications_substream(substream_id, handshake, ());
if let Some(idx) = notifications_in_open_cancel_acknowledgments
.iter()
.position(|s| *s == substream_id)
{
notifications_in_open_cancel_acknowledgments.remove(idx);
} else {
established.accept_in_notifications_substream(substream_id, handshake, ());
}
}
(
CoordinatorToConnectionInner::RejectInNotifications { substream_id },
MultiStreamConnectionTaskInner::Established { established, .. },
MultiStreamConnectionTaskInner::Established {
established,
notifications_in_open_cancel_acknowledgments,
..
},
) => {
// TODO: must verify that the substream is still valid
established.reject_in_notifications_substream(substream_id);
if let Some(idx) = notifications_in_open_cancel_acknowledgments
.iter()
.position(|s| *s == substream_id)
{
notifications_in_open_cancel_acknowledgments.remove(idx);
} else {
established.reject_in_notifications_substream(substream_id);
}
}
(
CoordinatorToConnectionInner::StartShutdown { .. },
Expand Down Expand Up @@ -778,6 +805,9 @@ where
),
outbound_substreams_reverse:
hashbrown::HashMap::with_capacity_and_hasher(0, Default::default()),
notifications_in_open_cancel_acknowledgments: VecDeque::with_capacity(
4,
),
};

!handshake_substream_still_open
Expand Down
45 changes: 39 additions & 6 deletions src/libp2p/collection/single_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ enum SingleStreamConnectionTaskInner<TNow> {
// TODO: could be user datas in established?
outbound_substreams_reverse:
hashbrown::HashMap<established::SubstreamId, SubstreamId, fnv::FnvBuildHasher>,

/// After a [`ConnectionToCoordinatorInner::NotificationsInOpenCancel`] is emitted, an
/// entry is added to this list. If the coordinator accepts or refuses a substream in this
/// list, the acceptance/refusal is dismissed.
notifications_in_open_cancel_acknowledgments: VecDeque<established::SubstreamId>,
},

/// Connection has finished its shutdown. A [`ConnectionToCoordinatorInner::ShutdownFinished`]
Expand Down Expand Up @@ -236,6 +241,7 @@ where
established,
outbound_substreams_map,
outbound_substreams_reverse,
..
},
) => {
let inner_substream_id =
Expand All @@ -257,6 +263,7 @@ where
established,
outbound_substreams_map,
outbound_substreams_reverse,
..
},
) => {
let inner_substream_id = established.open_notifications_substream(
Expand All @@ -279,6 +286,7 @@ where
established,
outbound_substreams_map,
outbound_substreams_reverse,
..
},
) => {
// It is possible that the remote has closed the outbound notification substream
Expand Down Expand Up @@ -329,17 +337,37 @@ where
substream_id,
handshake,
},
SingleStreamConnectionTaskInner::Established { established, .. },
SingleStreamConnectionTaskInner::Established {
established,
notifications_in_open_cancel_acknowledgments,
..
},
) => {
// TODO: must verify that the substream is still valid
established.accept_in_notifications_substream(substream_id, handshake, ());
if let Some(idx) = notifications_in_open_cancel_acknowledgments
.iter()
.position(|s| *s == substream_id)
{
notifications_in_open_cancel_acknowledgments.remove(idx);
} else {
established.accept_in_notifications_substream(substream_id, handshake, ());
}
}
(
CoordinatorToConnectionInner::RejectInNotifications { substream_id },
SingleStreamConnectionTaskInner::Established { established, .. },
SingleStreamConnectionTaskInner::Established {
established,
notifications_in_open_cancel_acknowledgments,
..
},
) => {
// TODO: must verify that the substream is still valid
established.reject_in_notifications_substream(substream_id);
if let Some(idx) = notifications_in_open_cancel_acknowledgments
.iter()
.position(|s| *s == substream_id)
{
notifications_in_open_cancel_acknowledgments.remove(idx);
} else {
established.reject_in_notifications_substream(substream_id);
}
}
(
CoordinatorToConnectionInner::StartShutdown { .. },
Expand Down Expand Up @@ -482,6 +510,7 @@ where
established,
mut outbound_substreams_map,
mut outbound_substreams_reverse,
mut notifications_in_open_cancel_acknowledgments,
} => match established.read_write(read_write) {
Ok((connection, event)) => {
if read_write.is_dead() && event.is_none() {
Expand Down Expand Up @@ -555,6 +584,7 @@ where
);
}
Some(established::Event::NotificationsInOpenCancel { id, .. }) => {
notifications_in_open_cancel_acknowledgments.push_back(id);
self.pending_messages.push_back(
ConnectionToCoordinatorInner::NotificationsInOpenCancel { id },
);
Expand Down Expand Up @@ -618,6 +648,7 @@ where
established: connection,
outbound_substreams_map,
outbound_substreams_reverse,
notifications_in_open_cancel_acknowledgments,
};
}
Err(err) => {
Expand Down Expand Up @@ -748,6 +779,8 @@ where
0,
Default::default(),
), // TODO: capacity?
notifications_in_open_cancel_acknowledgments:
VecDeque::with_capacity(4),
};
break;
}
Expand Down