From 052e45635e594ee27d3debf2ae3355c50280de3e Mon Sep 17 00:00:00 2001 From: driftluo Date: Tue, 22 Oct 2024 15:50:56 +0800 Subject: [PATCH] feat: small wasm support --- Cargo.lock | 102 ++++++++--- Makefile | 5 + db/Cargo.toml | 6 +- devtools/ci/ci_main.sh | 1 + network/Cargo.toml | 17 +- network/src/lib.rs | 6 +- network/src/network.rs | 128 ++++++++------ network/src/peer.rs | 2 +- network/src/peer_registry.rs | 3 +- network/src/peer_store/addr_manager.rs | 6 + network/src/peer_store/peer_store_db.rs | 41 +++++ network/src/peer_store/peer_store_impl.rs | 65 ++++--- network/src/protocols/discovery/mod.rs | 7 +- network/src/protocols/discovery/state.rs | 2 +- network/src/protocols/identify/mod.rs | 2 +- network/src/protocols/mod.rs | 22 ++- network/src/protocols/ping.rs | 3 +- .../src/services/dns_seeding/seed_record.rs | 2 +- network/src/services/dns_seeding/tests.rs | 2 +- network/src/services/dump_peer_store.rs | 47 ++++- network/src/services/outbound_peer.rs | 14 +- network/src/services/protocol_type_checker.rs | 16 +- resource/src/lib.rs | 18 +- script/Cargo.toml | 3 + script/src/verify.rs | 8 +- spec/Cargo.toml | 2 + spec/src/versionbits/mod.rs | 18 ++ tx-pool/src/block_assembler/mod.rs | 2 +- util/app-config/src/configs/network.rs | 50 ++++++ util/crypto/Cargo.toml | 2 +- util/crypto/src/secp/signature.rs | 4 +- util/fixed-hash/core/Cargo.toml | 1 - util/hash/Cargo.toml | 8 +- util/hash/src/lib.rs | 4 +- util/jsonrpc-types/src/blockchain.rs | 2 +- .../async_indexer_handle/get_cells.rs | 2 +- .../get_cells_capacity.rs | 2 +- .../async_indexer_handle/get_transactions.rs | 8 +- util/runtime/Cargo.toml | 11 +- util/runtime/src/brower.rs | 27 +++ util/runtime/src/lib.rs | 160 +----------------- util/runtime/src/native.rs | 153 +++++++++++++++++ util/spawn/src/lib.rs | 9 + util/stop-handler/Cargo.toml | 2 +- util/systemtime/Cargo.toml | 3 + util/systemtime/src/lib.rs | 9 +- util/types/src/core/cell.rs | 2 +- verification/Cargo.toml | 3 +- verification/src/transaction_verifier.rs | 5 +- 49 files changed, 693 insertions(+), 324 deletions(-) create mode 100644 util/runtime/src/brower.rs create mode 100644 util/runtime/src/native.rs diff --git a/Cargo.lock b/Cargo.lock index becc815f67..b11c47ada4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -155,6 +155,12 @@ version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" +[[package]] +name = "arrayvec" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" + [[package]] name = "async-stream" version = "0.3.6" @@ -205,14 +211,13 @@ checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" [[package]] name = "attohttpc" -version = "0.16.3" +version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdb8867f378f33f78a811a8eb9bf108ad99430d7aad43315dd9319c827ef6247" +checksum = "8d9a9bf8b79a749ee0b911b91b671cc2b6c670bdbc7e3dfd537576ddc94bb2a2" dependencies = [ "http 0.2.12", "log", "url", - "wildmatch", ] [[package]] @@ -350,6 +355,22 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" +[[package]] +name = "bitcoin-io" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "340e09e8399c7bd8912f495af6aa58bea0c9214773417ffaa8f6460f93aaee56" + +[[package]] +name = "bitcoin_hashes" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb18c03d0db0247e147a21a6faafd5a7eb851c743db062de72018b6b7e8e4d16" +dependencies = [ + "bitcoin-io", + "hex-conservative", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -641,6 +662,7 @@ dependencies = [ "ckb-logger", "ckb-spawn", "tokio", + "wasm-bindgen-futures", ] [[package]] @@ -1675,6 +1697,9 @@ dependencies = [ [[package]] name = "ckb-systemtime" version = "0.120.0-pre" +dependencies = [ + "web-time", +] [[package]] name = "ckb-test-chain-utils" @@ -2749,6 +2774,10 @@ name = "futures-timer" version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" +dependencies = [ + "gloo-timers", + "send_wrapper", +] [[package]] name = "futures-util" @@ -2847,6 +2876,18 @@ dependencies = [ "walkdir", ] +[[package]] +name = "gloo-timers" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "goblin" version = "0.2.3" @@ -3020,6 +3061,15 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hex-conservative" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5313b072ce3c597065a808dbf612c4c8e8590bdbf8b579508bf7a762c5eae6cd" +dependencies = [ + "arrayvec", +] + [[package]] name = "hkdf" version = "0.12.4" @@ -3277,10 +3327,10 @@ dependencies = [ ] [[package]] -name = "igd" -version = "0.12.1" +name = "igd-next" +version = "0.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "556b5a75cd4adb7c4ea21c64af1c48cefb2ce7d43dc4352c720a1fe47c21f355" +checksum = "76b0d7d4541def58a37bf8efc559683f21edce7c82f0d866c93ac21f7e098f93" dependencies = [ "attohttpc", "log", @@ -3578,7 +3628,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -5066,10 +5116,12 @@ dependencies = [ [[package]] name = "secp256k1" -version = "0.29.1" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9465315bc9d4566e1724f0fffcbcc446268cb522e60f9a27bcded6b19c108113" +checksum = "b50c5943d326858130af85e049f2661ba3c78b26589b8ab98e65e80ae44a1252" dependencies = [ + "bitcoin_hashes", + "rand 0.8.5", "secp256k1-sys", ] @@ -5111,6 +5163,12 @@ version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" +[[package]] +name = "send_wrapper" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f638d531eccd6e23b980caf34876660d38e265409d8e99b397ab71eb3612fad0" + [[package]] name = "sentry" version = "0.34.0" @@ -5809,20 +5867,20 @@ dependencies = [ [[package]] name = "tentacle" -version = "0.6.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d335523ec132a2bbefbaf403b52eba047fb50bc83bed2d0b1d22c119bae2fec1" +checksum = "457c5771b5b59538fb07d70d5db3d6b9f94f45e5c1ea1197165aa050f2de4f4d" dependencies = [ "async-trait", "bytes", "futures", - "igd", + "futures-timer", + "igd-next", "js-sys", "libc", "log", "molecule", "nohash-hasher", - "once_cell", "parking_lot 0.12.3", "rand 0.8.5", "socket2", @@ -5830,6 +5888,7 @@ dependencies = [ "tentacle-secio", "thiserror", "tokio", + "tokio-tungstenite", "tokio-util", "tokio-yamux", "wasm-bindgen", @@ -5852,9 +5911,9 @@ dependencies = [ [[package]] name = "tentacle-secio" -version = "0.6.2" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cac8b23a7879426a4961acea6ae66287f7fe9a934d131a722cbb88f145e97fea" +checksum = "99df015b8649588f2958d4853eee221860f95d2721995857e9dde1462ceb3dc4" dependencies = [ "bs58", "bytes", @@ -6070,7 +6129,6 @@ dependencies = [ "bytes", "libc", "mio", - "parking_lot 0.12.3", "pin-project-lite", "signal-hook-registry", "socket2", @@ -6149,16 +6207,18 @@ dependencies = [ [[package]] name = "tokio-yamux" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2ed88a04bfbf9e70343a5748a423200ee0591c55e7e487d784a55ee8af17db9" +checksum = "208cecd45a38868bfc0a45aac52cb1aea4583c6b801bf57f351e9d531b23cb86" dependencies = [ "bytes", "futures", + "futures-timer", "log", "nohash-hasher", "tokio", "tokio-util", + "web-time", ] [[package]] @@ -6776,12 +6836,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7219d36b6eac893fa81e84ebe06485e7dcbb616177469b142df14f1f4deb1311" -[[package]] -name = "wildmatch" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f44b95f62d34113cf558c93511ac93027e03e9c29a60dd0fd70e6e025c7270a" - [[package]] name = "winapi" version = "0.3.9" diff --git a/Makefile b/Makefile index 7882a0524b..24f3ee3ca5 100644 --- a/Makefile +++ b/Makefile @@ -222,6 +222,11 @@ ci: ## Run recipes for CI. ci: fmt clippy test bench-test check-cargo-metadata check-cargotoml check-whitespaces check-dirty-rpc-doc security-audit check-crates check-licenses git diff --exit-code Cargo.lock +.PHONY: wasm +wasm: + rustup target add wasm32-unknown-unknown + cd network && cargo c --target wasm32-unknown-unknown + .PHONY: check-cargotoml check-cargotoml: ./devtools/ci/check-cargotoml.sh diff --git a/db/Cargo.toml b/db/Cargo.toml index c9fbbdc8c7..1168678a82 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -13,9 +13,13 @@ ckb-app-config = { path = "../util/app-config", version = "= 0.120.0-pre" } ckb-logger = { path = "../util/logger", version = "= 0.120.0-pre" } ckb-error = { path = "../error", version = "= 0.120.0-pre" } libc = "0.2" -rocksdb = { package = "ckb-rocksdb", version ="=0.21.1", features = ["snappy", "lz4"], default-features = false } +rocksdb = { package = "ckb-rocksdb", version = "=0.21.1", features = [ + "snappy", + "lz4", +], default-features = false } ckb-db-schema = { path = "../db-schema", version = "= 0.120.0-pre" } + [dev-dependencies] tempfile.workspace = true diff --git a/devtools/ci/ci_main.sh b/devtools/ci/ci_main.sh index 33242b3035..7a1a798690 100755 --- a/devtools/ci/ci_main.sh +++ b/devtools/ci/ci_main.sh @@ -59,6 +59,7 @@ case $GITHUB_WORKFLOW in make check-whitespaces make check-dirty-rpc-doc make check-dirty-hashes-toml + make wasm devtools/ci/check-cyclic-dependencies.py devtools/ci/check-relaxed.sh ;; diff --git a/network/Cargo.toml b/network/Cargo.toml index 3138157ad8..04846b76ad 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -25,7 +25,7 @@ bs58 = { version = "0.5.0", optional = true } sentry = { version = "0.34.0", optional = true } faster-hex = { version = "0.6", optional = true } ckb-hash = { path = "../util/hash", version = "= 0.120.0-pre" } -secp256k1 = { version = "0.29", features = ["recovery"], optional = true } +secp256k1 = { version = "0.30", features = ["recovery"], optional = true } trust-dns-resolver = { version = "0.23", optional = true } snap = "1" ckb-types = { path = "../util/types", version = "= 0.120.0-pre" } @@ -33,14 +33,25 @@ ipnetwork = "0.20" serde_json = "1.0" bloom-filters = "0.1" ckb-spawn = { path = "../util/spawn", version = "= 0.120.0-pre" } -socket2 = "0.5" bitflags = "1.0" +p2p = { version = "0.6.2", package = "tentacle", default-features = false } -p2p = { version = "0.6.1", package = "tentacle", features = [ +[target.'cfg(not(target_family = "wasm"))'.dependencies] +p2p = { version = "0.6.2", package = "tentacle", default-features = false, features = [ "upnp", "parking_lot", "openssl-vendored", + "tokio-runtime", + "tokio-timer", + "ws", ] } +socket2 = "0.5" + +[target.'cfg(target_family = "wasm")'.dependencies] +p2p = { version = "0.6.2", package = "tentacle", default-features = false, features = [ + "wasm-timer", +] } + [features] with_sentry = ["sentry"] diff --git a/network/src/lib.rs b/network/src/lib.rs index 5a524198e6..3bb66e24bb 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -32,14 +32,14 @@ pub use crate::{ peer_registry::PeerRegistry, peer_store::Score, protocols::{ - identify::Flags, support_protocols::SupportProtocols, CKBProtocol, CKBProtocolContext, - CKBProtocolHandler, PeerIndex, + identify::Flags, support_protocols::SupportProtocols, BoxedCKBProtocolContext, CKBProtocol, + CKBProtocolContext, CKBProtocolHandler, PeerIndex, }, }; pub use p2p::{ async_trait, builder::ServiceBuilder, - bytes, multiaddr, + bytes, multiaddr, runtime, secio::{PeerId, PublicKey}, service::{ServiceControl, SessionType, TargetProtocol, TargetSession}, traits::ServiceProtocol, diff --git a/network/src/network.rs b/network/src/network.rs index 734c8815ae..f84b4721dd 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -1,5 +1,7 @@ //! Global state struct and start function -use crate::errors::{Error, P2PError}; +use crate::errors::Error; +#[cfg(not(target_family = "wasm"))] +use crate::errors::P2PError; use crate::peer_registry::{ConnectionStatus, PeerRegistry}; use crate::peer_store::{ types::{AddrInfo, BannedAddr}, @@ -22,6 +24,7 @@ use ckb_app_config::{default_support_all_protocols, NetworkConfig, SupportProtoc use ckb_logger::{debug, error, info, trace, warn}; use ckb_spawn::Spawn; use ckb_stop_handler::{broadcast_exit_signals, new_tokio_exit_rx, CancellationToken}; +use ckb_systemtime::{Duration, Instant}; use ckb_util::{Condvar, Mutex, RwLock}; use futures::{channel::mpsc::Sender, Future}; use ipnetwork::IpNetwork; @@ -45,6 +48,7 @@ use p2p::{ use rand::prelude::IteratorRandom; #[cfg(feature = "with_sentry")] use sentry::{capture_message, with_scope, Level}; +#[cfg(not(target_family = "wasm"))] use std::sync::mpsc; use std::{ borrow::Cow, @@ -56,7 +60,6 @@ use std::{ Arc, }, thread, - time::{Duration, Instant}, }; use tokio::{self, sync::oneshot}; @@ -92,6 +95,7 @@ pub struct NetworkState { impl NetworkState { /// Init from config pub fn from_config(config: NetworkConfig) -> Result { + #[cfg(not(target_family = "wasm"))] config.create_dir_if_not_exists()?; let local_private_key = config.fetch_private_key()?; let local_peer_id = local_private_key.peer_id(); @@ -113,9 +117,12 @@ impl NetworkState { }) .collect(); info!("Loading the peer store. This process may take a few seconds to complete."); + #[cfg(not(target_family = "wasm"))] let peer_store = Mutex::new(PeerStore::load_from_dir_or_default( config.peer_store_path(), )); + #[cfg(target_family = "wasm")] + let peer_store = Mutex::new(PeerStore::load_from_config(&config)); let bootnodes = config.bootnodes(); let peer_registry = PeerRegistry::new( @@ -890,7 +897,6 @@ impl NetworkService { }; service_builder = service_builder .handshake_type(network_state.local_private_key.clone().into()) - .upnp(config.upnp) .yamux_config(yamux_config) .forever(true) .max_connection_number(1024) @@ -898,30 +904,16 @@ impl NetworkService { .set_channel_size(config.channel_size()) .timeout(Duration::from_secs(5)); + #[cfg(not(target_family = "wasm"))] + { + service_builder = service_builder.upnp(config.upnp); + } + #[cfg(target_os = "linux")] let p2p_service = { if config.reuse_port_on_linux { let iter = config.listen_addresses.iter(); - #[derive(Clone, Copy, Debug, Eq, PartialEq)] - enum TransportType { - Ws, - Tcp, - } - - fn find_type(addr: &Multiaddr) -> TransportType { - let mut iter = addr.iter(); - - iter.find_map(|proto| { - if let p2p::multiaddr::Protocol::Ws = proto { - Some(TransportType::Ws) - } else { - None - } - }) - .unwrap_or(TransportType::Tcp) - } - #[derive(Clone, Copy, Debug, Eq, PartialEq)] enum BindType { None, @@ -942,43 +934,65 @@ impl NetworkService { fn is_ready(&self) -> bool { // should change to Both if ckb enable ws - matches!(self, BindType::Tcp) + matches!(self, BindType::Both) } } let mut init = BindType::None; - for addr in iter { + + for multi_addr in iter { if init.is_ready() { break; } - match find_type(addr) { - // wait ckb enable ws support - TransportType::Ws => (), + match find_type(multi_addr) { TransportType::Tcp => { // only bind once if matches!(init, BindType::Tcp) { continue; } - if let Some(addr) = multiaddr_to_socketaddr(addr) { - use p2p::service::TcpSocket; + if let Some(addr) = multiaddr_to_socketaddr(multi_addr) { + let domain = socket2::Domain::for_address(addr); + let bind_fn = move |socket: p2p::service::TcpSocket| { + let socket_ref = socket2::SockRef::from(&socket); + #[cfg(all( + unix, + not(target_os = "solaris"), + not(target_os = "illumos") + ))] + socket_ref.set_reuse_port(true)?; + socket_ref.set_reuse_address(true)?; + if socket_ref.domain()? == domain { + socket_ref.bind(&addr.into())?; + } + Ok(socket) + }; + init.transform(TransportType::Tcp); + service_builder = service_builder.tcp_config(bind_fn); + } + } + TransportType::Ws => { + // only bind once + if matches!(init, BindType::Ws) { + continue; + } + if let Some(addr) = multiaddr_to_socketaddr(multi_addr) { let domain = socket2::Domain::for_address(addr); - service_builder = - service_builder.tcp_config(move |socket: TcpSocket| { - let socket_ref = socket2::SockRef::from(&socket); - #[cfg(all( - unix, - not(target_os = "solaris"), - not(target_os = "illumos") - ))] - socket_ref.set_reuse_port(true)?; - - socket_ref.set_reuse_address(true)?; - if socket_ref.domain()? == domain { - socket_ref.bind(&addr.into())?; - } - Ok(socket) - }); - init.transform(TransportType::Tcp) + let bind_fn = move |socket: p2p::service::TcpSocket| { + let socket_ref = socket2::SockRef::from(&socket); + #[cfg(all( + unix, + not(target_os = "solaris"), + not(target_os = "illumos") + ))] + socket_ref.set_reuse_port(true)?; + socket_ref.set_reuse_address(true)?; + if socket_ref.domain()? == domain { + socket_ref.bind(&addr.into())?; + } + Ok(socket) + }; + init.transform(TransportType::Ws); + service_builder = service_builder.tcp_config_on_ws(bind_fn); } } } @@ -1094,11 +1108,14 @@ impl NetworkService { .unzip(); let receiver: CancellationToken = new_tokio_exit_rx(); + #[cfg(not(target_family = "wasm"))] let (start_sender, start_receiver) = mpsc::channel(); { + #[cfg(not(target_family = "wasm"))] let network_state = Arc::clone(&network_state); let p2p_control: ServiceAsyncControl = p2p_control.clone().into(); handle.spawn_task(async move { + #[cfg(not(target_family = "wasm"))] for addr in &config.listen_addresses { match p2p_service.listen(addr.to_owned()).await { Ok(listen_address) => { @@ -1121,8 +1138,9 @@ impl NetworkService { } }; } + #[cfg(not(target_family = "wasm"))] start_sender.send(Ok(())).unwrap(); - tokio::spawn(async move { p2p_service.run().await }); + p2p::runtime::spawn(async move { p2p_service.run().await }); tokio::select! { _ = receiver.cancelled() => { info!("NetworkService receive exit signal, start shutdown..."); @@ -1150,7 +1168,7 @@ impl NetworkService { } }); } - + #[cfg(not(target_family = "wasm"))] if let Ok(Err(e)) = start_receiver.recv() { return Err(e); } @@ -1419,3 +1437,17 @@ pub(crate) async fn async_disconnect_with_message( } control.disconnect(peer_index).await } + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub(crate) enum TransportType { + Ws, + Tcp, +} + +pub(crate) fn find_type(addr: &Multiaddr) -> TransportType { + if addr.iter().any(|proto| matches!(proto, Protocol::Ws)) { + TransportType::Ws + } else { + TransportType::Tcp + } +} diff --git a/network/src/peer.rs b/network/src/peer.rs index 48ae7196eb..fe7ee3dfb3 100644 --- a/network/src/peer.rs +++ b/network/src/peer.rs @@ -2,9 +2,9 @@ use crate::network_group::Group; use crate::{ multiaddr::Multiaddr, protocols::identify::Flags, ProtocolId, ProtocolVersion, SessionType, }; +use ckb_systemtime::{Duration, Instant}; use p2p::SessionId; use std::collections::HashMap; -use std::time::{Duration, Instant}; /// Peer info from identify protocol message #[derive(Clone, Debug)] diff --git a/network/src/peer_registry.rs b/network/src/peer_registry.rs index 9f3e55536b..dcc97d160f 100644 --- a/network/src/peer_registry.rs +++ b/network/src/peer_registry.rs @@ -6,6 +6,7 @@ use crate::{ extract_peer_id, Peer, PeerId, SessionType, }; use ckb_logger::debug; +use ckb_systemtime::Instant; use p2p::{multiaddr::Multiaddr, SessionId}; use rand::seq::SliceRandom; use rand::thread_rng; @@ -146,7 +147,7 @@ impl PeerRegistry { &mut candidate_peers, EVICTION_PROTECT_PEERS, |peer1, peer2| { - let now = std::time::Instant::now(); + let now = Instant::now(); let peer1_last_message = peer1 .last_ping_protocol_message_received_at .map(|t| now.saturating_duration_since(t).as_secs()) diff --git a/network/src/peer_store/addr_manager.rs b/network/src/peer_store/addr_manager.rs index b4b5179ba3..127b5e75a1 100644 --- a/network/src/peer_store/addr_manager.rs +++ b/network/src/peer_store/addr_manager.rs @@ -1,4 +1,6 @@ //! Address manager +#[cfg(target_family = "wasm")] +use crate::network::{find_type, TransportType}; use crate::peer_store::types::AddrInfo; use p2p::{multiaddr::Multiaddr, utils::multiaddr_to_socketaddr}; use rand::Rng; @@ -49,6 +51,10 @@ impl AddrManager { let mut addr_infos = Vec::with_capacity(count); let mut rng = rand::thread_rng(); let now_ms = ckb_systemtime::unix_time_as_millis(); + #[cfg(target_family = "wasm")] + let filter = |peer_addr: &AddrInfo| { + filter(peer_addr) && matches!(find_type(&peer_addr.addr), TransportType::Ws) + }; for i in 0..self.random_ids.len() { // reuse the for loop to shuffle random ids // https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle diff --git a/network/src/peer_store/peer_store_db.rs b/network/src/peer_store/peer_store_db.rs index efbd195db2..daeb604414 100644 --- a/network/src/peer_store/peer_store_db.rs +++ b/network/src/peer_store/peer_store_db.rs @@ -37,6 +37,14 @@ impl AddrManager { .and_then(|_| file.sync_all()) .map_err(Into::into) } + + #[cfg(target_family = "wasm")] + pub fn dump_with_fn(&self, f: F) { + let addrs: Vec<_> = self.addrs_iter().collect(); + debug!("Dump {} addrs", addrs.len()); + let bytes = serde_json::to_string(&addrs).unwrap(); + f(bytes.as_bytes()) + } } impl BanList { @@ -62,6 +70,14 @@ impl BanList { .and_then(|_| file.sync_all()) .map_err(Into::into) } + + #[cfg(target_family = "wasm")] + pub fn dump_with_fn(&self, f: F) { + let addrs: Vec<_> = self.get_banned_addrs(); + debug!("Dump {} banned addrs", addrs.len()); + let bytes = serde_json::to_string(&addrs).unwrap(); + f(bytes.as_bytes()) + } } impl PeerStore { @@ -107,6 +123,23 @@ impl PeerStore { PeerStore::new(addr_manager, ban_list) } + #[cfg(target_family = "wasm")] + pub fn load_from_config(config: &ckb_app_config::NetworkConfig) -> Self { + let addr_manager = Ok((config.peer_store_fns.load_fn)()) + .and_then(|file| { + AddrManager::load(std::io::BufReader::new(std::io::Cursor::new(file))) + .map_err(|err| error!("Failed to load AddrManager db, error: {:?}", err)) + }) + .unwrap_or_default(); + let ban_list = Ok((config.peer_store_ban_list_fns.load_fn)()) + .and_then(|file| { + BanList::load(std::io::BufReader::new(std::io::Cursor::new(file))) + .map_err(|err| error!("Failed to load BanList db, error: {:?}", err)) + }) + .unwrap_or_default(); + PeerStore::new(addr_manager, ban_list) + } + /// Dump all info to disk pub fn dump_to_dir>(&self, path: P) -> Result<(), Error> { // create dir @@ -139,6 +172,14 @@ impl PeerStore { move_file(tmp_ban_list, path.as_ref().join(DEFAULT_BAN_LIST_DB))?; Ok(()) } + + #[cfg(target_family = "wasm")] + pub fn dump_with_config(&self, config: &ckb_app_config::NetworkConfig) { + self.addr_manager() + .dump_with_fn(&config.peer_store_fns.dump_fn); + self.ban_list() + .dump_with_fn(&config.peer_store_ban_list_fns.dump_fn); + } } /// This function use `copy` then `remove_file` as a fallback when `rename` failed, diff --git a/network/src/peer_store/peer_store_impl.rs b/network/src/peer_store/peer_store_impl.rs index 4c076066a2..0dd50d6e1e 100644 --- a/network/src/peer_store/peer_store_impl.rs +++ b/network/src/peer_store/peer_store_impl.rs @@ -1,3 +1,4 @@ +use crate::network::{find_type, TransportType}; use crate::{ errors::{PeerStoreError, Result}, extract_peer_id, multiaddr_to_socketaddr, @@ -64,6 +65,10 @@ impl PeerStore { if self.ban_list.is_addr_banned(&addr) { return Ok(()); } + #[cfg(target_family = "wasm")] + if !matches!(find_type(&addr), TransportType::Ws) { + return Ok(()); + } self.check_purge()?; let score = self.score_config.default_score; self.addr_manager @@ -165,20 +170,24 @@ impl PeerStore { let now_ms = ckb_systemtime::unix_time_as_millis(); let peers = &self.connected_peers; let addr_expired_ms = now_ms.saturating_sub(ADDR_TRY_TIMEOUT_MS); + + let filter = |peer_addr: &AddrInfo| { + extract_peer_id(&peer_addr.addr) + .map(|peer_id| !peers.contains_key(&peer_id)) + .unwrap_or_default() + && peer_addr + .connected(|t| t > addr_expired_ms && t <= now_ms.saturating_sub(DIAL_INTERVAL)) + && required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags)) + }; + + // Any protocol expect websocket + #[cfg(not(target_family = "wasm"))] + let filter = |peer_addr: &AddrInfo| { + filter(peer_addr) && !matches!(find_type(&peer_addr.addr), TransportType::Ws) + }; + // get addrs that can attempt. - self.addr_manager - .fetch_random(count, |peer_addr: &AddrInfo| { - extract_peer_id(&peer_addr.addr) - .map(|peer_id| !peers.contains_key(&peer_id)) - .unwrap_or_default() - && peer_addr.connected(|t| { - t > addr_expired_ms && t <= now_ms.saturating_sub(DIAL_INTERVAL) - }) - && required_flags_filter( - required_flags, - Flags::from_bits_truncate(peer_addr.flags), - ) - }) + self.addr_manager.fetch_random(count, filter) } /// Get peers for feeler connection, this method randomly return peer addrs that we never @@ -192,14 +201,16 @@ impl PeerStore { let now_ms = ckb_systemtime::unix_time_as_millis(); let addr_expired_ms = now_ms.saturating_sub(ADDR_TRY_TIMEOUT_MS); let peers = &self.connected_peers; - self.addr_manager - .fetch_random(count, |peer_addr: &AddrInfo| { - extract_peer_id(&peer_addr.addr) - .map(|peer_id| !peers.contains_key(&peer_id)) - .unwrap_or_default() - && !peer_addr.tried_in_last_minute(now_ms) - && !peer_addr.connected(|t| t > addr_expired_ms) - }) + + let filter = |peer_addr: &AddrInfo| { + extract_peer_id(&peer_addr.addr) + .map(|peer_id| !peers.contains_key(&peer_id)) + .unwrap_or_default() + && !peer_addr.tried_in_last_minute(now_ms) + && !peer_addr.connected(|t| t > addr_expired_ms) + }; + + self.addr_manager.fetch_random(count, filter) } /// Return valid addrs that success connected, used for discovery. @@ -209,12 +220,14 @@ impl PeerStore { let now_ms = ckb_systemtime::unix_time_as_millis(); let addr_expired_ms = now_ms.saturating_sub(ADDR_TIMEOUT_MS); + + let filter = |peer_addr: &AddrInfo| { + required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags)) + && peer_addr.connected(|t| t > addr_expired_ms) + }; + // get success connected addrs. - self.addr_manager - .fetch_random(count, |peer_addr: &AddrInfo| { - required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags)) - && peer_addr.connected(|t| t > addr_expired_ms) - }) + self.addr_manager.fetch_random(count, filter) } /// Ban an addr diff --git a/network/src/protocols/discovery/mod.rs b/network/src/protocols/discovery/mod.rs index c08012ba1b..573fad175a 100644 --- a/network/src/protocols/discovery/mod.rs +++ b/network/src/protocols/discovery/mod.rs @@ -1,10 +1,7 @@ -use std::{ - collections::HashMap, - sync::Arc, - time::{Duration, Instant}, -}; +use std::{collections::HashMap, sync::Arc}; use ckb_logger::{debug, error, trace, warn}; +use ckb_systemtime::{Duration, Instant}; use p2p::{ async_trait, bytes, context::{ProtocolContext, ProtocolContextMutRef, SessionContext}, diff --git a/network/src/protocols/discovery/state.rs b/network/src/protocols/discovery/state.rs index cba4d41d14..a7c7dca091 100644 --- a/network/src/protocols/discovery/state.rs +++ b/network/src/protocols/discovery/state.rs @@ -1,4 +1,4 @@ -use std::time::{Duration, Instant}; +use ckb_systemtime::{Duration, Instant}; use ckb_logger::debug; use p2p::{ diff --git a/network/src/protocols/identify/mod.rs b/network/src/protocols/identify/mod.rs index d2fefde131..e534dda78f 100644 --- a/network/src/protocols/identify/mod.rs +++ b/network/src/protocols/identify/mod.rs @@ -1,9 +1,9 @@ use std::borrow::Cow; use std::collections::HashMap; use std::sync::{atomic::Ordering, Arc}; -use std::time::{Duration, Instant}; use ckb_logger::{debug, error, trace, warn}; +use ckb_systemtime::{Duration, Instant}; use p2p::{ async_trait, bytes::Bytes, diff --git a/network/src/protocols/mod.rs b/network/src/protocols/mod.rs index f2da17e4a0..80a117e73b 100644 --- a/network/src/protocols/mod.rs +++ b/network/src/protocols/mod.rs @@ -128,38 +128,36 @@ pub trait CKBProtocolContext: Send { } } +/// type alias of dyn ckb protocol context +pub type BoxedCKBProtocolContext = Arc; + /// Abstract protocol handle base on tentacle service handle #[async_trait] pub trait CKBProtocolHandler: Sync + Send { /// Init action on service run - async fn init(&mut self, nc: Arc); + async fn init(&mut self, nc: BoxedCKBProtocolContext); /// Called when opening protocol async fn connected( &mut self, - _nc: Arc, + _nc: BoxedCKBProtocolContext, _peer_index: PeerIndex, _version: &str, ) { } /// Called when closing protocol - async fn disconnected( - &mut self, - _nc: Arc, - _peer_index: PeerIndex, - ) { - } + async fn disconnected(&mut self, _nc: BoxedCKBProtocolContext, _peer_index: PeerIndex) {} /// Called when the corresponding protocol message is received async fn received( &mut self, - _nc: Arc, + _nc: BoxedCKBProtocolContext, _peer_index: PeerIndex, _data: Bytes, ) { } /// Called when the Service receives the notify task - async fn notify(&mut self, _nc: Arc, _token: u64) {} + async fn notify(&mut self, _nc: BoxedCKBProtocolContext, _token: u64) {} /// Behave like `Stream::poll` - async fn poll(&mut self, _nc: Arc) -> Option<()> { + async fn poll(&mut self, _nc: BoxedCKBProtocolContext) -> Option<()> { None } } @@ -648,6 +646,6 @@ impl Future for BlockingFutureTask { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - tokio::task::block_in_place(|| self.task.poll_unpin(cx)) + p2p::runtime::block_in_place(|| self.task.poll_unpin(cx)) } } diff --git a/network/src/protocols/ping.rs b/network/src/protocols/ping.rs index 984dbf2a15..5f692b46b1 100644 --- a/network/src/protocols/ping.rs +++ b/network/src/protocols/ping.rs @@ -18,9 +18,10 @@ use std::{ collections::{HashMap, HashSet}, str, sync::Arc, - time::{Duration, Instant}, }; +use ckb_systemtime::{Duration, Instant}; + const SEND_PING_TOKEN: u64 = 0; const CHECK_TIMEOUT_TOKEN: u64 = 1; const CONTROL_CHANNEL_BUFFER_SIZE: usize = 2; diff --git a/network/src/services/dns_seeding/seed_record.rs b/network/src/services/dns_seeding/seed_record.rs index 20ec1affcc..92bce2ca74 100644 --- a/network/src/services/dns_seeding/seed_record.rs +++ b/network/src/services/dns_seeding/seed_record.rs @@ -99,7 +99,7 @@ impl SeedRecord { return Err(SeedRecordError::InvalidRecord); } - let recid = RecoveryId::from_i32(i32::from(sig[64])) + let recid = RecoveryId::try_from(i32::from(sig[64])) .map_err(|_| SeedRecordError::InvalidSignature)?; let signature = RecoverableSignature::from_compact(&sig[0..64], recid) .map_err(|_| SeedRecordError::InvalidSignature)?; diff --git a/network/src/services/dns_seeding/tests.rs b/network/src/services/dns_seeding/tests.rs index 81ddea0521..79576c74dc 100644 --- a/network/src/services/dns_seeding/tests.rs +++ b/network/src/services/dns_seeding/tests.rs @@ -63,7 +63,7 @@ impl SeedRecord { let (recid, signed_data) = signature.serialize_compact(); let mut sig = [0u8; 65]; sig[0..64].copy_from_slice(&signed_data[0..64]); - sig[64] = recid.to_i32() as u8; + sig[64] = Into::::into(recid) as u8; let signature_string = bs58::encode(&sig[..]).into_string(); Ok([data, signature_string].join(&SEP.to_string())) } diff --git a/network/src/services/dump_peer_store.rs b/network/src/services/dump_peer_store.rs index 735ace3a71..775ea7a2a5 100644 --- a/network/src/services/dump_peer_store.rs +++ b/network/src/services/dump_peer_store.rs @@ -1,5 +1,5 @@ use crate::NetworkState; -use ckb_logger::{debug, warn}; +use ckb_logger::debug; use futures::Future; use std::{ pin::Pin, @@ -7,14 +7,16 @@ use std::{ task::{Context, Poll}, time::Duration, }; -use tokio::time::{Instant, Interval, MissedTickBehavior}; const DEFAULT_DUMP_INTERVAL: Duration = Duration::from_secs(3600); // 1 hour /// Save current peer store data regularly pub struct DumpPeerStoreService { network_state: Arc, - interval: Option, + #[cfg(not(target_family = "wasm"))] + interval: Option, + #[cfg(target_family = "wasm")] + interval: Option, } impl DumpPeerStoreService { @@ -25,16 +27,24 @@ impl DumpPeerStoreService { } } + #[cfg(not(target_family = "wasm"))] fn dump_peer_store(&self) { let path = self.network_state.config.peer_store_path(); self.network_state.with_peer_store_mut(|peer_store| { if let Err(err) = peer_store.dump_to_dir(&path) { - warn!("Dump peer store error, path: {:?} error: {}", path, err); + ckb_logger::warn!("Dump peer store error, path: {:?} error: {}", path, err); } else { debug!("Dump peer store to {:?}", path); } }); } + + #[cfg(target_family = "wasm")] + fn dump_peer_store(&self) { + let config = &self.network_state.config; + self.network_state + .with_peer_store_mut(|peer_store| peer_store.dump_with_config(config)); + } } impl Drop for DumpPeerStoreService { @@ -47,16 +57,17 @@ impl Drop for DumpPeerStoreService { impl Future for DumpPeerStoreService { type Output = (); + #[cfg(not(target_family = "wasm"))] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if self.interval.is_none() { self.interval = { let mut interval = tokio::time::interval_at( - Instant::now() + DEFAULT_DUMP_INTERVAL, + tokio::time::Instant::now() + DEFAULT_DUMP_INTERVAL, DEFAULT_DUMP_INTERVAL, ); // The dump peer store service does not need to urgently compensate for the missed wake, // just delay behavior is enough - interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); Some(interval) } } @@ -65,4 +76,28 @@ impl Future for DumpPeerStoreService { } Poll::Pending } + + #[cfg(target_family = "wasm")] + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + use futures::StreamExt; + if self.interval.is_none() { + self.interval = { + let mut interval = p2p::runtime::Interval::new(DEFAULT_DUMP_INTERVAL); + // The outbound service does not need to urgently compensate for the missed wake, + // just skip behavior is enough + interval.set_missed_tick_behavior(p2p::runtime::MissedTickBehavior::Skip); + Some(interval) + } + } + while self + .interval + .as_mut() + .unwrap() + .poll_next_unpin(cx) + .is_ready() + { + self.dump_peer_store() + } + Poll::Pending + } } diff --git a/network/src/services/outbound_peer.rs b/network/src/services/outbound_peer.rs index 6498ec6894..10dca1c33c 100644 --- a/network/src/services/outbound_peer.rs +++ b/network/src/services/outbound_peer.rs @@ -4,7 +4,8 @@ use crate::{ }; use ckb_logger::trace; use ckb_systemtime::unix_time_as_millis; -use futures::Future; +use futures::{Future, StreamExt}; +use p2p::runtime::{Interval, MissedTickBehavior}; use p2p::{multiaddr::MultiAddr, service::ServiceControl}; use rand::prelude::IteratorRandom; use std::{ @@ -13,7 +14,6 @@ use std::{ task::{Context, Poll}, time::Duration, }; -use tokio::time::{Interval, MissedTickBehavior}; const FEELER_CONNECTION_COUNT: usize = 10; @@ -155,14 +155,20 @@ impl Future for OutboundPeerService { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if self.interval.is_none() { self.interval = { - let mut interval = tokio::time::interval(self.try_connect_interval); + let mut interval = Interval::new(self.try_connect_interval); // The outbound service does not need to urgently compensate for the missed wake, // just skip behavior is enough interval.set_missed_tick_behavior(MissedTickBehavior::Skip); Some(interval) } } - while self.interval.as_mut().unwrap().poll_tick(cx).is_ready() { + while self + .interval + .as_mut() + .unwrap() + .poll_next_unpin(cx) + .is_ready() + { // keep whitelist peer on connected self.try_dial_whitelist(); // ensure feeler work at any time diff --git a/network/src/services/protocol_type_checker.rs b/network/src/services/protocol_type_checker.rs index 9a469a7d92..c41ba14147 100644 --- a/network/src/services/protocol_type_checker.rs +++ b/network/src/services/protocol_type_checker.rs @@ -10,15 +10,15 @@ /// Other protocols will be closed after a timeout. use crate::{network::disconnect_with_message, NetworkState, Peer, ProtocolId, SupportProtocols}; use ckb_logger::debug; -use futures::Future; +use ckb_systemtime::{Duration, Instant}; +use futures::{Future, StreamExt}; +use p2p::runtime::{Interval, MissedTickBehavior}; use p2p::service::ServiceControl; use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, - time::{Duration, Instant}, }; -use tokio::time::{Interval, MissedTickBehavior}; const TIMEOUT: Duration = Duration::from_secs(10); const CHECK_INTERVAL: Duration = Duration::from_secs(30); @@ -129,14 +129,20 @@ impl Future for ProtocolTypeCheckerService { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if self.interval.is_none() { self.interval = { - let mut interval = tokio::time::interval(CHECK_INTERVAL); + let mut interval = Interval::new(CHECK_INTERVAL); // The protocol type checker service does not need to urgently compensate for the missed wake, // just skip behavior is enough interval.set_missed_tick_behavior(MissedTickBehavior::Skip); Some(interval) } } - while self.interval.as_mut().unwrap().poll_tick(cx).is_ready() { + while self + .interval + .as_mut() + .unwrap() + .poll_next_unpin(cx) + .is_ready() + { self.check_protocol_type(); } Poll::Pending diff --git a/resource/src/lib.rs b/resource/src/lib.rs index 188b060381..b69b644b0f 100644 --- a/resource/src/lib.rs +++ b/resource/src/lib.rs @@ -33,7 +33,7 @@ use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::fmt; use std::fs; -use std::io::{self, BufReader, Read}; +use std::io::{self, BufReader, Cursor, Read}; use std::path::{Path, PathBuf}; use ckb_system_scripts::BUNDLED_CELL; @@ -71,6 +71,11 @@ pub enum Resource { /// The file path to the resource. file: PathBuf, }, + /// A resource that init by user custom + Raw { + /// raw data + raw: String, + }, } impl fmt::Display for Resource { @@ -78,6 +83,7 @@ impl fmt::Display for Resource { match self { Resource::Bundled { bundled } => write!(f, "Bundled({bundled})"), Resource::FileSystem { file } => write!(f, "FileSystem({})", file.display()), + Resource::Raw { raw } => write!(f, "Raw({})", raw), } } } @@ -93,6 +99,11 @@ impl Resource { Resource::FileSystem { file } } + /// Creates a reference to the resource resident in the memory. + pub fn raw(raw: String) -> Resource { + Resource::Raw { raw } + } + /// Creates the CKB config file resource from the file system. /// /// It searches the file name `CKB_CONFIG_FILE_NAME` in the directory `root_dir`. @@ -156,6 +167,7 @@ impl Resource { SourceFiles::new(&BUNDLED_CELL, &BUNDLED).is_available(bundled) } Resource::FileSystem { file } => file.exists(), + Resource::Raw { .. } => true, } } @@ -185,6 +197,7 @@ impl Resource { match self { Resource::Bundled { bundled } => SourceFiles::new(&BUNDLED_CELL, &BUNDLED).get(bundled), Resource::FileSystem { file } => Ok(Cow::Owned(fs::read(file)?)), + Resource::Raw { raw } => Ok(Cow::Owned(raw.to_owned().into_bytes())), } } @@ -195,6 +208,9 @@ impl Resource { SourceFiles::new(&BUNDLED_CELL, &BUNDLED).read(bundled) } Resource::FileSystem { file } => Ok(Box::new(BufReader::new(fs::File::open(file)?))), + Resource::Raw { raw } => Ok(Box::new(BufReader::new(Cursor::new( + raw.to_owned().into_bytes(), + )))), } } diff --git a/script/Cargo.toml b/script/Cargo.toml index d76f360864..ee2101c990 100644 --- a/script/Cargo.toml +++ b/script/Cargo.toml @@ -28,6 +28,9 @@ ckb-logger = { path = "../util/logger", version = "= 0.120.0-pre", optional = tr serde = { version = "1.0", features = ["derive"] } ckb-error = { path = "../error", version = "= 0.120.0-pre" } ckb-chain-spec = { path = "../spec", version = "= 0.120.0-pre" } +tokio = { version = "1.35.0", features = [] } + +[target.'cfg(not(target_family = "wasm"))'.dependencies] tokio = { version = "1.35.0", features = ["rt-multi-thread"] } [dev-dependencies] diff --git a/script/src/verify.rs b/script/src/verify.rs index d20afd5ceb..6551ffa7f3 100644 --- a/script/src/verify.rs +++ b/script/src/verify.rs @@ -3,6 +3,8 @@ use crate::scheduler::Scheduler; use crate::syscalls::Pause; use crate::syscalls::{InheritedFd, ProcessID, EXEC_LOAD_ELF_V2_CYCLES_BASE}; use crate::types::{DataPieceId, FullSuspendedState, Message, RunMode, TxData, VmId, FIRST_VM_ID}; +#[cfg(not(target_family = "wasm"))] +use crate::ChunkCommand; use crate::{ error::{ScriptError, TransactionScriptError}, syscalls::{ @@ -16,7 +18,6 @@ use crate::{ TransactionSnapshot, TransactionState, VerifyResult, }, verify_env::TxVerifyEnv, - ChunkCommand, }; use ckb_chain_spec::consensus::{Consensus, TYPE_ID_CODE_HASH}; use ckb_error::Error; @@ -32,6 +33,7 @@ use ckb_types::{ packed::{Byte32, CellOutput, OutPoint, Script}, prelude::*, }; +#[cfg(not(target_family = "wasm"))] use ckb_vm::machine::Pause as VMPause; use ckb_vm::{snapshot2::Snapshot2Context, Error as VMInternalError, Syscalls}; use std::sync::{Arc, Mutex}; @@ -39,6 +41,7 @@ use std::{ collections::{BTreeMap, HashMap}, sync::RwLock, }; +#[cfg(not(target_family = "wasm"))] use tokio::sync::{ oneshot, watch::{self, Receiver}, @@ -689,6 +692,7 @@ where /// Performing a resumable verification on the transaction scripts with signal channel, /// if `Suspend` comes from `command_rx`, the process will be hang up until `Resume` comes, /// otherwise, it will return until the verification is completed. + #[cfg(not(target_family = "wasm"))] pub async fn resumable_verify_with_signal( &self, limit_cycles: Cycle, @@ -1072,6 +1076,7 @@ where } } + #[cfg(not(target_family = "wasm"))] async fn verify_group_with_signal( &self, group: &ScriptGroup, @@ -1163,6 +1168,7 @@ where } } + #[cfg(not(target_family = "wasm"))] async fn chunk_run_with_signal( &self, script_group: &ScriptGroup, diff --git a/spec/Cargo.toml b/spec/Cargo.toml index aa570cea62..26ded3450b 100644 --- a/spec/Cargo.toml +++ b/spec/Cargo.toml @@ -25,6 +25,8 @@ ckb-error = { path = "../error", version = "= 0.120.0-pre" } ckb-traits = { path = "../traits", version = "= 0.120.0-pre" } ckb-logger = { path = "../util/logger", version = "= 0.120.0-pre" } +[target.'cfg(not(target_family = "wasm"))'.dependencies] +cacache = { version = "13.0.0", default-features = false, features = ["mmap"] } [dev-dependencies] tempfile.workspace = true diff --git a/spec/src/versionbits/mod.rs b/spec/src/versionbits/mod.rs index 5c62a9834a..12960928c4 100644 --- a/spec/src/versionbits/mod.rs +++ b/spec/src/versionbits/mod.rs @@ -1,9 +1,11 @@ //! Versionbits defines a finite-state-machine to deploy a softfork in multiple stages. //! +#![allow(dead_code)] mod convert; use crate::consensus::Consensus; +#[cfg(not(target_family = "wasm"))] use ckb_logger::error; use ckb_types::global::DATA_DIR; use ckb_types::{ @@ -149,6 +151,7 @@ pub struct Cache { impl Cache { /// Reads the entire contents of a cache file synchronously into a bytes vector, /// looking the data up by key. + #[cfg(not(target_family = "wasm"))] pub fn get(&self, key: &Byte32) -> Option { match cacache::read_sync(&self.path, Self::encode_key(key)) { Ok(bytes) => Some(Self::decode_value(bytes)), @@ -160,7 +163,15 @@ impl Cache { } } + /// Soft Versionbit only work on tx-pool/block_assembler, it will not work on wasm, + /// so it can unimplemented + #[cfg(target_family = "wasm")] + pub fn get(&self, _key: &Byte32) -> Option { + unimplemented!() + } + /// Writes data to the cache synchronously + #[cfg(not(target_family = "wasm"))] pub fn insert(&self, key: &Byte32, value: ThresholdState) { if let Err(e) = cacache::write_sync(&self.path, Self::encode_key(key), Self::encode_value(value)) @@ -169,6 +180,13 @@ impl Cache { } } + /// Soft Versionbit only work on tx-pool/block_assembler, it will not work on wasm, + /// so it can unimplemented + #[cfg(target_family = "wasm")] + pub fn insert(&self, _key: &Byte32, _value: ThresholdState) { + unimplemented!() + } + fn decode_value(value: Vec) -> ThresholdState { ThresholdState::from_u8(value[0]) } diff --git a/tx-pool/src/block_assembler/mod.rs b/tx-pool/src/block_assembler/mod.rs index dc6e0a1e93..78949e6ba5 100644 --- a/tx-pool/src/block_assembler/mod.rs +++ b/tx-pool/src/block_assembler/mod.rs @@ -463,7 +463,7 @@ impl BlockAssembler { config: &BlockAssemblerConfig, snapshot: &Snapshot, ) -> CellbaseWitness { - let hash_type: ScriptHashType = config.hash_type.clone().into(); + let hash_type: ScriptHashType = config.hash_type.into(); let cellbase_lock = Script::new_builder() .args(config.args.as_bytes().pack()) .code_hash(config.code_hash.pack()) diff --git a/util/app-config/src/configs/network.rs b/util/app-config/src/configs/network.rs index b30ff1689f..e047fe9635 100644 --- a/util/app-config/src/configs/network.rs +++ b/util/app-config/src/configs/network.rs @@ -88,6 +88,40 @@ pub struct Config { pub sync: SyncConfig, /// Tentacle inner channel_size. pub channel_size: Option, + + #[cfg(target_family = "wasm")] + #[serde(skip)] + pub secret_key: Option<[u8; 32]>, + + #[cfg(target_family = "wasm")] + #[serde(skip)] + pub peer_store_ban_list_fns: std::sync::Arc, + #[cfg(target_family = "wasm")] + #[serde(skip)] + pub peer_store_fns: std::sync::Arc, +} + +#[cfg(target_family = "wasm")] +pub struct PeerStoreWasm { + pub dump_fn: Box, + pub load_fn: Box Vec + Send + Sync>, +} + +#[cfg(target_family = "wasm")] +impl std::fmt::Debug for PeerStoreWasm { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "wasm peer store functions") + } +} + +#[cfg(target_family = "wasm")] +impl Default for PeerStoreWasm { + fn default() -> Self { + PeerStoreWasm { + dump_fn: Box::new(|_| {}), + load_fn: Box::new(|| Vec::new()), + } + } } /// Chain synchronization config options. @@ -297,6 +331,7 @@ impl Config { } /// Reads the private key from file or generates one if the file does not exist. + #[cfg(not(target_family = "wasm"))] pub fn fetch_private_key(&self) -> Result { match self.read_secret_key()? { Some(key) => Ok(key), @@ -307,6 +342,21 @@ impl Config { } } + #[cfg(target_family = "wasm")] + pub fn fetch_private_key(&self) -> Result { + self.secret_key + .clone() + .ok_or(Error::new( + ErrorKind::InvalidData, + "invalid secret key data", + )) + .or_else(|_| Ok(generate_random_key())) + .map(|secret| { + secio::SecioKeyPair::secp256k1_raw_key(&secret) + .expect("network secret key is invalid") + }) + } + /// Gets the list of whitelist peers. pub fn whitelist_peers(&self) -> Vec { self.whitelist_peers.clone() diff --git a/util/crypto/Cargo.toml b/util/crypto/Cargo.toml index 886b2ea74a..6ace2f4c63 100644 --- a/util/crypto/Cargo.toml +++ b/util/crypto/Cargo.toml @@ -10,7 +10,7 @@ repository = "https://github.com/nervosnetwork/ckb" [dependencies] ckb-fixed-hash = { path = "../fixed-hash", version = "= 0.120.0-pre" } -secp256k1 = { version = "0.29", features = ["recovery"], optional = true } +secp256k1 = { version = "0.30", features = ["recovery"], optional = true } thiserror = "1.0.22" rand = { version = "0.8", features = ["small_rng"] } faster-hex = "0.6" diff --git a/util/crypto/src/secp/signature.rs b/util/crypto/src/secp/signature.rs index a5a0e45dd9..1e7e64b265 100644 --- a/util/crypto/src/secp/signature.rs +++ b/util/crypto/src/secp/signature.rs @@ -36,7 +36,7 @@ impl Signature { pub fn from_compact(rec_id: RecoveryId, ret: [u8; 64]) -> Self { let mut data = [0; 65]; data[0..64].copy_from_slice(&ret[0..64]); - data[64] = rec_id.to_i32() as u8; + data[64] = Into::::into(rec_id) as u8; Signature(data) } @@ -79,7 +79,7 @@ impl Signature { /// Converts compact signature to a recoverable signature pub fn to_recoverable(&self) -> Result { - let recovery_id = RecoveryId::from_i32(i32::from(self.0[64]))?; + let recovery_id = RecoveryId::try_from(i32::from(self.0[64]))?; Ok(RecoverableSignature::from_compact( &self.0[0..64], recovery_id, diff --git a/util/fixed-hash/core/Cargo.toml b/util/fixed-hash/core/Cargo.toml index de6cbe7a40..f1c7ad705d 100644 --- a/util/fixed-hash/core/Cargo.toml +++ b/util/fixed-hash/core/Cargo.toml @@ -15,6 +15,5 @@ faster-hex = "0.6" schemars = { version = "0.8.19", package = "ckb_schemars" } - [dev-dependencies] serde_json = "1.0" diff --git a/util/hash/Cargo.toml b/util/hash/Cargo.toml index ee552213b5..c4fac103b7 100644 --- a/util/hash/Cargo.toml +++ b/util/hash/Cargo.toml @@ -10,12 +10,14 @@ repository = "https://github.com/nervosnetwork/ckb" [features] default = ["blake2b-ref", "blake2b-rs"] -ckb-contract = ["blake2b-ref"] # This feature is used for CKB contract development +ckb-contract = [ + "blake2b-ref", +] # This feature is used for CKB contract development -[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +[target.'cfg(not(target_family = "wasm"))'.dependencies] blake2b-rs = { version = "0.2", optional = true } -[target.'cfg(target_arch = "wasm32")'.dependencies] +[target.'cfg(target_family = "wasm")'.dependencies] blake2b-ref = { version = "0.3", optional = true } [dependencies] diff --git a/util/hash/src/lib.rs b/util/hash/src/lib.rs index b988f041f5..530794e0c5 100644 --- a/util/hash/src/lib.rs +++ b/util/hash/src/lib.rs @@ -12,10 +12,10 @@ #[cfg(feature = "ckb-contract")] pub use blake2b_ref::{Blake2b, Blake2bBuilder}; -#[cfg(all(not(feature = "ckb-contract"), target_arch = "wasm32"))] +#[cfg(all(not(feature = "ckb-contract"), target_family = "wasm"))] pub use blake2b_ref::{Blake2b, Blake2bBuilder}; -#[cfg(all(not(feature = "ckb-contract"), not(target_arch = "wasm32")))] +#[cfg(all(not(feature = "ckb-contract"), not(target_family = "wasm")))] pub use blake2b_rs::{Blake2b, Blake2bBuilder}; #[doc(hidden)] diff --git a/util/jsonrpc-types/src/blockchain.rs b/util/jsonrpc-types/src/blockchain.rs index e5812d1d9e..abdf5ce939 100644 --- a/util/jsonrpc-types/src/blockchain.rs +++ b/util/jsonrpc-types/src/blockchain.rs @@ -25,7 +25,7 @@ use std::fmt; /// when the low 1 bit is 0, it indicates the data, /// and then it relies on the high 7 bits to indicate /// that the data actually corresponds to the version. -#[derive(Default, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Debug, JsonSchema)] +#[derive(Default, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash, Debug, JsonSchema)] #[serde(rename_all = "snake_case")] pub enum ScriptHashType { /// Type "data" matches script code via cell data hash, and run the script code in v0 CKB VM. diff --git a/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_cells.rs b/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_cells.rs index ae17b25c56..524abbebd1 100644 --- a/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_cells.rs +++ b/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_cells.rs @@ -190,7 +190,7 @@ impl AsyncRichIndexerHandle { if let Some(script) = filter.script.as_ref() { query = query .bind(script.code_hash.as_bytes()) - .bind(script.hash_type.clone() as i16); + .bind(script.hash_type as i16); // Default prefix search query = query .bind(script.args.as_bytes()) diff --git a/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_cells_capacity.rs b/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_cells_capacity.rs index 9b72ffff7c..808336c074 100644 --- a/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_cells_capacity.rs +++ b/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_cells_capacity.rs @@ -144,7 +144,7 @@ impl AsyncRichIndexerHandle { if let Some(script) = filter.script.as_ref() { query = query .bind(script.code_hash.as_bytes()) - .bind(script.hash_type.clone() as i16); + .bind(script.hash_type as i16); // Default prefix search query = query .bind(script.args.as_bytes()) diff --git a/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_transactions.rs b/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_transactions.rs index 7b9b425455..342c1c1ef8 100644 --- a/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_transactions.rs +++ b/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_transactions.rs @@ -193,7 +193,7 @@ pub async fn get_tx_with_cell( for _ in 0..2 { query = query .bind(search_key.script.code_hash.as_bytes()) - .bind(search_key.script.hash_type.clone() as i16); + .bind(search_key.script.hash_type as i16); match &search_key.script_search_mode { Some(IndexerSearchMode::Prefix) | None => { query = query @@ -217,7 +217,7 @@ pub async fn get_tx_with_cell( if let Some(script) = filter.script.as_ref() { query = query .bind(script.code_hash.as_bytes()) - .bind(script.hash_type.clone() as i16); + .bind(script.hash_type as i16); // Default prefix search query = query .bind(script.args.as_bytes()) @@ -343,7 +343,7 @@ pub async fn get_tx_with_cells( for _ in 0..2 { query = query .bind(search_key.script.code_hash.as_bytes()) - .bind(search_key.script.hash_type.clone() as i16); + .bind(search_key.script.hash_type as i16); match &search_key.script_search_mode { Some(IndexerSearchMode::Prefix) | None => { query = query @@ -367,7 +367,7 @@ pub async fn get_tx_with_cells( if let Some(script) = filter.script.as_ref() { query = query .bind(script.code_hash.as_bytes()) - .bind(script.hash_type.clone() as i16); + .bind(script.hash_type as i16); // Default prefix search query = query .bind(script.args.as_bytes()) diff --git a/util/runtime/Cargo.toml b/util/runtime/Cargo.toml index 530a7fb29f..ab5e24b90c 100644 --- a/util/runtime/Cargo.toml +++ b/util/runtime/Cargo.toml @@ -9,6 +9,13 @@ homepage = "https://github.com/nervosnetwork/ckb" repository = "https://github.com/nervosnetwork/ckb" [dependencies] -tokio = { version = "1", features = ["full"] } +tokio = { version = "1", features = ["rt", "sync"] } ckb-logger = { path = "../logger", version = "= 0.120.0-pre" } -ckb-spawn = { path = "../spawn", version = "= 0.120.0-pre" } +ckb-spawn = { path = "../spawn", version = "= 0.120.0-pre" } + +[target.'cfg(not(target_family = "wasm"))'.dependencies] +tokio = { version = "1", features = ["rt-multi-thread"] } + + +[target.'cfg(target_family = "wasm")'.dependencies] +wasm-bindgen-futures = "0.4" diff --git a/util/runtime/src/brower.rs b/util/runtime/src/brower.rs new file mode 100644 index 0000000000..18c88678b9 --- /dev/null +++ b/util/runtime/src/brower.rs @@ -0,0 +1,27 @@ +use ckb_spawn::Spawn; +use std::future::Future; +use wasm_bindgen_futures::spawn_local; + +#[derive(Debug, Clone)] +pub struct Handle {} + +impl Handle { + /// Spawns a future onto the runtime. + /// + /// This spawns the given future onto the runtime's executor + pub fn spawn(&self, future: F) + where + F: Future + 'static, + { + spawn_local(async move { future.await }) + } +} + +impl Spawn for Handle { + fn spawn_task(&self, future: F) + where + F: Future + 'static, + { + self.spawn(future); + } +} diff --git a/util/runtime/src/lib.rs b/util/runtime/src/lib.rs index 5b9809bea0..89ca13d117 100644 --- a/util/runtime/src/lib.rs +++ b/util/runtime/src/lib.rs @@ -1,159 +1,15 @@ //! Utilities for tokio runtime. -use ckb_spawn::Spawn; -use core::future::Future; -use std::sync::atomic::{AtomicU32, Ordering}; -use std::thread::available_parallelism; -use tokio::runtime::Builder; -use tokio::runtime::Handle as TokioHandle; - -use tokio::task::JoinHandle; - pub use tokio; pub use tokio::runtime::Runtime; -use tokio::sync::mpsc::{Receiver, Sender}; - -// Handle is a newtype wrap and unwrap tokio::Handle, it is workaround with Rust Orphan Rules. -// We need `Handle` impl ckb spawn trait decouple tokio dependence - -/// Handle to the runtime. -#[derive(Debug, Clone)] -pub struct Handle { - pub(crate) inner: TokioHandle, - guard: Option>, -} - -impl Handle { - /// Create a new Handle - pub fn new(inner: TokioHandle, guard: Option>) -> Self { - Self { inner, guard } - } - - /// Drop the guard - pub fn drop_guard(&mut self) { - let _ = self.guard.take(); - } -} - -impl Handle { - /// Enter the runtime context. This allows you to construct types that must - /// have an executor available on creation such as [`tokio::time::Sleep`] or [`tokio::net::TcpStream`]. - /// It will also allow you to call methods such as [`tokio::spawn`]. - pub fn enter(&self, f: F) -> R - where - F: FnOnce() -> R, - { - let _enter = self.inner.enter(); - f() - } - - /// Spawns a future onto the runtime. - /// - /// This spawns the given future onto the runtime's executor - pub fn spawn(&self, future: F) -> JoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - let tokio_task_guard = self.guard.clone(); - - self.inner.spawn(async move { - // move tokio_task_guard into the spawned future - // so that it will be dropped when the future is finished - let _guard = tokio_task_guard; - future.await - }) - } - - /// Run a future to completion on the Tokio runtime from a synchronous context. - pub fn block_on(&self, future: F) -> F::Output { - self.inner.block_on(future) - } - - /// Spawns a future onto the runtime blocking pool. - /// - /// This spawns the given future onto the runtime's blocking executor - pub fn spawn_blocking(&self, f: F) -> JoinHandle - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, - { - self.inner.spawn_blocking(f) - } - - /// Transform to inner tokio handler - pub fn into_inner(self) -> TokioHandle { - self.inner - } -} - -/// Create a new runtime with unique name. -fn new_runtime(worker_num: Option) -> Runtime { - Builder::new_multi_thread() - .enable_all() - .worker_threads(worker_num.unwrap_or_else(|| available_parallelism().unwrap().into())) - .thread_name_fn(|| { - static ATOMIC_ID: AtomicU32 = AtomicU32::new(0); - let id = ATOMIC_ID - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |n| { - // A long thread name will cut to 15 characters in debug tools. - // Such as "top", "htop", "gdb" and so on. - // It's a kernel limit. - // - // So if we want to see the whole name in debug tools, - // this number should have 6 digits at most, - // since the prefix uses 9 characters in below code. - // - // There still has a issue: - // When id wraps around, we couldn't know whether the old id - // is released or not. - // But we can ignore this, because it's almost impossible. - if n >= 999_999 { - Some(0) - } else { - Some(n + 1) - } - }) - .expect("impossible since the above closure must return Some(number)"); - format!("GlobalRt-{id}") - }) - .build() - .expect("ckb runtime initialized") -} - -/// Create new threaded_scheduler tokio Runtime, return `Runtime` -pub fn new_global_runtime(worker_num: Option) -> (Handle, Receiver<()>, Runtime) { - let runtime = new_runtime(worker_num); - let handle = runtime.handle().clone(); - let (guard, handle_stop_rx): (Sender<()>, Receiver<()>) = tokio::sync::mpsc::channel::<()>(1); - - (Handle::new(handle, Some(guard)), handle_stop_rx, runtime) -} - -/// Create new threaded_scheduler tokio Runtime, return `Handle` and background thread join handle, -/// NOTICE: This is only used in testing -pub fn new_background_runtime() -> Handle { - let runtime = new_runtime(None); - let handle = runtime.handle().clone(); - let (guard, mut handle_stop_rx): (Sender<()>, Receiver<()>) = - tokio::sync::mpsc::channel::<()>(1); - let _thread = std::thread::Builder::new() - .name("GlobalRtBuilder".to_string()) - .spawn(move || { - let ret = runtime.block_on(async move { handle_stop_rx.recv().await }); - ckb_logger::debug!("Global runtime finished {:?}", ret); - }) - .expect("tokio runtime started"); +#[cfg(not(target_family = "wasm"))] +pub use native::*; - Handle::new(handle, Some(guard)) -} +#[cfg(target_family = "wasm")] +pub use brower::*; -impl Spawn for Handle { - fn spawn_task(&self, future: F) - where - F: Future + Send + 'static, - { - self.spawn(future); - } -} +#[cfg(target_family = "wasm")] +mod brower; +#[cfg(not(target_family = "wasm"))] +mod native; diff --git a/util/runtime/src/native.rs b/util/runtime/src/native.rs new file mode 100644 index 0000000000..7c7a728f78 --- /dev/null +++ b/util/runtime/src/native.rs @@ -0,0 +1,153 @@ +use ckb_spawn::Spawn; +use core::future::Future; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::thread::available_parallelism; +use tokio::runtime::{Builder, Handle as TokioHandle, Runtime}; + +use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::task::JoinHandle; + +// Handle is a newtype wrap and unwrap tokio::Handle, it is workaround with Rust Orphan Rules. +// We need `Handle` impl ckb spawn trait decouple tokio dependence + +/// Handle to the runtime. +#[derive(Debug, Clone)] +pub struct Handle { + pub(crate) inner: TokioHandle, + guard: Option>, +} + +impl Handle { + /// Create a new Handle + pub fn new(inner: TokioHandle, guard: Option>) -> Self { + Self { inner, guard } + } + + /// Drop the guard + pub fn drop_guard(&mut self) { + let _ = self.guard.take(); + } +} + +impl Handle { + /// Enter the runtime context. This allows you to construct types that must + /// have an executor available on creation such as [`tokio::time::Sleep`] or [`tokio::net::TcpStream`]. + /// It will also allow you to call methods such as [`tokio::spawn`]. + pub fn enter(&self, f: F) -> R + where + F: FnOnce() -> R, + { + let _enter = self.inner.enter(); + f() + } + + /// Spawns a future onto the runtime. + /// + /// This spawns the given future onto the runtime's executor + pub fn spawn(&self, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let tokio_task_guard = self.guard.clone(); + + self.inner.spawn(async move { + // move tokio_task_guard into the spawned future + // so that it will be dropped when the future is finished + let _guard = tokio_task_guard; + future.await + }) + } + + /// Run a future to completion on the Tokio runtime from a synchronous context. + pub fn block_on(&self, future: F) -> F::Output { + self.inner.block_on(future) + } + + /// Spawns a future onto the runtime blocking pool. + /// + /// This spawns the given future onto the runtime's blocking executor + pub fn spawn_blocking(&self, f: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + self.inner.spawn_blocking(f) + } + + /// Transform to inner tokio handler + pub fn into_inner(self) -> TokioHandle { + self.inner + } +} + +/// Create a new runtime with unique name. +fn new_runtime(worker_num: Option) -> Runtime { + Builder::new_multi_thread() + .enable_all() + .worker_threads(worker_num.unwrap_or_else(|| available_parallelism().unwrap().into())) + .thread_name_fn(|| { + static ATOMIC_ID: AtomicU32 = AtomicU32::new(0); + let id = ATOMIC_ID + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |n| { + // A long thread name will cut to 15 characters in debug tools. + // Such as "top", "htop", "gdb" and so on. + // It's a kernel limit. + // + // So if we want to see the whole name in debug tools, + // this number should have 6 digits at most, + // since the prefix uses 9 characters in below code. + // + // There still has a issue: + // When id wraps around, we couldn't know whether the old id + // is released or not. + // But we can ignore this, because it's almost impossible. + if n >= 999_999 { + Some(0) + } else { + Some(n + 1) + } + }) + .expect("impossible since the above closure must return Some(number)"); + format!("GlobalRt-{id}") + }) + .build() + .expect("ckb runtime initialized") +} + +/// Create new threaded_scheduler tokio Runtime, return `Runtime` +pub fn new_global_runtime(worker_num: Option) -> (Handle, Receiver<()>, Runtime) { + let runtime = new_runtime(worker_num); + let handle = runtime.handle().clone(); + let (guard, handle_stop_rx): (Sender<()>, Receiver<()>) = tokio::sync::mpsc::channel::<()>(1); + + (Handle::new(handle, Some(guard)), handle_stop_rx, runtime) +} + +/// Create new threaded_scheduler tokio Runtime, return `Handle` and background thread join handle, +/// NOTICE: This is only used in testing +pub fn new_background_runtime() -> Handle { + let runtime = new_runtime(None); + let handle = runtime.handle().clone(); + + let (guard, mut handle_stop_rx): (Sender<()>, Receiver<()>) = + tokio::sync::mpsc::channel::<()>(1); + let _thread = std::thread::Builder::new() + .name("GlobalRtBuilder".to_string()) + .spawn(move || { + let ret = runtime.block_on(async move { handle_stop_rx.recv().await }); + ckb_logger::debug!("Global runtime finished {:?}", ret); + }) + .expect("tokio runtime started"); + + Handle::new(handle, Some(guard)) +} + +impl Spawn for Handle { + fn spawn_task(&self, future: F) + where + F: Future + Send + 'static, + { + self.spawn(future); + } +} diff --git a/util/spawn/src/lib.rs b/util/spawn/src/lib.rs index 83864084bb..8d5bcabf23 100644 --- a/util/spawn/src/lib.rs +++ b/util/spawn/src/lib.rs @@ -5,9 +5,18 @@ use core::future::Future; /// `Spawn` abstract async runtime, spawns a future onto the runtime +#[cfg(not(target_family = "wasm"))] pub trait Spawn { /// This spawns the given future onto the runtime's executor fn spawn_task(&self, task: F) where F: Future + Send + 'static; } + +#[cfg(target_family = "wasm")] +pub trait Spawn { + /// This spawns the given future onto the runtime's executor + fn spawn_task(&self, task: F) + where + F: Future + 'static; +} diff --git a/util/stop-handler/Cargo.toml b/util/stop-handler/Cargo.toml index cf3e069b01..a7653c1104 100644 --- a/util/stop-handler/Cargo.toml +++ b/util/stop-handler/Cargo.toml @@ -10,7 +10,7 @@ repository = "https://github.com/nervosnetwork/ckb" [dependencies] ckb-logger = { path = "../logger", version = "= 0.120.0-pre" } -tokio = { version = "1", features = ["sync", "rt-multi-thread"] } +tokio = { version = "1", features = ["sync"] } ckb-channel = { path = "../channel", version = "= 0.120.0-pre" } ckb-util = { path = "..", version = "= 0.120.0-pre" } ckb-async-runtime = { path = "../runtime", version = "= 0.120.0-pre" } diff --git a/util/systemtime/Cargo.toml b/util/systemtime/Cargo.toml index 019c38cea4..b6980c5313 100644 --- a/util/systemtime/Cargo.toml +++ b/util/systemtime/Cargo.toml @@ -10,6 +10,9 @@ repository = "https://github.com/nervosnetwork/ckb" [dependencies] +[target.'cfg(all(target_family = "wasm", target_os = "unknown"))'.dependencies] +web-time = "1.1.0" + [dev-dependencies] [features] diff --git a/util/systemtime/src/lib.rs b/util/systemtime/src/lib.rs index aed840fa23..f33082aa39 100644 --- a/util/systemtime/src/lib.rs +++ b/util/systemtime/src/lib.rs @@ -4,7 +4,10 @@ mod test_realtime; #[cfg(feature = "enable_faketime")] use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use std::time::Duration; +#[cfg(not(target_family = "wasm"))] +pub use std::time::{Duration, Instant, SystemTime}; +#[cfg(all(target_family = "wasm", target_os = "unknown"))] +pub use web_time::{Duration, Instant, SystemTime}; // Store faketime timestamp here #[cfg(feature = "enable_faketime")] @@ -16,8 +19,8 @@ static FAKETIME_ENABLED: AtomicBool = AtomicBool::new(false); // Get real system's timestamp in millis fn system_time_as_millis() -> u64 { - let duration = std::time::SystemTime::now() - .duration_since(std::time::SystemTime::UNIX_EPOCH) + let duration = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) .expect("SystemTime before UNIX EPOCH!"); duration.as_secs() * 1000 + u64::from(duration.subsec_millis()) } diff --git a/util/types/src/core/cell.rs b/util/types/src/core/cell.rs index c7be1c4d64..f1d8a02717 100644 --- a/util/types/src/core/cell.rs +++ b/util/types/src/core/cell.rs @@ -165,7 +165,7 @@ impl CellMeta { } /// TODO(doc): @quake -#[derive(PartialEq, Debug, Eq)] +#[derive(PartialEq, Debug, Eq, Clone)] pub enum CellStatus { /// Cell exists and has not been spent. Live(CellMeta), diff --git a/verification/Cargo.toml b/verification/Cargo.toml index e0ac8e293c..904b27e1f8 100644 --- a/verification/Cargo.toml +++ b/verification/Cargo.toml @@ -23,7 +23,8 @@ derive_more = { version = "1", default-features = false, features = [ "display", ] } ckb-verification-traits = { path = "./traits", version = "= 0.120.0-pre" } -tokio = { version = "1", features = ["sync", "process"] } +tokio = { version = "1", features = ["sync", "macros"] } + [dev-dependencies] ckb-test-chain-utils = { path = "../util/test-chain-utils", version = "= 0.120.0-pre" } diff --git a/verification/src/transaction_verifier.rs b/verification/src/transaction_verifier.rs index b6317cebaa..62be30fcfb 100644 --- a/verification/src/transaction_verifier.rs +++ b/verification/src/transaction_verifier.rs @@ -5,7 +5,9 @@ use ckb_chain_spec::consensus::Consensus; use ckb_dao::DaoCalculator; use ckb_dao_utils::DaoError; use ckb_error::Error; -use ckb_script::{ChunkCommand, TransactionScriptsVerifier, TransactionSnapshot}; +#[cfg(not(target_family = "wasm"))] +use ckb_script::ChunkCommand; +use ckb_script::{TransactionScriptsVerifier, TransactionSnapshot}; use ckb_traits::{ CellDataProvider, EpochProvider, ExtensionProvider, HeaderFieldsProvider, HeaderProvider, }; @@ -175,6 +177,7 @@ where /// Perform context-dependent verification with command /// The verification will be interrupted when receiving a Suspend command + #[cfg(not(target_family = "wasm"))] pub async fn verify_with_pause( &self, max_cycles: Cycle,