Skip to content

Commit

Permalink
Make sure we're connected before starting requests (#2297)
Browse files Browse the repository at this point in the history
* Make sure we're connected before starting requests

* Rustfmt

* peer -> peer_id

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
tomaka and mergify[bot] authored May 18, 2022
1 parent 2594c0f commit 9a8a749
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 35 deletions.
7 changes: 5 additions & 2 deletions bin/full-node/src/run/consensus_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use smoldot::{
identity::keystore,
informant::HashDisplay,
libp2p,
network::{self, protocol::BlockData, service::BlocksRequestError},
network::{self, protocol::BlockData},
sync::all,
};
use std::{
Expand Down Expand Up @@ -323,7 +323,10 @@ struct SyncBackground {
'static,
(
all::RequestId,
Result<Result<Vec<BlockData>, BlocksRequestError>, future::Aborted>,
Result<
Result<Vec<BlockData>, network_service::BlocksRequestError>,
future::Aborted,
>,
),
>,
>,
Expand Down
48 changes: 39 additions & 9 deletions bin/full-node/src/run/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,13 +527,20 @@ impl NetworkService {
chain_index: usize,
scale_encoded_header: &[u8],
is_best: bool,
) -> Result<(), peers::QueueNotificationError> {
let result = self.inner.guarded.lock().await.network.send_block_announce(
&target,
chain_index,
scale_encoded_header,
is_best,
);
) -> Result<(), QueueNotificationError> {
let mut guarded = self.inner.guarded.lock().await;

// The call to `send_block_announce` below panics if we have no active connection.
// TODO: not the correct check; must make sure that we have a substream open
if !guarded.network.has_established_connection(&target) {
return Err(QueueNotificationError::NoConnection);
}

let result = guarded
.network
.send_block_announce(&target, chain_index, scale_encoded_header, is_best)
.map_err(QueueNotificationError::Queue);

self.inner.wake_up_main_background_task.notify(1);
result
}
Expand All @@ -547,7 +554,7 @@ impl NetworkService {
target: PeerId, // TODO: by value?
chain_index: usize,
config: protocol::BlocksRequestConfig,
) -> Result<Vec<protocol::BlockData>, service::BlocksRequestError> {
) -> Result<Vec<protocol::BlockData>, BlocksRequestError> {
let _jaeger_span = self.inner.jaeger_service.outgoing_block_request_span(
&self.inner.local_peer_id,
&target,
Expand All @@ -564,6 +571,11 @@ impl NetworkService {
let rx = {
let mut guarded = self.inner.guarded.lock().await;

// The call to `start_blocks_request` below panics if we have no active connection.
if !guarded.network.has_established_connection(&target) {
return Err(BlocksRequestError::NoConnection);
}

let (tx, rx) = oneshot::channel();

let request_id = guarded.network.start_blocks_request(
Expand All @@ -580,7 +592,7 @@ impl NetworkService {
rx
};

rx.await.unwrap()
rx.await.unwrap().map_err(BlocksRequestError::Request)
}
}

Expand All @@ -602,6 +614,24 @@ pub enum InitError {
BadListenMultiaddr(Multiaddr),
}

/// Error returned by [`NetworkService::blocks_request`].
#[derive(Debug, derive_more::Display)]
pub enum BlocksRequestError {
/// No established connection with the target.
NoConnection,
/// Error during the request.
Request(service::BlocksRequestError),
}

/// Error returned by [`NetworkService::send_block_announce`].
#[derive(Debug, derive_more::Display)]
pub enum QueueNotificationError {
/// No established connection with the target.
NoConnection,
/// Error during the queuing.
Queue(peers::QueueNotificationError),
}

async fn background_task(inner: Arc<Inner>, mut event_senders: Vec<mpsc::Sender<Event>>) {
loop {
// In order to guarantee that waking up `wake_up_background` will run an entirely
Expand Down
113 changes: 98 additions & 15 deletions bin/light-base/src/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,15 @@ impl<TPlat: Platform> NetworkService<TPlat> {
chain_index: usize,
config: protocol::BlocksRequestConfig,
timeout: Duration,
) -> Result<Vec<protocol::BlockData>, service::BlocksRequestError> {
) -> Result<Vec<protocol::BlockData>, BlocksRequestError> {
let rx = {
let mut guarded = self.shared.guarded.lock().await;

// The call to `start_blocks_request` below panics if we have no active connection.
if !guarded.network.has_established_connection(&target) {
return Err(BlocksRequestError::NoConnection);
}

match &config.start {
protocol::BlocksRequestConfigStart::Hash(hash) => {
log::debug!(
Expand Down Expand Up @@ -449,7 +454,7 @@ impl<TPlat: Platform> NetworkService<TPlat> {
}
}

result
result.map_err(BlocksRequestError::Request)
}

/// Sends a grandpa warp sync request to the given peer.
Expand All @@ -460,10 +465,16 @@ impl<TPlat: Platform> NetworkService<TPlat> {
chain_index: usize,
begin_hash: [u8; 32],
timeout: Duration,
) -> Result<protocol::GrandpaWarpSyncResponse, service::GrandpaWarpSyncRequestError> {
) -> Result<protocol::GrandpaWarpSyncResponse, GrandpaWarpSyncRequestError> {
let rx = {
let mut guarded = self.shared.guarded.lock().await;

// The call to `start_grandpa_warp_sync_request` below panics if we have no
// active connection.
if !guarded.network.has_established_connection(&target) {
return Err(GrandpaWarpSyncRequestError::NoConnection);
}

log::debug!(
target: "network", "Connection({}) <= GrandpaWarpSyncRequest(chain={}, start={})",
target, self.shared.log_chain_names[chain_index], HashDisplay(&begin_hash)
Expand Down Expand Up @@ -509,7 +520,7 @@ impl<TPlat: Platform> NetworkService<TPlat> {
}
}

result
result.map_err(GrandpaWarpSyncRequestError::Request)
}

pub async fn set_local_best_block(
Expand Down Expand Up @@ -557,10 +568,16 @@ impl<TPlat: Platform> NetworkService<TPlat> {
target: PeerId, // TODO: takes by value because of futures longevity issue
config: protocol::StorageProofRequestConfig<impl Iterator<Item = impl AsRef<[u8]>>>,
timeout: Duration,
) -> Result<Vec<Vec<u8>>, service::StorageProofRequestError> {
) -> Result<Vec<Vec<u8>>, StorageProofRequestError> {
let rx = {
let mut guarded = self.shared.guarded.lock().await;

// The call to `start_storage_proof_request` below panics if we have no active
// connection.
if !guarded.network.has_established_connection(&target) {
return Err(StorageProofRequestError::NoConnection);
}

log::debug!(
target: "network",
"Connection({}) <= StorageProofRequest(chain={}, block={})",
Expand Down Expand Up @@ -608,7 +625,7 @@ impl<TPlat: Platform> NetworkService<TPlat> {
}
}

result
result.map_err(StorageProofRequestError::Request)
}

/// Sends a call proof request to the given peer.
Expand All @@ -621,10 +638,15 @@ impl<TPlat: Platform> NetworkService<TPlat> {
target: PeerId, // TODO: takes by value because of futures longevity issue
config: protocol::CallProofRequestConfig<'a, impl Iterator<Item = impl AsRef<[u8]>>>,
timeout: Duration,
) -> Result<Vec<Vec<u8>>, service::CallProofRequestError> {
) -> Result<Vec<Vec<u8>>, CallProofRequestError> {
let rx = {
let mut guarded = self.shared.guarded.lock().await;

// The call to `start_call_proof_request` below panics if we have no active connection.
if !guarded.network.has_established_connection(&target) {
return Err(CallProofRequestError::NoConnection);
}

log::debug!(
target: "network",
"Connection({}) <= CallProofRequest({}, {}, {})",
Expand Down Expand Up @@ -673,7 +695,7 @@ impl<TPlat: Platform> NetworkService<TPlat> {
}
}

result
result.map_err(CallProofRequestError::Request)
}

/// Announces transaction to the peers we are connected to.
Expand Down Expand Up @@ -719,14 +741,19 @@ impl<TPlat: Platform> NetworkService<TPlat> {
chain_index: usize,
scale_encoded_header: &[u8],
is_best: bool,
) -> Result<(), peers::QueueNotificationError> {
let result = self
.shared
.guarded
.lock()
.await
) -> Result<(), QueueNotificationError> {
let mut guarded = self.shared.guarded.lock().await;

// The call to `send_block_announce` below panics if we have no active connection.
// TODO: not the correct check; must make sure that we have a substream open
if !guarded.network.has_established_connection(&target) {
return Err(QueueNotificationError::NoConnection);
}

let result = guarded
.network
.send_block_announce(&target, chain_index, scale_encoded_header, is_best);
.send_block_announce(&target, chain_index, scale_encoded_header, is_best)
.map_err(QueueNotificationError::Queue);

self.shared.wake_up_main_background_task.notify(1);

Expand Down Expand Up @@ -807,6 +834,62 @@ pub enum Event {
},
}

/// Error returned by [`NetworkService::blocks_request`].
#[derive(Debug, derive_more::Display)]
pub enum BlocksRequestError {
/// No established connection with the target.
NoConnection,
/// Error during the request.
Request(service::BlocksRequestError),
}

/// Error returned by [`NetworkService::grandpa_warp_sync_request`].
#[derive(Debug, derive_more::Display)]
pub enum GrandpaWarpSyncRequestError {
/// No established connection with the target.
NoConnection,
/// Error during the request.
Request(service::GrandpaWarpSyncRequestError),
}

/// Error returned by [`NetworkService::storage_proof_request`].
#[derive(Debug, derive_more::Display, Clone)]
pub enum StorageProofRequestError {
/// No established connection with the target.
NoConnection,
/// Error during the request.
Request(service::StorageProofRequestError),
}

/// Error returned by [`NetworkService::call_proof_request`].
#[derive(Debug, derive_more::Display, Clone)]
pub enum CallProofRequestError {
/// No established connection with the target.
NoConnection,
/// Error during the request.
Request(service::CallProofRequestError),
}

impl CallProofRequestError {
/// Returns `true` if this is caused by networking issues, as opposed to a consensus-related
/// issue.
pub fn is_network_problem(&self) -> bool {
match self {
CallProofRequestError::Request(err) => err.is_network_problem(),
CallProofRequestError::NoConnection => true,
}
}
}

/// Error returned by [`NetworkService::send_block_announce`].
#[derive(Debug, derive_more::Display)]
pub enum QueueNotificationError {
/// No established connection with the target.
NoConnection,
/// Error during the queuing.
Queue(peers::QueueNotificationError),
}

async fn background_task<TPlat: Platform>(
shared: Arc<Shared<TPlat>>,
mut event_senders: Vec<mpsc::Sender<Event>>,
Expand Down
27 changes: 20 additions & 7 deletions bin/light-base/src/sync_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,9 +546,11 @@ impl<TPlat: Platform> SyncService<TPlat> {
match result {
Ok(value) if !value.is_empty() => return Ok(value),
// TODO: this check of emptiness is a bit of a hack; it is necessary because Substrate responds to requests about blocks it doesn't know with an empty proof
Ok(_) => outcome_errors.push(service::CallProofRequestError::Request(
smoldot::libp2p::peers::RequestError::Substream(
smoldot::libp2p::connection::established::RequestError::SubstreamClosed,
Ok(_) => outcome_errors.push(network_service::CallProofRequestError::Request(
service::CallProofRequestError::Request(
smoldot::libp2p::peers::RequestError::Substream(
smoldot::libp2p::connection::established::RequestError::SubstreamClosed,
),
),
)),
Err(err) => {
Expand Down Expand Up @@ -576,8 +578,19 @@ impl StorageQueryError {
/// issue.
pub fn is_network_problem(&self) -> bool {
self.errors.iter().all(|err| match err {
StorageQueryErrorDetail::Network(service::StorageProofRequestError::Request(_)) => true,
StorageQueryErrorDetail::Network(service::StorageProofRequestError::Decode(_)) => false,
StorageQueryErrorDetail::Network(
network_service::StorageProofRequestError::Request(
service::StorageProofRequestError::Request(_),
),
)
| StorageQueryErrorDetail::Network(
network_service::StorageProofRequestError::NoConnection,
) => true,
StorageQueryErrorDetail::Network(
network_service::StorageProofRequestError::Request(
service::StorageProofRequestError::Decode(_),
),
) => false,
// TODO: as a temporary hack, we consider `TrieRootNotFound` as the remote not knowing about the requested block; see https://github.com/paritytech/substrate/pull/8046
StorageQueryErrorDetail::ProofVerification(proof_verify::Error::TrieRootNotFound) => {
true
Expand Down Expand Up @@ -606,7 +619,7 @@ impl fmt::Display for StorageQueryError {
pub enum StorageQueryErrorDetail {
/// Error during the network request.
#[display(fmt = "{}", _0)]
Network(service::StorageProofRequestError),
Network(network_service::StorageProofRequestError),
/// Error verifying the proof.
#[display(fmt = "{}", _0)]
ProofVerification(proof_verify::Error),
Expand All @@ -617,7 +630,7 @@ pub enum StorageQueryErrorDetail {
pub struct CallProofQueryError {
/// Contains one error per peer that has been contacted. If this list is empty, then we
/// aren't connected to any node.
pub errors: Vec<service::CallProofRequestError>,
pub errors: Vec<network_service::CallProofRequestError>,
}

impl CallProofQueryError {
Expand Down
4 changes: 2 additions & 2 deletions bin/light-base/src/sync_service/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ struct Task<TPlat: Platform> {
(
all::RequestId,
Result<
Result<Vec<protocol::BlockData>, network::service::BlocksRequestError>,
Result<Vec<protocol::BlockData>, network_service::BlocksRequestError>,
future::Aborted,
>,
),
Expand All @@ -410,7 +410,7 @@ struct Task<TPlat: Platform> {
Result<
Result<
protocol::GrandpaWarpSyncResponse,
network::service::GrandpaWarpSyncRequestError,
network_service::GrandpaWarpSyncRequestError,
>,
future::Aborted,
>,
Expand Down
14 changes: 14 additions & 0 deletions src/libp2p/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,20 @@ where
self.inner.respond_in_request(id.0, response)
}

/// Returns `true` if there exists an established connection with the given peer.
pub fn has_established_connection(&self, peer_id: &PeerId) -> bool {
let peer_index = match self.peer_indices.get(peer_id) {
Some(idx) => *idx,
None => return false,
};

self.connections_by_peer
.range(
(peer_index, ConnectionId::min_value())..=(peer_index, ConnectionId::max_value()),
)
.any(|(_, established)| *established)
}

/// Returns an iterator to the list of [`PeerId`]s that we have an established connection
/// with.
pub fn peers_list(&self) -> impl Iterator<Item = &PeerId> {
Expand Down
Loading

0 comments on commit 9a8a749

Please sign in to comment.