diff --git a/chain/ethereum/src/network.rs b/chain/ethereum/src/network.rs index c36a6ce7ba1..7ac2ef3d113 100644 --- a/chain/ethereum/src/network.rs +++ b/chain/ethereum/src/network.rs @@ -1,5 +1,6 @@ use anyhow::{anyhow, bail, Context}; use graph::cheap_clone::CheapClone; +use graph::firehose::SubgraphLimit; use graph::prelude::rand::{self, seq::IteratorRandom}; use std::cmp::Ordering; use std::collections::HashMap; @@ -20,7 +21,7 @@ pub struct EthereumNetworkAdapter { /// strong_count on `adapter` to determine whether the adapter is above /// that limit. That's a somewhat imprecise but convenient way to /// determine the number of connections - limit: usize, + limit: SubgraphLimit, } impl EthereumNetworkAdapter { @@ -56,7 +57,11 @@ impl EthereumNetworkAdapters { self.adapters .iter() .filter(move |adapter| Some(&adapter.capabilities) == cheapest_sufficient_capability) - .filter(|adapter| Arc::strong_count(&adapter.adapter) < adapter.limit) + .filter(|adapter| { + adapter + .limit + .has_capacity(Arc::strong_count(&adapter.adapter)) + }) .map(|adapter| adapter.adapter.cheap_clone()) } @@ -92,9 +97,12 @@ impl EthereumNetworkAdapters { &self, capabilities: Option<&NodeCapabilities>, ) -> anyhow::Result> { - match self.call_only_adapter()? { - Some(adapter) => Ok(adapter), - None => self.cheapest_with(capabilities.unwrap_or(&NodeCapabilities { + // call_only_adapter can fail if we're out of capcity, this is fine since + // we would want to fallback onto a full adapter + // so we will ignore this error and return whatever comes out of `cheapest_with` + match self.call_only_adapter() { + Ok(Some(adapter)) => Ok(adapter), + _ => self.cheapest_with(capabilities.unwrap_or(&NodeCapabilities { // Archive is required for call_only archive: true, traces: false, @@ -115,7 +123,10 @@ impl EthereumNetworkAdapters { // TODO: This will probably blow up a lot sooner than [limit] amount of // subgraphs, since we probably use a few instances. - if Arc::strong_count(&adapters.adapter) >= adapters.limit { + if !adapters + .limit + .has_capacity(Arc::strong_count(&adapters.adapter)) + { bail!("call only adapter has reached the concurrency limit"); } @@ -142,7 +153,7 @@ impl EthereumNetworks { name: String, capabilities: NodeCapabilities, adapter: Arc, - limit: usize, + limit: SubgraphLimit, ) { let network_adapters = self .networks @@ -213,7 +224,7 @@ impl EthereumNetworks { mod tests { use std::sync::Arc; - use graph::{prelude::MetricsRegistry, tokio, url::Url}; + use graph::{firehose::SubgraphLimit, prelude::MetricsRegistry, tokio, url::Url}; use graph_mock::MockMetricsRegistry; use http::HeaderMap; @@ -320,7 +331,7 @@ mod tests { traces: false, }, eth_call_adapter.clone(), - 3, + SubgraphLimit::Limit(3), ); ethereum_networks.insert( chain.clone(), @@ -329,7 +340,7 @@ mod tests { traces: false, }, eth_adapter.clone(), - 3, + SubgraphLimit::Limit(3), ); ethereum_networks.networks.get(&chain).unwrap().clone() }; @@ -360,7 +371,10 @@ mod tests { { let adapter = adapters.call_or_cheapest(None).unwrap(); assert!(adapter.is_call_only()); - assert!(adapters.call_or_cheapest(None).is_err()); + assert_eq!( + adapters.call_or_cheapest(None).unwrap().is_call_only(), + false + ); } // Check empty falls back to call only @@ -375,4 +389,181 @@ mod tests { assert_eq!(adapter.is_call_only(), false); } } + + #[tokio::test] + async fn adapter_selector_unlimited() { + let chain = "mainnet".to_string(); + let logger = graph::log::logger(true); + let mock_registry: Arc = Arc::new(MockMetricsRegistry::new()); + let transport = + Transport::new_rpc(Url::parse("http://127.0.0.1").unwrap(), HeaderMap::new()); + let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone())); + + let eth_call_adapter = Arc::new( + EthereumAdapter::new( + logger.clone(), + String::new(), + "http://127.0.0.1", + transport.clone(), + provider_metrics.clone(), + true, + true, + ) + .await, + ); + + let eth_adapter = Arc::new( + EthereumAdapter::new( + logger.clone(), + String::new(), + "http://127.0.0.1", + transport.clone(), + provider_metrics.clone(), + true, + false, + ) + .await, + ); + + let adapters = { + let mut ethereum_networks = EthereumNetworks::new(); + ethereum_networks.insert( + chain.clone(), + NodeCapabilities { + archive: true, + traces: false, + }, + eth_call_adapter.clone(), + SubgraphLimit::Unlimited, + ); + ethereum_networks.insert( + chain.clone(), + NodeCapabilities { + archive: true, + traces: false, + }, + eth_adapter.clone(), + SubgraphLimit::Limit(3), + ); + ethereum_networks.networks.get(&chain).unwrap().clone() + }; + // one reference above and one inside adapters struct + assert_eq!(Arc::strong_count(ð_call_adapter), 2); + assert_eq!(Arc::strong_count(ð_adapter), 2); + + let keep: Vec> = vec![0; 10] + .iter() + .map(|_| adapters.call_or_cheapest(None).unwrap()) + .collect(); + assert_eq!(keep.iter().any(|a| !a.is_call_only()), false); + } + + #[tokio::test] + async fn adapter_selector_disable_call_only_fallback() { + let chain = "mainnet".to_string(); + let logger = graph::log::logger(true); + let mock_registry: Arc = Arc::new(MockMetricsRegistry::new()); + let transport = + Transport::new_rpc(Url::parse("http://127.0.0.1").unwrap(), HeaderMap::new()); + let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone())); + + let eth_call_adapter = Arc::new( + EthereumAdapter::new( + logger.clone(), + String::new(), + "http://127.0.0.1", + transport.clone(), + provider_metrics.clone(), + true, + true, + ) + .await, + ); + + let eth_adapter = Arc::new( + EthereumAdapter::new( + logger.clone(), + String::new(), + "http://127.0.0.1", + transport.clone(), + provider_metrics.clone(), + true, + false, + ) + .await, + ); + + let adapters = { + let mut ethereum_networks = EthereumNetworks::new(); + ethereum_networks.insert( + chain.clone(), + NodeCapabilities { + archive: true, + traces: false, + }, + eth_call_adapter.clone(), + SubgraphLimit::Disabled, + ); + ethereum_networks.insert( + chain.clone(), + NodeCapabilities { + archive: true, + traces: false, + }, + eth_adapter.clone(), + SubgraphLimit::Limit(3), + ); + ethereum_networks.networks.get(&chain).unwrap().clone() + }; + // one reference above and one inside adapters struct + assert_eq!(Arc::strong_count(ð_call_adapter), 2); + assert_eq!(Arc::strong_count(ð_adapter), 2); + assert_eq!( + adapters.call_or_cheapest(None).unwrap().is_call_only(), + false + ); + } + + #[tokio::test] + async fn adapter_selector_no_call_only_fallback() { + let chain = "mainnet".to_string(); + let logger = graph::log::logger(true); + let mock_registry: Arc = Arc::new(MockMetricsRegistry::new()); + let transport = + Transport::new_rpc(Url::parse("http://127.0.0.1").unwrap(), HeaderMap::new()); + let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone())); + + let eth_adapter = Arc::new( + EthereumAdapter::new( + logger.clone(), + String::new(), + "http://127.0.0.1", + transport.clone(), + provider_metrics.clone(), + true, + false, + ) + .await, + ); + + let adapters = { + let mut ethereum_networks = EthereumNetworks::new(); + ethereum_networks.insert( + chain.clone(), + NodeCapabilities { + archive: true, + traces: false, + }, + eth_adapter.clone(), + SubgraphLimit::Limit(3), + ); + ethereum_networks.networks.get(&chain).unwrap().clone() + }; + // one reference above and one inside adapters struct + assert_eq!(Arc::strong_count(ð_adapter), 2); + assert_eq!( + adapters.call_or_cheapest(None).unwrap().is_call_only(), + false + ); + } } diff --git a/docs/config.md b/docs/config.md index 8020334918c..53a9299efed 100644 --- a/docs/config.md +++ b/docs/config.md @@ -151,9 +151,14 @@ approximate and can differ from the true number by a small amount (generally less than 10) The limit is set through rules that match on the node name. If a node's -name does not match any rule, the corresponding provider can be used for an -unlimited number of subgraphs. It is recommended that at least one provider -is generally unlimited. The limit is set in the following way: +name does not match any rule, the corresponding provider will be disabled +for that node. + +If the match property is omitted then the provider will be unlimited on every +node. + +It is recommended that at least one provider is generally unlimited. +The limit is set in the following way: ```toml [chains.mainnet] @@ -169,8 +174,7 @@ provider = [ Nodes named `some_node_.*` will use `mainnet-1` for at most 10 subgraphs, and `mainnet-0` for everything else, nodes named `other_node_.*` will never use `mainnet-1` and always `mainnet-0`. Any node whose name does not match -one of these patterns will use `mainnet-0` and `mainnet-1` for an unlimited -number of subgraphs. +one of these patterns will not be able to use and `mainnet-1`. ## Controlling Deployment diff --git a/graph/src/firehose/endpoints.rs b/graph/src/firehose/endpoints.rs index 2582df5c119..2222257d342 100644 --- a/graph/src/firehose/endpoints.rs +++ b/graph/src/firehose/endpoints.rs @@ -34,15 +34,26 @@ pub struct FirehoseEndpoint { pub token: Option, pub filters_enabled: bool, pub compression_enabled: bool, - pub subgraph_limit: usize, + pub subgraph_limit: SubgraphLimit, channel: Channel, } -#[derive(Clone, Debug)] +// TODO: Find a new home for this type. +#[derive(Clone, Debug, PartialEq, Ord, Eq, PartialOrd)] pub enum SubgraphLimit { - Unlimited, + Disabled, Limit(usize), - NoTraffic, + Unlimited, +} + +impl SubgraphLimit { + pub fn has_capacity(&self, current: usize) -> bool { + match self { + SubgraphLimit::Unlimited => true, + SubgraphLimit::Limit(limit) => limit > ¤t, + SubgraphLimit::Disabled => false, + } + } } impl Display for FirehoseEndpoint { @@ -93,10 +104,10 @@ impl FirehoseEndpoint { let subgraph_limit = match subgraph_limit { // See the comment on the constant - SubgraphLimit::Unlimited => SUBGRAPHS_PER_CONN, + SubgraphLimit::Unlimited => SubgraphLimit::Limit(SUBGRAPHS_PER_CONN), // This is checked when parsing from config but doesn't hurt to be defensive. - SubgraphLimit::Limit(limit) => limit.min(SUBGRAPHS_PER_CONN), - SubgraphLimit::NoTraffic => 0, + SubgraphLimit::Limit(limit) => SubgraphLimit::Limit(limit.min(SUBGRAPHS_PER_CONN)), + l => l, }; FirehoseEndpoint { @@ -109,11 +120,11 @@ impl FirehoseEndpoint { } } - // The SUBGRAPHS_PER_CONN upper bound was already limited so we leave it the same - // we need to use inclusive limits (<=) because there will always be a reference + // we need to -1 because there will always be a reference // inside FirehoseEndpoints that is not used (is always cloned). pub fn has_subgraph_capacity(self: &Arc) -> bool { - Arc::strong_count(&self) <= self.subgraph_limit + self.subgraph_limit + .has_capacity(Arc::strong_count(&self).checked_sub(1).unwrap_or(0)) } pub async fn get_block( @@ -501,7 +512,7 @@ mod test { None, false, false, - SubgraphLimit::NoTraffic, + SubgraphLimit::Disabled, ))]; let mut endpoints = FirehoseEndpoints::from(endpoint); diff --git a/node/src/chain.rs b/node/src/chain.rs index e57ba5d626b..e25ea4a1122 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -169,11 +169,6 @@ pub fn create_firehose_networks( "Configuring firehose endpoint"; "provider" => &provider.label, ); - let subgraph_limit = match firehose.limit_for(&config.node) { - Some(limit) if limit == 0 => SubgraphLimit::Unlimited, - Some(limit) => SubgraphLimit::Limit(limit), - None => SubgraphLimit::NoTraffic, - }; let parsed_networks = networks_by_kind .entry(chain.protocol) @@ -194,7 +189,7 @@ pub fn create_firehose_networks( firehose.token.clone(), firehose.filters_enabled(), firehose.compression_enabled(), - subgraph_limit.clone(), + firehose.limit_for(&config.node), )), ); } diff --git a/node/src/config.rs b/node/src/config.rs index 228d3297844..ffde7d2e946 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -1,7 +1,7 @@ use graph::{ anyhow::Error, blockchain::BlockchainKind, - firehose::SUBGRAPHS_PER_CONN, + firehose::{SubgraphLimit, SUBGRAPHS_PER_CONN}, prelude::{ anyhow::{anyhow, bail, Context, Result}, info, @@ -476,7 +476,7 @@ impl ChainSection { url: url.to_string(), features, headers: Default::default(), - rules: Vec::new(), + rules: vec![], }), }; let entry = chains.entry(name.to_string()).or_insert_with(|| Chain { @@ -573,8 +573,8 @@ pub struct FirehoseProvider { } impl FirehoseProvider { - pub fn limit_for(&self, node: &NodeId) -> Option { - self.rules.iter().find_map(|r| r.limit_for(node)) + pub fn limit_for(&self, node: &NodeId) -> SubgraphLimit { + self.rules.limit_for(node) } pub fn filters_enabled(&self) -> bool { self.features.contains(FIREHOSE_FILTER_FEATURE) @@ -584,6 +584,19 @@ impl FirehoseProvider { } } +pub trait Web3Rules { + fn limit_for(&self, node: &NodeId) -> SubgraphLimit; +} + +impl Web3Rules for Vec { + fn limit_for(&self, node: &NodeId) -> SubgraphLimit { + self.iter() + .map(|rule| rule.limit_for(node)) + .max() + .unwrap_or(SubgraphLimit::Unlimited) + } +} + #[derive(Clone, Debug, Deserialize, Serialize)] struct Web3Rule { #[serde(with = "serde_regex")] @@ -598,10 +611,16 @@ impl PartialEq for Web3Rule { } impl Web3Rule { - fn limit_for(&self, node: &NodeId) -> Option { + fn limit_for(&self, node: &NodeId) -> SubgraphLimit { match self.name.find(node.as_str()) { - Some(m) if m.as_str() == node.as_str() => Some(self.limit), - _ => None, + Some(m) if m.as_str() == node.as_str() => { + if self.limit == 0 { + SubgraphLimit::Disabled + } else { + SubgraphLimit::Limit(self.limit) + } + } + _ => SubgraphLimit::Disabled, } } } @@ -633,11 +652,8 @@ impl Web3Provider { } } - pub fn limit_for(&self, node: &NodeId) -> usize { - self.rules - .iter() - .find_map(|l| l.limit_for(node)) - .unwrap_or(usize::MAX) + pub fn limit_for(&self, node: &NodeId) -> SubgraphLimit { + self.rules.limit_for(node) } } @@ -1125,6 +1141,7 @@ mod tests { Chain, Config, FirehoseProvider, Provider, ProviderDetails, Transport, Web3Provider, }; use graph::blockchain::BlockchainKind; + use graph::firehose::SubgraphLimit; use graph::prelude::regex::Regex; use graph::prelude::NodeId; use http::{HeaderMap, HeaderValue}; @@ -1597,7 +1614,7 @@ mod tests { #[test] fn it_parses_web3_provider_rules() { - fn limit_for(node: &str) -> usize { + fn limit_for(node: &str) -> SubgraphLimit { let prov = toml::from_str::( r#" label = "something" @@ -1612,11 +1629,29 @@ mod tests { prov.limit_for(&NodeId::new(node.to_string()).unwrap()) } - assert_eq!(10, limit_for("some_node_0")); - assert_eq!(0, limit_for("other_node_0")); - assert_eq!(usize::MAX, limit_for("default")); + assert_eq!(SubgraphLimit::Limit(10), limit_for("some_node_0")); + assert_eq!(SubgraphLimit::Disabled, limit_for("other_node_0")); + assert_eq!(SubgraphLimit::Disabled, limit_for("default")); } + #[test] + fn it_parses_web3_default_empty_unlimited() { + fn limit_for(node: &str) -> SubgraphLimit { + let prov = toml::from_str::( + r#" + label = "something" + url = "http://example.com" + features = [] + match = [] + "#, + ) + .unwrap(); + + prov.limit_for(&NodeId::new(node.to_string()).unwrap()) + } + + assert_eq!(SubgraphLimit::Unlimited, limit_for("other_node_0")); + } fn read_resource_as_string>(path: P) -> String { let mut d = PathBuf::from(env!("CARGO_MANIFEST_DIR")); d.push("resources/tests"); @@ -1649,4 +1684,10 @@ mod tests { actual ); } + + #[test] + fn web3rules_have_the_right_order() { + assert!(SubgraphLimit::Unlimited > SubgraphLimit::Limit(10)); + assert!(SubgraphLimit::Limit(10) > SubgraphLimit::Disabled); + } }