diff --git a/chain/ethereum/examples/firehose.rs b/chain/ethereum/examples/firehose.rs index f6334c72387..fbdfaa1f141 100644 --- a/chain/ethereum/examples/firehose.rs +++ b/chain/ethereum/examples/firehose.rs @@ -4,7 +4,7 @@ use graph::{ env::env_var, firehose::SubgraphLimit, log::logger, - prelude::{prost, tokio, tonic}, + prelude::{prost, tokio, tonic, MetricsRegistry}, {firehose, firehose::FirehoseEndpoint}, }; use graph_chain_ethereum::codec; @@ -24,7 +24,11 @@ async fn main() -> Result<(), Error> { let logger = logger(false); let host = "https://api.streamingfast.io:443".to_string(); - let metrics = Arc::new(EndpointMetrics::new(logger, &[host.clone()])); + let metrics = Arc::new(EndpointMetrics::new( + logger, + &[host.clone()], + Arc::new(MetricsRegistry::mock()), + )); let firehose = Arc::new(FirehoseEndpoint::new( "firehose", diff --git a/chain/substreams/examples/substreams.rs b/chain/substreams/examples/substreams.rs index 55858825d18..e946fd957ef 100644 --- a/chain/substreams/examples/substreams.rs +++ b/chain/substreams/examples/substreams.rs @@ -41,7 +41,11 @@ async fn main() -> Result<(), Error> { prometheus_registry.clone(), )); - let endpoint_metrics = EndpointMetrics::new(logger.clone(), &[endpoint.clone()]); + let endpoint_metrics = EndpointMetrics::new( + logger.clone(), + &[endpoint.clone()], + Arc::new(MetricsRegistry::mock()), + ); let firehose = Arc::new(FirehoseEndpoint::new( "substreams", diff --git a/graph/src/components/metrics/registry.rs b/graph/src/components/metrics/registry.rs index bd9fe6fe0f6..7fa5b903b05 100644 --- a/graph/src/components/metrics/registry.rs +++ b/graph/src/components/metrics/registry.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::sync::{Arc, RwLock}; -use prometheus::{labels, Histogram}; +use prometheus::{labels, Histogram, IntCounterVec}; use crate::components::metrics::{counter_with_labels, gauge_with_labels}; use crate::prelude::Collector; @@ -386,6 +386,18 @@ impl MetricsRegistry { Ok(counter) } + pub fn new_int_counter_vec( + &self, + name: &str, + help: &str, + variable_labels: &[&str], + ) -> Result, PrometheusError> { + let opts = Opts::new(name, help); + let counters = Box::new(IntCounterVec::new(opts, &variable_labels)?); + self.register(name, counters.clone()); + Ok(counters) + } + pub fn new_counter_vec( &self, name: &str, diff --git a/graph/src/endpoint.rs b/graph/src/endpoint.rs index bb67c5f0643..368d9db4130 100644 --- a/graph/src/endpoint.rs +++ b/graph/src/endpoint.rs @@ -6,9 +6,10 @@ use std::{ }, }; +use prometheus::IntCounterVec; use slog::{warn, Logger}; -use crate::data::value::Word; +use crate::{components::metrics::MetricsRegistry, data::value::Word}; /// HostCount is the underlying structure to keep the count, /// we require that all the hosts are known ahead of time, this way we can @@ -20,50 +21,140 @@ type HostCount = Arc>; /// adapters if they share the same endpoint. pub type Host = Word; +/// This struct represents all the current labels except for the result +/// which is added separately. If any new labels are necessary they should +/// remain in the same order as added in [`EndpointMetrics::new`] +#[derive(Clone)] +pub struct RequestLabels { + pub host: Host, + pub req_type: Word, + pub conn_type: ConnectionType, +} + +/// The type of underlying connection we are reporting for. +#[derive(Clone)] +pub enum ConnectionType { + Firehose, + Substreams, + Rpc, +} + +impl Into<&str> for &ConnectionType { + fn into(self) -> &'static str { + match self { + ConnectionType::Firehose => "firehose", + ConnectionType::Substreams => "substreams", + ConnectionType::Rpc => "rpc", + } + } +} + +impl RequestLabels { + fn to_slice(&self, is_success: bool) -> Box<[&str]> { + Box::new([ + (&self.conn_type).into(), + self.req_type.as_str(), + self.host.as_str(), + match is_success { + true => "success", + false => "failure", + }, + ]) + } +} + /// EndpointMetrics keeps track of calls success rate for specific calls, /// a success call to a host will clear the error count. -#[derive(Debug)] pub struct EndpointMetrics { logger: Logger, hosts: HostCount, + counter: Box, +} + +impl std::fmt::Debug for EndpointMetrics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("{:?}", self.hosts)) + } } impl EndpointMetrics { - pub fn new(logger: Logger, hosts: &[impl AsRef]) -> Self { + pub fn new(logger: Logger, hosts: &[impl AsRef], registry: Arc) -> Self { let hosts = Arc::new(HashMap::from_iter( hosts .iter() .map(|h| (Host::from(h.as_ref()), AtomicU64::new(0))), )); - Self { logger, hosts } + let counter = registry + .new_int_counter_vec( + "endpoint_request", + "successfull request", + &["conn_type", "req_type", "host", "result"], + ) + .expect("unable to create endpoint_request counter_vec"); + + Self { + logger, + hosts, + counter, + } } /// This should only be used for testing. pub fn mock() -> Self { use slog::{o, Discard}; - Self { - logger: Logger::root(Discard, o!()), - hosts: Arc::new(HashMap::default()), + let hosts: &[&str] = &[]; + Self::new( + Logger::root(Discard, o!()), + hosts, + Arc::new(MetricsRegistry::mock()), + ) + } + + #[cfg(debug_assertions)] + pub fn report_for_test(&self, host: &Host, success: bool) { + match success { + true => self.success(&RequestLabels { + host: host.clone(), + req_type: "".into(), + conn_type: ConnectionType::Firehose, + }), + false => self.failure(&RequestLabels { + host: host.clone(), + req_type: "".into(), + conn_type: ConnectionType::Firehose, + }), } } - pub fn success(&self, host: &Host) { - match self.hosts.get(host) { + pub fn success(&self, labels: &RequestLabels) { + match self.hosts.get(&labels.host) { Some(count) => { count.store(0, Ordering::Relaxed); } - None => warn!(&self.logger, "metrics not available for host {}", host), - } + None => warn!( + &self.logger, + "metrics not available for host {}", labels.host + ), + }; + + self.counter.with_label_values(&labels.to_slice(true)).inc(); } - pub fn failure(&self, host: &Host) { - match self.hosts.get(host) { + pub fn failure(&self, labels: &RequestLabels) { + match self.hosts.get(&labels.host) { Some(count) => { count.fetch_add(1, Ordering::Relaxed); } - None => warn!(&self.logger, "metrics not available for host {}", host), - } + None => warn!( + &self.logger, + "metrics not available for host {}", &labels.host + ), + }; + + self.counter + .with_label_values(&labels.to_slice(false)) + .inc(); } /// Returns the current error count of a host or 0 if the host @@ -78,9 +169,14 @@ impl EndpointMetrics { #[cfg(test)] mod test { + use std::sync::Arc; + use slog::{o, Discard, Logger}; - use crate::endpoint::{EndpointMetrics, Host}; + use crate::{ + components::metrics::MetricsRegistry, + endpoint::{EndpointMetrics, Host}, + }; #[tokio::test] async fn should_increment_and_reset() { @@ -88,13 +184,13 @@ mod test { let hosts: &[&str] = &[&a, &b, &c]; let logger = Logger::root(Discard, o!()); - let metrics = EndpointMetrics::new(logger, hosts); + let metrics = EndpointMetrics::new(logger, hosts, Arc::new(MetricsRegistry::mock())); - metrics.success(&a); - metrics.failure(&a); - metrics.failure(&b); - metrics.failure(&b); - metrics.success(&c); + metrics.report_for_test(&a, true); + metrics.report_for_test(&a, false); + metrics.report_for_test(&b, false); + metrics.report_for_test(&b, false); + metrics.report_for_test(&c, true); assert_eq!(metrics.get_count(&a), 1); assert_eq!(metrics.get_count(&b), 2); diff --git a/graph/src/firehose/endpoints.rs b/graph/src/firehose/endpoints.rs index 2302c28981f..2fc5ee2ae26 100644 --- a/graph/src/firehose/endpoints.rs +++ b/graph/src/firehose/endpoints.rs @@ -4,7 +4,7 @@ use crate::{ blockchain::BlockPtr, cheap_clone::CheapClone, components::store::BlockNumber, - endpoint::{EndpointMetrics, Host}, + endpoint::{ConnectionType, EndpointMetrics, Host, RequestLabels}, firehose::decode_firehose_block, prelude::{anyhow, debug, info}, substreams, @@ -196,7 +196,11 @@ impl FirehoseEndpoint { let metrics = MetricsInterceptor { metrics: self.endpoint_metrics.cheap_clone(), service: self.channel.cheap_clone(), - host: self.host.clone(), + labels: RequestLabels { + host: self.host.clone(), + req_type: "unknown".into(), + conn_type: ConnectionType::Firehose, + }, }; let mut client: FetchClient< @@ -219,7 +223,11 @@ impl FirehoseEndpoint { let metrics = MetricsInterceptor { metrics: self.endpoint_metrics.cheap_clone(), service: self.channel.cheap_clone(), - host: self.host.clone(), + labels: RequestLabels { + host: self.host.clone(), + req_type: "unknown".into(), + conn_type: ConnectionType::Firehose, + }, }; let mut client = StreamClient::with_interceptor(metrics, self.auth.clone()) @@ -240,7 +248,11 @@ impl FirehoseEndpoint { let metrics = MetricsInterceptor { metrics: self.endpoint_metrics.cheap_clone(), service: self.channel.cheap_clone(), - host: self.host.clone(), + labels: RequestLabels { + host: self.host.clone(), + req_type: "unknown".into(), + conn_type: ConnectionType::Substreams, + }, }; let mut client = @@ -505,7 +517,9 @@ mod test { use slog::{o, Discard, Logger}; - use crate::{endpoint::EndpointMetrics, firehose::SubgraphLimit}; + use crate::{ + components::metrics::MetricsRegistry, endpoint::EndpointMetrics, firehose::SubgraphLimit, + }; use super::{AvailableCapacity, FirehoseEndpoint, FirehoseEndpoints, SUBGRAPHS_PER_CONN}; @@ -607,6 +621,7 @@ mod test { "http://127.0.0.2/", "http://127.0.0.3/", ], + Arc::new(MetricsRegistry::mock()), )); let high_error_adapter1 = Arc::new(FirehoseEndpoint::new( @@ -646,7 +661,7 @@ mod test { endpoint_metrics.clone(), )); - endpoint_metrics.failure(&high_error_adapter1.host); + endpoint_metrics.report_for_test(&high_error_adapter1.host, false); let mut endpoints = FirehoseEndpoints::from(vec![ high_error_adapter1.clone(), diff --git a/graph/src/firehose/interceptors.rs b/graph/src/firehose/interceptors.rs index 0c248188f41..8c86d034db4 100644 --- a/graph/src/firehose/interceptors.rs +++ b/graph/src/firehose/interceptors.rs @@ -8,7 +8,7 @@ use tonic::{ service::Interceptor, }; -use crate::endpoint::{EndpointMetrics, Host}; +use crate::endpoint::{EndpointMetrics, RequestLabels}; #[derive(Clone)] pub struct AuthInterceptor { @@ -37,7 +37,7 @@ impl Interceptor for AuthInterceptor { pub struct MetricsInterceptor { pub(crate) metrics: Arc, pub(crate) service: S, - pub(crate) host: Host, + pub(crate) labels: RequestLabels, } impl Service for MetricsInterceptor @@ -60,16 +60,16 @@ where } fn call(&mut self, req: Request) -> Self::Future { - let host = self.host.clone(); + let labels = self.labels.clone(); let metrics = self.metrics.clone(); let fut = self.service.call(req); let res = async move { let res = fut.await; if res.is_ok() { - metrics.success(&host); + metrics.success(&labels); } else { - metrics.failure(&host); + metrics.failure(&labels); } res }; diff --git a/node/src/main.rs b/node/src/main.rs index 577867f1c4a..59ca831812d 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -231,6 +231,7 @@ async fn main() { let endpoint_metrics = Arc::new(EndpointMetrics::new( logger.clone(), &config.chains.provider_urls(), + metrics_registry.cheap_clone(), )); // Ethereum clients; query nodes ignore all ethereum clients and never diff --git a/node/src/manager/commands/run.rs b/node/src/manager/commands/run.rs index 5ebc450832c..d9e326579ca 100644 --- a/node/src/manager/commands/run.rs +++ b/node/src/manager/commands/run.rs @@ -75,6 +75,7 @@ pub async fn run( let endpoint_metrics = Arc::new(EndpointMetrics::new( logger.clone(), &config.chains.provider_urls(), + metrics_registry.cheap_clone(), )); // Convert the clients into a link resolver. Since we want to get past