Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chain/ethereum: Use provider for metrics #4504

Merged
merged 1 commit into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions chain/ethereum/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ impl EthereumNetworks {
#[cfg(test)]
mod tests {
use graph::{
endpoint::{EndpointMetrics, Host},
endpoint::{EndpointMetrics, Provider},
firehose::SubgraphLimit,
prelude::MetricsRegistry,
slog::{o, Discard, Logger},
Expand Down Expand Up @@ -348,6 +348,7 @@ mod tests {
Url::parse("http://127.0.0.1").unwrap(),
HeaderMap::new(),
metrics.clone(),
"",
);
let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone()));

Expand Down Expand Up @@ -453,6 +454,7 @@ mod tests {
Url::parse("http://127.0.0.1").unwrap(),
HeaderMap::new(),
metrics.clone(),
"",
);
let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone()));

Expand Down Expand Up @@ -523,6 +525,7 @@ mod tests {
Url::parse("http://127.0.0.1").unwrap(),
HeaderMap::new(),
metrics.clone(),
"",
);
let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone()));

Expand Down Expand Up @@ -591,6 +594,7 @@ mod tests {
Url::parse("http://127.0.0.1").unwrap(),
HeaderMap::new(),
metrics.clone(),
"",
);
let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone()));

Expand Down Expand Up @@ -668,7 +672,7 @@ mod tests {
];

// Set errors
metrics.report_for_test(&Host::from(error_provider.clone()), false);
metrics.report_for_test(&Provider::from(error_provider.clone()), false);

let mut no_retest_adapters = EthereumNetworkAdapters::new(Some(0f64));
let mut always_retest_adapters = EthereumNetworkAdapters::new(Some(1f64));
Expand Down Expand Up @@ -744,7 +748,7 @@ mod tests {
let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone()));

// Set errors
metrics.report_for_test(&Host::from(error_provider.clone()), false);
metrics.report_for_test(&Provider::from(error_provider.clone()), false);

let mut no_retest_adapters = EthereumNetworkAdapters::new(Some(0f64));
no_retest_adapters.adapters.push(EthereumNetworkAdapter {
Expand Down Expand Up @@ -833,6 +837,7 @@ mod tests {
Url::parse(&"http://127.0.0.1").unwrap(),
HeaderMap::new(),
endpoint_metrics.clone(),
"",
);

Arc::new(
Expand Down
22 changes: 13 additions & 9 deletions chain/ethereum/src/transport.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use graph::endpoint::{EndpointMetrics, Host, RequestLabels};
use graph::endpoint::{EndpointMetrics, Provider, RequestLabels};
use jsonrpc_core::types::Call;
use jsonrpc_core::Value;

Expand All @@ -15,7 +15,7 @@ pub enum Transport {
RPC {
client: http::Http,
metrics: Arc<EndpointMetrics>,
host: Host,
provider: Provider,
},
IPC(ipc::Ipc),
WS(ws::WebSocket),
Expand Down Expand Up @@ -43,18 +43,22 @@ impl Transport {
///
/// Note: JSON-RPC over HTTP doesn't always support subscribing to new
/// blocks (one such example is Infura's HTTP endpoint).
pub fn new_rpc(rpc: Url, headers: ::http::HeaderMap, metrics: Arc<EndpointMetrics>) -> Self {
pub fn new_rpc(
rpc: Url,
headers: ::http::HeaderMap,
metrics: Arc<EndpointMetrics>,
provider: impl AsRef<str>,
) -> Self {
// Unwrap: This only fails if something is wrong with the system's TLS config.
let client = reqwest::Client::builder()
.default_headers(headers)
.build()
.unwrap();

let host = rpc.to_string();
Transport::RPC {
client: http::Http::with_client(client, rpc),
metrics,
host: host.into(),
provider: provider.as_ref().into(),
}
}
}
Expand All @@ -67,7 +71,7 @@ impl web3::Transport for Transport {
Transport::RPC {
client,
metrics: _,
host: _,
provider: _,
} => client.prepare(method, params),
Transport::IPC(ipc) => ipc.prepare(method, params),
Transport::WS(ws) => ws.prepare(method, params),
Expand All @@ -79,7 +83,7 @@ impl web3::Transport for Transport {
Transport::RPC {
client,
metrics,
host,
provider,
} => {
let metrics = metrics.cheap_clone();
let client = client.clone();
Expand All @@ -89,7 +93,7 @@ impl web3::Transport for Transport {
};

let labels = RequestLabels {
host: host.clone(),
provider: provider.clone(),
req_type: method.into(),
conn_type: graph::endpoint::ConnectionType::Rpc,
};
Expand Down Expand Up @@ -127,7 +131,7 @@ impl web3::BatchTransport for Transport {
Transport::RPC {
client,
metrics: _,
host: _,
provider: _,
} => Box::new(client.send_batch(requests)),
Transport::IPC(ipc) => Box::new(ipc.send_batch(requests)),
Transport::WS(ws) => Box::new(ws.send_batch(requests)),
Expand Down
62 changes: 32 additions & 30 deletions graph/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,20 @@ use slog::{warn, Logger};

use crate::{components::metrics::MetricsRegistry, data::value::Word};

/// HostCount is the underlying structure to keep the count,
/// ProviderCount is the underlying structure to keep the count,
/// we require that all the hosts are known ahead of time, this way we can
/// avoid locking since we don't need to modify the entire struture.
type HostCount = Arc<HashMap<Host, AtomicU64>>;
type ProviderCount = Arc<HashMap<Provider, AtomicU64>>;

/// Host represents the normalized (parse::<Url>().to_string()) of the
/// underlying endpoint. This allows us to track errors across multiple
/// adapters if they share the same endpoint.
pub type Host = Word;
/// Provider represents label of the underlying endpoint.
pub type Provider = Word;

/// This struct represents all the current labels except for the result
/// which is added separately. If any new labels are necessary they should
/// remain in the same order as added in [`EndpointMetrics::new`]
#[derive(Clone)]
pub struct RequestLabels {
pub host: Host,
pub provider: Provider,
pub req_type: Word,
pub conn_type: ConnectionType,
}
Expand Down Expand Up @@ -54,7 +52,7 @@ impl RequestLabels {
Box::new([
(&self.conn_type).into(),
self.req_type.as_str(),
self.host.as_str(),
self.provider.as_str(),
match is_success {
true => "success",
false => "failure",
Expand All @@ -67,88 +65,92 @@ impl RequestLabels {
/// a success call to a host will clear the error count.
pub struct EndpointMetrics {
logger: Logger,
hosts: HostCount,
providers: ProviderCount,
counter: Box<IntCounterVec>,
}

impl std::fmt::Debug for EndpointMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("{:?}", self.hosts))
f.write_fmt(format_args!("{:?}", self.providers))
}
}

impl EndpointMetrics {
pub fn new(logger: Logger, hosts: &[impl AsRef<str>], registry: Arc<MetricsRegistry>) -> Self {
let hosts = Arc::new(HashMap::from_iter(
hosts
pub fn new(
logger: Logger,
providers: &[impl AsRef<str>],
registry: Arc<MetricsRegistry>,
) -> Self {
let providers = Arc::new(HashMap::from_iter(
providers
.iter()
.map(|h| (Host::from(h.as_ref()), AtomicU64::new(0))),
.map(|h| (Provider::from(h.as_ref()), AtomicU64::new(0))),
));

let counter = registry
.new_int_counter_vec(
"endpoint_request",
"successfull request",
&["conn_type", "req_type", "host", "result"],
&["conn_type", "req_type", "provider", "result"],
)
.expect("unable to create endpoint_request counter_vec");

Self {
logger,
hosts,
providers,
counter,
}
}

/// This should only be used for testing.
pub fn mock() -> Self {
use slog::{o, Discard};
let hosts: &[&str] = &[];
let providers: &[&str] = &[];
Self::new(
Logger::root(Discard, o!()),
hosts,
providers,
Arc::new(MetricsRegistry::mock()),
)
}

#[cfg(debug_assertions)]
pub fn report_for_test(&self, host: &Host, success: bool) {
pub fn report_for_test(&self, provider: &Provider, success: bool) {
match success {
true => self.success(&RequestLabels {
host: host.clone(),
provider: provider.clone(),
req_type: "".into(),
conn_type: ConnectionType::Firehose,
}),
false => self.failure(&RequestLabels {
host: host.clone(),
provider: provider.clone(),
req_type: "".into(),
conn_type: ConnectionType::Firehose,
}),
}
}

pub fn success(&self, labels: &RequestLabels) {
match self.hosts.get(&labels.host) {
match self.providers.get(&labels.provider) {
Some(count) => {
count.store(0, Ordering::Relaxed);
}
None => warn!(
&self.logger,
"metrics not available for host {}", labels.host
"metrics not available for host {}", labels.provider
),
};

self.counter.with_label_values(&labels.to_slice(true)).inc();
}

pub fn failure(&self, labels: &RequestLabels) {
match self.hosts.get(&labels.host) {
match self.providers.get(&labels.provider) {
Some(count) => {
count.fetch_add(1, Ordering::Relaxed);
}
None => warn!(
&self.logger,
"metrics not available for host {}", &labels.host
"metrics not available for host {}", &labels.provider
),
};

Expand All @@ -159,9 +161,9 @@ impl EndpointMetrics {

/// Returns the current error count of a host or 0 if the host
/// doesn't have a value on the map.
pub fn get_count(&self, host: &Host) -> u64 {
self.hosts
.get(host)
pub fn get_count(&self, provider: &Provider) -> u64 {
self.providers
.get(provider)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0)
}
Expand All @@ -175,12 +177,12 @@ mod test {

use crate::{
components::metrics::MetricsRegistry,
endpoint::{EndpointMetrics, Host},
endpoint::{EndpointMetrics, Provider},
};

#[tokio::test]
async fn should_increment_and_reset() {
let (a, b, c): (Host, Host, Host) = ("a".into(), "b".into(), "c".into());
let (a, b, c): (Provider, Provider, Provider) = ("a".into(), "b".into(), "c".into());
let hosts: &[&str] = &[&a, &b, &c];
let logger = Logger::root(Discard, o!());

Expand Down
10 changes: 5 additions & 5 deletions graph/src/firehose/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
blockchain::BlockPtr,
cheap_clone::CheapClone,
components::store::BlockNumber,
endpoint::{ConnectionType, EndpointMetrics, Host, RequestLabels},
endpoint::{ConnectionType, EndpointMetrics, Provider, RequestLabels},
firehose::decode_firehose_block,
prelude::{anyhow, debug, info},
substreams,
Expand Down Expand Up @@ -37,7 +37,7 @@ const HIGH_VALUE_USED_PERCENTAGE: usize = 80;

#[derive(Debug)]
pub struct FirehoseEndpoint {
pub provider: Host,
pub provider: Provider,
pub auth: AuthInterceptor,
pub filters_enabled: bool,
pub compression_enabled: bool,
Expand Down Expand Up @@ -194,7 +194,7 @@ impl FirehoseEndpoint {
metrics: self.endpoint_metrics.cheap_clone(),
service: self.channel.cheap_clone(),
labels: RequestLabels {
host: self.provider.clone().into(),
provider: self.provider.clone().into(),
req_type: "unknown".into(),
conn_type: ConnectionType::Firehose,
},
Expand All @@ -221,7 +221,7 @@ impl FirehoseEndpoint {
metrics: self.endpoint_metrics.cheap_clone(),
service: self.channel.cheap_clone(),
labels: RequestLabels {
host: self.provider.clone().into(),
provider: self.provider.clone().into(),
req_type: "unknown".into(),
conn_type: ConnectionType::Firehose,
},
Expand All @@ -246,7 +246,7 @@ impl FirehoseEndpoint {
metrics: self.endpoint_metrics.cheap_clone(),
service: self.channel.cheap_clone(),
labels: RequestLabels {
host: self.provider.clone().into(),
provider: self.provider.clone().into(),
req_type: "unknown".into(),
conn_type: ConnectionType::Substreams,
},
Expand Down
1 change: 1 addition & 0 deletions node/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ pub async fn create_ethereum_networks_for_chain(
Url::parse(&web3.url)?,
web3.headers.clone(),
endpoint_metrics.cheap_clone(),
&provider.label,
),
Ipc => Transport::new_ipc(&web3.url).await,
Ws => Transport::new_ws(&web3.url).await,
Expand Down