Skip to content

Commit

Permalink
Kademlia: Somewhat complete the records implementation. (#1189)
Browse files Browse the repository at this point in the history
* Somewhat complete the implementation of Kademlia records.

This commit relates to [libp2p-146] and [libp2p-1089].

  * All records expire (by default, configurable).
  * Provider records are also stored in the RecordStore, and the RecordStore
    API extended.
  * Background jobs for periodic (re-)replication and (re-)publication
    of records. Regular (value-)records are subject to re-replication and
    re-publication as per standard Kademlia. Provider records are only
    subject to re-publication.
  * For standard Kademlia value lookups (quorum = 1), the record is cached
    at the closest peer to the key that did not return the value, as per
    standard Kademlia.
  * Expiration times of regular (value-)records is computed exponentially
    inversely proportional to the number of nodes between the local node
    and the closest node known to the key (beyond the k closest), as per
    standard Kademlia.

The protobuf messages are extended with two fields: `ttl` and `publisher`
in order to implement the different semantics of re-replication (by any
of the k closest peers to the key, not affecting expiry) and re-publication
(by the original publisher, resetting the expiry). This is not done yet in
other libp2p Kademlia implementations, see e.g. [libp2p-go-323]. The new protobuf fields
have been given somewhat unique identifiers to prevent future collision.

Similarly, periodic re-publication of provider records does not seem to
be done yet in other implementations, see e.g. [libp2p-js-98].

[libp2p-146]: #146
[libp2p-1089]: #1089
[libp2p-go-323]: libp2p/go-libp2p-kad-dht#323
[libp2p-js-98]: libp2p/js-libp2p-kad-dht#98

* Tweak kad-ipfs example.

* Add missing files.

* Ensure new delays are polled immediately.

To ensure task notification, since `NotReady` is returned right after.

* Fix ipfs-kad example and use wasm_timer.

* Small cleanup.

* Incorporate some feedback.

* Adjustments after rebase.

* Distinguish events further.

In order for a user to easily distinguish the result of e.g.
a `put_record` operation from the result of a later republication,
different event constructors are used. Furthermore, for now,
re-replication and "caching" of records (at the closest peer to
the key that did not return a value during a successful lookup)
do not yield events for now as they are less interesting.

* Speed up tests for CI.

* Small refinements and more documentation.

  * Guard a node against overriding records for which it considers
    itself to be the publisher.

  * Document the jobs module more extensively.

* More inline docs around removal of "unreachable" addresses.

* Remove wildcard re-exports.

* Use NonZeroUsize for the constants.

* Re-add method lost on merge.

* Add missing 'pub'.

* Further increase the timeout in the ipfs-kad example.

* Readd log dependency to libp2p-kad.

* Simplify RecordStore API slightly.

* Some more commentary.

* Change Addresses::remove to return Result<(),()>.

Change the semantics of `Addresses::remove` so that the error case
is unambiguous, instead of the success case. Use the `Result` for
clearer semantics to that effect.

* Add some documentation to .
  • Loading branch information
romanb authored Jul 17, 2019
1 parent 01bce16 commit cde93f5
Show file tree
Hide file tree
Showing 21 changed files with 2,700 additions and 932 deletions.
6 changes: 4 additions & 2 deletions examples/ipfs-kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use libp2p::{
build_development_transport
};
use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent, GetClosestPeersError};
use libp2p::kad::record::store::MemoryStore;
use std::env;
use std::time::Duration;

Expand All @@ -51,9 +52,10 @@ fn main() {
// to insert our local node in the DHT. However here we use `without_init` because this
// example is very ephemeral and we don't want to pollute the DHT. In a real world
// application, you want to use `new` instead.
let mut cfg = KademliaConfig::new(local_peer_id.clone());
let mut cfg = KademliaConfig::default();
cfg.set_query_timeout(Duration::from_secs(5 * 60));
let mut behaviour: Kademlia<_> = Kademlia::new(cfg);
let store = MemoryStore::new(local_peer_id.clone());
let mut behaviour = Kademlia::with_config(local_peer_id.clone(), store, cfg);

// TODO: the /dnsaddr/ scheme is not supported (https://github.com/libp2p/rust-libp2p/issues/967)
/*behaviour.add_address(&"QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());
Expand Down
2 changes: 1 addition & 1 deletion misc/core-derive/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ fn three_fields() {
struct Foo<TSubstream> {
ping: libp2p::ping::Ping<TSubstream>,
identify: libp2p::identify::Identify<TSubstream>,
kad: libp2p::kad::Kademlia<TSubstream>,
kad: libp2p::kad::Kademlia<TSubstream, libp2p::kad::record::store::MemoryStore>,
#[behaviour(ignore)]
foo: String,
}
Expand Down
1 change: 1 addition & 0 deletions protocols/kad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ bytes = "0.4"
either = "1.5"
fnv = "1.0"
futures = "0.1"
log = "0.4"
libp2p-core = { version = "0.10.0", path = "../../core" }
libp2p-swarm = { version = "0.1.0", path = "../../swarm" }
multiaddr = { package = "parity-multiaddr", version = "0.5.0", path = "../../misc/multiaddr" }
Expand Down
8 changes: 8 additions & 0 deletions protocols/kad/dht.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ message Record {

// Time the record was received, set by receiver
string timeReceived = 5;

// The original publisher of the record.
// Currently specific to rust-libp2p.
bytes publisher = 666;

// The remaining TTL of the record, in seconds.
// Currently specific to rust-libp2p.
uint32 ttl = 777;
};

message Message {
Expand Down
70 changes: 40 additions & 30 deletions protocols/kad/src/addresses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,67 +22,77 @@ use libp2p_core::Multiaddr;
use smallvec::SmallVec;
use std::fmt;

/// List of addresses of a peer.
/// A non-empty list of (unique) addresses of a peer in the routing table.
#[derive(Clone)]
pub struct Addresses {
addrs: SmallVec<[Multiaddr; 6]>,
}

impl Addresses {
/// Creates a new list of addresses.
pub fn new() -> Addresses {
Addresses {
addrs: SmallVec::new(),
}
pub fn new(addr: Multiaddr) -> Addresses {
let mut addrs = SmallVec::new();
addrs.push(addr);
Addresses { addrs }
}

/// Gets a reference to the first address in the list.
pub fn first(&self) -> &Multiaddr {
&self.addrs[0]
}

/// Returns an iterator over the list of addresses.
/// Returns an iterator over the addresses.
pub fn iter(&self) -> impl Iterator<Item = &Multiaddr> {
self.addrs.iter()
}

/// Returns the number of addresses in the list.
pub fn len(&self) -> usize {
self.addrs.len()
}

/// Converts the addresses into a `Vec`.
pub fn into_vec(self) -> Vec<Multiaddr> {
self.addrs.into_vec()
}

/// Returns true if the list of addresses is empty.
pub fn is_empty(&self) -> bool {
self.addrs.is_empty()
}
/// Removes the given address from the list.
///
/// Returns `Ok(())` if the address is either not in the list or was found and
/// removed. Returns `Err(())` if the address is the last remaining address,
/// which cannot be removed.
///
/// An address should only be removed if is determined to be invalid or
/// otherwise unreachable.
pub fn remove(&mut self, addr: &Multiaddr) -> Result<(),()> {
if self.addrs.len() == 1 {
return Err(())
}

/// Removes the given address from the list. Typically called if an address is determined to
/// be invalid or unreachable.
pub fn remove(&mut self, addr: &Multiaddr) {
if let Some(pos) = self.addrs.iter().position(|a| a == addr) {
self.addrs.remove(pos);
if self.addrs.len() <= self.addrs.inline_size() {
self.addrs.shrink_to_fit();
}
}

if self.addrs.len() <= self.addrs.inline_size() {
self.addrs.shrink_to_fit();
}
}

/// Clears the list. It is empty afterwards.
pub fn clear(&mut self) {
self.addrs.clear();
self.addrs.shrink_to_fit();
Ok(())
}

/// Inserts an address in the list. No effect if the address was already in the list.
pub fn insert(&mut self, addr: Multiaddr) {
/// Adds a new address to the end of the list.
///
/// Returns true if the address was added, false otherwise (i.e. if the
/// address is already in the list).
pub fn insert(&mut self, addr: Multiaddr) -> bool {
if self.addrs.iter().all(|a| *a != addr) {
self.addrs.push(addr);
true
} else {
false
}
}
}

impl Default for Addresses {
fn default() -> Self {
Addresses::new()
}
}

impl fmt::Debug for Addresses {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_list()
Expand Down
Loading

0 comments on commit cde93f5

Please sign in to comment.