Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #2618 #2620

Merged
merged 4 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- Fix panic that occured when connecting to a peer, then discovering it through the background discovery process, then disconnecting from it. ([#2616](https://github.com/paritytech/smoldot/pull/2616))
- Fix circular dependency between JavaScript modules. ([#2614](https://github.com/paritytech/smoldot/pull/2614))
- Fix panic when a handshake timeout or protocol error happens on a connection at the same time as the local node tries to shut it down. ([#2620](https://github.com/paritytech/smoldot/pull/2620))

## 0.6.29 - 2022-08-09

Expand Down
75 changes: 48 additions & 27 deletions src/libp2p/collection/multi_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ enum MultiStreamConnectionTaskInner<TNow, TSubId> {
/// Connection has finished its shutdown. A [`ConnectionToCoordinatorInner::ShutdownFinished`]
/// message has been sent and is waiting to be acknowledged.
ShutdownWaitingAck {
/// If true, [`MultiStreamConnectionTask::reset`] has been called. This doesn't modify any
/// of the behavior but is used to make sure that the API is used correctly.
was_api_reset: bool,
/// What has initiated the shutdown.
initiator: ShutdownInitiator,

/// `None` if the [`ConnectionToCoordinatorInner::StartShutdown`] message has already
/// been sent to the coordinator. `Some` if the message hasn't been sent yet.
Expand All @@ -70,12 +69,19 @@ enum MultiStreamConnectionTaskInner<TNow, TSubId> {
/// Connection has finished its shutdown and its shutdown has been acknowledged. There is
/// nothing more to do except stop the connection task.
ShutdownAcked {
/// If true, [`MultiStreamConnectionTask::reset`] has been called. This doesn't modify any
/// of the behavior but is used to make sure that the API is used correctly.
was_api_reset: bool,
/// What has initiated the shutdown.
initiator: ShutdownInitiator,
},
}

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum ShutdownInitiator {
/// The coordinator sent a [`CoordinatorToConnectionInner::StartShutdown`] message.
Coordinator,
/// [`MultiStreamConnectionTask::reset`] has been called.
Api,
}

impl<TNow, TSubId> MultiStreamConnectionTask<TNow, TSubId>
where
TNow: Clone + Add<Duration, Output = TNow> + Sub<TNow, Output = Duration> + Ord,
Expand Down Expand Up @@ -403,7 +409,7 @@ where
self.connection = MultiStreamConnectionTaskInner::ShutdownWaitingAck {
start_shutdown_message_to_send: Some(None),
shutdown_finish_message_sent: false,
was_api_reset: false,
initiator: ShutdownInitiator::Coordinator,
};
}
(
Expand All @@ -429,7 +435,7 @@ where
| (
CoordinatorToConnectionInner::StartShutdown,
MultiStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: true,
initiator: ShutdownInitiator::Api,
..
},
) => {
Expand All @@ -443,19 +449,22 @@ where
MultiStreamConnectionTaskInner::ShutdownWaitingAck {
start_shutdown_message_to_send: start_shutdown_message_sent,
shutdown_finish_message_sent,
was_api_reset: was_reset,
initiator,
},
) => {
debug_assert!(
start_shutdown_message_sent.is_none() && *shutdown_finish_message_sent
);
self.connection = MultiStreamConnectionTaskInner::ShutdownAcked {
was_api_reset: *was_reset,
initiator: *initiator,
};
}
(
CoordinatorToConnectionInner::StartShutdown,
MultiStreamConnectionTaskInner::ShutdownWaitingAck { .. }
MultiStreamConnectionTaskInner::ShutdownWaitingAck {
initiator: ShutdownInitiator::Coordinator,
..
}
| MultiStreamConnectionTaskInner::ShutdownAcked { .. },
) => unreachable!(),
(CoordinatorToConnectionInner::ShutdownFinishedAck, _) => unreachable!(),
Expand Down Expand Up @@ -546,23 +555,35 @@ where
/// Panics if [`MultiStreamConnectionTask::reset`] has been called in the past.
///
pub fn reset(&mut self) {
// It is illegal to call `reset` a second time. Verify that the user didn't do this.
if let MultiStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: true,
..
}
| MultiStreamConnectionTaskInner::ShutdownAcked {
was_api_reset: true,
} = self.connection
{
panic!()
match self.connection {
MultiStreamConnectionTaskInner::ShutdownWaitingAck {
initiator: ShutdownInitiator::Api,
..
}
| MultiStreamConnectionTaskInner::ShutdownAcked {
initiator: ShutdownInitiator::Api,
..
} => {
// It is illegal to call `reset` a second time.
panic!()
}
MultiStreamConnectionTaskInner::ShutdownWaitingAck {
ref mut initiator, ..
}
| MultiStreamConnectionTaskInner::ShutdownAcked {
ref mut initiator, ..
} => {
// Mark the initiator as being the API in order to track proper API usage.
*initiator = ShutdownInitiator::Api;
}
_ => {
self.connection = MultiStreamConnectionTaskInner::ShutdownWaitingAck {
initiator: ShutdownInitiator::Api,
shutdown_finish_message_sent: false,
start_shutdown_message_to_send: Some(Some(ShutdownCause::RemoteReset)),
};
}
}

self.connection = MultiStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: true,
shutdown_finish_message_sent: false,
start_shutdown_message_to_send: Some(Some(ShutdownCause::RemoteReset)),
};
}

/// Immediately destroys the substream with the given identifier.
Expand Down
97 changes: 57 additions & 40 deletions src/libp2p/collection/single_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,23 +93,32 @@ enum SingleStreamConnectionTaskInner<TNow> {
/// Connection has finished its shutdown. A [`ConnectionToCoordinatorInner::ShutdownFinished`]
/// message has been sent and is waiting to be acknowledged.
ShutdownWaitingAck {
/// If true, [`SingleStreamConnectionTask::reset`] has been called. This doesn't modify
/// any of the behavior but is used to make sure that the API is used correctly.
was_api_reset: bool,
/// What has initiated the shutdown.
initiator: ShutdownInitiator,
},

/// Connection has finished its shutdown and its shutdown has been acknowledged. There is
/// nothing more to do except stop the connection task.
ShutdownAcked {
/// If true, [`SingleStreamConnectionTask::reset`] has been called. This doesn't modify
/// any of the behavior but is used to make sure that the API is used correctly.
was_api_reset: bool,
/// What has initiated the shutdown.
initiator: ShutdownInitiator,
},

/// Temporary state used to satisfy the borrow checker during state transitions.
Poisoned,
}

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum ShutdownInitiator {
/// The coordinator sent a [`CoordinatorToConnectionInner::StartShutdown`] message.
Coordinator,
/// [`SingleStreamConnectionTask::reset`] has been called.
Api,
/// The shutdown has been initiated due to a protocol error or because the connection has been
/// shut down cleanly by the remote.
Remote,
}

impl<TNow> SingleStreamConnectionTask<TNow>
where
TNow: Clone + Add<Duration, Output = TNow> + Sub<TNow, Output = Duration> + Ord,
Expand Down Expand Up @@ -332,7 +341,7 @@ where
self.pending_messages
.push_back(ConnectionToCoordinatorInner::ShutdownFinished);
self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: false,
initiator: ShutdownInitiator::Coordinator,
};
}
(
Expand All @@ -359,7 +368,7 @@ where
| (
CoordinatorToConnectionInner::StartShutdown,
SingleStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: true,
initiator: ShutdownInitiator::Api | ShutdownInitiator::Remote,
},
) => {
// There might still be some messages coming from the coordinator after the
Expand All @@ -369,17 +378,18 @@ where
}
(
CoordinatorToConnectionInner::ShutdownFinishedAck,
SingleStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: was_reset,
},
SingleStreamConnectionTaskInner::ShutdownWaitingAck { initiator },
) => {
self.connection = SingleStreamConnectionTaskInner::ShutdownAcked {
was_api_reset: *was_reset,
initiator: *initiator,
};
}
(
CoordinatorToConnectionInner::StartShutdown,
SingleStreamConnectionTaskInner::ShutdownWaitingAck { .. }
SingleStreamConnectionTaskInner::ShutdownWaitingAck {
initiator: ShutdownInitiator::Coordinator,
..
}
| SingleStreamConnectionTaskInner::ShutdownAcked { .. },
) => unreachable!(),
(CoordinatorToConnectionInner::ShutdownFinishedAck, _) => unreachable!(),
Expand All @@ -404,26 +414,33 @@ where
/// Panics if [`SingleStreamConnectionTask::reset`] has been called in the past.
///
pub fn reset(&mut self) {
// It is illegal to call `reset` a second time. Verify that the user didn't do this.
if let SingleStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: true,
}
| SingleStreamConnectionTaskInner::ShutdownAcked {
was_api_reset: true,
} = self.connection
{
panic!()
match self.connection {
SingleStreamConnectionTaskInner::ShutdownWaitingAck {
initiator: ShutdownInitiator::Api,
}
| SingleStreamConnectionTaskInner::ShutdownAcked {
initiator: ShutdownInitiator::Api,
} => {
// It is illegal to call `reset` a second time.
panic!()
}
SingleStreamConnectionTaskInner::ShutdownWaitingAck { ref mut initiator }
| SingleStreamConnectionTaskInner::ShutdownAcked { ref mut initiator } => {
// Mark the initiator as being the API in order to track proper API usage.
*initiator = ShutdownInitiator::Api;
}
_ => {
self.pending_messages
.push_back(ConnectionToCoordinatorInner::StartShutdown(Some(
ShutdownCause::RemoteReset,
)));
self.pending_messages
.push_back(ConnectionToCoordinatorInner::ShutdownFinished);
self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck {
initiator: ShutdownInitiator::Api,
};
}
}

self.pending_messages
.push_back(ConnectionToCoordinatorInner::StartShutdown(Some(
ShutdownCause::RemoteReset,
)));
self.pending_messages
.push_back(ConnectionToCoordinatorInner::ShutdownFinished);
self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: true,
};
}

/// Reads data coming from the connection, updates the internal state machine, and writes data
Expand Down Expand Up @@ -465,7 +482,7 @@ where
self.pending_messages
.push_back(ConnectionToCoordinatorInner::ShutdownFinished);
self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: false,
initiator: ShutdownInitiator::Remote,
};
return;
}
Expand Down Expand Up @@ -586,7 +603,7 @@ where
self.pending_messages
.push_back(ConnectionToCoordinatorInner::ShutdownFinished);
self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: false,
initiator: ShutdownInitiator::Remote,
};
}
},
Expand Down Expand Up @@ -619,7 +636,7 @@ where
self.pending_messages
.push_back(ConnectionToCoordinatorInner::ShutdownFinished);
self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: false,
initiator: ShutdownInitiator::Remote,
};
return;
}
Expand All @@ -643,7 +660,7 @@ where
self.pending_messages
.push_back(ConnectionToCoordinatorInner::ShutdownFinished);
self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: false,
initiator: ShutdownInitiator::Remote,
};
return;
}
Expand Down Expand Up @@ -714,10 +731,10 @@ where
}

c @ (SingleStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: false,
initiator: ShutdownInitiator::Coordinator | ShutdownInitiator::Remote,
}
| SingleStreamConnectionTaskInner::ShutdownAcked {
was_api_reset: false,
initiator: ShutdownInitiator::Coordinator | ShutdownInitiator::Remote,
}) => {
// The user might legitimately call this function after the connection has
// already shut down. In that case, just do nothing.
Expand All @@ -729,10 +746,10 @@ where
}

SingleStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: true,
initiator: ShutdownInitiator::Api,
}
| SingleStreamConnectionTaskInner::ShutdownAcked {
was_api_reset: true,
initiator: ShutdownInitiator::Api,
} => {
// As documented, it is illegal to call this function after calling `reset()`.
panic!()
Expand Down