Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ban peers when a request fails #1482

Merged
merged 16 commits into from
Dec 20, 2023
104 changes: 96 additions & 8 deletions full-node/src/consensus_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,7 @@ impl SyncBackground {
all::BlockAnnounceOutcome::NotFinalizedChain => {}
all::BlockAnnounceOutcome::Discarded => {}
all::BlockAnnounceOutcome::StoredForLater {} => {}
all::BlockAnnounceOutcome::InvalidHeader(_) => unreachable!(),
all::BlockAnnounceOutcome::InvalidHeader(_) => unreachable!(), // TODO: ?!?! why unreachable? also, ban the peer
}
}
WakeUpReason::NetworkEvent(network_service::Event::GrandpaNeighborPacket {
Expand Down Expand Up @@ -1102,6 +1102,17 @@ impl SyncBackground {
source_id,
result: Err(_),
}) => {
// Note that we perform the ban even if the source is now disconnected.
let peer_id = self.sync[source_id].as_ref().unwrap().peer_id.clone();
self.network_service
.ban_and_disconnect(
peer_id,
self.network_chain_id,
network_service::BanSeverity::Low,
"blocks-request-error",
)
.await;

let _ = self
.sync
.blocks_request_response(request_id, Err::<iter::Empty<_>, _>(()));
Expand Down Expand Up @@ -1167,6 +1178,17 @@ impl SyncBackground {
source_id,
result: Err(_),
}) => {
// Note that we perform the ban even if the source is now disconnected.
let peer_id = self.sync[source_id].as_ref().unwrap().peer_id.clone();
self.network_service
.ban_and_disconnect(
peer_id,
self.network_chain_id,
network_service::BanSeverity::Low,
"warp-sync-request-error",
)
.await;

let _ = self.sync.grandpa_warp_sync_response_err(request_id);

// If the source was actually disconnected and has no other request in
Expand All @@ -1192,7 +1214,19 @@ impl SyncBackground {
source_id,
result,
}) => {
// Storage proof request.
if result.is_err() {
// Note that we perform the ban even if the source is now disconnected.
let peer_id = self.sync[source_id].as_ref().unwrap().peer_id.clone();
self.network_service
.ban_and_disconnect(
peer_id,
self.network_chain_id,
network_service::BanSeverity::Low,
"storage-proof-request-error",
)
.await;
}

let _ = self
.sync
.storage_get_response(request_id, result.map(|r| r.decode().to_owned()));
Expand Down Expand Up @@ -1220,7 +1254,19 @@ impl SyncBackground {
source_id,
result,
}) => {
// Successful call proof request.
if result.is_err() {
// Note that we perform the ban even if the source is now disconnected.
let peer_id = self.sync[source_id].as_ref().unwrap().peer_id.clone();
self.network_service
.ban_and_disconnect(
peer_id,
self.network_chain_id,
network_service::BanSeverity::Low,
"call-proof-request-error",
)
.await;
}

self.sync
.call_proof_response(request_id, result.map(|r| r.decode().to_owned()));
// TODO: need help from networking service to avoid this to_owned
Expand Down Expand Up @@ -1855,6 +1901,17 @@ impl SyncBackground {
);
}
Err(err) => {
if let Some(sender) = &sender {
self.network_service
.ban_and_disconnect(
sender.clone(),
self.network_chain_id,
network_service::BanSeverity::High,
"bad-warp-sync-fragment",
)
.await;
}

self.log_callback.log(
LogLevel::Warn,
format!(
Expand Down Expand Up @@ -1896,6 +1953,7 @@ impl SyncBackground {
unimplemented!()
}
all::ProcessOne::VerifyBlock(verify) => {
// TODO: ban peer in case of verification failure
let when_verification_started = Instant::now();
let mut database_accesses_duration = Duration::new(0, 0);
let mut runtime_build_duration = Duration::new(0, 0);
Expand Down Expand Up @@ -2333,6 +2391,13 @@ impl SyncBackground {
}

all::ProcessOne::VerifyFinalityProof(verify) => {
let sender = verify
.sender()
.1
.as_ref()
.map(|s| s.peer_id.clone())
.unwrap();

match verify.perform(rand::random()) {
(
sync_out,
Expand All @@ -2352,7 +2417,7 @@ impl SyncBackground {
self.log_callback.log(
LogLevel::Debug,
format!(
"finality-proof-verification; outcome=success; new-finalized={}",
"finality-proof-verification; outcome=success, sender={sender}, new-finalized={}",
HashDisplay(&new_finalized_hash)
),
);
Expand Down Expand Up @@ -2407,31 +2472,54 @@ impl SyncBackground {
(sync_out, all::FinalityProofVerifyOutcome::GrandpaCommitPending) => {
self.log_callback.log(
LogLevel::Debug,
"finality-proof-verification; outcome=pending".to_string(),
"finality-proof-verification; outcome=pending, sender={sender}"
.to_string(),
);
self.sync = sync_out;
(self, true)
}
(sync_out, all::FinalityProofVerifyOutcome::AlreadyFinalized) => {
self.log_callback.log(
LogLevel::Debug,
"finality-proof-verification; outcome=already-finalized".to_string(),
"finality-proof-verification; outcome=already-finalized, sender={sender}".to_string(),
);
self.sync = sync_out;
(self, true)
}
(sync_out, all::FinalityProofVerifyOutcome::GrandpaCommitError(error)) => {
self.network_service
.ban_and_disconnect(
sender.clone(),
self.network_chain_id,
network_service::BanSeverity::High,
"bad-warp-sync-fragment",
)
.await;
self.log_callback.log(
LogLevel::Warn,
format!("finality-proof-verification-failure; error={}", error),
format!(
"finality-proof-verification-failure; sender={sender}, error={}",
error
),
);
self.sync = sync_out;
(self, true)
}
(sync_out, all::FinalityProofVerifyOutcome::JustificationError(error)) => {
self.network_service
.ban_and_disconnect(
sender.clone(),
self.network_chain_id,
network_service::BanSeverity::High,
"bad-warp-sync-fragment",
)
.await;
self.log_callback.log(
LogLevel::Warn,
format!("finality-proof-verification-failure; error={}", error),
format!(
"finality-proof-verification-failure; sender={sender}, error={}",
error
),
);
self.sync = sync_out;
(self, true)
Expand Down
99 changes: 99 additions & 0 deletions full-node/src/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ pub struct NetworkService {
}

enum ToBackground {
ForegroundDisconnectAndBan {
peer_id: PeerId,
chain_id: ChainId,
severity: BanSeverity,
reason: &'static str,
},
ForegroundAnnounceBlock {
target: PeerId,
chain_id: ChainId,
Expand Down Expand Up @@ -334,6 +340,13 @@ struct Chain {
max_in_peers: usize,
}

/// Severity of a ban. See [`NetworkService::ban_and_disconnect`].
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum BanSeverity {
Low,
High,
}

impl NetworkService {
/// Initializes the network service with the given configuration.
pub async fn new(
Expand Down Expand Up @@ -605,6 +618,37 @@ impl NetworkService {
.await;
}

/// Starts asynchronously disconnecting the given peer. A [`Event::Disconnected`] will later be
/// generated. Prevents a new gossip link with the same peer from being reopened for a
/// little while.
///
/// `reason` is a human-readable string printed in the logs.
///
/// Due to race conditions, it is possible to reconnect to the peer soon after, in case the
/// reconnection was already happening as the call to this function is still being processed.
/// If that happens another [`Event::Disconnected`] will be delivered afterwards. In other
/// words, this function guarantees that we will be disconnected in the future rather than
/// guarantees that we will disconnect.
pub async fn ban_and_disconnect(
&self,
peer_id: PeerId,
chain_id: ChainId,
severity: BanSeverity,
reason: &'static str,
) {
let _ = self
.to_background_tx
.lock()
.await
.send(ToBackground::ForegroundDisconnectAndBan {
peer_id,
chain_id,
severity,
reason,
})
.await;
}

pub async fn send_block_announce(
self: Arc<Self>,
target: PeerId,
Expand Down Expand Up @@ -1049,6 +1093,61 @@ async fn background_task(mut inner: Inner) {
return;
}

WakeUpReason::Message(ToBackground::ForegroundDisconnectAndBan {
peer_id,
chain_id,
severity,
reason,
}) => {
// Note that peer doesn't necessarily have an out slot.
inner.peering_strategy.unassign_slot_and_ban(
&chain_id,
&peer_id,
Instant::now()
+ Duration::from_secs(match severity {
BanSeverity::Low => 10,
BanSeverity::High => 40,
}),
);
if inner.network.gossip_remove_desired(
chain_id,
&peer_id,
service::GossipKind::ConsensusTransactions,
) {
inner.log_callback.log(
LogLevel::Debug,
format!(
"slot-unassigned; peer_id={}; chain={}; reason=user-ban; user-reason={}",
peer_id, inner.network[chain_id].log_name, reason
),
);
}

if inner.network.gossip_is_connected(
chain_id,
&peer_id,
service::GossipKind::ConsensusTransactions,
) {
let _close_result = inner.network.gossip_close(
chain_id,
&peer_id,
service::GossipKind::ConsensusTransactions,
);
debug_assert!(_close_result.is_ok());

inner.log_callback.log(
LogLevel::Debug,
format!(
"chain-disconnected; peer_id={}; chain={}",
peer_id, inner.network[chain_id].log_name
),
);

debug_assert!(inner.event_pending_send.is_none());
inner.event_pending_send = Some(Event::Disconnected { chain_id, peer_id });
}
}

WakeUpReason::Message(ToBackground::ForegroundAnnounceBlock {
target,
chain_id,
Expand Down
47 changes: 47 additions & 0 deletions lib/src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3357,6 +3357,53 @@ where
.map(|(_, peer_index, _, _, _)| &self.peers[peer_index.0])
}

/// Returns the list of all peers for a [`Event::GossipConnected`] event of the given kind has
/// been emitted.
/// It is possible to send gossip notifications to these peers.
///
/// # Panic
///
/// Panics if the [`ChainId`] is invalid.
///
pub fn gossip_is_connected(
&'_ self,
chain_id: ChainId,
target: &PeerId,
kind: GossipKind,
) -> bool {
assert!(self.chains.contains(chain_id.0));
let GossipKind::ConsensusTransactions = kind;

let Some(&peer_index) = self.peers_by_peer_id.get(target) else {
// If the `PeerId` is unknown, then we also don't have any gossip link to it.
return false;
};

self.notification_substreams_by_peer_id
.range(
(
NotificationsProtocol::BlockAnnounces {
chain_index: chain_id.0,
},
peer_index,
SubstreamDirection::Out,
NotificationsSubstreamState::open_min_value(),
SubstreamId::min_value(),
)
..=(
NotificationsProtocol::BlockAnnounces {
chain_index: chain_id.0,
},
peer_index,
SubstreamDirection::Out,
NotificationsSubstreamState::open_max_value(),
SubstreamId::max_value(),
),
)
.next()
.is_some()
}

/// Open a gossiping substream with the given peer on the given chain.
///
/// Either a [`Event::GossipConnected`] or [`Event::GossipOpenFailed`] is guaranteed to later
Expand Down
Loading
Loading