From bf1d12096bb3bda300325c72e1884483c9405491 Mon Sep 17 00:00:00 2001 From: Filipe Azevedo Date: Tue, 28 Mar 2023 20:38:10 +0100 Subject: [PATCH] chain/ethereum: Use provider for metrics --- chain/ethereum/src/network.rs | 11 ++++-- chain/ethereum/src/transport.rs | 22 +++++++----- graph/src/endpoint.rs | 62 +++++++++++++++++---------------- graph/src/firehose/endpoints.rs | 10 +++--- node/src/chain.rs | 1 + 5 files changed, 59 insertions(+), 47 deletions(-) diff --git a/chain/ethereum/src/network.rs b/chain/ethereum/src/network.rs index 0e4303a9ade..77e4dab13b9 100644 --- a/chain/ethereum/src/network.rs +++ b/chain/ethereum/src/network.rs @@ -266,7 +266,7 @@ impl EthereumNetworks { #[cfg(test)] mod tests { use graph::{ - endpoint::{EndpointMetrics, Host}, + endpoint::{EndpointMetrics, Provider}, firehose::SubgraphLimit, prelude::MetricsRegistry, slog::{o, Discard, Logger}, @@ -348,6 +348,7 @@ mod tests { Url::parse("http://127.0.0.1").unwrap(), HeaderMap::new(), metrics.clone(), + "", ); let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone())); @@ -453,6 +454,7 @@ mod tests { Url::parse("http://127.0.0.1").unwrap(), HeaderMap::new(), metrics.clone(), + "", ); let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone())); @@ -523,6 +525,7 @@ mod tests { Url::parse("http://127.0.0.1").unwrap(), HeaderMap::new(), metrics.clone(), + "", ); let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone())); @@ -591,6 +594,7 @@ mod tests { Url::parse("http://127.0.0.1").unwrap(), HeaderMap::new(), metrics.clone(), + "", ); let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone())); @@ -668,7 +672,7 @@ mod tests { ]; // Set errors - metrics.report_for_test(&Host::from(error_provider.clone()), false); + metrics.report_for_test(&Provider::from(error_provider.clone()), false); let mut no_retest_adapters = EthereumNetworkAdapters::new(Some(0f64)); let mut always_retest_adapters = EthereumNetworkAdapters::new(Some(1f64)); @@ -744,7 +748,7 @@ mod tests { let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone())); // Set errors - metrics.report_for_test(&Host::from(error_provider.clone()), false); + metrics.report_for_test(&Provider::from(error_provider.clone()), false); let mut no_retest_adapters = EthereumNetworkAdapters::new(Some(0f64)); no_retest_adapters.adapters.push(EthereumNetworkAdapter { @@ -833,6 +837,7 @@ mod tests { Url::parse(&"http://127.0.0.1").unwrap(), HeaderMap::new(), endpoint_metrics.clone(), + "", ); Arc::new( diff --git a/chain/ethereum/src/transport.rs b/chain/ethereum/src/transport.rs index 6b898b95cae..f19b45dc3df 100644 --- a/chain/ethereum/src/transport.rs +++ b/chain/ethereum/src/transport.rs @@ -1,4 +1,4 @@ -use graph::endpoint::{EndpointMetrics, Host, RequestLabels}; +use graph::endpoint::{EndpointMetrics, Provider, RequestLabels}; use jsonrpc_core::types::Call; use jsonrpc_core::Value; @@ -15,7 +15,7 @@ pub enum Transport { RPC { client: http::Http, metrics: Arc, - host: Host, + provider: Provider, }, IPC(ipc::Ipc), WS(ws::WebSocket), @@ -43,18 +43,22 @@ impl Transport { /// /// Note: JSON-RPC over HTTP doesn't always support subscribing to new /// blocks (one such example is Infura's HTTP endpoint). - pub fn new_rpc(rpc: Url, headers: ::http::HeaderMap, metrics: Arc) -> Self { + pub fn new_rpc( + rpc: Url, + headers: ::http::HeaderMap, + metrics: Arc, + provider: impl AsRef, + ) -> Self { // Unwrap: This only fails if something is wrong with the system's TLS config. let client = reqwest::Client::builder() .default_headers(headers) .build() .unwrap(); - let host = rpc.to_string(); Transport::RPC { client: http::Http::with_client(client, rpc), metrics, - host: host.into(), + provider: provider.as_ref().into(), } } } @@ -67,7 +71,7 @@ impl web3::Transport for Transport { Transport::RPC { client, metrics: _, - host: _, + provider: _, } => client.prepare(method, params), Transport::IPC(ipc) => ipc.prepare(method, params), Transport::WS(ws) => ws.prepare(method, params), @@ -79,7 +83,7 @@ impl web3::Transport for Transport { Transport::RPC { client, metrics, - host, + provider, } => { let metrics = metrics.cheap_clone(); let client = client.clone(); @@ -89,7 +93,7 @@ impl web3::Transport for Transport { }; let labels = RequestLabels { - host: host.clone(), + provider: provider.clone(), req_type: method.into(), conn_type: graph::endpoint::ConnectionType::Rpc, }; @@ -127,7 +131,7 @@ impl web3::BatchTransport for Transport { Transport::RPC { client, metrics: _, - host: _, + provider: _, } => Box::new(client.send_batch(requests)), Transport::IPC(ipc) => Box::new(ipc.send_batch(requests)), Transport::WS(ws) => Box::new(ws.send_batch(requests)), diff --git a/graph/src/endpoint.rs b/graph/src/endpoint.rs index 368d9db4130..bff6b0c53f9 100644 --- a/graph/src/endpoint.rs +++ b/graph/src/endpoint.rs @@ -11,22 +11,20 @@ use slog::{warn, Logger}; use crate::{components::metrics::MetricsRegistry, data::value::Word}; -/// HostCount is the underlying structure to keep the count, +/// ProviderCount is the underlying structure to keep the count, /// we require that all the hosts are known ahead of time, this way we can /// avoid locking since we don't need to modify the entire struture. -type HostCount = Arc>; +type ProviderCount = Arc>; -/// Host represents the normalized (parse::().to_string()) of the -/// underlying endpoint. This allows us to track errors across multiple -/// adapters if they share the same endpoint. -pub type Host = Word; +/// Provider represents label of the underlying endpoint. +pub type Provider = Word; /// This struct represents all the current labels except for the result /// which is added separately. If any new labels are necessary they should /// remain in the same order as added in [`EndpointMetrics::new`] #[derive(Clone)] pub struct RequestLabels { - pub host: Host, + pub provider: Provider, pub req_type: Word, pub conn_type: ConnectionType, } @@ -54,7 +52,7 @@ impl RequestLabels { Box::new([ (&self.conn_type).into(), self.req_type.as_str(), - self.host.as_str(), + self.provider.as_str(), match is_success { true => "success", false => "failure", @@ -67,35 +65,39 @@ impl RequestLabels { /// a success call to a host will clear the error count. pub struct EndpointMetrics { logger: Logger, - hosts: HostCount, + providers: ProviderCount, counter: Box, } impl std::fmt::Debug for EndpointMetrics { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_fmt(format_args!("{:?}", self.hosts)) + f.write_fmt(format_args!("{:?}", self.providers)) } } impl EndpointMetrics { - pub fn new(logger: Logger, hosts: &[impl AsRef], registry: Arc) -> Self { - let hosts = Arc::new(HashMap::from_iter( - hosts + pub fn new( + logger: Logger, + providers: &[impl AsRef], + registry: Arc, + ) -> Self { + let providers = Arc::new(HashMap::from_iter( + providers .iter() - .map(|h| (Host::from(h.as_ref()), AtomicU64::new(0))), + .map(|h| (Provider::from(h.as_ref()), AtomicU64::new(0))), )); let counter = registry .new_int_counter_vec( "endpoint_request", "successfull request", - &["conn_type", "req_type", "host", "result"], + &["conn_type", "req_type", "provider", "result"], ) .expect("unable to create endpoint_request counter_vec"); Self { logger, - hosts, + providers, counter, } } @@ -103,24 +105,24 @@ impl EndpointMetrics { /// This should only be used for testing. pub fn mock() -> Self { use slog::{o, Discard}; - let hosts: &[&str] = &[]; + let providers: &[&str] = &[]; Self::new( Logger::root(Discard, o!()), - hosts, + providers, Arc::new(MetricsRegistry::mock()), ) } #[cfg(debug_assertions)] - pub fn report_for_test(&self, host: &Host, success: bool) { + pub fn report_for_test(&self, provider: &Provider, success: bool) { match success { true => self.success(&RequestLabels { - host: host.clone(), + provider: provider.clone(), req_type: "".into(), conn_type: ConnectionType::Firehose, }), false => self.failure(&RequestLabels { - host: host.clone(), + provider: provider.clone(), req_type: "".into(), conn_type: ConnectionType::Firehose, }), @@ -128,13 +130,13 @@ impl EndpointMetrics { } pub fn success(&self, labels: &RequestLabels) { - match self.hosts.get(&labels.host) { + match self.providers.get(&labels.provider) { Some(count) => { count.store(0, Ordering::Relaxed); } None => warn!( &self.logger, - "metrics not available for host {}", labels.host + "metrics not available for host {}", labels.provider ), }; @@ -142,13 +144,13 @@ impl EndpointMetrics { } pub fn failure(&self, labels: &RequestLabels) { - match self.hosts.get(&labels.host) { + match self.providers.get(&labels.provider) { Some(count) => { count.fetch_add(1, Ordering::Relaxed); } None => warn!( &self.logger, - "metrics not available for host {}", &labels.host + "metrics not available for host {}", &labels.provider ), }; @@ -159,9 +161,9 @@ impl EndpointMetrics { /// Returns the current error count of a host or 0 if the host /// doesn't have a value on the map. - pub fn get_count(&self, host: &Host) -> u64 { - self.hosts - .get(host) + pub fn get_count(&self, provider: &Provider) -> u64 { + self.providers + .get(provider) .map(|c| c.load(Ordering::Relaxed)) .unwrap_or(0) } @@ -175,12 +177,12 @@ mod test { use crate::{ components::metrics::MetricsRegistry, - endpoint::{EndpointMetrics, Host}, + endpoint::{EndpointMetrics, Provider}, }; #[tokio::test] async fn should_increment_and_reset() { - let (a, b, c): (Host, Host, Host) = ("a".into(), "b".into(), "c".into()); + let (a, b, c): (Provider, Provider, Provider) = ("a".into(), "b".into(), "c".into()); let hosts: &[&str] = &[&a, &b, &c]; let logger = Logger::root(Discard, o!()); diff --git a/graph/src/firehose/endpoints.rs b/graph/src/firehose/endpoints.rs index 9f60e9b4c00..d0b02755e3c 100644 --- a/graph/src/firehose/endpoints.rs +++ b/graph/src/firehose/endpoints.rs @@ -4,7 +4,7 @@ use crate::{ blockchain::BlockPtr, cheap_clone::CheapClone, components::store::BlockNumber, - endpoint::{ConnectionType, EndpointMetrics, Host, RequestLabels}, + endpoint::{ConnectionType, EndpointMetrics, Provider, RequestLabels}, firehose::decode_firehose_block, prelude::{anyhow, debug, info}, substreams, @@ -37,7 +37,7 @@ const HIGH_VALUE_USED_PERCENTAGE: usize = 80; #[derive(Debug)] pub struct FirehoseEndpoint { - pub provider: Host, + pub provider: Provider, pub auth: AuthInterceptor, pub filters_enabled: bool, pub compression_enabled: bool, @@ -194,7 +194,7 @@ impl FirehoseEndpoint { metrics: self.endpoint_metrics.cheap_clone(), service: self.channel.cheap_clone(), labels: RequestLabels { - host: self.provider.clone().into(), + provider: self.provider.clone().into(), req_type: "unknown".into(), conn_type: ConnectionType::Firehose, }, @@ -221,7 +221,7 @@ impl FirehoseEndpoint { metrics: self.endpoint_metrics.cheap_clone(), service: self.channel.cheap_clone(), labels: RequestLabels { - host: self.provider.clone().into(), + provider: self.provider.clone().into(), req_type: "unknown".into(), conn_type: ConnectionType::Firehose, }, @@ -246,7 +246,7 @@ impl FirehoseEndpoint { metrics: self.endpoint_metrics.cheap_clone(), service: self.channel.cheap_clone(), labels: RequestLabels { - host: self.provider.clone().into(), + provider: self.provider.clone().into(), req_type: "unknown".into(), conn_type: ConnectionType::Substreams, }, diff --git a/node/src/chain.rs b/node/src/chain.rs index d40ba458a9a..6a7a0be8ab2 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -461,6 +461,7 @@ pub async fn create_ethereum_networks_for_chain( Url::parse(&web3.url)?, web3.headers.clone(), endpoint_metrics.cheap_clone(), + &provider.label, ), Ipc => Transport::new_ipc(&web3.url).await, Ws => Transport::new_ws(&web3.url).await,