From 95451e7c432d421c0eed9f04f23d0b3ca89b9546 Mon Sep 17 00:00:00 2001 From: jjy Date: Wed, 19 Dec 2018 22:24:06 +0800 Subject: [PATCH] fix: use new strategy to evict inbound peer --- Cargo.lock | 1 + network/Cargo.toml | 1 + network/src/ckb_service.rs | 2 + network/src/network.rs | 17 +++- network/src/network_group.rs | 24 +++-- network/src/peers_registry.rs | 142 +++++++++++++++++++--------- network/src/ping_service.rs | 12 ++- network/src/tests/peers_registry.rs | 69 ++++++++++++-- 8 files changed, 204 insertions(+), 64 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 196a47525a..db16161475 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -465,6 +465,7 @@ name = "ckb-network" version = "0.3.0-pre" dependencies = [ "bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", + "ckb-time 0.3.0-pre", "ckb-util 0.3.0-pre", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/network/Cargo.toml b/network/Cargo.toml index 8c7da3cca1..e4a0dfcc20 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -18,3 +18,4 @@ tokio = "0.1.8" futures = { version = "0.1.19", features = ["use_std"] } snap = "0.2" libp2p = { git = "https://github.com/libp2p/rust-libp2p", rev="cfdfca1a06fb2deb9ebcc15a63d715ebddb23bd0", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] } +ckb-time = { path = "../util/time" } diff --git a/network/src/ckb_service.rs b/network/src/ckb_service.rs index eac2502b03..19a1909160 100644 --- a/network/src/ckb_service.rs +++ b/network/src/ckb_service.rs @@ -8,6 +8,7 @@ use crate::protocol_service::ProtocolService; use crate::CKBProtocolHandler; use crate::Network; use crate::PeerId; +use ckb_time::now_ms; use futures::future::{self, Future}; use futures::Stream; use libp2p::core::{Endpoint, Multiaddr, UniqueConnecState}; @@ -81,6 +82,7 @@ impl CKBService { move |data| { // update kad_system when we received data kad_system.update_kbuckets(peer_id.clone()); + network.modify_peer(&peer_id, |peer| peer.last_message_time = Some(now_ms())); let protocol_handler = Arc::clone(&protocol_handler); let network = Arc::clone(&network); let handle_received = future::lazy(move || { diff --git a/network/src/network.rs b/network/src/network.rs index 35d4cb0cbd..1b814a40d5 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -32,7 +32,6 @@ use std::boxed::Box; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::sync::Arc; use std::time::Duration; -use std::time::Instant; use std::usize; use tokio::io::{AsyncRead, AsyncWrite}; @@ -51,7 +50,7 @@ pub struct SessionInfo { pub struct PeerInfo { pub peer_id: PeerId, pub endpoint_role: Endpoint, - pub last_ping_time: Option, + pub last_ping_time: Option, pub connected_addr: Multiaddr, pub identify_info: Option, } @@ -106,6 +105,20 @@ impl Network { peers_registry.connection_status() } + pub(crate) fn modify_peer(&self, peer_id: &PeerId, mut f: F) -> bool + where + F: FnMut(&mut PeerConnection) -> (), + { + let mut peers_registry = self.peers_registry.write(); + match peers_registry.get_mut(peer_id) { + Some(peer) => { + f(peer); + true + } + None => false, + } + } + pub(crate) fn get_peer_identify_info(&self, peer_id: &PeerId) -> Option { let peers_registry = self.peers_registry.read(); peers_registry diff --git a/network/src/network_group.rs b/network/src/network_group.rs index 8405f11171..9b9d1cdf06 100644 --- a/network/src/network_group.rs +++ b/network/src/network_group.rs @@ -1,7 +1,13 @@ use libp2p::core::{AddrComponent, Multiaddr}; use std::net::IpAddr; -pub type Group = Vec; +#[derive(Hash, Eq, PartialEq, Debug)] +pub enum Group { + NoGroup, + LocalNetwork, + IP4([u8; 2]), + IP6([u8; 4]), +} pub trait NetworkGroup { fn network_group(&self) -> Group; @@ -22,28 +28,30 @@ impl NetworkGroup for Multiaddr { fn network_group(&self) -> Group { if let Some(ip_addr) = extract_ip_addr(self) { if ip_addr.is_loopback() { - // Local NetworkGroup - return vec![1]; + return Group::LocalNetwork; } // TODO uncomment after ip feature stable // if !ip_addr.is_global() { // // Global NetworkGroup - // return vec![2] + // return Group::GlobalNetwork // } // IPv4 NetworkGroup if let IpAddr::V4(ipv4) = ip_addr { - return ipv4.octets()[0..2].to_vec(); + let bits = ipv4.octets(); + return Group::IP4([bits[0], bits[1]]); } // IPv6 NetworkGroup if let IpAddr::V6(ipv6) = ip_addr { if let Some(ipv4) = ipv6.to_ipv4() { - return ipv4.octets()[0..2].to_vec(); + let bits = ipv4.octets(); + return Group::IP4([bits[0], bits[1]]); } - return ipv6.octets()[0..4].to_vec(); + let bits = ipv6.octets(); + return Group::IP6([bits[0], bits[1], bits[2], bits[3]]); } } // Can't group addr - vec![0] + Group::NoGroup } } diff --git a/network/src/peers_registry.rs b/network/src/peers_registry.rs index 465db4112c..e10adbd9b2 100644 --- a/network/src/peers_registry.rs +++ b/network/src/peers_registry.rs @@ -1,7 +1,8 @@ use crate::network_group::{Group, NetworkGroup}; -use crate::peer_store::{PeerStore, Score}; +use crate::peer_store::PeerStore; use crate::{Error, ErrorKind, PeerId, PeerIndex, ProtocolId}; use bytes::Bytes; +use ckb_time::now_ms; use ckb_util::RwLock; use fnv::{FnvHashMap, FnvHashSet}; use futures::sync::mpsc::UnboundedSender; @@ -12,7 +13,8 @@ use rand::seq::SliceRandom; use rand::thread_rng; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::time::Instant; + +pub(crate) const EVICTION_PROTECT_PEERS: usize = 8; struct PeerConnections { id_allocator: AtomicUsize, @@ -97,7 +99,10 @@ pub struct PeerConnection { pub(crate) pinger_loader: UniqueConnec, pub identify_info: Option, pub(crate) ckb_protocols: Vec, - pub last_ping_time: Option, + pub last_ping_time: Option, + pub last_message_time: Option, + pub ping: Option, + pub connected_time: Option, } impl PeerConnection { @@ -108,7 +113,10 @@ impl PeerConnection { pinger_loader: UniqueConnec::empty(), identify_info: None, ckb_protocols: Vec::with_capacity(1), + ping: None, last_ping_time: None, + last_message_time: None, + connected_time: None, peer_index: None, } } @@ -172,6 +180,14 @@ fn find_most_peers_in_same_network_group<'a>( .unwrap_or_else(Vec::new) } +fn sort_then_drop_last_n_elements(list: &mut Vec, n: usize, compare: F) +where + F: FnMut(&T, &T) -> std::cmp::Ordering, +{ + list.sort_by(compare); + list.truncate(list.len().saturating_sub(n)); +} + impl PeersRegistry { pub fn new( peer_store: Arc>>, @@ -217,19 +233,16 @@ impl PeersRegistry { )) .into()); } - let candidate_score = { - let peer_store = self.peer_store.read(); - if peer_store.is_banned(&peer_id) { - return Err( - ErrorKind::InvalidNewPeer(format!("peer {:?} is denied", peer_id)).into(), - ); - } - peer_store.peer_score_or_default(&peer_id) - }; + if self.peer_store.read().is_banned(&peer_id) { + return Err( + ErrorKind::InvalidNewPeer(format!("peer {:?} is denied", peer_id)).into(), + ); + } + let connection_status = self.connection_status(); // check peers connection limitation if connection_status.unreserved_inbound >= self.max_inbound - && !self.try_evict_inbound_peer(candidate_score) + && !self.try_evict_inbound_peer() { return Err(ErrorKind::InvalidNewPeer(format!( "reach max inbound peers limitation, reject peer {:?}", @@ -242,42 +255,80 @@ impl PeersRegistry { Ok(()) } - fn try_evict_inbound_peer(&mut self, candidate_score: Score) -> bool { - let peer_id = { - let inbound_peers = self + fn try_evict_inbound_peer(&mut self) -> bool { + let peer_id: PeerId = { + let mut candidate_peers = self .peers .iter() - .filter(|(peer_id, peer)| peer.is_inbound() && !self.is_reserved(peer_id)); - let candidate_peers = find_most_peers_in_same_network_group(inbound_peers); + .filter(|(peer_id, peer)| peer.is_inbound() && !self.is_reserved(peer_id)) + .collect::>(); let peer_store = self.peer_store.read(); - - // must have less score than candidate_score - let mut lowest_score = candidate_score - 1; - let mut low_score_peers = Vec::new(); - for peer_id in candidate_peers { - if let Some(score) = peer_store.peer_score(peer_id) { - if score > lowest_score { - continue; - } - if score < lowest_score { - lowest_score = score; - low_score_peers.clear(); - } - low_score_peers.push(peer_id); - } - } - // failed to evict - if low_score_peers.is_empty() { - return false; - } + // Protect peers based on characteristics that an attacker hard to simulate or manipulate + // Protect peers which has the highest score + sort_then_drop_last_n_elements( + &mut candidate_peers, + EVICTION_PROTECT_PEERS, + |(peer_id1, _), (peer_id2, _)| { + let peer1_score = peer_store.peer_score(peer_id1).unwrap_or_default(); + let peer2_score = peer_store.peer_score(peer_id2).unwrap_or_default(); + peer1_score.cmp(&peer2_score) + }, + ); + + // Protect peers which has the lowest ping + sort_then_drop_last_n_elements( + &mut candidate_peers, + EVICTION_PROTECT_PEERS, + |(_, peer1), (_, peer2)| { + let peer1_ping = peer1.ping.unwrap_or_else(|| std::u64::MAX); + let peer2_ping = peer2.ping.unwrap_or_else(|| std::u64::MAX); + peer2_ping.cmp(&peer1_ping) + }, + ); + + // Protect peers which most recently sent messages + sort_then_drop_last_n_elements( + &mut candidate_peers, + EVICTION_PROTECT_PEERS, + |(_, peer1), (_, peer2)| { + let peer1_last_message_time = peer1.last_message_time.unwrap_or_default(); + let peer2_last_message_time = peer2.last_message_time.unwrap_or_default(); + peer1_last_message_time.cmp(&peer2_last_message_time) + }, + ); + candidate_peers.sort_by(|(_, peer1), (_, peer2)| { + let peer1_last_connected_at = peer1.connected_time.unwrap_or_else(|| std::u64::MAX); + let peer2_last_connected_at = peer2.connected_time.unwrap_or_else(|| std::u64::MAX); + peer2_last_connected_at.cmp(&peer1_last_connected_at) + }); + // Protect half peers which have the longest connection time + let protect_peers = candidate_peers.len() / 2; + sort_then_drop_last_n_elements( + &mut candidate_peers, + protect_peers, + |(_, peer1), (_, peer2)| { + let peer1_last_connected_at = + peer1.connected_time.unwrap_or_else(|| std::u64::MAX); + let peer2_last_connected_at = + peer2.connected_time.unwrap_or_else(|| std::u64::MAX); + peer2_last_connected_at.cmp(&peer1_last_connected_at) + }, + ); + + let mut evict_group = + find_most_peers_in_same_network_group(candidate_peers.into_iter()); let mut rng = thread_rng(); - low_score_peers[..] - .choose(&mut rng) - .unwrap() - .to_owned() - .to_owned() + evict_group.shuffle(&mut rng); + // randomly evict a lowest scored peer + match evict_group + .iter() + .min_by_key(|peer_id| peer_store.peer_score(peer_id).unwrap_or_default()) + { + Some(peer_id) => peer_id.to_owned().to_owned(), + None => return false, + } }; - debug!("evict inbound peer {:?}", peer_id); + debug!(target: "network", "evict inbound peer {:?}", peer_id); self.drop_peer(&peer_id); true } @@ -325,7 +376,8 @@ impl PeersRegistry { self.peer_store .write() .new_connected_peer(&peer_id, connected_addr.clone()); - let peer = PeerConnection::new(connected_addr, endpoint); + let mut peer = PeerConnection::new(connected_addr, endpoint); + peer.connected_time = Some(now_ms()); let peer_index = self.peers.or_insert(peer_id.clone(), peer); debug!(target: "network", "allocate peer_index {} to peer {:?}", peer_index, peer_id); peer_index diff --git a/network/src/ping_service.rs b/network/src/ping_service.rs index 0d08827f37..5d9ab0873a 100644 --- a/network/src/ping_service.rs +++ b/network/src/ping_service.rs @@ -4,6 +4,7 @@ use crate::protocol_service::ProtocolService; use crate::transport::TransportOutput; use crate::Network; use crate::PeerId; +use ckb_time::now_ms; use futures::future::{self, Future}; use futures::stream::FuturesUnordered; use futures::Stream; @@ -140,7 +141,7 @@ impl ProtocolService for PingService { }) } }); - let ping_start_time = Instant::now(); + let ping_start_time = now_ms(); let ping_future = Future::then(Timeout::new(ping_future, ping_timeout), { let network = Arc::clone(&network); @@ -148,13 +149,18 @@ impl ProtocolService for PingService { let mut peer_store = network.peer_store().write(); match result { Ok(peer_id) => { - let received_during = ping_start_time.elapsed(); + let now = now_ms(); + let ping = now - ping_start_time; + network.modify_peer(&peer_id, |peer| { + peer.ping = Some(ping); + peer.last_ping_time = Some(now); + }); peer_store.report(&peer_id, Behaviour::Ping); trace!( target: "network", "received pong from {:?} in {:?}", peer_id, - received_during + ping ); Ok(()) } diff --git a/network/src/tests/peers_registry.rs b/network/src/tests/peers_registry.rs index 613984823d..6f0bfcf022 100644 --- a/network/src/tests/peers_registry.rs +++ b/network/src/tests/peers_registry.rs @@ -1,9 +1,10 @@ use crate::{ memory_peer_store::MemoryPeerStore, peer_store::{Behaviour, PeerStore}, - peers_registry::PeersRegistry, - random_peer_id, PeerId, ToMultiaddr, + peers_registry::{PeersRegistry, EVICTION_PROTECT_PEERS}, + random_peer_id, ToMultiaddr, }; +use ckb_time::now_ms; use ckb_util::RwLock; use std::default::Default; use std::sync::Arc; @@ -83,13 +84,58 @@ fn test_accept_inbound_peer_eviction() { let lowest_score_peer = random_peer_id().unwrap(); let addr1 = "/ip4/127.0.0.1".to_multiaddr().unwrap(); let addr2 = "/ip4/192.168.0.1".to_multiaddr().unwrap(); + // prepare protected peers + let longest_connection_time_peers_count = 5; + let protected_peers_count = 3 * EVICTION_PROTECT_PEERS + longest_connection_time_peers_count; let mut peers_registry = PeersRegistry::new( Arc::clone(&peer_store), - 5, + (protected_peers_count + longest_connection_time_peers_count) as u32, 3, false, vec![reserved_peer.clone()], ); + for _ in 0..protected_peers_count { + assert!(peers_registry + .accept_inbound_peer(random_peer_id().unwrap(), addr2.clone()) + .is_ok()); + } + let mut peers_iter = peers_registry + .peers_iter() + .map(|(peer_id, _)| peer_id.to_owned()) + .collect::>() + .into_iter(); + // higest scored peers + { + let mut peer_store = peer_store.write(); + for _ in 0..EVICTION_PROTECT_PEERS { + let peer_id = peers_iter.next().unwrap(); + peer_store.report(&peer_id, Behaviour::Ping); + peer_store.report(&peer_id, Behaviour::Ping); + } + } + // lowest ping peers + for _ in 0..EVICTION_PROTECT_PEERS { + let peer_id = peers_iter.next().unwrap(); + let mut peer = peers_registry.get_mut(&peer_id).unwrap(); + peer.ping = Some(0); + } + // peers which most recently sent messages + let now = now_ms(); + for _ in 0..EVICTION_PROTECT_PEERS { + let peer_id = peers_iter.next().unwrap(); + let mut peer = peers_registry.get_mut(&peer_id).unwrap(); + peer.last_message_time = Some(now + 10000); + } + // protect 5 peers which have the longest connection time + for _ in 0..longest_connection_time_peers_count { + let peer_id = peers_iter.next().unwrap(); + let mut peer = peers_registry.get_mut(&peer_id).unwrap(); + peer.connected_time = Some(now.saturating_sub(10000)); + } + let mut new_peer_ids = (0..3) + .into_iter() + .map(|_| random_peer_id().unwrap()) + .collect::>(); // setup 3 node and 1 reserved node from addr1 peers_registry .accept_inbound_peer(reserved_peer.clone(), addr1.clone()) @@ -98,17 +144,17 @@ fn test_accept_inbound_peer_eviction() { .accept_inbound_peer(evict_target.clone(), addr1.clone()) .expect("accept"); peers_registry - .accept_inbound_peer(random_peer_id().unwrap(), addr1.clone()) + .accept_inbound_peer(new_peer_ids[0].clone(), addr1.clone()) .expect("accept"); peers_registry - .accept_inbound_peer(random_peer_id().unwrap(), addr1.clone()) + .accept_inbound_peer(new_peer_ids[1].clone(), addr1.clone()) .expect("accept"); // setup 2 node from addr2 peers_registry .accept_inbound_peer(lowest_score_peer.clone(), addr2.clone()) .expect("accept"); peers_registry - .accept_inbound_peer(random_peer_id().unwrap(), addr2.clone()) + .accept_inbound_peer(new_peer_ids[2].clone(), addr2.clone()) .expect("accept"); // setup score { @@ -120,6 +166,17 @@ fn test_accept_inbound_peer_eviction() { peer_store.report(&reserved_peer, Behaviour::FailedToPing); peer_store.report(&evict_target, Behaviour::FailedToPing); } + // make sure other peers should not protected by longest connection time rule + new_peer_ids.extend_from_slice(&[ + reserved_peer.clone(), + evict_target.clone(), + lowest_score_peer.clone(), + ]); + for peer_id in new_peer_ids { + let mut peer = peers_registry.get_mut(&peer_id).unwrap(); + // push the connected_time to make sure peer is unprotect + peer.connected_time = Some(now + 10000); + } // should evict evict target assert!(peers_registry.get(&evict_target).is_some()); peers_registry