Skip to content

Commit

Permalink
Fix race conditions in the handling of notifications in events (#2785)
Browse files Browse the repository at this point in the history
Fix #2740

The logic of the handling of `NotificationsInOpenCancel` events wasn't
correct, as it wasn't taking into account the fact that everything is
asynchronous.

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
tomaka and mergify[bot] authored Sep 28, 2022
1 parent 8bb31f9 commit b8b194a
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 22 deletions.
1 change: 1 addition & 0 deletions bin/wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

### Fixed

- Fix several panics related to cancelling the opening of incoming substreams. ([#2785](https://github.com/paritytech/smoldot/pull/2785))
- Fix old runtimes not being cleaned up properly and runtimes being downloaded multiple times after an on-chain runtime upgrade. ([#2781](https://github.com/paritytech/smoldot/pull/2781))

## 0.6.34 - 2022-09-20
Expand Down
52 changes: 43 additions & 9 deletions src/libp2p/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1271,18 +1271,32 @@ where
continue;
}

let substream_id = self
// The event might concern a substream that we have already accepted or
// refused. In that situation, either reinterpret the event as
// "NotificationsInClose" or discard it.
if let Some(substream_id) = self
.ingoing_notification_substreams_by_connection
.remove(&(connection_id, inner_substream_id))
.unwrap();
let _was_in = self.ingoing_notification_substreams.remove(&substream_id);
debug_assert!(_was_in.is_some());
{
let _was_in = self.ingoing_notification_substreams.remove(&substream_id);
debug_assert!(_was_in.is_some());

Event::NotificationsInClose {
substream_id,
outcome: Err(NotificationsInClosedErr::Substream(
established::NotificationsInClosedErr::SubstreamReset,
)),
Event::NotificationsInClose {
substream_id,
outcome: Err(NotificationsInClosedErr::Substream(
established::NotificationsInClosedErr::SubstreamReset,
)),
}
} else {
// Substream was refused. As documented, we must confirm the reception of
// the event by sending back a rejection.
self.messages_to_connections.push_back((
connection_id,
CoordinatorToConnectionInner::RejectInNotifications {
substream_id: inner_substream_id,
},
));
continue;
}
}
ConnectionToCoordinatorInner::NotificationIn {
Expand Down Expand Up @@ -1507,6 +1521,26 @@ enum ConnectionToCoordinatorInner {
handshake: Vec<u8>,
},
/// See the corresponding event in [`established::Event`].
///
/// The coordinator should be aware that, due to the asynchronous nature of communications, it
/// might receive this event after having sent a
/// [`CoordinatorToConnectionInner::AcceptInNotifications`] or
/// [`CoordinatorToConnectionInner::RejectInNotifications`]. In that situation, the coordinator
/// should either reinterpret the message as a `NotificationsInClose` (if it had accepted it)
/// or ignore it (if it had rejected it).
///
/// The connection should be aware that, due to the asynchronous nature of communications, it
/// might later receive an [`CoordinatorToConnectionInner::AcceptInNotifications`] or
/// [`CoordinatorToConnectionInner::RejectInNotifications`] concerning this substream. In that
/// situation, the connection should ignore this message.
///
/// Because substream IDs can be reused, this introduces an ambiguity in the following sequence
/// of events: send `NotificationsInOpen`, send `NotificationsInOpenCancel`, send
/// `NotificationsInOpen`, receive `AcceptInNotifications`. Does the `AcceptInNotifications`
/// refer to the first `NotificationsInOpen` or to the second?
/// In order to solve this problem, the coordinator must always send back a
/// [`CoordinatorToConnectionInner::RejectInNotifications`] in order to acknowledge a
/// `NotificationsInOpenCancel`.
NotificationsInOpenCancel {
id: established::SubstreamId,
},
Expand Down
44 changes: 37 additions & 7 deletions src/libp2p/collection/multi_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use super::{
NotificationsOutErr, OverlayNetwork, PeerId, ShutdownCause, SubstreamId,
};

use alloc::{string::ToString as _, sync::Arc};
use alloc::{collections::VecDeque, string::ToString as _, sync::Arc};
use core::{
hash::Hash,
iter,
Expand Down Expand Up @@ -85,6 +85,11 @@ enum MultiStreamConnectionTaskInner<TNow, TSubId> {
// TODO: could be user datas in established?
outbound_substreams_reverse:
hashbrown::HashMap<established::SubstreamId, SubstreamId, fnv::FnvBuildHasher>,

/// After a [`ConnectionToCoordinatorInner::NotificationsInOpenCancel`] is emitted, an
/// entry is added to this list. If the coordinator accepts or refuses a substream in this
/// list, the acceptance/refusal is dismissed.
notifications_in_open_cancel_acknowledgments: VecDeque<established::SubstreamId>,
},

/// Connection has finished its shutdown. A [`ConnectionToCoordinatorInner::ShutdownFinished`]
Expand Down Expand Up @@ -214,6 +219,7 @@ where
outbound_substreams_map,
outbound_substreams_reverse,
handshake_finished_message_to_send,
notifications_in_open_cancel_acknowledgments,
..
} => {
if let Some(remote_peer_id) = handshake_finished_message_to_send.take() {
Expand Down Expand Up @@ -265,6 +271,7 @@ where
handshake,
}),
Some(established::Event::NotificationsInOpenCancel { id, .. }) => {
notifications_in_open_cancel_acknowledgments.push_back(id);
Some(ConnectionToCoordinatorInner::NotificationsInOpenCancel { id })
}
Some(established::Event::NotificationIn { id, notification }) => {
Expand Down Expand Up @@ -459,17 +466,37 @@ where
substream_id,
handshake,
},
MultiStreamConnectionTaskInner::Established { established, .. },
MultiStreamConnectionTaskInner::Established {
established,
notifications_in_open_cancel_acknowledgments,
..
},
) => {
// TODO: must verify that the substream is still valid
established.accept_in_notifications_substream(substream_id, handshake, ());
if let Some(idx) = notifications_in_open_cancel_acknowledgments
.iter()
.position(|s| *s == substream_id)
{
notifications_in_open_cancel_acknowledgments.remove(idx);
} else {
established.accept_in_notifications_substream(substream_id, handshake, ());
}
}
(
CoordinatorToConnectionInner::RejectInNotifications { substream_id },
MultiStreamConnectionTaskInner::Established { established, .. },
MultiStreamConnectionTaskInner::Established {
established,
notifications_in_open_cancel_acknowledgments,
..
},
) => {
// TODO: must verify that the substream is still valid
established.reject_in_notifications_substream(substream_id);
if let Some(idx) = notifications_in_open_cancel_acknowledgments
.iter()
.position(|s| *s == substream_id)
{
notifications_in_open_cancel_acknowledgments.remove(idx);
} else {
established.reject_in_notifications_substream(substream_id);
}
}
(
CoordinatorToConnectionInner::StartShutdown { .. },
Expand Down Expand Up @@ -813,6 +840,9 @@ where
),
outbound_substreams_reverse:
hashbrown::HashMap::with_capacity_and_hasher(0, Default::default()),
notifications_in_open_cancel_acknowledgments: VecDeque::with_capacity(
4,
),
};

!handshake_substream_still_open
Expand Down
45 changes: 39 additions & 6 deletions src/libp2p/collection/single_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ enum SingleStreamConnectionTaskInner<TNow> {
// TODO: could be user datas in established?
outbound_substreams_reverse:
hashbrown::HashMap<established::SubstreamId, SubstreamId, fnv::FnvBuildHasher>,

/// After a [`ConnectionToCoordinatorInner::NotificationsInOpenCancel`] is emitted, an
/// entry is added to this list. If the coordinator accepts or refuses a substream in this
/// list, the acceptance/refusal is dismissed.
notifications_in_open_cancel_acknowledgments: VecDeque<established::SubstreamId>,
},

/// Connection has finished its shutdown. A [`ConnectionToCoordinatorInner::ShutdownFinished`]
Expand Down Expand Up @@ -236,6 +241,7 @@ where
established,
outbound_substreams_map,
outbound_substreams_reverse,
..
},
) => {
let inner_substream_id =
Expand All @@ -257,6 +263,7 @@ where
established,
outbound_substreams_map,
outbound_substreams_reverse,
..
},
) => {
let inner_substream_id = established.open_notifications_substream(
Expand All @@ -279,6 +286,7 @@ where
established,
outbound_substreams_map,
outbound_substreams_reverse,
..
},
) => {
// It is possible that the remote has closed the outbound notification substream
Expand Down Expand Up @@ -329,17 +337,37 @@ where
substream_id,
handshake,
},
SingleStreamConnectionTaskInner::Established { established, .. },
SingleStreamConnectionTaskInner::Established {
established,
notifications_in_open_cancel_acknowledgments,
..
},
) => {
// TODO: must verify that the substream is still valid
established.accept_in_notifications_substream(substream_id, handshake, ());
if let Some(idx) = notifications_in_open_cancel_acknowledgments
.iter()
.position(|s| *s == substream_id)
{
notifications_in_open_cancel_acknowledgments.remove(idx);
} else {
established.accept_in_notifications_substream(substream_id, handshake, ());
}
}
(
CoordinatorToConnectionInner::RejectInNotifications { substream_id },
SingleStreamConnectionTaskInner::Established { established, .. },
SingleStreamConnectionTaskInner::Established {
established,
notifications_in_open_cancel_acknowledgments,
..
},
) => {
// TODO: must verify that the substream is still valid
established.reject_in_notifications_substream(substream_id);
if let Some(idx) = notifications_in_open_cancel_acknowledgments
.iter()
.position(|s| *s == substream_id)
{
notifications_in_open_cancel_acknowledgments.remove(idx);
} else {
established.reject_in_notifications_substream(substream_id);
}
}
(
CoordinatorToConnectionInner::StartShutdown { .. },
Expand Down Expand Up @@ -482,6 +510,7 @@ where
established,
mut outbound_substreams_map,
mut outbound_substreams_reverse,
mut notifications_in_open_cancel_acknowledgments,
} => match established.read_write(read_write) {
Ok((connection, event)) => {
if read_write.is_dead() && event.is_none() {
Expand Down Expand Up @@ -555,6 +584,7 @@ where
);
}
Some(established::Event::NotificationsInOpenCancel { id, .. }) => {
notifications_in_open_cancel_acknowledgments.push_back(id);
self.pending_messages.push_back(
ConnectionToCoordinatorInner::NotificationsInOpenCancel { id },
);
Expand Down Expand Up @@ -618,6 +648,7 @@ where
established: connection,
outbound_substreams_map,
outbound_substreams_reverse,
notifications_in_open_cancel_acknowledgments,
};
}
Err(err) => {
Expand Down Expand Up @@ -748,6 +779,8 @@ where
0,
Default::default(),
), // TODO: capacity?
notifications_in_open_cancel_acknowledgments:
VecDeque::with_capacity(4),
};
break;
}
Expand Down

0 comments on commit b8b194a

Please sign in to comment.