From 0f530f2ae63342b136ad65e1c7d3b3231b939a6b Mon Sep 17 00:00:00 2001 From: bernard-wagner Date: Mon, 4 Sep 2023 17:54:51 +0200 Subject: [PATCH] feat(cast): support websockets (#5571) * feat(cast): support websockets * add tests and rework ipc path --- Cargo.lock | 2 + crates/anvil/src/lib.rs | 26 +-- crates/cast/bin/cmd/call.rs | 4 +- crates/cast/tests/cli/main.rs | 12 +- crates/common/Cargo.toml | 4 + crates/common/src/lib.rs | 1 + crates/common/src/provider.rs | 90 ++++---- crates/common/src/runtime_client.rs | 258 +++++++++++++++++++++++ crates/evm/src/executor/fork/multi.rs | 6 +- crates/forge/bin/cmd/script/providers.rs | 6 +- crates/forge/tests/it/fork.rs | 9 + crates/utils/src/rpc.rs | 23 ++ 12 files changed, 373 insertions(+), 68 deletions(-) create mode 100644 crates/common/src/runtime_client.rs diff --git a/Cargo.lock b/Cargo.lock index 1c89d1084b90..bc598c5f6e04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2448,6 +2448,7 @@ dependencies = [ name = "foundry-common" version = "0.2.0" dependencies = [ + "async-trait", "auto_impl", "clap", "comfy-table", @@ -2472,6 +2473,7 @@ dependencies = [ "thiserror", "tokio", "tracing", + "url", "walkdir", "yansi 0.5.1", ] diff --git a/crates/anvil/src/lib.rs b/crates/anvil/src/lib.rs index 17005bb598e0..134de3db3d0d 100644 --- a/crates/anvil/src/lib.rs +++ b/crates/anvil/src/lib.rs @@ -17,10 +17,10 @@ use eth::backend::fork::ClientFork; use ethers::{ core::k256::ecdsa::SigningKey, prelude::Wallet, - providers::{Http, Provider, Ws}, signers::Signer, types::{Address, U256}, }; +use foundry_common::{ProviderBuilder, RetryProvider}; use foundry_evm::revm; use futures::{FutureExt, TryFutureExt}; use parking_lot::Mutex; @@ -267,27 +267,23 @@ impl NodeHandle { } /// Returns a Provider for the http endpoint - pub fn http_provider(&self) -> Provider { - Provider::::try_from(self.http_endpoint()) - .unwrap() + pub fn http_provider(&self) -> RetryProvider { + ProviderBuilder::new(self.http_endpoint()) + .build() + .expect("Failed to connect using http provider") .interval(Duration::from_millis(500)) } /// Connects to the websocket Provider of the node - pub async fn ws_provider(&self) -> Provider { - Provider::new( - Ws::connect(self.ws_endpoint()).await.expect("Failed to connect to node's websocket"), - ) + pub async fn ws_provider(&self) -> RetryProvider { + ProviderBuilder::new(self.ws_endpoint()) + .build() + .expect("Failed to connect to node's websocket") } /// Connects to the ipc endpoint of the node, if spawned - pub async fn ipc_provider(&self) -> Option> { - let ipc_path = self.config.get_ipc_path()?; - tracing::trace!(target: "ipc", ?ipc_path, "connecting ipc provider"); - let provider = Provider::connect_ipc(&ipc_path).await.unwrap_or_else(|err| { - panic!("Failed to connect to node's ipc endpoint {ipc_path}: {err:?}") - }); - Some(provider) + pub async fn ipc_provider(&self) -> Option { + ProviderBuilder::new(self.config.get_ipc_path()?).build().ok() } /// Signer accounts that can sign messages/transactions from the EVM node diff --git a/crates/cast/bin/cmd/call.rs b/crates/cast/bin/cmd/call.rs index 3659bcf2c098..ec8919659013 100644 --- a/crates/cast/bin/cmd/call.rs +++ b/crates/cast/bin/cmd/call.rs @@ -9,12 +9,12 @@ use foundry_cli::{ opts::{EthereumOpts, TransactionOpts}, utils::{self, handle_traces, parse_ether_value, TraceResult}, }; +use foundry_common::runtime_client::RuntimeClient; use foundry_config::{find_project_root_path, Config}; use foundry_evm::{executor::opts::EvmOpts, trace::TracingExecutor}; use std::str::FromStr; -type Provider = - ethers::providers::Provider>; +type Provider = ethers::providers::Provider; /// CLI arguments for `cast call`. #[derive(Debug, Parser)] diff --git a/crates/cast/tests/cli/main.rs b/crates/cast/tests/cli/main.rs index 9eff595b6969..cbc999079262 100644 --- a/crates/cast/tests/cli/main.rs +++ b/crates/cast/tests/cli/main.rs @@ -4,7 +4,7 @@ use foundry_test_utils::{ casttest, util::{OutputExt, TestCommand, TestProject}, }; -use foundry_utils::rpc::next_http_rpc_endpoint; +use foundry_utils::rpc::{next_http_rpc_endpoint, next_ws_rpc_endpoint}; use std::{io::Write, path::Path}; // tests `--help` is printed to std out @@ -243,6 +243,16 @@ casttest!(cast_rpc_no_args, |_: TestProject, mut cmd: TestCommand| { assert_eq!(output.trim_end(), r#""0x1""#); }); +// test for cast_rpc without arguments using websocket +casttest!(cast_ws_rpc_no_args, |_: TestProject, mut cmd: TestCommand| { + let eth_rpc_url = next_ws_rpc_endpoint(); + + // Call `cast rpc eth_chainId` + cmd.args(["rpc", "--rpc-url", eth_rpc_url.as_str(), "eth_chainId"]); + let output = cmd.stdout_lossy(); + assert_eq!(output.trim_end(), r#""0x1""#); +}); + // test for cast_rpc with arguments casttest!(cast_rpc_with_args, |_: TestProject, mut cmd: TestCommand| { let eth_rpc_url = next_http_rpc_endpoint(); diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index e945b45ce094..daf66d33b3af 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -33,6 +33,7 @@ tempfile = "3" # misc auto_impl = "1.1.0" +async-trait = "0.1" serde = "1" serde_json = "1" thiserror = "1" @@ -43,8 +44,11 @@ once_cell = "1" dunce = "1" regex = "1" globset = "0.4" +tokio = "1" +url = "2" # Using const-hex instead of hex for speed hex.workspace = true + [dev-dependencies] tokio = { version = "1", features = ["rt-multi-thread", "macros"] } diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index cb045a25bb27..bd7162e7c8dd 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -14,6 +14,7 @@ pub mod fmt; pub mod fs; pub mod glob; pub mod provider; +pub mod runtime_client; pub mod selectors; pub mod shell; pub mod term; diff --git a/crates/common/src/provider.rs b/crates/common/src/provider.rs index 1efd29bee63f..0af44f320ef1 100644 --- a/crates/common/src/provider.rs +++ b/crates/common/src/provider.rs @@ -1,18 +1,16 @@ //! Commonly used helpers to construct `Provider`s -use crate::{ALCHEMY_FREE_TIER_CUPS, REQUEST_TIMEOUT}; +use crate::{runtime_client::RuntimeClient, ALCHEMY_FREE_TIER_CUPS, REQUEST_TIMEOUT}; use ethers_core::types::{Chain, U256}; use ethers_middleware::gas_oracle::{GasCategory, GasOracle, Polygon}; -use ethers_providers::{ - is_local_endpoint, Authorization, Http, HttpRateLimitRetryPolicy, JwtAuth, JwtKey, Middleware, - Provider, RetryClient, RetryClientBuilder, DEFAULT_LOCAL_POLL_INTERVAL, -}; +use ethers_providers::{is_local_endpoint, Middleware, Provider, DEFAULT_LOCAL_POLL_INTERVAL}; use eyre::WrapErr; -use reqwest::{header::HeaderValue, IntoUrl, Url}; -use std::{borrow::Cow, time::Duration}; +use reqwest::{IntoUrl, Url}; +use std::{borrow::Cow, env, path::Path, time::Duration}; +use url::ParseError; /// Helper type alias for a retry provider -pub type RetryProvider = Provider>; +pub type RetryProvider = Provider; /// Helper type alias for a rpc url pub type RpcUrl = String; @@ -68,9 +66,38 @@ impl ProviderBuilder { // prefix return Self::new(format!("http://{url_str}")) } - let err = format!("Invalid provider url: {url_str}"); + + let url = Url::parse(url_str) + .or_else(|err| { + match err { + ParseError::RelativeUrlWithoutBase => { + let path = Path::new(url_str); + let absolute_path = if path.is_absolute() { + path.to_path_buf() + } else { + // Assume the path is relative to the current directory. + // Don't use `std::fs::canonicalize` as it requires the path to exist. + // It should be possible to construct a provider and only + // attempt to establish a connection later + let current_dir = + env::current_dir().expect("Current directory should exist"); + current_dir.join(path) + }; + + let path_str = + absolute_path.to_str().expect("Path should be a valid string"); + + // invalid url: non-prefixed URL scheme is not allowed, so we assume the URL + // is for a local file + Url::parse(format!("file://{path_str}").as_str()) + } + _ => Err(err), + } + }) + .wrap_err(format!("Invalid provider url: {url_str}")); + Self { - url: url.into_url().wrap_err(err), + url, chain: Chain::Mainnet, max_retry: 100, timeout_retry: 5, @@ -176,43 +203,18 @@ impl ProviderBuilder { } = self; let url = url?; - let mut client_builder = reqwest::Client::builder().timeout(timeout); - - // Set the JWT auth as a header if present - if let Some(jwt) = jwt { - // Decode jwt from hex, then generate claims (iat with current timestamp) - let jwt = hex::decode(jwt)?; - let secret = - JwtKey::from_slice(&jwt).map_err(|err| eyre::eyre!("Invalid JWT: {}", err))?; - let auth = JwtAuth::new(secret, None, None); - let token = auth.generate_token()?; - - // Essentially unrolled ethers-rs new_with_auth to accomodate the custom timeout - let auth = Authorization::Bearer(token); - let mut auth_value = HeaderValue::from_str(&auth.to_string())?; - auth_value.set_sensitive(true); - - let mut headers = reqwest::header::HeaderMap::new(); - headers.insert(reqwest::header::AUTHORIZATION, auth_value); - - client_builder = client_builder.default_headers(headers); - } + let mut provider = Provider::new(RuntimeClient::new( + url.clone(), + max_retry, + timeout_retry, + initial_backoff, + timeout, + compute_units_per_second, + jwt, + )); - let client = client_builder.build()?; let is_local = is_local_endpoint(url.as_str()); - let provider = Http::new_with_client(url, client); - - #[allow(clippy::box_default)] - let mut provider = Provider::new( - RetryClientBuilder::default() - .initial_backoff(Duration::from_millis(initial_backoff)) - .rate_limit_retries(max_retry) - .timeout_retries(timeout_retry) - .compute_units_per_second(compute_units_per_second) - .build(provider, Box::new(HttpRateLimitRetryPolicy)), - ); - if is_local { provider = provider.interval(DEFAULT_LOCAL_POLL_INTERVAL); } else if let Some(blocktime) = chain.average_blocktime_hint() { diff --git a/crates/common/src/runtime_client.rs b/crates/common/src/runtime_client.rs new file mode 100644 index 000000000000..cd89736aab6b --- /dev/null +++ b/crates/common/src/runtime_client.rs @@ -0,0 +1,258 @@ +//! Wrap different providers + +use async_trait::async_trait; +use ethers_core::types::U256; +use ethers_providers::{ + Authorization, ConnectionDetails, Http, HttpRateLimitRetryPolicy, Ipc, JsonRpcClient, + JsonRpcError, JwtAuth, JwtKey, ProviderError, PubsubClient, RetryClient, RetryClientBuilder, + RpcError, Ws, +}; +use reqwest::{header::HeaderValue, Url}; +use serde::{de::DeserializeOwned, Serialize}; +use std::{fmt::Debug, sync::Arc, time::Duration}; +use thiserror::Error; +use tokio::sync::RwLock; + +/// Enum representing a the client types supported by the runtime provider +#[derive(Debug)] +enum InnerClient { + /// HTTP client + Http(RetryClient), + /// WebSocket client + Ws(Ws), + /// IPC client + Ipc(Ipc), +} + +/// Error type for the runtime provider +#[derive(Error, Debug)] +pub enum RuntimeClientError { + /// Internal provider error + #[error(transparent)] + ProviderError(ProviderError), + + /// Failed to lock the client + #[error("Failed to lock the client")] + LockError, + + /// Invalid URL scheme + #[error("URL scheme is not supported: {0}")] + BadScheme(String), + + /// Invalid file path + #[error("Invalid IPC file path: {0}")] + BadPath(String), +} + +impl RpcError for RuntimeClientError { + fn as_error_response(&self) -> Option<&JsonRpcError> { + match self { + RuntimeClientError::ProviderError(err) => err.as_error_response(), + _ => None, + } + } + + fn as_serde_error(&self) -> Option<&serde_json::Error> { + match self { + RuntimeClientError::ProviderError(e) => e.as_serde_error(), + _ => None, + } + } +} + +impl From for ProviderError { + fn from(src: RuntimeClientError) -> Self { + match src { + RuntimeClientError::ProviderError(err) => err, + _ => ProviderError::JsonRpcClientError(Box::new(src)), + } + } +} + +/// A provider that connects on first request allowing handling of different provider types at +/// runtime +#[derive(Clone, Debug, Error)] +pub struct RuntimeClient { + client: Arc>>, + url: Url, + max_retry: u32, + timeout_retry: u32, + initial_backoff: u64, + timeout: Duration, + /// available CUPS + compute_units_per_second: u64, + jwt: Option, +} + +impl ::core::fmt::Display for RuntimeClient { + fn fmt(&self, f: &mut ::core::fmt::Formatter<'_>) -> ::core::fmt::Result { + write!(f, "RuntimeClient") + } +} + +fn build_auth(jwt: String) -> eyre::Result { + // Decode jwt from hex, then generate claims (iat with current timestamp) + let jwt = hex::decode(jwt)?; + let secret = JwtKey::from_slice(&jwt).map_err(|err| eyre::eyre!("Invalid JWT: {}", err))?; + let auth = JwtAuth::new(secret, None, None); + let token = auth.generate_token()?; + + // Essentially unrolled ethers-rs new_with_auth to accomodate the custom timeout + let auth = Authorization::Bearer(token); + + Ok(auth) +} + +impl RuntimeClient { + /// Creates a new dynamic provider from a URL + pub fn new( + url: Url, + max_retry: u32, + timeout_retry: u32, + initial_backoff: u64, + timeout: Duration, + compute_units_per_second: u64, + jwt: Option, + ) -> Self { + Self { + client: Arc::new(RwLock::new(None)), + url, + max_retry, + timeout_retry, + initial_backoff, + timeout, + compute_units_per_second, + jwt, + } + } + + async fn connect(&self) -> Result { + match self.url.scheme() { + "http" | "https" => { + let mut client_builder = reqwest::Client::builder().timeout(self.timeout); + + if let Some(jwt) = self.jwt.as_ref() { + let auth = build_auth(jwt.clone()).map_err(|err| { + RuntimeClientError::ProviderError(ProviderError::CustomError( + err.to_string(), + )) + })?; + + let mut auth_value: HeaderValue = HeaderValue::from_str(&auth.to_string()) + .expect("Header should be valid string"); + auth_value.set_sensitive(true); + + let mut headers = reqwest::header::HeaderMap::new(); + headers.insert(reqwest::header::AUTHORIZATION, auth_value); + + client_builder = client_builder.default_headers(headers); + }; + + let client = client_builder + .build() + .map_err(|e| RuntimeClientError::ProviderError(e.into()))?; + + let provider = Http::new_with_client(self.url.clone(), client); + + #[allow(clippy::box_default)] + let provider = RetryClientBuilder::default() + .initial_backoff(Duration::from_millis(self.initial_backoff)) + .rate_limit_retries(self.max_retry) + .timeout_retries(self.timeout_retry) + .compute_units_per_second(self.compute_units_per_second) + .build(provider, Box::new(HttpRateLimitRetryPolicy)); + Ok(InnerClient::Http(provider)) + } + "ws" | "wss" => { + let auth: Option = + self.jwt.as_ref().and_then(|jwt| build_auth(jwt.clone()).ok()); + let connection_details = ConnectionDetails::new(self.url.as_str(), auth); + + let client = + Ws::connect_with_reconnects(connection_details, self.max_retry as usize) + .await + .map_err(|e| RuntimeClientError::ProviderError(e.into()))?; + + Ok(InnerClient::Ws(client)) + } + "file" => { + let path = self + .url + .to_file_path() + .map_err(|_| RuntimeClientError::BadPath(self.url.to_string()))?; + + let client = Ipc::connect(path) + .await + .map_err(|e| RuntimeClientError::ProviderError(e.into()))?; + + Ok(InnerClient::Ipc(client)) + } + _ => Err(RuntimeClientError::BadScheme(self.url.to_string())), + } + } +} + +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +impl JsonRpcClient for RuntimeClient { + type Error = RuntimeClientError; + + #[allow(implied_bounds_entailment)] + async fn request(&self, method: &str, params: T) -> Result + where + T: Debug + Serialize + Send + Sync, + R: DeserializeOwned + Send, + { + if self.client.read().await.is_none() { + let mut w = self.client.write().await; + *w = Some( + self.connect().await.map_err(|e| RuntimeClientError::ProviderError(e.into()))?, + ); + } + + let res = match self.client.read().await.as_ref().unwrap() { + InnerClient::Http(http) => RetryClient::request(http, method, params) + .await + .map_err(|e| RuntimeClientError::ProviderError(e.into())), + InnerClient::Ws(ws) => JsonRpcClient::request(ws, method, params) + .await + .map_err(|e| RuntimeClientError::ProviderError(e.into())), + InnerClient::Ipc(ipc) => JsonRpcClient::request(ipc, method, params) + .await + .map_err(|e| RuntimeClientError::ProviderError(e.into())), + }?; + Ok(res) + } +} + +// We can also implement [`PubsubClient`] for our dynamic provider. +impl PubsubClient for RuntimeClient { + // Since both `Ws` and `Ipc`'s `NotificationStream` associated type is the same, + // we can simply return one of them. + type NotificationStream = ::NotificationStream; + + fn subscribe>(&self, id: T) -> Result { + match self.client.try_read().map_err(|_| RuntimeClientError::LockError)?.as_ref().unwrap() { + InnerClient::Http(_) => { + Err(RuntimeClientError::ProviderError(ProviderError::UnsupportedRPC)) + } + InnerClient::Ws(client) => Ok(PubsubClient::subscribe(client, id) + .map_err(|e| RuntimeClientError::ProviderError(e.into()))?), + InnerClient::Ipc(client) => Ok(PubsubClient::subscribe(client, id) + .map_err(|e| RuntimeClientError::ProviderError(e.into()))?), + } + } + + fn unsubscribe>(&self, id: T) -> Result<(), Self::Error> { + match self.client.try_read().map_err(|_| (RuntimeClientError::LockError))?.as_ref().unwrap() + { + InnerClient::Http(_) => { + Err(RuntimeClientError::ProviderError(ProviderError::UnsupportedRPC)) + } + InnerClient::Ws(client) => Ok(PubsubClient::unsubscribe(client, id) + .map_err(|e| RuntimeClientError::ProviderError(e.into()))?), + InnerClient::Ipc(client) => Ok(PubsubClient::unsubscribe(client, id) + .map_err(|e| RuntimeClientError::ProviderError(e.into()))?), + } + } +} diff --git a/crates/evm/src/executor/fork/multi.rs b/crates/evm/src/executor/fork/multi.rs index 600078b24956..27ab759eb97a 100644 --- a/crates/evm/src/executor/fork/multi.rs +++ b/crates/evm/src/executor/fork/multi.rs @@ -9,10 +9,10 @@ use crate::{ }; use ethers::{ abi::{AbiDecode, AbiEncode, AbiError}, - providers::{Http, Provider, RetryClient}, + providers::Provider, types::{BlockId, BlockNumber}, }; -use foundry_common::ProviderBuilder; +use foundry_common::{runtime_client::RuntimeClient, ProviderBuilder}; use foundry_config::Config; use futures::{ channel::mpsc::{channel, Receiver, Sender}, @@ -168,7 +168,7 @@ impl MultiFork { } } -type Handler = BackendHandler>>>; +type Handler = BackendHandler>>; type CreateFuture = Pin> + Send>>; type CreateSender = OneshotSender>; diff --git a/crates/forge/bin/cmd/script/providers.rs b/crates/forge/bin/cmd/script/providers.rs index df05b852c1a3..93ff19a658f8 100644 --- a/crates/forge/bin/cmd/script/providers.rs +++ b/crates/forge/bin/cmd/script/providers.rs @@ -1,6 +1,6 @@ -use ethers::prelude::{Http, Middleware, Provider, RetryClient, U256}; +use ethers::prelude::{Middleware, Provider, U256}; use eyre::{Result, WrapErr}; -use foundry_common::{get_http_provider, RpcUrl}; +use foundry_common::{get_http_provider, runtime_client::RuntimeClient, RpcUrl}; use foundry_config::Chain; use std::{ collections::{hash_map::Entry, HashMap}, @@ -42,7 +42,7 @@ impl Deref for ProvidersManager { /// Holds related metadata to each provider RPC. #[derive(Debug)] pub struct ProviderInfo { - pub provider: Arc>>, + pub provider: Arc>, pub chain: u64, pub gas_price: GasPrice, pub is_legacy: bool, diff --git a/crates/forge/tests/it/fork.rs b/crates/forge/tests/it/fork.rs index 0d52239ef9b8..9d10e8a5def8 100644 --- a/crates/forge/tests/it/fork.rs +++ b/crates/forge/tests/it/fork.rs @@ -76,6 +76,15 @@ async fn test_launch_fork() { TestConfig::with_filter(runner, filter).run().await; } +/// Smoke test that forking workings with websockets +#[tokio::test(flavor = "multi_thread")] +async fn test_launch_fork_ws() { + let rpc_url = foundry_utils::rpc::next_ws_archive_rpc_endpoint(); + let runner = forked_runner(&rpc_url).await; + let filter = Filter::new(".*", ".*", &format!(".*fork{RE_PATH_SEPARATOR}Launch")); + TestConfig::with_filter(runner, filter).run().await; +} + /// Tests that we can transact transactions in forking mode #[tokio::test(flavor = "multi_thread")] async fn test_transact_fork() { diff --git a/crates/utils/src/rpc.rs b/crates/utils/src/rpc.rs index c32cbe199f12..a41f2dddaba4 100644 --- a/crates/utils/src/rpc.rs +++ b/crates/utils/src/rpc.rs @@ -70,6 +70,13 @@ pub fn next_http_rpc_endpoint() -> String { next_rpc_endpoint("mainnet") } +/// Returns the next _mainnet_ rpc endpoint in inline +/// +/// This will rotate all available rpc endpoints +pub fn next_ws_rpc_endpoint() -> String { + next_ws_endpoint("mainnet") +} + pub fn next_rpc_endpoint(network: &str) -> String { let idx = next() % num_keys(); if idx < INFURA_KEYS.len() { @@ -80,12 +87,28 @@ pub fn next_rpc_endpoint(network: &str) -> String { } } +pub fn next_ws_endpoint(network: &str) -> String { + let idx = next() % num_keys(); + if idx < INFURA_KEYS.len() { + format!("wss://{network}.infura.io/v3/{}", INFURA_KEYS[idx]) + } else { + let idx = idx - INFURA_KEYS.len(); + format!("wss://eth-{network}.alchemyapi.io/v2/{}", ALCHEMY_MAINNET_KEYS[idx]) + } +} + /// Returns endpoint that has access to archive state pub fn next_http_archive_rpc_endpoint() -> String { let idx = next() % ALCHEMY_MAINNET_KEYS.len(); format!("https://eth-mainnet.alchemyapi.io/v2/{}", ALCHEMY_MAINNET_KEYS[idx]) } +/// Returns endpoint that has access to archive state +pub fn next_ws_archive_rpc_endpoint() -> String { + let idx = next() % ALCHEMY_MAINNET_KEYS.len(); + format!("wss://eth-mainnet.alchemyapi.io/v2/{}", ALCHEMY_MAINNET_KEYS[idx]) +} + #[cfg(test)] mod tests { use super::*;