Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure we're connected before starting requests #2297

Merged
merged 4 commits into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions bin/full-node/src/run/consensus_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use smoldot::{
identity::keystore,
informant::HashDisplay,
libp2p,
network::{self, protocol::BlockData, service::BlocksRequestError},
network::{self, protocol::BlockData},
sync::all,
};
use std::{
Expand Down Expand Up @@ -323,7 +323,10 @@ struct SyncBackground {
'static,
(
all::RequestId,
Result<Result<Vec<BlockData>, BlocksRequestError>, future::Aborted>,
Result<
Result<Vec<BlockData>, network_service::BlocksRequestError>,
future::Aborted,
>,
),
>,
>,
Expand Down
48 changes: 39 additions & 9 deletions bin/full-node/src/run/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,13 +527,20 @@ impl NetworkService {
chain_index: usize,
scale_encoded_header: &[u8],
is_best: bool,
) -> Result<(), peers::QueueNotificationError> {
let result = self.inner.guarded.lock().await.network.send_block_announce(
&target,
chain_index,
scale_encoded_header,
is_best,
);
) -> Result<(), QueueNotificationError> {
let mut guarded = self.inner.guarded.lock().await;

// The call to `send_block_announce` below panics if we have no active connection.
// TODO: not the correct check; must make sure that we have a substream open
if !guarded.network.has_established_connection(&target) {
return Err(QueueNotificationError::NoConnection);
}

let result = guarded
.network
.send_block_announce(&target, chain_index, scale_encoded_header, is_best)
.map_err(QueueNotificationError::Queue);

self.inner.wake_up_main_background_task.notify(1);
result
}
Expand All @@ -547,7 +554,7 @@ impl NetworkService {
target: PeerId, // TODO: by value?
chain_index: usize,
config: protocol::BlocksRequestConfig,
) -> Result<Vec<protocol::BlockData>, service::BlocksRequestError> {
) -> Result<Vec<protocol::BlockData>, BlocksRequestError> {
let _jaeger_span = self.inner.jaeger_service.outgoing_block_request_span(
&self.inner.local_peer_id,
&target,
Expand All @@ -564,6 +571,11 @@ impl NetworkService {
let rx = {
let mut guarded = self.inner.guarded.lock().await;

// The call to `start_blocks_request` below panics if we have no active connection.
if !guarded.network.has_established_connection(&target) {
return Err(BlocksRequestError::NoConnection);
}

let (tx, rx) = oneshot::channel();

let request_id = guarded.network.start_blocks_request(
Expand All @@ -580,7 +592,7 @@ impl NetworkService {
rx
};

rx.await.unwrap()
rx.await.unwrap().map_err(BlocksRequestError::Request)
}
}

Expand All @@ -602,6 +614,24 @@ pub enum InitError {
BadListenMultiaddr(Multiaddr),
}

/// Error returned by [`NetworkService::blocks_request`].
#[derive(Debug, derive_more::Display)]
pub enum BlocksRequestError {
/// No established connection with the target.
NoConnection,
/// Error during the request.
Request(service::BlocksRequestError),
}

/// Error returned by [`NetworkService::send_block_announce`].
#[derive(Debug, derive_more::Display)]
pub enum QueueNotificationError {
/// No established connection with the target.
NoConnection,
/// Error during the queuing.
Queue(peers::QueueNotificationError),
}

async fn background_task(inner: Arc<Inner>, mut event_senders: Vec<mpsc::Sender<Event>>) {
loop {
// In order to guarantee that waking up `wake_up_background` will run an entirely
Expand Down
113 changes: 98 additions & 15 deletions bin/light-base/src/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,15 @@ impl<TPlat: Platform> NetworkService<TPlat> {
chain_index: usize,
config: protocol::BlocksRequestConfig,
timeout: Duration,
) -> Result<Vec<protocol::BlockData>, service::BlocksRequestError> {
) -> Result<Vec<protocol::BlockData>, BlocksRequestError> {
let rx = {
let mut guarded = self.shared.guarded.lock().await;

// The call to `start_blocks_request` below panics if we have no active connection.
if !guarded.network.has_established_connection(&target) {
return Err(BlocksRequestError::NoConnection);
}

match &config.start {
protocol::BlocksRequestConfigStart::Hash(hash) => {
log::debug!(
Expand Down Expand Up @@ -449,7 +454,7 @@ impl<TPlat: Platform> NetworkService<TPlat> {
}
}

result
result.map_err(BlocksRequestError::Request)
}

/// Sends a grandpa warp sync request to the given peer.
Expand All @@ -460,10 +465,16 @@ impl<TPlat: Platform> NetworkService<TPlat> {
chain_index: usize,
begin_hash: [u8; 32],
timeout: Duration,
) -> Result<protocol::GrandpaWarpSyncResponse, service::GrandpaWarpSyncRequestError> {
) -> Result<protocol::GrandpaWarpSyncResponse, GrandpaWarpSyncRequestError> {
let rx = {
let mut guarded = self.shared.guarded.lock().await;

// The call to `start_grandpa_warp_sync_request` below panics if we have no
// active connection.
if !guarded.network.has_established_connection(&target) {
return Err(GrandpaWarpSyncRequestError::NoConnection);
}

log::debug!(
target: "network", "Connection({}) <= GrandpaWarpSyncRequest(chain={}, start={})",
target, self.shared.log_chain_names[chain_index], HashDisplay(&begin_hash)
Expand Down Expand Up @@ -509,7 +520,7 @@ impl<TPlat: Platform> NetworkService<TPlat> {
}
}

result
result.map_err(GrandpaWarpSyncRequestError::Request)
}

pub async fn set_local_best_block(
Expand Down Expand Up @@ -557,10 +568,16 @@ impl<TPlat: Platform> NetworkService<TPlat> {
target: PeerId, // TODO: takes by value because of futures longevity issue
config: protocol::StorageProofRequestConfig<impl Iterator<Item = impl AsRef<[u8]>>>,
timeout: Duration,
) -> Result<Vec<Vec<u8>>, service::StorageProofRequestError> {
) -> Result<Vec<Vec<u8>>, StorageProofRequestError> {
let rx = {
let mut guarded = self.shared.guarded.lock().await;

// The call to `start_storage_proof_request` below panics if we have no active
// connection.
if !guarded.network.has_established_connection(&target) {
return Err(StorageProofRequestError::NoConnection);
}

log::debug!(
target: "network",
"Connection({}) <= StorageProofRequest(chain={}, block={})",
Expand Down Expand Up @@ -608,7 +625,7 @@ impl<TPlat: Platform> NetworkService<TPlat> {
}
}

result
result.map_err(StorageProofRequestError::Request)
}

/// Sends a call proof request to the given peer.
Expand All @@ -621,10 +638,15 @@ impl<TPlat: Platform> NetworkService<TPlat> {
target: PeerId, // TODO: takes by value because of futures longevity issue
config: protocol::CallProofRequestConfig<'a, impl Iterator<Item = impl AsRef<[u8]>>>,
timeout: Duration,
) -> Result<Vec<Vec<u8>>, service::CallProofRequestError> {
) -> Result<Vec<Vec<u8>>, CallProofRequestError> {
let rx = {
let mut guarded = self.shared.guarded.lock().await;

// The call to `start_call_proof_request` below panics if we have no active connection.
if !guarded.network.has_established_connection(&target) {
return Err(CallProofRequestError::NoConnection);
}

log::debug!(
target: "network",
"Connection({}) <= CallProofRequest({}, {}, {})",
Expand Down Expand Up @@ -673,7 +695,7 @@ impl<TPlat: Platform> NetworkService<TPlat> {
}
}

result
result.map_err(CallProofRequestError::Request)
}

/// Announces transaction to the peers we are connected to.
Expand Down Expand Up @@ -719,14 +741,19 @@ impl<TPlat: Platform> NetworkService<TPlat> {
chain_index: usize,
scale_encoded_header: &[u8],
is_best: bool,
) -> Result<(), peers::QueueNotificationError> {
let result = self
.shared
.guarded
.lock()
.await
) -> Result<(), QueueNotificationError> {
let mut guarded = self.shared.guarded.lock().await;

// The call to `send_block_announce` below panics if we have no active connection.
// TODO: not the correct check; must make sure that we have a substream open
if !guarded.network.has_established_connection(&target) {
return Err(QueueNotificationError::NoConnection);
}

let result = guarded
.network
.send_block_announce(&target, chain_index, scale_encoded_header, is_best);
.send_block_announce(&target, chain_index, scale_encoded_header, is_best)
.map_err(QueueNotificationError::Queue);

self.shared.wake_up_main_background_task.notify(1);

Expand Down Expand Up @@ -807,6 +834,62 @@ pub enum Event {
},
}

/// Error returned by [`NetworkService::blocks_request`].
#[derive(Debug, derive_more::Display)]
pub enum BlocksRequestError {
/// No established connection with the target.
NoConnection,
/// Error during the request.
Request(service::BlocksRequestError),
}

/// Error returned by [`NetworkService::grandpa_warp_sync_request`].
#[derive(Debug, derive_more::Display)]
pub enum GrandpaWarpSyncRequestError {
/// No established connection with the target.
NoConnection,
/// Error during the request.
Request(service::GrandpaWarpSyncRequestError),
}

/// Error returned by [`NetworkService::storage_proof_request`].
#[derive(Debug, derive_more::Display, Clone)]
pub enum StorageProofRequestError {
/// No established connection with the target.
NoConnection,
/// Error during the request.
Request(service::StorageProofRequestError),
}

/// Error returned by [`NetworkService::call_proof_request`].
#[derive(Debug, derive_more::Display, Clone)]
pub enum CallProofRequestError {
/// No established connection with the target.
NoConnection,
/// Error during the request.
Request(service::CallProofRequestError),
}

impl CallProofRequestError {
/// Returns `true` if this is caused by networking issues, as opposed to a consensus-related
/// issue.
pub fn is_network_problem(&self) -> bool {
match self {
CallProofRequestError::Request(err) => err.is_network_problem(),
CallProofRequestError::NoConnection => true,
}
}
}

/// Error returned by [`NetworkService::send_block_announce`].
#[derive(Debug, derive_more::Display)]
pub enum QueueNotificationError {
/// No established connection with the target.
NoConnection,
/// Error during the queuing.
Queue(peers::QueueNotificationError),
}

async fn background_task<TPlat: Platform>(
shared: Arc<Shared<TPlat>>,
mut event_senders: Vec<mpsc::Sender<Event>>,
Expand Down
27 changes: 20 additions & 7 deletions bin/light-base/src/sync_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,9 +546,11 @@ impl<TPlat: Platform> SyncService<TPlat> {
match result {
Ok(value) if !value.is_empty() => return Ok(value),
// TODO: this check of emptiness is a bit of a hack; it is necessary because Substrate responds to requests about blocks it doesn't know with an empty proof
Ok(_) => outcome_errors.push(service::CallProofRequestError::Request(
smoldot::libp2p::peers::RequestError::Substream(
smoldot::libp2p::connection::established::RequestError::SubstreamClosed,
Ok(_) => outcome_errors.push(network_service::CallProofRequestError::Request(
service::CallProofRequestError::Request(
smoldot::libp2p::peers::RequestError::Substream(
smoldot::libp2p::connection::established::RequestError::SubstreamClosed,
),
),
)),
Err(err) => {
Expand Down Expand Up @@ -576,8 +578,19 @@ impl StorageQueryError {
/// issue.
pub fn is_network_problem(&self) -> bool {
self.errors.iter().all(|err| match err {
StorageQueryErrorDetail::Network(service::StorageProofRequestError::Request(_)) => true,
StorageQueryErrorDetail::Network(service::StorageProofRequestError::Decode(_)) => false,
StorageQueryErrorDetail::Network(
network_service::StorageProofRequestError::Request(
service::StorageProofRequestError::Request(_),
),
)
| StorageQueryErrorDetail::Network(
network_service::StorageProofRequestError::NoConnection,
) => true,
StorageQueryErrorDetail::Network(
network_service::StorageProofRequestError::Request(
service::StorageProofRequestError::Decode(_),
),
) => false,
// TODO: as a temporary hack, we consider `TrieRootNotFound` as the remote not knowing about the requested block; see https://github.com/paritytech/substrate/pull/8046
StorageQueryErrorDetail::ProofVerification(proof_verify::Error::TrieRootNotFound) => {
true
Expand Down Expand Up @@ -606,7 +619,7 @@ impl fmt::Display for StorageQueryError {
pub enum StorageQueryErrorDetail {
/// Error during the network request.
#[display(fmt = "{}", _0)]
Network(service::StorageProofRequestError),
Network(network_service::StorageProofRequestError),
/// Error verifying the proof.
#[display(fmt = "{}", _0)]
ProofVerification(proof_verify::Error),
Expand All @@ -617,7 +630,7 @@ pub enum StorageQueryErrorDetail {
pub struct CallProofQueryError {
/// Contains one error per peer that has been contacted. If this list is empty, then we
/// aren't connected to any node.
pub errors: Vec<service::CallProofRequestError>,
pub errors: Vec<network_service::CallProofRequestError>,
}

impl CallProofQueryError {
Expand Down
4 changes: 2 additions & 2 deletions bin/light-base/src/sync_service/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ struct Task<TPlat: Platform> {
(
all::RequestId,
Result<
Result<Vec<protocol::BlockData>, network::service::BlocksRequestError>,
Result<Vec<protocol::BlockData>, network_service::BlocksRequestError>,
future::Aborted,
>,
),
Expand All @@ -410,7 +410,7 @@ struct Task<TPlat: Platform> {
Result<
Result<
protocol::GrandpaWarpSyncResponse,
network::service::GrandpaWarpSyncRequestError,
network_service::GrandpaWarpSyncRequestError,
>,
future::Aborted,
>,
Expand Down
14 changes: 14 additions & 0 deletions src/libp2p/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,20 @@ where
self.inner.respond_in_request(id.0, response)
}

/// Returns `true` if there exists an established connection with the given peer.
pub fn has_established_connection(&self, peer_id: &PeerId) -> bool {
let peer_index = match self.peer_indices.get(peer_id) {
Some(idx) => *idx,
None => return false,
};

self.connections_by_peer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would we want to have multiple connections to the same peer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can happen accidentally if two peers connect to each other at the same time.

Another situation is if you connect to an IP address and expect to find peer A but actually connect to peer B, even though you're already connected to B.
In that case, from the point of view of peer B, maybe you have accidentally connected to it (the example that I just gave), but maybe the first connection you had with B is actually dead and you're reopening a new one.

The best way to handle these situations is to allow multiple connections between peers, and automatically close unused ones.

.range(
(peer_index, ConnectionId::min_value())..=(peer_index, ConnectionId::max_value()),
)
.any(|(_, established)| *established)
}

/// Returns an iterator to the list of [`PeerId`]s that we have an established connection
/// with.
pub fn peers_list(&self) -> impl Iterator<Item = &PeerId> {
Expand Down
Loading