Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store peers in the database #2439

Merged
merged 6 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/light-base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ itertools = "0.10.3"
log = { version = "0.4.17", default-features = false }
lru = { version = "0.7.7", default-features = false } # TODO: there's no way to use a custom hasher; remove this dependency
rand = "0.8.5"
serde = { version = "1.0.137", default-features = false, features = ["alloc", "derive"] }
melekes marked this conversation as resolved.
Show resolved Hide resolved
serde_json = "1.0.81"
slab = { version = "0.4.6", default-features = false }
smoldot = { version = "0.2.0", path = "../..", default-features = false }
164 changes: 129 additions & 35 deletions bin/light-base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use smoldot::{
libp2p::{connection, multiaddr, peer_id},
};
use std::{
cmp,
collections::{hash_map::Entry, HashMap},
num::NonZeroU32,
ops,
Expand Down Expand Up @@ -398,7 +399,7 @@ impl<TChain, TPlat: Platform> Client<TChain, TPlat> {
// known as a checkpoint) is present in the chain spec, it is possible to start syncing at
// the finalized block it describes.
// TODO: clean up that block
let (chain_information, genesis_block_header) = {
let (chain_information, genesis_block_header, checkpoint_nodes) = {
match (
chain_spec
.as_chain_information()
Expand All @@ -408,10 +409,10 @@ impl<TChain, TPlat: Platform> Client<TChain, TPlat> {
s.as_chain_information(),
)
}),
finalized_serialize::decode_chain(config.database_content),
decode_database(config.database_content),
) {
// Use the database if it contains a more recent block than the chain spec checkpoint.
(Ok(Ok(genesis_ci)), checkpoint, Ok((database, _)))
(Ok(Ok(genesis_ci)), checkpoint, Ok((database, checkpoint_nodes)))
if checkpoint
.as_ref()
.map(|r| r.as_ref().ok())
Expand All @@ -422,14 +423,14 @@ impl<TChain, TPlat: Platform> Client<TChain, TPlat> {
}) =>
{
let genesis_header = genesis_ci.as_ref().finalized_block_header.clone();
(database, genesis_header.into())
(database, genesis_header.into(), checkpoint_nodes)
}

// Use the database if it contains a more recent block than the chain spec checkpoint.
(
Err(chain_spec::FromGenesisStorageError::UnknownStorageItems),
checkpoint,
Ok((database, _)),
Ok((database, checkpoint_nodes)),
) if checkpoint
.as_ref()
.map(|r| r.as_ref().ok())
Expand All @@ -447,7 +448,7 @@ impl<TChain, TPlat: Platform> Client<TChain, TPlat> {
digest: header::DigestRef::empty().into(),
};

(database, genesis_header)
(database, genesis_header, checkpoint_nodes)
}

(Err(chain_spec::FromGenesisStorageError::UnknownStorageItems), None, _) => {
Expand All @@ -473,7 +474,7 @@ impl<TChain, TPlat: Platform> Client<TChain, TPlat> {
digest: header::DigestRef::empty().into(),
};

(checkpoint, genesis_header)
(checkpoint, genesis_header, Default::default())
}

(Err(err), _, _) => {
Expand All @@ -499,13 +500,13 @@ impl<TChain, TPlat: Platform> Client<TChain, TPlat> {

(Ok(Ok(genesis_ci)), Some(Ok(checkpoint)), _) => {
let genesis_header = genesis_ci.as_ref().finalized_block_header.clone();
(checkpoint, genesis_header.into())
(checkpoint, genesis_header.into(), Default::default())
}

(Ok(Ok(genesis_ci)), None, _) => {
let genesis_header =
header::Header::from(genesis_ci.as_ref().finalized_block_header.clone());
(genesis_ci, genesis_header)
(genesis_ci, genesis_header, Default::default())
}
}
};
Expand Down Expand Up @@ -818,11 +819,12 @@ impl<TChain, TPlat: Platform> Client<TChain, TPlat> {
let new_chain_id = ChainId(public_api_chains_entry.key());

// Multiple chains can share the same network service, but each specify different
// bootstrap nodes. In order to resolve this, each chain adds their own bootnodes to
// the network service after it has been initialized. This is done by adding a short-lived
// task that waits for the chain initialization to finish then adds the nodes.
// bootstrap nodes and database nodes. In order to resolve this, each chain adds their own
// bootnodes and database nodes to the network service after it has been initialized. This
// is done by adding a short-lived task that waits for the chain initialization to finish
// then adds the nodes.
self.new_task_tx
.unbounded_send(("network-service-add-bootnodes".to_owned(), {
.unbounded_send(("network-service-add-initial-topology".to_owned(), {
// Clone `running_chain_init`.
let mut running_chain_init = match services_init {
future::MaybeDone::Done(d) => future::MaybeDone::Done(d.clone()),
Expand All @@ -834,6 +836,10 @@ impl<TChain, TPlat: Platform> Client<TChain, TPlat> {
// Wait for the chain to finish initializing to proceed.
(&mut running_chain_init).await;
let running_chain = Pin::new(&mut running_chain_init).take_output().unwrap();
running_chain
.network_service
.discover(&TPlat::now(), 0, checkpoint_nodes, false)
.await;
running_chain
.network_service
.discover(&TPlat::now(), 0, bootstrap_nodes, true)
Expand Down Expand Up @@ -1054,28 +1060,7 @@ impl<TChain, TPlat: Platform> Client<TChain, TPlat> {
// Wait for the chain to finish initializing before we can obtain the database.
(&mut services).await;
let services = Pin::new(&mut services).take_output().unwrap();

// Finally getting the database.
// If the database can't be obtained, we just return a dummy value that will intentionally
// fail to decode if passed back.
let database_content = services
.sync_service
.serialize_chain_information()
.await
.map(|ci| finalized_serialize::encode_chain(&ci))
.unwrap_or_else(|| "<unknown>".into());

// Cap the database length to the requested max length.
if database_content.len() > max_size {
let dummy_message = "<too-large>";
if dummy_message.len() >= max_size {
String::new()
} else {
dummy_message.to_owned()
}
} else {
database_content
}
encode_database(&services, max_size).await
}
}
}
Expand Down Expand Up @@ -1236,3 +1221,112 @@ async fn start_services<TPlat: Platform>(
transactions_service,
}
}

async fn encode_database<TPlat: Platform>(
services: &ChainServices<TPlat>,
max_size: usize,
) -> String {
// Craft the structure containing all the data that we would like to include.
let mut database_draft = SerdeDatabase {
chain: match services.sync_service.serialize_chain_information().await {
Some(ci) => {
let encoded = finalized_serialize::encode_chain(&ci);
serde_json::from_str(&encoded).unwrap()
}
None => {
// If the chain information can't be obtained, we just return a dummy value that
// will intentionally fail to decode if passed back.
let dummy_message = "<unknown>";
return if dummy_message.len() > max_size {
String::new()
} else {
dummy_message.to_owned()
};
}
},
nodes: services
.network_service
.discovered_nodes(0)
.await
.map(|(peer_id, addrs)| {
(
peer_id.to_base58(),
addrs.map(|a| a.to_string()).collect::<Vec<_>>(),
)
})
.collect(),
};

// Cap the database length to the maximum size.
loop {
let serialized = serde_json::to_string(&database_draft).unwrap();
if serialized.len() <= max_size {
// Success!
return serialized;
}

if database_draft.nodes.is_empty() {
// Can't shrink the database anymore. Return the string `"<too-large>"` which will
// fail to decode but will indicate what is wrong.
let dummy_message = "<too-large>";
return if dummy_message.len() >= max_size {
String::new()
} else {
dummy_message.to_owned()
};
}

// Try to reduce the size of the database.

// Remove half of the nodes.
// Which nodes are removed doesn't really matter.
let mut nodes_to_remove = cmp::max(1, database_draft.nodes.len() / 2);
database_draft.nodes.retain(|_, _| {
if nodes_to_remove >= 1 {
nodes_to_remove -= 1;
false
} else {
true
}
});
}
}

fn decode_database(
encoded: &str,
) -> Result<
(
chain::chain_information::ValidChainInformation,
Vec<(PeerId, Vec<multiaddr::Multiaddr>)>,
),
(),
> {
let decoded: SerdeDatabase = serde_json::from_str(encoded).map_err(|_| ())?;

let (chain, _) =
finalized_serialize::decode_chain(&serde_json::to_string(&decoded.chain).unwrap())
.map_err(|_| ())?;

// Nodes that fail to decode are simply ignored. This is especially important for
// multiaddresses, as the definition of a valid or invalid multiaddress might change across
// versions.
let nodes = decoded
.nodes
.iter()
.filter_map(|(peer_id, addrs)| {
let addrs = addrs
.iter()
.filter_map(|a| Some(a.parse::<multiaddr::Multiaddr>().ok()?))
.collect();
Some((peer_id.parse::<PeerId>().ok()?, addrs))
})
.collect::<Vec<_>>();

Ok((chain, nodes))
}

#[derive(serde::Serialize, serde::Deserialize)]
struct SerdeDatabase {
chain: Box<serde_json::value::RawValue>,
nodes: hashbrown::HashMap<String, Vec<String>, fnv::FnvBuildHasher>,
}
24 changes: 24 additions & 0 deletions bin/light-base/src/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,30 @@ impl<TPlat: Platform> NetworkService<TPlat> {
self.shared.wake_up_main_background_task.notify(1);
}

/// Returns a list of nodes (their [`PeerId`] and multiaddresses) that we know are part of
/// the network.
///
/// Nodes that are discovered might disappear over time. In other words, there is no guarantee
/// that a node that has been added through [`NetworkService::discover`] will later be
/// returned by [`NetworkService::discovered_nodes`].
pub async fn discovered_nodes(
&self,
chain_index: usize,
) -> impl Iterator<Item = (PeerId, impl Iterator<Item = Multiaddr>)> {
let guarded = self.shared.guarded.lock().await;
guarded
.network
.discovered_nodes(chain_index)
.map(|(peer_id, addresses)| {
(
peer_id.clone(),
addresses.map(|a| a.clone()).collect::<Vec<_>>().into_iter(),
)
})
.collect::<Vec<_>>()
.into_iter()
}

/// Returns an iterator to the list of [`PeerId`]s that we have an established connection
/// with.
pub async fn peers_list(&self) -> impl Iterator<Item = PeerId> {
Expand Down
4 changes: 4 additions & 0 deletions bin/wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@

- The `chain_subscribeAllHeads`, `chain_subscribeNewHeads`, and `chain_subscribeFinalizedHeads` JSON-RPC functions no longer panic if connected to a chain whose headers are in a format that can't be decoded. Instead, no notification is sent and a warning is printed. ([#2442](https://github.com/paritytech/smoldot/pull/2442))

### Changed

- The format of the database returned by `Client.databaseContent` has been changed to include the list of nodes that are known to be present on the peer-to-peer network. When the database is restored, these nodes are immediately discovered. This change aims at reducing the importance of bootnodes. This change is a breaking change, meaning that providing a database that has been obtained from a previous version of smoldot will have no effect. ([#2439](https://github.com/paritytech/smoldot/pull/2439))

## 0.6.20 - 2022-06-23

### Changed
Expand Down
2 changes: 1 addition & 1 deletion src/database/finalized_serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use hashbrown::HashMap;

mod defs;

/// Serializes the given chain information as a string.
/// Serializes the given chain information as a JSON string.
///
/// This is a shortcut for [`encode_chain_storage`] with no `finalized_storage`.
pub fn encode_chain<'a>(
Expand Down
14 changes: 10 additions & 4 deletions src/network/kademlia/kbuckets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ where
pub fn closest_entries(&self, target: &K) -> impl Iterator<Item = (&K, &V)> {
// TODO: this is extremely unoptimized
let target_hashed = Key::new(target.as_ref());
let mut list = self.iter().collect::<Vec<_>>();
let mut list = self.iter_ordered().collect::<Vec<_>>();
list.sort_by_key(|(key, _)| {
let key_hashed = Key::new(key.as_ref());
distance_log2(&key_hashed, &target_hashed).map_or(0, |d| u16::from(d) + 1)
Expand All @@ -171,14 +171,20 @@ where

impl<K, V, TNow, const ENTRIES_PER_BUCKET: usize> KBuckets<K, V, TNow, ENTRIES_PER_BUCKET> {
/// Iterates over all the peers in the k-buckets.
pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
///
/// The buckets are iterated one by one from closest to furthest away, and within each bucket
/// elements are ordered by descending time since connectivity.
pub fn iter_ordered(&self) -> impl Iterator<Item = (&K, &V)> {
self.buckets
.iter()
.flat_map(|b| b.entries.iter().map(|(k, v)| (k, v)))
}

/// Iterates over all the peers in the k-buckets.
pub fn iter_mut(&mut self) -> impl Iterator<Item = (&K, &mut V)> {
///
/// The buckets are iterated one by one from closest to furthest away, and within each bucket
/// elements are ordered by descending time since connectivity.
pub fn iter_mut_ordered(&mut self) -> impl Iterator<Item = (&K, &mut V)> {
self.buckets
.iter_mut()
.flat_map(|b| b.entries.iter_mut().map(|(k, v)| (&*k, v)))
Expand All @@ -192,7 +198,7 @@ where
V: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_list().entries(self.iter()).finish()
f.debug_list().entries(self.iter_ordered()).finish()
}
}

Expand Down
Loading