Skip to content

Commit

Permalink
Store peers in the database (#2439)
Browse files Browse the repository at this point in the history
* Add function to network_service to serialize peers

* Put the peers in the database

* CHANGELOG

* Rustdoc

* Fix comment
  • Loading branch information
tomaka authored Jun 30, 2022
1 parent a3125a6 commit 3a257ca
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 42 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/light-base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ itertools = "0.10.3"
log = { version = "0.4.17", default-features = false }
lru = { version = "0.7.7", default-features = false } # TODO: there's no way to use a custom hasher; remove this dependency
rand = "0.8.5"
serde = { version = "1.0.137", default-features = false, features = ["alloc", "derive"] }
serde_json = "1.0.81"
slab = { version = "0.4.6", default-features = false }
smoldot = { version = "0.2.0", path = "../..", default-features = false }
164 changes: 129 additions & 35 deletions bin/light-base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use smoldot::{
libp2p::{connection, multiaddr, peer_id},
};
use std::{
cmp,
collections::{hash_map::Entry, HashMap},
num::NonZeroU32,
ops,
Expand Down Expand Up @@ -398,7 +399,7 @@ impl<TChain, TPlat: Platform> Client<TChain, TPlat> {
// known as a checkpoint) is present in the chain spec, it is possible to start syncing at
// the finalized block it describes.
// TODO: clean up that block
let (chain_information, genesis_block_header) = {
let (chain_information, genesis_block_header, checkpoint_nodes) = {
match (
chain_spec
.as_chain_information()
Expand All @@ -408,10 +409,10 @@ impl<TChain, TPlat: Platform> Client<TChain, TPlat> {
s.as_chain_information(),
)
}),
finalized_serialize::decode_chain(config.database_content),
decode_database(config.database_content),
) {
// Use the database if it contains a more recent block than the chain spec checkpoint.
(Ok(Ok(genesis_ci)), checkpoint, Ok((database, _)))
(Ok(Ok(genesis_ci)), checkpoint, Ok((database, checkpoint_nodes)))
if checkpoint
.as_ref()
.map(|r| r.as_ref().ok())
Expand All @@ -422,14 +423,14 @@ impl<TChain, TPlat: Platform> Client<TChain, TPlat> {
}) =>
{
let genesis_header = genesis_ci.as_ref().finalized_block_header.clone();
(database, genesis_header.into())
(database, genesis_header.into(), checkpoint_nodes)
}

// Use the database if it contains a more recent block than the chain spec checkpoint.
(
Err(chain_spec::FromGenesisStorageError::UnknownStorageItems),
checkpoint,
Ok((database, _)),
Ok((database, checkpoint_nodes)),
) if checkpoint
.as_ref()
.map(|r| r.as_ref().ok())
Expand All @@ -447,7 +448,7 @@ impl<TChain, TPlat: Platform> Client<TChain, TPlat> {
digest: header::DigestRef::empty().into(),
};

(database, genesis_header)
(database, genesis_header, checkpoint_nodes)
}

(Err(chain_spec::FromGenesisStorageError::UnknownStorageItems), None, _) => {
Expand All @@ -473,7 +474,7 @@ impl<TChain, TPlat: Platform> Client<TChain, TPlat> {
digest: header::DigestRef::empty().into(),
};

(checkpoint, genesis_header)
(checkpoint, genesis_header, Default::default())
}

(Err(err), _, _) => {
Expand All @@ -499,13 +500,13 @@ impl<TChain, TPlat: Platform> Client<TChain, TPlat> {

(Ok(Ok(genesis_ci)), Some(Ok(checkpoint)), _) => {
let genesis_header = genesis_ci.as_ref().finalized_block_header.clone();
(checkpoint, genesis_header.into())
(checkpoint, genesis_header.into(), Default::default())
}

(Ok(Ok(genesis_ci)), None, _) => {
let genesis_header =
header::Header::from(genesis_ci.as_ref().finalized_block_header.clone());
(genesis_ci, genesis_header)
(genesis_ci, genesis_header, Default::default())
}
}
};
Expand Down Expand Up @@ -818,11 +819,12 @@ impl<TChain, TPlat: Platform> Client<TChain, TPlat> {
let new_chain_id = ChainId(public_api_chains_entry.key());

// Multiple chains can share the same network service, but each specify different
// bootstrap nodes. In order to resolve this, each chain adds their own bootnodes to
// the network service after it has been initialized. This is done by adding a short-lived
// task that waits for the chain initialization to finish then adds the nodes.
// bootstrap nodes and database nodes. In order to resolve this, each chain adds their own
// bootnodes and database nodes to the network service after it has been initialized. This
// is done by adding a short-lived task that waits for the chain initialization to finish
// then adds the nodes.
self.new_task_tx
.unbounded_send(("network-service-add-bootnodes".to_owned(), {
.unbounded_send(("network-service-add-initial-topology".to_owned(), {
// Clone `running_chain_init`.
let mut running_chain_init = match services_init {
future::MaybeDone::Done(d) => future::MaybeDone::Done(d.clone()),
Expand All @@ -834,6 +836,10 @@ impl<TChain, TPlat: Platform> Client<TChain, TPlat> {
// Wait for the chain to finish initializing to proceed.
(&mut running_chain_init).await;
let running_chain = Pin::new(&mut running_chain_init).take_output().unwrap();
running_chain
.network_service
.discover(&TPlat::now(), 0, checkpoint_nodes, false)
.await;
running_chain
.network_service
.discover(&TPlat::now(), 0, bootstrap_nodes, true)
Expand Down Expand Up @@ -1054,28 +1060,7 @@ impl<TChain, TPlat: Platform> Client<TChain, TPlat> {
// Wait for the chain to finish initializing before we can obtain the database.
(&mut services).await;
let services = Pin::new(&mut services).take_output().unwrap();

// Finally getting the database.
// If the database can't be obtained, we just return a dummy value that will intentionally
// fail to decode if passed back.
let database_content = services
.sync_service
.serialize_chain_information()
.await
.map(|ci| finalized_serialize::encode_chain(&ci))
.unwrap_or_else(|| "<unknown>".into());

// Cap the database length to the requested max length.
if database_content.len() > max_size {
let dummy_message = "<too-large>";
if dummy_message.len() >= max_size {
String::new()
} else {
dummy_message.to_owned()
}
} else {
database_content
}
encode_database(&services, max_size).await
}
}
}
Expand Down Expand Up @@ -1236,3 +1221,112 @@ async fn start_services<TPlat: Platform>(
transactions_service,
}
}

async fn encode_database<TPlat: Platform>(
services: &ChainServices<TPlat>,
max_size: usize,
) -> String {
// Craft the structure containing all the data that we would like to include.
let mut database_draft = SerdeDatabase {
chain: match services.sync_service.serialize_chain_information().await {
Some(ci) => {
let encoded = finalized_serialize::encode_chain(&ci);
serde_json::from_str(&encoded).unwrap()
}
None => {
// If the chain information can't be obtained, we just return a dummy value that
// will intentionally fail to decode if passed back.
let dummy_message = "<unknown>";
return if dummy_message.len() > max_size {
String::new()
} else {
dummy_message.to_owned()
};
}
},
nodes: services
.network_service
.discovered_nodes(0)
.await
.map(|(peer_id, addrs)| {
(
peer_id.to_base58(),
addrs.map(|a| a.to_string()).collect::<Vec<_>>(),
)
})
.collect(),
};

// Cap the database length to the maximum size.
loop {
let serialized = serde_json::to_string(&database_draft).unwrap();
if serialized.len() <= max_size {
// Success!
return serialized;
}

if database_draft.nodes.is_empty() {
// Can't shrink the database anymore. Return the string `"<too-large>"` which will
// fail to decode but will indicate what is wrong.
let dummy_message = "<too-large>";
return if dummy_message.len() >= max_size {
String::new()
} else {
dummy_message.to_owned()
};
}

// Try to reduce the size of the database.

// Remove half of the nodes.
// Which nodes are removed doesn't really matter.
let mut nodes_to_remove = cmp::max(1, database_draft.nodes.len() / 2);
database_draft.nodes.retain(|_, _| {
if nodes_to_remove >= 1 {
nodes_to_remove -= 1;
false
} else {
true
}
});
}
}

fn decode_database(
encoded: &str,
) -> Result<
(
chain::chain_information::ValidChainInformation,
Vec<(PeerId, Vec<multiaddr::Multiaddr>)>,
),
(),
> {
let decoded: SerdeDatabase = serde_json::from_str(encoded).map_err(|_| ())?;

let (chain, _) =
finalized_serialize::decode_chain(&serde_json::to_string(&decoded.chain).unwrap())
.map_err(|_| ())?;

// Nodes that fail to decode are simply ignored. This is especially important for
// multiaddresses, as the definition of a valid or invalid multiaddress might change across
// versions.
let nodes = decoded
.nodes
.iter()
.filter_map(|(peer_id, addrs)| {
let addrs = addrs
.iter()
.filter_map(|a| Some(a.parse::<multiaddr::Multiaddr>().ok()?))
.collect();
Some((peer_id.parse::<PeerId>().ok()?, addrs))
})
.collect::<Vec<_>>();

Ok((chain, nodes))
}

#[derive(serde::Serialize, serde::Deserialize)]
struct SerdeDatabase {
chain: Box<serde_json::value::RawValue>,
nodes: hashbrown::HashMap<String, Vec<String>, fnv::FnvBuildHasher>,
}
24 changes: 24 additions & 0 deletions bin/light-base/src/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,30 @@ impl<TPlat: Platform> NetworkService<TPlat> {
self.shared.wake_up_main_background_task.notify(1);
}

/// Returns a list of nodes (their [`PeerId`] and multiaddresses) that we know are part of
/// the network.
///
/// Nodes that are discovered might disappear over time. In other words, there is no guarantee
/// that a node that has been added through [`NetworkService::discover`] will later be
/// returned by [`NetworkService::discovered_nodes`].
pub async fn discovered_nodes(
&self,
chain_index: usize,
) -> impl Iterator<Item = (PeerId, impl Iterator<Item = Multiaddr>)> {
let guarded = self.shared.guarded.lock().await;
guarded
.network
.discovered_nodes(chain_index)
.map(|(peer_id, addresses)| {
(
peer_id.clone(),
addresses.map(|a| a.clone()).collect::<Vec<_>>().into_iter(),
)
})
.collect::<Vec<_>>()
.into_iter()
}

/// Returns an iterator to the list of [`PeerId`]s that we have an established connection
/// with.
pub async fn peers_list(&self) -> impl Iterator<Item = PeerId> {
Expand Down
4 changes: 4 additions & 0 deletions bin/wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@

- The `chain_subscribeAllHeads`, `chain_subscribeNewHeads`, and `chain_subscribeFinalizedHeads` JSON-RPC functions no longer panic if connected to a chain whose headers are in a format that can't be decoded. Instead, no notification is sent and a warning is printed. ([#2442](https://github.com/paritytech/smoldot/pull/2442))

### Changed

- The format of the database returned by `Client.databaseContent` has been changed to include the list of nodes that are known to be present on the peer-to-peer network. When the database is restored, these nodes are immediately discovered. This change aims at reducing the importance of bootnodes. This change is a breaking change, meaning that providing a database that has been obtained from a previous version of smoldot will have no effect. ([#2439](https://github.com/paritytech/smoldot/pull/2439))

## 0.6.20 - 2022-06-23

### Changed
Expand Down
2 changes: 1 addition & 1 deletion src/database/finalized_serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use hashbrown::HashMap;

mod defs;

/// Serializes the given chain information as a string.
/// Serializes the given chain information as a JSON string.
///
/// This is a shortcut for [`encode_chain_storage`] with no `finalized_storage`.
pub fn encode_chain<'a>(
Expand Down
14 changes: 10 additions & 4 deletions src/network/kademlia/kbuckets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ where
pub fn closest_entries(&self, target: &K) -> impl Iterator<Item = (&K, &V)> {
// TODO: this is extremely unoptimized
let target_hashed = Key::new(target.as_ref());
let mut list = self.iter().collect::<Vec<_>>();
let mut list = self.iter_ordered().collect::<Vec<_>>();
list.sort_by_key(|(key, _)| {
let key_hashed = Key::new(key.as_ref());
distance_log2(&key_hashed, &target_hashed).map_or(0, |d| u16::from(d) + 1)
Expand All @@ -171,14 +171,20 @@ where

impl<K, V, TNow, const ENTRIES_PER_BUCKET: usize> KBuckets<K, V, TNow, ENTRIES_PER_BUCKET> {
/// Iterates over all the peers in the k-buckets.
pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
///
/// The buckets are iterated one by one from closest to furthest away, and within each bucket
/// elements are ordered by descending time since connectivity.
pub fn iter_ordered(&self) -> impl Iterator<Item = (&K, &V)> {
self.buckets
.iter()
.flat_map(|b| b.entries.iter().map(|(k, v)| (k, v)))
}

/// Iterates over all the peers in the k-buckets.
pub fn iter_mut(&mut self) -> impl Iterator<Item = (&K, &mut V)> {
///
/// The buckets are iterated one by one from closest to furthest away, and within each bucket
/// elements are ordered by descending time since connectivity.
pub fn iter_mut_ordered(&mut self) -> impl Iterator<Item = (&K, &mut V)> {
self.buckets
.iter_mut()
.flat_map(|b| b.entries.iter_mut().map(|(k, v)| (&*k, v)))
Expand All @@ -192,7 +198,7 @@ where
V: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_list().entries(self.iter()).finish()
f.debug_list().entries(self.iter_ordered()).finish()
}
}

Expand Down
Loading

0 comments on commit 3a257ca

Please sign in to comment.