Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
tomaka committed Aug 12, 2022
1 parent babb637 commit 6ef550f
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 67 deletions.
1 change: 1 addition & 0 deletions bin/wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Fixed

- Fix circular dependency between JavaScript modules. ([#2614](https://github.com/paritytech/smoldot/pull/2614))
- Fix panic when a handshake timeout or protocol error happens on a connection at the same time as the local node tries to shut it down.

## 0.6.29 - 2022-08-09

Expand Down
77 changes: 50 additions & 27 deletions src/libp2p/collection/multi_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ enum MultiStreamConnectionTaskInner<TNow, TSubId> {
/// Connection has finished its shutdown. A [`ConnectionToCoordinatorInner::ShutdownFinished`]
/// message has been sent and is waiting to be acknowledged.
ShutdownWaitingAck {
/// If true, [`MultiStreamConnectionTask::reset`] has been called. This doesn't modify any
/// of the behavior but is used to make sure that the API is used correctly.
was_api_reset: bool,
/// What has initiated the shutdown.
initiator: ShutdownInitiator,

/// `None` if the [`ConnectionToCoordinatorInner::StartShutdown`] message has already
/// been sent to the coordinator. `Some` if the message hasn't been sent yet.
Expand All @@ -70,12 +69,21 @@ enum MultiStreamConnectionTaskInner<TNow, TSubId> {
/// Connection has finished its shutdown and its shutdown has been acknowledged. There is
/// nothing more to do except stop the connection task.
ShutdownAcked {
/// If true, [`MultiStreamConnectionTask::reset`] has been called. This doesn't modify any
/// of the behavior but is used to make sure that the API is used correctly.
was_api_reset: bool,
/// What has initiated the shutdown.
initiator: ShutdownInitiator,
},
}

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum ShutdownInitiator {
/// The coordinator sent a [`CoordinatorToConnectionInner::StartShutdown`] message.
Coordinator,
/// [`MultiStreamConnectionTask::reset`] has been called.
Api,
/// The shutdown has been initiated due to a protocol error.
Remote,
}

impl<TNow, TSubId> MultiStreamConnectionTask<TNow, TSubId>
where
TNow: Clone + Add<Duration, Output = TNow> + Sub<TNow, Output = Duration> + Ord,
Expand Down Expand Up @@ -403,7 +411,7 @@ where
self.connection = MultiStreamConnectionTaskInner::ShutdownWaitingAck {
start_shutdown_message_to_send: Some(None),
shutdown_finish_message_sent: false,
was_api_reset: false,
initiator: ShutdownInitiator::Coordinator,
};
}
(
Expand All @@ -429,7 +437,7 @@ where
| (
CoordinatorToConnectionInner::StartShutdown,
MultiStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: true,
initiator: ShutdownInitiator::Api | ShutdownInitiator::Remote,
..
},
) => {
Expand All @@ -443,19 +451,22 @@ where
MultiStreamConnectionTaskInner::ShutdownWaitingAck {
start_shutdown_message_to_send: start_shutdown_message_sent,
shutdown_finish_message_sent,
was_api_reset: was_reset,
initiator,
},
) => {
debug_assert!(
start_shutdown_message_sent.is_none() && *shutdown_finish_message_sent
);
self.connection = MultiStreamConnectionTaskInner::ShutdownAcked {
was_api_reset: *was_reset,
initiator: *initiator,
};
}
(
CoordinatorToConnectionInner::StartShutdown,
MultiStreamConnectionTaskInner::ShutdownWaitingAck { .. }
MultiStreamConnectionTaskInner::ShutdownWaitingAck {
initiator: ShutdownInitiator::Coordinator,
..
}
| MultiStreamConnectionTaskInner::ShutdownAcked { .. },
) => unreachable!(),
(CoordinatorToConnectionInner::ShutdownFinishedAck, _) => unreachable!(),
Expand Down Expand Up @@ -546,23 +557,35 @@ where
/// Panics if [`MultiStreamConnectionTask::reset`] has been called in the past.
///
pub fn reset(&mut self) {
// It is illegal to call `reset` a second time. Verify that the user didn't do this.
if let MultiStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: true,
..
}
| MultiStreamConnectionTaskInner::ShutdownAcked {
was_api_reset: true,
} = self.connection
{
panic!()
match self.connection {
MultiStreamConnectionTaskInner::ShutdownWaitingAck {
initiator: ShutdownInitiator::Api,
..
}
| MultiStreamConnectionTaskInner::ShutdownAcked {
initiator: ShutdownInitiator::Api,
..
} => {
// It is illegal to call `reset` a second time.
panic!()
}
MultiStreamConnectionTaskInner::ShutdownWaitingAck {
ref mut initiator, ..
}
| MultiStreamConnectionTaskInner::ShutdownAcked {
ref mut initiator, ..
} => {
// Mark the initiator as being the API in order to track proper API usage.
*initiator = ShutdownInitiator::Api;
}
_ => {
self.connection = MultiStreamConnectionTaskInner::ShutdownWaitingAck {
initiator: ShutdownInitiator::Api,
shutdown_finish_message_sent: false,
start_shutdown_message_to_send: Some(Some(ShutdownCause::RemoteReset)),
};
}
}

self.connection = MultiStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: true,
shutdown_finish_message_sent: false,
start_shutdown_message_to_send: Some(Some(ShutdownCause::RemoteReset)),
};
}

/// Immediately destroys the substream with the given identifier.
Expand Down
97 changes: 57 additions & 40 deletions src/libp2p/collection/single_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,23 +93,32 @@ enum SingleStreamConnectionTaskInner<TNow> {
/// Connection has finished its shutdown. A [`ConnectionToCoordinatorInner::ShutdownFinished`]
/// message has been sent and is waiting to be acknowledged.
ShutdownWaitingAck {
/// If true, [`SingleStreamConnectionTask::reset`] has been called. This doesn't modify
/// any of the behavior but is used to make sure that the API is used correctly.
was_api_reset: bool,
/// What has initiated the shutdown.
initiator: ShutdownInitiator,
},

/// Connection has finished its shutdown and its shutdown has been acknowledged. There is
/// nothing more to do except stop the connection task.
ShutdownAcked {
/// If true, [`SingleStreamConnectionTask::reset`] has been called. This doesn't modify
/// any of the behavior but is used to make sure that the API is used correctly.
was_api_reset: bool,
/// What has initiated the shutdown.
initiator: ShutdownInitiator,
},

/// Temporary state used to satisfy the borrow checker during state transitions.
Poisoned,
}

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum ShutdownInitiator {
/// The coordinator sent a [`CoordinatorToConnectionInner::StartShutdown`] message.
Coordinator,
/// [`SingleStreamConnectionTask::reset`] has been called.
Api,
/// The shutdown has been initiated due to a protocol error or because the connection has been
/// shut down cleanly by the remote.
Remote,
}

impl<TNow> SingleStreamConnectionTask<TNow>
where
TNow: Clone + Add<Duration, Output = TNow> + Sub<TNow, Output = Duration> + Ord,
Expand Down Expand Up @@ -332,7 +341,7 @@ where
self.pending_messages
.push_back(ConnectionToCoordinatorInner::ShutdownFinished);
self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: false,
initiator: ShutdownInitiator::Coordinator,
};
}
(
Expand All @@ -359,7 +368,7 @@ where
| (
CoordinatorToConnectionInner::StartShutdown,
SingleStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: true,
initiator: ShutdownInitiator::Api | ShutdownInitiator::Remote,
},
) => {
// There might still be some messages coming from the coordinator after the
Expand All @@ -369,17 +378,18 @@ where
}
(
CoordinatorToConnectionInner::ShutdownFinishedAck,
SingleStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: was_reset,
},
SingleStreamConnectionTaskInner::ShutdownWaitingAck { initiator },
) => {
self.connection = SingleStreamConnectionTaskInner::ShutdownAcked {
was_api_reset: *was_reset,
initiator: *initiator,
};
}
(
CoordinatorToConnectionInner::StartShutdown,
SingleStreamConnectionTaskInner::ShutdownWaitingAck { .. }
SingleStreamConnectionTaskInner::ShutdownWaitingAck {
initiator: ShutdownInitiator::Coordinator,
..
}
| SingleStreamConnectionTaskInner::ShutdownAcked { .. },
) => unreachable!(),
(CoordinatorToConnectionInner::ShutdownFinishedAck, _) => unreachable!(),
Expand All @@ -404,26 +414,33 @@ where
/// Panics if [`SingleStreamConnectionTask::reset`] has been called in the past.
///
pub fn reset(&mut self) {
// It is illegal to call `reset` a second time. Verify that the user didn't do this.
if let SingleStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: true,
}
| SingleStreamConnectionTaskInner::ShutdownAcked {
was_api_reset: true,
} = self.connection
{
panic!()
match self.connection {
SingleStreamConnectionTaskInner::ShutdownWaitingAck {
initiator: ShutdownInitiator::Api,
}
| SingleStreamConnectionTaskInner::ShutdownAcked {
initiator: ShutdownInitiator::Api,
} => {
// It is illegal to call `reset` a second time.
panic!()
}
SingleStreamConnectionTaskInner::ShutdownWaitingAck { ref mut initiator }
| SingleStreamConnectionTaskInner::ShutdownAcked { ref mut initiator } => {
// Mark the initiator as being the API in order to track proper API usage.
*initiator = ShutdownInitiator::Api;
}
_ => {
self.pending_messages
.push_back(ConnectionToCoordinatorInner::StartShutdown(Some(
ShutdownCause::RemoteReset,
)));
self.pending_messages
.push_back(ConnectionToCoordinatorInner::ShutdownFinished);
self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck {
initiator: ShutdownInitiator::Api,
};
}
}

self.pending_messages
.push_back(ConnectionToCoordinatorInner::StartShutdown(Some(
ShutdownCause::RemoteReset,
)));
self.pending_messages
.push_back(ConnectionToCoordinatorInner::ShutdownFinished);
self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: true,
};
}

/// Reads data coming from the connection, updates the internal state machine, and writes data
Expand Down Expand Up @@ -465,7 +482,7 @@ where
self.pending_messages
.push_back(ConnectionToCoordinatorInner::ShutdownFinished);
self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: false,
initiator: ShutdownInitiator::Remote,
};
return;
}
Expand Down Expand Up @@ -586,7 +603,7 @@ where
self.pending_messages
.push_back(ConnectionToCoordinatorInner::ShutdownFinished);
self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: false,
initiator: ShutdownInitiator::Remote,
};
}
},
Expand Down Expand Up @@ -619,7 +636,7 @@ where
self.pending_messages
.push_back(ConnectionToCoordinatorInner::ShutdownFinished);
self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: false,
initiator: ShutdownInitiator::Remote,
};
return;
}
Expand All @@ -643,7 +660,7 @@ where
self.pending_messages
.push_back(ConnectionToCoordinatorInner::ShutdownFinished);
self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: false,
initiator: ShutdownInitiator::Remote,
};
return;
}
Expand Down Expand Up @@ -714,10 +731,10 @@ where
}

c @ (SingleStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: false,
initiator: ShutdownInitiator::Coordinator | ShutdownInitiator::Remote,
}
| SingleStreamConnectionTaskInner::ShutdownAcked {
was_api_reset: false,
initiator: ShutdownInitiator::Coordinator | ShutdownInitiator::Remote,
}) => {
// The user might legitimately call this function after the connection has
// already shut down. In that case, just do nothing.
Expand All @@ -729,10 +746,10 @@ where
}

SingleStreamConnectionTaskInner::ShutdownWaitingAck {
was_api_reset: true,
initiator: ShutdownInitiator::Api,
}
| SingleStreamConnectionTaskInner::ShutdownAcked {
was_api_reset: true,
initiator: ShutdownInitiator::Api,
} => {
// As documented, it is illegal to call this function after calling `reset()`.
panic!()
Expand Down

0 comments on commit 6ef550f

Please sign in to comment.