Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix limit handling on rpc configuration #4353

Merged
merged 1 commit into from
Feb 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 202 additions & 11 deletions chain/ethereum/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::{anyhow, bail, Context};
use graph::cheap_clone::CheapClone;
use graph::firehose::SubgraphLimit;
use graph::prelude::rand::{self, seq::IteratorRandom};
use std::cmp::Ordering;
use std::collections::HashMap;
Expand All @@ -20,7 +21,7 @@ pub struct EthereumNetworkAdapter {
/// strong_count on `adapter` to determine whether the adapter is above
/// that limit. That's a somewhat imprecise but convenient way to
/// determine the number of connections
limit: usize,
limit: SubgraphLimit,
}

impl EthereumNetworkAdapter {
Expand Down Expand Up @@ -56,7 +57,11 @@ impl EthereumNetworkAdapters {
self.adapters
.iter()
.filter(move |adapter| Some(&adapter.capabilities) == cheapest_sufficient_capability)
.filter(|adapter| Arc::strong_count(&adapter.adapter) < adapter.limit)
.filter(|adapter| {
adapter
.limit
.has_capacity(Arc::strong_count(&adapter.adapter))
})
.map(|adapter| adapter.adapter.cheap_clone())
}

Expand Down Expand Up @@ -92,9 +97,12 @@ impl EthereumNetworkAdapters {
&self,
capabilities: Option<&NodeCapabilities>,
) -> anyhow::Result<Arc<EthereumAdapter>> {
match self.call_only_adapter()? {
Some(adapter) => Ok(adapter),
None => self.cheapest_with(capabilities.unwrap_or(&NodeCapabilities {
// call_only_adapter can fail if we're out of capcity, this is fine since
// we would want to fallback onto a full adapter
// so we will ignore this error and return whatever comes out of `cheapest_with`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand this comment, especially the 'or, this is fine' throws me off

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially had something else there :P I will fix this, good catch :)

match self.call_only_adapter() {
Ok(Some(adapter)) => Ok(adapter),
_ => self.cheapest_with(capabilities.unwrap_or(&NodeCapabilities {
// Archive is required for call_only
archive: true,
traces: false,
Expand All @@ -115,7 +123,10 @@ impl EthereumNetworkAdapters {

// TODO: This will probably blow up a lot sooner than [limit] amount of
// subgraphs, since we probably use a few instances.
if Arc::strong_count(&adapters.adapter) >= adapters.limit {
if !adapters
.limit
.has_capacity(Arc::strong_count(&adapters.adapter))
{
bail!("call only adapter has reached the concurrency limit");
}

Expand All @@ -142,7 +153,7 @@ impl EthereumNetworks {
name: String,
capabilities: NodeCapabilities,
adapter: Arc<EthereumAdapter>,
limit: usize,
limit: SubgraphLimit,
) {
let network_adapters = self
.networks
Expand Down Expand Up @@ -213,7 +224,7 @@ impl EthereumNetworks {
mod tests {
use std::sync::Arc;

use graph::{prelude::MetricsRegistry, tokio, url::Url};
use graph::{firehose::SubgraphLimit, prelude::MetricsRegistry, tokio, url::Url};
use graph_mock::MockMetricsRegistry;
use http::HeaderMap;

Expand Down Expand Up @@ -320,7 +331,7 @@ mod tests {
traces: false,
},
eth_call_adapter.clone(),
3,
SubgraphLimit::Limit(3),
);
ethereum_networks.insert(
chain.clone(),
Expand All @@ -329,7 +340,7 @@ mod tests {
traces: false,
},
eth_adapter.clone(),
3,
SubgraphLimit::Limit(3),
);
ethereum_networks.networks.get(&chain).unwrap().clone()
};
Expand Down Expand Up @@ -360,7 +371,10 @@ mod tests {
{
let adapter = adapters.call_or_cheapest(None).unwrap();
assert!(adapter.is_call_only());
assert!(adapters.call_or_cheapest(None).is_err());
assert_eq!(
adapters.call_or_cheapest(None).unwrap().is_call_only(),
false
);
}

// Check empty falls back to call only
Expand All @@ -375,4 +389,181 @@ mod tests {
assert_eq!(adapter.is_call_only(), false);
}
}

#[tokio::test]
async fn adapter_selector_unlimited() {
let chain = "mainnet".to_string();
let logger = graph::log::logger(true);
let mock_registry: Arc<dyn MetricsRegistry> = Arc::new(MockMetricsRegistry::new());
let transport =
Transport::new_rpc(Url::parse("http://127.0.0.1").unwrap(), HeaderMap::new());
let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone()));

let eth_call_adapter = Arc::new(
EthereumAdapter::new(
logger.clone(),
String::new(),
"http://127.0.0.1",
transport.clone(),
provider_metrics.clone(),
true,
true,
)
.await,
);

let eth_adapter = Arc::new(
EthereumAdapter::new(
logger.clone(),
String::new(),
"http://127.0.0.1",
transport.clone(),
provider_metrics.clone(),
true,
false,
)
.await,
);

let adapters = {
let mut ethereum_networks = EthereumNetworks::new();
ethereum_networks.insert(
chain.clone(),
NodeCapabilities {
archive: true,
traces: false,
},
eth_call_adapter.clone(),
SubgraphLimit::Unlimited,
);
ethereum_networks.insert(
chain.clone(),
NodeCapabilities {
archive: true,
traces: false,
},
eth_adapter.clone(),
SubgraphLimit::Limit(3),
);
ethereum_networks.networks.get(&chain).unwrap().clone()
};
// one reference above and one inside adapters struct
assert_eq!(Arc::strong_count(&eth_call_adapter), 2);
assert_eq!(Arc::strong_count(&eth_adapter), 2);

let keep: Vec<Arc<EthereumAdapter>> = vec![0; 10]
.iter()
.map(|_| adapters.call_or_cheapest(None).unwrap())
.collect();
assert_eq!(keep.iter().any(|a| !a.is_call_only()), false);
}

#[tokio::test]
async fn adapter_selector_disable_call_only_fallback() {
let chain = "mainnet".to_string();
let logger = graph::log::logger(true);
let mock_registry: Arc<dyn MetricsRegistry> = Arc::new(MockMetricsRegistry::new());
let transport =
Transport::new_rpc(Url::parse("http://127.0.0.1").unwrap(), HeaderMap::new());
let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone()));

let eth_call_adapter = Arc::new(
EthereumAdapter::new(
logger.clone(),
String::new(),
"http://127.0.0.1",
transport.clone(),
provider_metrics.clone(),
true,
true,
)
.await,
);

let eth_adapter = Arc::new(
EthereumAdapter::new(
logger.clone(),
String::new(),
"http://127.0.0.1",
transport.clone(),
provider_metrics.clone(),
true,
false,
)
.await,
);

let adapters = {
let mut ethereum_networks = EthereumNetworks::new();
ethereum_networks.insert(
chain.clone(),
NodeCapabilities {
archive: true,
traces: false,
},
eth_call_adapter.clone(),
SubgraphLimit::Disabled,
);
ethereum_networks.insert(
chain.clone(),
NodeCapabilities {
archive: true,
traces: false,
},
eth_adapter.clone(),
SubgraphLimit::Limit(3),
);
ethereum_networks.networks.get(&chain).unwrap().clone()
};
// one reference above and one inside adapters struct
assert_eq!(Arc::strong_count(&eth_call_adapter), 2);
assert_eq!(Arc::strong_count(&eth_adapter), 2);
assert_eq!(
adapters.call_or_cheapest(None).unwrap().is_call_only(),
false
);
}

#[tokio::test]
async fn adapter_selector_no_call_only_fallback() {
let chain = "mainnet".to_string();
let logger = graph::log::logger(true);
let mock_registry: Arc<dyn MetricsRegistry> = Arc::new(MockMetricsRegistry::new());
let transport =
Transport::new_rpc(Url::parse("http://127.0.0.1").unwrap(), HeaderMap::new());
let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone()));

let eth_adapter = Arc::new(
EthereumAdapter::new(
logger.clone(),
String::new(),
"http://127.0.0.1",
transport.clone(),
provider_metrics.clone(),
true,
false,
)
.await,
);

let adapters = {
let mut ethereum_networks = EthereumNetworks::new();
ethereum_networks.insert(
chain.clone(),
NodeCapabilities {
archive: true,
traces: false,
},
eth_adapter.clone(),
SubgraphLimit::Limit(3),
);
ethereum_networks.networks.get(&chain).unwrap().clone()
};
// one reference above and one inside adapters struct
assert_eq!(Arc::strong_count(&eth_adapter), 2);
assert_eq!(
adapters.call_or_cheapest(None).unwrap().is_call_only(),
false
);
}
}
14 changes: 9 additions & 5 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,14 @@ approximate and can differ from the true number by a small amount
(generally less than 10)

The limit is set through rules that match on the node name. If a node's
name does not match any rule, the corresponding provider can be used for an
unlimited number of subgraphs. It is recommended that at least one provider
is generally unlimited. The limit is set in the following way:
name does not match any rule, the corresponding provider will be disabled
for that node.

If the match property is omitted then the provider will be unlimited on every
node.

It is recommended that at least one provider is generally unlimited.
The limit is set in the following way:

```toml
[chains.mainnet]
Expand All @@ -169,8 +174,7 @@ provider = [
Nodes named `some_node_.*` will use `mainnet-1` for at most 10 subgraphs,
and `mainnet-0` for everything else, nodes named `other_node_.*` will never
use `mainnet-1` and always `mainnet-0`. Any node whose name does not match
one of these patterns will use `mainnet-0` and `mainnet-1` for an unlimited
number of subgraphs.
one of these patterns will not be able to use and `mainnet-1`.

## Controlling Deployment

Expand Down
33 changes: 22 additions & 11 deletions graph/src/firehose/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,26 @@ pub struct FirehoseEndpoint {
pub token: Option<String>,
pub filters_enabled: bool,
pub compression_enabled: bool,
pub subgraph_limit: usize,
pub subgraph_limit: SubgraphLimit,
channel: Channel,
}

#[derive(Clone, Debug)]
// TODO: Find a new home for this type.
#[derive(Clone, Debug, PartialEq, Ord, Eq, PartialOrd)]
pub enum SubgraphLimit {
Unlimited,
Disabled,
Limit(usize),
NoTraffic,
Unlimited,
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this enum? I don't really think it buys us much above using a plain usize (where Disabled -> 0 and Unlimited -> usize::MAX)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's mostly to improve readability since otherwise we'd need to set a value for 0 and some for "not set" in order to differentiate so there would be a few places with Option where we were not sure of the meaning. Also Unlimited will also get transformed in the different adapters, eg firehose uses 100 max so it gets set to the adapter-specific "maximum"


impl SubgraphLimit {
pub fn has_capacity(&self, current: usize) -> bool {
match self {
SubgraphLimit::Unlimited => true,
SubgraphLimit::Limit(limit) => limit > &current,
SubgraphLimit::Disabled => false,
}
}
}

impl Display for FirehoseEndpoint {
Expand Down Expand Up @@ -93,10 +104,10 @@ impl FirehoseEndpoint {

let subgraph_limit = match subgraph_limit {
// See the comment on the constant
SubgraphLimit::Unlimited => SUBGRAPHS_PER_CONN,
SubgraphLimit::Unlimited => SubgraphLimit::Limit(SUBGRAPHS_PER_CONN),
// This is checked when parsing from config but doesn't hurt to be defensive.
SubgraphLimit::Limit(limit) => limit.min(SUBGRAPHS_PER_CONN),
SubgraphLimit::NoTraffic => 0,
SubgraphLimit::Limit(limit) => SubgraphLimit::Limit(limit.min(SUBGRAPHS_PER_CONN)),
l => l,
};

FirehoseEndpoint {
Expand All @@ -109,11 +120,11 @@ impl FirehoseEndpoint {
}
}

// The SUBGRAPHS_PER_CONN upper bound was already limited so we leave it the same
// we need to use inclusive limits (<=) because there will always be a reference
// we need to -1 because there will always be a reference
// inside FirehoseEndpoints that is not used (is always cloned).
pub fn has_subgraph_capacity(self: &Arc<Self>) -> bool {
Arc::strong_count(&self) <= self.subgraph_limit
self.subgraph_limit
.has_capacity(Arc::strong_count(&self).checked_sub(1).unwrap_or(0))
}

pub async fn get_block<M>(
Expand Down Expand Up @@ -501,7 +512,7 @@ mod test {
None,
false,
false,
SubgraphLimit::NoTraffic,
SubgraphLimit::Disabled,
))];

let mut endpoints = FirehoseEndpoints::from(endpoint);
Expand Down
Loading