Skip to content

Commit

Permalink
Merge pull request #95 from nervosnetwork/jjyr/network-group-and-evic…
Browse files Browse the repository at this point in the history
…tion

feat: implement network group and inbound peer eviction
  • Loading branch information
jjyr authored Dec 20, 2018
2 parents 8c0dc2c + 8eebc0a commit 1e7187b
Show file tree
Hide file tree
Showing 18 changed files with 893 additions and 409 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = ["Nervos Core Dev <[email protected]>"]
edition = "2018"

[dependencies]
rand = "0.5"
rand = "0.6"
fnv = "1.0"
serde = "1.0"
serde_derive = "1.0"
Expand All @@ -18,3 +18,4 @@ tokio = "0.1.8"
futures = { version = "0.1.19", features = ["use_std"] }
snap = "0.2"
libp2p = { git = "https://github.com/libp2p/rust-libp2p", rev="cfdfca1a06fb2deb9ebcc15a63d715ebddb23bd0", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
ckb-time = { path = "../util/time" }
36 changes: 22 additions & 14 deletions network/src/ckb_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ use crate::protocol_service::ProtocolService;
use crate::CKBProtocolHandler;
use crate::Network;
use crate::PeerId;
use ckb_time::now_ms;
use futures::future::{self, Future};
use futures::Stream;
use libp2p::core::{Multiaddr, UniqueConnecState};
use libp2p::core::{Endpoint, Multiaddr, UniqueConnecState};
use libp2p::kad;
use log::{error, info};
use std::boxed::Box;
Expand All @@ -29,24 +30,30 @@ impl CKBService {
peer_id: PeerId,
protocol_output: CKBProtocolOutput<Arc<CKBProtocolHandler>>,
kad_system: Arc<kad::KadSystem>,
addr: Option<Multiaddr>,
addr: Multiaddr,
) -> Box<Future<Item = (), Error = IoError> + Send> {
let protocol_id = protocol_output.protocol_id;
let protocol_handler = protocol_output.protocol_handler;
let protocol_version = protocol_output.protocol_version;
let endpoint = protocol_output.endpoint;
let addresses = addr.map(|addr| vec![addr]);
// get peer protocol_connection
let protocol_connec =
match network.ckb_protocol_connec(&peer_id, protocol_id, endpoint, addresses.clone()) {
Ok(protocol_connec) => protocol_connec,
Err(err) => {
return Box::new(future::err(IoError::new(
IoErrorKind::Other,
format!("handle ckb_protocol connection error: {}", err),
))) as Box<Future<Item = (), Error = IoError> + Send>
let protocol_connec = {
let result = match endpoint {
Endpoint::Dialer => {
network.try_outbound_ckb_protocol_connec(&peer_id, protocol_id, addr)
}
Endpoint::Listener => {
network.try_inbound_ckb_protocol_connec(&peer_id, protocol_id, addr)
}
};
if let Err(err) = result {
return Box::new(future::err(IoError::new(
IoErrorKind::Other,
format!("handle ckb_protocol connection error: {}", err),
))) as Box<Future<Item = (), Error = IoError> + Send>;
}
result.unwrap()
};
if protocol_connec.state() == UniqueConnecState::Full {
error!(
target: "network",
Expand Down Expand Up @@ -75,6 +82,7 @@ impl CKBService {
move |data| {
// update kad_system when we received data
kad_system.update_kbuckets(peer_id.clone());
network.modify_peer(&peer_id, |peer| peer.last_message_time = Some(now_ms()));
let protocol_handler = Arc::clone(&protocol_handler);
let network = Arc::clone(&network);
let handle_received = future::lazy(move || {
Expand Down Expand Up @@ -108,7 +116,7 @@ impl CKBService {
{
let mut peer_store = network.peer_store().write();
peer_store.report(&peer_id, Behaviour::UnexpectedDisconnect);
peer_store.report_status(&peer_id, Status::Disconnected);
peer_store.update_status(&peer_id, Status::Disconnected);
}
protocol_handler.disconnected(
Box::new(DefaultCKBProtocolContext::new(
Expand All @@ -131,7 +139,7 @@ impl CKBService {
{
let mut peer_store = network.peer_store().write();
peer_store.report(&peer_id, Behaviour::Connect);
peer_store.report_status(&peer_id, Status::Connected);
peer_store.update_status(&peer_id, Status::Connected);
}
{
let handle_connected = future::lazy(move || {
Expand All @@ -157,7 +165,7 @@ impl<T: Send> ProtocolService<T> for CKBService {
addr: &Multiaddr,
output: Self::Output,
) -> Protocol<T> {
Protocol::CKBProtocol(output, PeerId::clone(&peer_id), Some(addr.to_owned()))
Protocol::CKBProtocol(output, PeerId::clone(&peer_id), addr.to_owned())
}
fn handle(
&self,
Expand Down
2 changes: 1 addition & 1 deletion network/src/identify_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ where
}
}
// TODO should we try all addresses?
if let Some(addr) = network.get_peer_remote_addresses(&peer_id).get(0) {
if let Some(addr) = network.get_peer_addresses(&peer_id).get(0) {
trace!(
target: "network",
"request identify to peer {:?} {:?}",
Expand Down
23 changes: 14 additions & 9 deletions network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ mod identify_service;
mod memory_peer_store;
mod network;
mod network_config;
mod network_group;
mod network_service;
mod outgoing_service;
mod outbound_peer_service;
mod peer_store;
mod peers_registry;
mod ping_service;
mod protocol;
mod protocol_service;
#[cfg(test)]
mod tests;
mod timer_service;
mod transport;

Expand All @@ -25,7 +28,9 @@ pub use crate::errors::{Error, ErrorKind};
pub use crate::network::{Network, PeerInfo, SessionInfo};
pub use crate::network_config::NetworkConfig;
pub use crate::network_service::NetworkService;
pub use libp2p::{core::Endpoint, multiaddr::AddrComponent, Multiaddr, PeerId};
pub use libp2p::{
core::Endpoint, multiaddr::AddrComponent, multiaddr::ToMultiaddr, Multiaddr, PeerId,
};

pub type TimerToken = usize;
pub type ProtocolId = [u8; 3];
Expand Down Expand Up @@ -55,27 +60,27 @@ pub struct Config {
pub non_reserved_mode: Option<String>,
/// Minimum number of connected peers to maintain
pub max_peers: u32,
pub outgoing_peers_ratio: Option<u32>,
pub outbound_peers_ratio: Option<u32>,
pub config_dir_path: Option<String>,
}

impl Config {
fn max_outgoing_peers(&self) -> u32 {
fn max_outbound_peers(&self) -> u32 {
self.max_peers
/ self
.outgoing_peers_ratio
.outbound_peers_ratio
.unwrap_or_else(|| DEFAULT_OUTGOING_PEERS_RATIO)
}
fn max_incoming_peers(&self) -> u32 {
self.max_peers - self.max_outgoing_peers()
fn max_inbound_peers(&self) -> u32 {
self.max_peers - self.max_outbound_peers()
}
}

impl From<Config> for NetworkConfig {
fn from(config: Config) -> Self {
let mut cfg = NetworkConfig::default();
cfg.max_outgoing_peers = config.max_outgoing_peers();
cfg.max_incoming_peers = config.max_incoming_peers();
cfg.max_outbound_peers = config.max_outbound_peers();
cfg.max_inbound_peers = config.max_inbound_peers();
cfg.listen_addresses = config.listen_addresses;
cfg.bootnodes = config.boot_nodes;
cfg.reserved_peers = config.reserved_nodes;
Expand Down
Loading

0 comments on commit 1e7187b

Please sign in to comment.