Skip to content

Commit

Permalink
feat: impl peer store load/dump on wasm
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Nov 21, 2024
1 parent 762d3e1 commit e8aab4c
Show file tree
Hide file tree
Showing 9 changed files with 341 additions and 72 deletions.
29 changes: 28 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ socket2 = "0.5"
p2p = { version = "0.6.2", package = "tentacle", default-features = false, features = [
"wasm-timer",
] }
idb = "0.6"
serde-wasm-bindgen = "0.6.5"


[features]
Expand Down
2 changes: 1 addition & 1 deletion network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub use p2p::{
async_trait,
builder::ServiceBuilder,
bytes, multiaddr, runtime,
secio::{PeerId, PublicKey},
secio::{self, PeerId, PublicKey},
service::{ServiceControl, SessionType, TargetProtocol, TargetSession},
traits::ServiceProtocol,
utils::{extract_peer_id, multiaddr_to_socketaddr},
Expand Down
55 changes: 51 additions & 4 deletions network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ pub struct NetworkState {

impl NetworkState {
/// Init from config
#[cfg(not(target_family = "wasm"))]
pub fn from_config(config: NetworkConfig) -> Result<NetworkState, Error> {
#[cfg(not(target_family = "wasm"))]
config.create_dir_if_not_exists()?;
let local_private_key = config.fetch_private_key()?;
let local_peer_id = local_private_key.peer_id();
Expand All @@ -117,12 +117,10 @@ impl NetworkState {
})
.collect();
info!("Loading the peer store. This process may take a few seconds to complete.");
#[cfg(not(target_family = "wasm"))]

let peer_store = Mutex::new(PeerStore::load_from_dir_or_default(
config.peer_store_path(),
));
#[cfg(target_family = "wasm")]
let peer_store = Mutex::new(PeerStore::load_from_config(&config));
let bootnodes = config.bootnodes();

let peer_registry = PeerRegistry::new(
Expand Down Expand Up @@ -150,6 +148,55 @@ impl NetworkState {
})
}

#[cfg(target_family = "wasm")]
pub async fn from_config(config: NetworkConfig) -> Result<NetworkState, Error> {
let local_private_key = config.fetch_private_key()?;
let local_peer_id = local_private_key.peer_id();
// set max score to public addresses
let public_addrs: HashSet<Multiaddr> = config
.listen_addresses
.iter()
.chain(config.public_addresses.iter())
.cloned()
.filter_map(|mut addr| {
multiaddr_to_socketaddr(&addr)
.filter(|addr| is_reachable(addr.ip()))
.and({
if extract_peer_id(&addr).is_none() {
addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
}
Some(addr)
})
})
.collect();
info!("Loading the peer store. This process may take a few seconds to complete.");
let peer_store = Mutex::new(PeerStore::load_from_idb(config.peer_store_path()).await);
let bootnodes = config.bootnodes();

let peer_registry = PeerRegistry::new(
config.max_inbound_peers(),
config.max_outbound_peers(),
config.whitelist_only,
config.whitelist_peers(),
);
Ok(NetworkState {
peer_store,
config,
bootnodes,
peer_registry: RwLock::new(peer_registry),
dialing_addrs: RwLock::new(HashMap::default()),
public_addrs: RwLock::new(public_addrs),
listened_addrs: RwLock::new(Vec::new()),
pending_observed_addrs: RwLock::new(HashSet::default()),
local_private_key,
local_peer_id,
active: AtomicBool::new(true),
protocols: RwLock::new(Vec::new()),
required_flags: Flags::SYNC | Flags::DISCOVERY | Flags::RELAY,
ckb2023: AtomicBool::new(false),
})
}

/// fork flag
pub fn ckb2023(self, init: bool) -> Self {
self.ckb2023.store(init, Ordering::SeqCst);
Expand Down
172 changes: 172 additions & 0 deletions network/src/peer_store/browser.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
use idb::{
DatabaseEvent, Factory, IndexParams, KeyPath, ObjectStoreParams, TransactionMode,
TransactionResult,
};
use p2p::runtime;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc::channel, OnceCell};

use std::path::Path;

use crate::errors::PeerStoreError;

static DB: OnceCell<Storage> = OnceCell::const_new();

#[derive(Deserialize, Serialize, Debug)]
pub struct KV {
pub key: Vec<u8>,
pub value: Vec<u8>,
}

struct Request {
cmd: CommandRequest,
resp: tokio::sync::oneshot::Sender<CommandResponse>,
}

enum CommandResponse {
Read { value: Option<Vec<u8>> },
Put,
Shutdown,
}

enum CommandRequest {
Read { key: Vec<u8> },
Put { kv: KV },
Shutdown,
}

impl std::fmt::Debug for CommandResponse {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CommandResponse::Read { .. } => write!(f, "Read"),
CommandResponse::Put { .. } => write!(f, "Put"),
CommandResponse::Shutdown => write!(f, "Shutdown"),
}
}
}

impl std::fmt::Debug for CommandRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CommandRequest::Read { .. } => write!(f, "Read"),
CommandRequest::Put { .. } => write!(f, "Put"),
CommandRequest::Shutdown => write!(f, "Shutdown"),
}
}
}

pub async fn get_db<P: AsRef<Path>>(path: P) -> &'static Storage {
DB.get_or_init(|| Storage::new(path)).await
}

#[derive(Clone)]
pub struct Storage {
chan: tokio::sync::mpsc::Sender<Request>,
}

impl Storage {
pub async fn new<P: AsRef<Path>>(path: P) -> Self {
let factory = Factory::new().unwrap();
let mut open_request = factory.open("network", Some(1)).unwrap();
let store_name = path.as_ref().to_str().unwrap().to_owned();
let store_name_clone = store_name.clone();
open_request.on_upgrade_needed(move |event| {
let database = event.database().unwrap();
let store_params = ObjectStoreParams::new();

let store = database
.create_object_store(&store_name_clone, store_params)
.unwrap();
let mut index_params = IndexParams::new();
index_params.unique(true);
store
.create_index("key", KeyPath::new_single("key"), Some(index_params))
.unwrap();
});
let db = open_request.await.unwrap();
let (tx, mut rx) = channel(128);

runtime::spawn(async move {
loop {
let request: Request = rx.recv().await.unwrap();
match request.cmd {
CommandRequest::Read { key } => {
let tran = db
.transaction(&[&store_name], TransactionMode::ReadOnly)
.unwrap();
let store = tran.object_store(&store_name).unwrap();
let key = serde_wasm_bindgen::to_value(&key).unwrap();
let value = store
.get(key)
.unwrap()
.await
.unwrap()
.map(|v| serde_wasm_bindgen::from_value::<KV>(v).unwrap().value);
assert_eq!(TransactionResult::Committed, tran.await.unwrap());
request.resp.send(CommandResponse::Read { value }).unwrap()
}
CommandRequest::Put { kv } => {
let tran = db
.transaction(&[&store_name], TransactionMode::ReadWrite)
.unwrap();
let store = tran.object_store(&store_name).unwrap();

let key = serde_wasm_bindgen::to_value(&kv.key).unwrap();
let value = serde_wasm_bindgen::to_value(&kv).unwrap();
store.put(&value, Some(&key)).unwrap().await.unwrap();
assert_eq!(
TransactionResult::Committed,
tran.commit().unwrap().await.unwrap()
);
request.resp.send(CommandResponse::Put).unwrap();
}
CommandRequest::Shutdown => {
request.resp.send(CommandResponse::Shutdown).unwrap();
break;
}
}
}
});

Self { chan: tx }
}

pub async fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, PeerStoreError> {
let value = send_command(
&self.chan,
CommandRequest::Read {
key: key.as_ref().to_vec(),
},
)
.await;
if let CommandResponse::Read { value } = value {
return Ok(value);
} else {
unreachable!()
}
}

pub async fn put(&self, key: Vec<u8>, value: Vec<u8>) -> Result<(), PeerStoreError> {
let kv = KV { key, value };

send_command(&self.chan, CommandRequest::Put { kv }).await;
Ok(())
}

pub async fn shutdown(&self) {
if let CommandResponse::Shutdown = send_command(&self.chan, CommandRequest::Shutdown).await
{
} else {
unreachable!()
}
}
}

async fn send_command(
chan: &tokio::sync::mpsc::Sender<Request>,
cmd: CommandRequest,
) -> CommandResponse {
let (tx, rx) = tokio::sync::oneshot::channel();
chan.send(Request { cmd, resp: tx }).await.unwrap();
rx.await.unwrap()
}
2 changes: 2 additions & 0 deletions network/src/peer_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
pub mod addr_manager;
pub mod ban_list;
#[cfg(target_family = "wasm")]
pub(crate) mod browser;
mod peer_store_db;
mod peer_store_impl;
pub mod types;
Expand Down
Loading

0 comments on commit e8aab4c

Please sign in to comment.