Skip to content

Commit

Permalink
graph: Add new counter to EndpointMetrics (#4490)
Browse files Browse the repository at this point in the history
- Add new metric with relevant labels to EndpointMetric
- Wire the MetricInterceptor of firehose to produce request metrics
  • Loading branch information
mangas authored Mar 27, 2023
1 parent 46bcd80 commit 43c35b9
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 37 deletions.
8 changes: 6 additions & 2 deletions chain/ethereum/examples/firehose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use graph::{
env::env_var,
firehose::SubgraphLimit,
log::logger,
prelude::{prost, tokio, tonic},
prelude::{prost, tokio, tonic, MetricsRegistry},
{firehose, firehose::FirehoseEndpoint},
};
use graph_chain_ethereum::codec;
Expand All @@ -24,7 +24,11 @@ async fn main() -> Result<(), Error> {

let logger = logger(false);
let host = "https://api.streamingfast.io:443".to_string();
let metrics = Arc::new(EndpointMetrics::new(logger, &[host.clone()]));
let metrics = Arc::new(EndpointMetrics::new(
logger,
&[host.clone()],
Arc::new(MetricsRegistry::mock()),
));

let firehose = Arc::new(FirehoseEndpoint::new(
"firehose",
Expand Down
6 changes: 5 additions & 1 deletion chain/substreams/examples/substreams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ async fn main() -> Result<(), Error> {
prometheus_registry.clone(),
));

let endpoint_metrics = EndpointMetrics::new(logger.clone(), &[endpoint.clone()]);
let endpoint_metrics = EndpointMetrics::new(
logger.clone(),
&[endpoint.clone()],
Arc::new(MetricsRegistry::mock()),
);

let firehose = Arc::new(FirehoseEndpoint::new(
"substreams",
Expand Down
14 changes: 13 additions & 1 deletion graph/src/components/metrics/registry.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

use prometheus::{labels, Histogram};
use prometheus::{labels, Histogram, IntCounterVec};

use crate::components::metrics::{counter_with_labels, gauge_with_labels};
use crate::prelude::Collector;
Expand Down Expand Up @@ -386,6 +386,18 @@ impl MetricsRegistry {
Ok(counter)
}

pub fn new_int_counter_vec(
&self,
name: &str,
help: &str,
variable_labels: &[&str],
) -> Result<Box<IntCounterVec>, PrometheusError> {
let opts = Opts::new(name, help);
let counters = Box::new(IntCounterVec::new(opts, &variable_labels)?);
self.register(name, counters.clone());
Ok(counters)
}

pub fn new_counter_vec(
&self,
name: &str,
Expand Down
140 changes: 118 additions & 22 deletions graph/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ use std::{
},
};

use prometheus::IntCounterVec;
use slog::{warn, Logger};

use crate::data::value::Word;
use crate::{components::metrics::MetricsRegistry, data::value::Word};

/// HostCount is the underlying structure to keep the count,
/// we require that all the hosts are known ahead of time, this way we can
Expand All @@ -20,50 +21,140 @@ type HostCount = Arc<HashMap<Host, AtomicU64>>;
/// adapters if they share the same endpoint.
pub type Host = Word;

/// This struct represents all the current labels except for the result
/// which is added separately. If any new labels are necessary they should
/// remain in the same order as added in [`EndpointMetrics::new`]
#[derive(Clone)]
pub struct RequestLabels {
pub host: Host,
pub req_type: Word,
pub conn_type: ConnectionType,
}

/// The type of underlying connection we are reporting for.
#[derive(Clone)]
pub enum ConnectionType {
Firehose,
Substreams,
Rpc,
}

impl Into<&str> for &ConnectionType {
fn into(self) -> &'static str {
match self {
ConnectionType::Firehose => "firehose",
ConnectionType::Substreams => "substreams",
ConnectionType::Rpc => "rpc",
}
}
}

impl RequestLabels {
fn to_slice(&self, is_success: bool) -> Box<[&str]> {
Box::new([
(&self.conn_type).into(),
self.req_type.as_str(),
self.host.as_str(),
match is_success {
true => "success",
false => "failure",
},
])
}
}

/// EndpointMetrics keeps track of calls success rate for specific calls,
/// a success call to a host will clear the error count.
#[derive(Debug)]
pub struct EndpointMetrics {
logger: Logger,
hosts: HostCount,
counter: Box<IntCounterVec>,
}

impl std::fmt::Debug for EndpointMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("{:?}", self.hosts))
}
}

impl EndpointMetrics {
pub fn new(logger: Logger, hosts: &[impl AsRef<str>]) -> Self {
pub fn new(logger: Logger, hosts: &[impl AsRef<str>], registry: Arc<MetricsRegistry>) -> Self {
let hosts = Arc::new(HashMap::from_iter(
hosts
.iter()
.map(|h| (Host::from(h.as_ref()), AtomicU64::new(0))),
));

Self { logger, hosts }
let counter = registry
.new_int_counter_vec(
"endpoint_request",
"successfull request",
&["conn_type", "req_type", "host", "result"],
)
.expect("unable to create endpoint_request counter_vec");

Self {
logger,
hosts,
counter,
}
}

/// This should only be used for testing.
pub fn mock() -> Self {
use slog::{o, Discard};
Self {
logger: Logger::root(Discard, o!()),
hosts: Arc::new(HashMap::default()),
let hosts: &[&str] = &[];
Self::new(
Logger::root(Discard, o!()),
hosts,
Arc::new(MetricsRegistry::mock()),
)
}

#[cfg(debug_assertions)]
pub fn report_for_test(&self, host: &Host, success: bool) {
match success {
true => self.success(&RequestLabels {
host: host.clone(),
req_type: "".into(),
conn_type: ConnectionType::Firehose,
}),
false => self.failure(&RequestLabels {
host: host.clone(),
req_type: "".into(),
conn_type: ConnectionType::Firehose,
}),
}
}

pub fn success(&self, host: &Host) {
match self.hosts.get(host) {
pub fn success(&self, labels: &RequestLabels) {
match self.hosts.get(&labels.host) {
Some(count) => {
count.store(0, Ordering::Relaxed);
}
None => warn!(&self.logger, "metrics not available for host {}", host),
}
None => warn!(
&self.logger,
"metrics not available for host {}", labels.host
),
};

self.counter.with_label_values(&labels.to_slice(true)).inc();
}

pub fn failure(&self, host: &Host) {
match self.hosts.get(host) {
pub fn failure(&self, labels: &RequestLabels) {
match self.hosts.get(&labels.host) {
Some(count) => {
count.fetch_add(1, Ordering::Relaxed);
}
None => warn!(&self.logger, "metrics not available for host {}", host),
}
None => warn!(
&self.logger,
"metrics not available for host {}", &labels.host
),
};

self.counter
.with_label_values(&labels.to_slice(false))
.inc();
}

/// Returns the current error count of a host or 0 if the host
Expand All @@ -78,23 +169,28 @@ impl EndpointMetrics {

#[cfg(test)]
mod test {
use std::sync::Arc;

use slog::{o, Discard, Logger};

use crate::endpoint::{EndpointMetrics, Host};
use crate::{
components::metrics::MetricsRegistry,
endpoint::{EndpointMetrics, Host},
};

#[tokio::test]
async fn should_increment_and_reset() {
let (a, b, c): (Host, Host, Host) = ("a".into(), "b".into(), "c".into());
let hosts: &[&str] = &[&a, &b, &c];
let logger = Logger::root(Discard, o!());

let metrics = EndpointMetrics::new(logger, hosts);
let metrics = EndpointMetrics::new(logger, hosts, Arc::new(MetricsRegistry::mock()));

metrics.success(&a);
metrics.failure(&a);
metrics.failure(&b);
metrics.failure(&b);
metrics.success(&c);
metrics.report_for_test(&a, true);
metrics.report_for_test(&a, false);
metrics.report_for_test(&b, false);
metrics.report_for_test(&b, false);
metrics.report_for_test(&c, true);

assert_eq!(metrics.get_count(&a), 1);
assert_eq!(metrics.get_count(&b), 2);
Expand Down
27 changes: 21 additions & 6 deletions graph/src/firehose/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
blockchain::BlockPtr,
cheap_clone::CheapClone,
components::store::BlockNumber,
endpoint::{EndpointMetrics, Host},
endpoint::{ConnectionType, EndpointMetrics, Host, RequestLabels},
firehose::decode_firehose_block,
prelude::{anyhow, debug, info},
substreams,
Expand Down Expand Up @@ -196,7 +196,11 @@ impl FirehoseEndpoint {
let metrics = MetricsInterceptor {
metrics: self.endpoint_metrics.cheap_clone(),
service: self.channel.cheap_clone(),
host: self.host.clone(),
labels: RequestLabels {
host: self.host.clone(),
req_type: "unknown".into(),
conn_type: ConnectionType::Firehose,
},
};

let mut client: FetchClient<
Expand All @@ -219,7 +223,11 @@ impl FirehoseEndpoint {
let metrics = MetricsInterceptor {
metrics: self.endpoint_metrics.cheap_clone(),
service: self.channel.cheap_clone(),
host: self.host.clone(),
labels: RequestLabels {
host: self.host.clone(),
req_type: "unknown".into(),
conn_type: ConnectionType::Firehose,
},
};

let mut client = StreamClient::with_interceptor(metrics, self.auth.clone())
Expand All @@ -240,7 +248,11 @@ impl FirehoseEndpoint {
let metrics = MetricsInterceptor {
metrics: self.endpoint_metrics.cheap_clone(),
service: self.channel.cheap_clone(),
host: self.host.clone(),
labels: RequestLabels {
host: self.host.clone(),
req_type: "unknown".into(),
conn_type: ConnectionType::Substreams,
},
};

let mut client =
Expand Down Expand Up @@ -505,7 +517,9 @@ mod test {

use slog::{o, Discard, Logger};

use crate::{endpoint::EndpointMetrics, firehose::SubgraphLimit};
use crate::{
components::metrics::MetricsRegistry, endpoint::EndpointMetrics, firehose::SubgraphLimit,
};

use super::{AvailableCapacity, FirehoseEndpoint, FirehoseEndpoints, SUBGRAPHS_PER_CONN};

Expand Down Expand Up @@ -607,6 +621,7 @@ mod test {
"http://127.0.0.2/",
"http://127.0.0.3/",
],
Arc::new(MetricsRegistry::mock()),
));

let high_error_adapter1 = Arc::new(FirehoseEndpoint::new(
Expand Down Expand Up @@ -646,7 +661,7 @@ mod test {
endpoint_metrics.clone(),
));

endpoint_metrics.failure(&high_error_adapter1.host);
endpoint_metrics.report_for_test(&high_error_adapter1.host, false);

let mut endpoints = FirehoseEndpoints::from(vec![
high_error_adapter1.clone(),
Expand Down
10 changes: 5 additions & 5 deletions graph/src/firehose/interceptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tonic::{
service::Interceptor,
};

use crate::endpoint::{EndpointMetrics, Host};
use crate::endpoint::{EndpointMetrics, RequestLabels};

#[derive(Clone)]
pub struct AuthInterceptor {
Expand Down Expand Up @@ -37,7 +37,7 @@ impl Interceptor for AuthInterceptor {
pub struct MetricsInterceptor<S> {
pub(crate) metrics: Arc<EndpointMetrics>,
pub(crate) service: S,
pub(crate) host: Host,
pub(crate) labels: RequestLabels,
}

impl<S, Request> Service<Request> for MetricsInterceptor<S>
Expand All @@ -60,16 +60,16 @@ where
}

fn call(&mut self, req: Request) -> Self::Future {
let host = self.host.clone();
let labels = self.labels.clone();
let metrics = self.metrics.clone();

let fut = self.service.call(req);
let res = async move {
let res = fut.await;
if res.is_ok() {
metrics.success(&host);
metrics.success(&labels);
} else {
metrics.failure(&host);
metrics.failure(&labels);
}
res
};
Expand Down
1 change: 1 addition & 0 deletions node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ async fn main() {
let endpoint_metrics = Arc::new(EndpointMetrics::new(
logger.clone(),
&config.chains.provider_urls(),
metrics_registry.cheap_clone(),
));

// Ethereum clients; query nodes ignore all ethereum clients and never
Expand Down
1 change: 1 addition & 0 deletions node/src/manager/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub async fn run(
let endpoint_metrics = Arc::new(EndpointMetrics::new(
logger.clone(),
&config.chains.provider_urls(),
metrics_registry.cheap_clone(),
));

// Convert the clients into a link resolver. Since we want to get past
Expand Down

0 comments on commit 43c35b9

Please sign in to comment.