From 706d2b11b09660738a41a80df410fff85ccbfc57 Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Fri, 3 Feb 2023 15:44:24 +0000 Subject: [PATCH] improve(metrics): Turn "Flushing logs to Elasticsearch" log into metric (#4333) --- graph/src/log/elastic.rs | 25 ++++++++++++++++--------- graph/src/log/factory.rs | 25 ++++++++++++++++++++++++- node/src/main.rs | 17 +++++++++-------- node/src/manager/commands/run.rs | 2 +- server/http/tests/server.rs | 13 +++++++++---- tests/src/fixture.rs | 2 +- tests/src/fixture/ethereum.rs | 4 ++-- 7 files changed, 62 insertions(+), 26 deletions(-) diff --git a/graph/src/log/elastic.rs b/graph/src/log/elastic.rs index a08ca5384eb..cbc49810beb 100644 --- a/graph/src/log/elastic.rs +++ b/graph/src/log/elastic.rs @@ -8,6 +8,7 @@ use std::time::Duration; use chrono::prelude::{SecondsFormat, Utc}; use futures03::TryFutureExt; use http::header::CONTENT_TYPE; +use prometheus::Counter; use reqwest; use reqwest::Client; use serde::ser::Serializer as SerdeSerializer; @@ -175,15 +176,21 @@ pub struct ElasticDrainConfig { pub struct ElasticDrain { config: ElasticDrainConfig, error_logger: Logger, + logs_sent_counter: Counter, logs: Arc>>, } impl ElasticDrain { /// Creates a new `ElasticDrain`. - pub fn new(config: ElasticDrainConfig, error_logger: Logger) -> Self { + pub fn new( + config: ElasticDrainConfig, + error_logger: Logger, + logs_sent_counter: Counter, + ) -> Self { let drain = ElasticDrain { config, error_logger, + logs_sent_counter, logs: Arc::new(Mutex::new(vec![])), }; drain.periodically_flush_logs(); @@ -192,6 +199,7 @@ impl ElasticDrain { fn periodically_flush_logs(&self) { let flush_logger = self.error_logger.clone(); + let logs_sent_counter = self.logs_sent_counter.clone(); let logs = self.logs.clone(); let config = self.config.clone(); let mut interval = tokio::time::interval(self.config.flush_interval); @@ -203,7 +211,6 @@ impl ElasticDrain { let logs = logs.clone(); let config = config.clone(); - let flush_logger = flush_logger.clone(); let logs_to_send = { let mut logs = logs.lock().unwrap(); let logs_to_send = (*logs).clone(); @@ -217,11 +224,7 @@ impl ElasticDrain { continue; } - debug!( - flush_logger, - "Flushing {} logs to Elasticsearch", - logs_to_send.len() - ); + logs_sent_counter.inc_by(logs_to_send.len() as f64); // The Elasticsearch batch API takes requests with the following format: // ```ignore @@ -382,8 +385,12 @@ impl Drain for ElasticDrain { /// /// Uses `error_logger` to print any Elasticsearch logging errors, /// so they don't go unnoticed. -pub fn elastic_logger(config: ElasticDrainConfig, error_logger: Logger) -> Logger { - let elastic_drain = ElasticDrain::new(config, error_logger).fuse(); +pub fn elastic_logger( + config: ElasticDrainConfig, + error_logger: Logger, + logs_sent_counter: Counter, +) -> Logger { + let elastic_drain = ElasticDrain::new(config, error_logger, logs_sent_counter).fuse(); let async_drain = slog_async::Async::new(elastic_drain) .chan_size(20000) .build() diff --git a/graph/src/log/factory.rs b/graph/src/log/factory.rs index 8565c5624ff..1b126a6995d 100644 --- a/graph/src/log/factory.rs +++ b/graph/src/log/factory.rs @@ -1,5 +1,9 @@ +use std::sync::Arc; + +use prometheus::Counter; use slog::*; +use crate::components::metrics::MetricsRegistry; use crate::components::store::DeploymentLocator; use crate::log::elastic::*; use crate::log::split::*; @@ -20,14 +24,20 @@ pub struct ComponentLoggerConfig { pub struct LoggerFactory { parent: Logger, elastic_config: Option, + metrics_registry: Arc, } impl LoggerFactory { /// Creates a new factory using a parent logger and optional Elasticsearch configuration. - pub fn new(logger: Logger, elastic_config: Option) -> Self { + pub fn new( + logger: Logger, + elastic_config: Option, + metrics_registry: Arc, + ) -> Self { Self { parent: logger, elastic_config, + metrics_registry, } } @@ -36,6 +46,7 @@ impl LoggerFactory { Self { parent, elastic_config: self.elastic_config.clone(), + metrics_registry: self.metrics_registry.clone(), } } @@ -68,6 +79,7 @@ impl LoggerFactory { max_retries: ENV_VARS.elastic_search_max_retries, }, term_logger.clone(), + self.logs_sent_counter(None), ), ) }) @@ -98,9 +110,20 @@ impl LoggerFactory { max_retries: ENV_VARS.elastic_search_max_retries, }, term_logger.clone(), + self.logs_sent_counter(Some(loc.hash.as_str())), ), ) }) .unwrap_or(term_logger) } + + fn logs_sent_counter(&self, deployment: Option<&str>) -> Counter { + self.metrics_registry + .global_deployment_counter( + "graph_elasticsearch_logs_sent", + "Count of logs sent to Elasticsearch endpoint", + deployment.unwrap_or(""), + ) + .unwrap() + } } diff --git a/node/src/main.rs b/node/src/main.rs index 5e81d07d56d..243cb78b0e9 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -191,8 +191,16 @@ async fn main() { client: reqwest::Client::new(), }); + // Set up Prometheus registry + let prometheus_registry = Arc::new(Registry::new()); + let metrics_registry = Arc::new(MetricsRegistry::new( + logger.clone(), + prometheus_registry.clone(), + )); + // Create a component and subgraph logger factory - let logger_factory = LoggerFactory::new(logger.clone(), elastic_config); + let logger_factory = + LoggerFactory::new(logger.clone(), elastic_config, metrics_registry.clone()); // Try to create IPFS clients for each URL specified in `--ipfs` let ipfs_clients: Vec<_> = create_ipfs_clients(&logger, &opt.ipfs); @@ -207,13 +215,6 @@ async fn main() { // Convert the clients into a link resolver. Since we want to get past // possible temporary DNS failures, make the resolver retry let link_resolver = Arc::new(LinkResolver::new(ipfs_clients, env_vars.cheap_clone())); - - // Set up Prometheus registry - let prometheus_registry = Arc::new(Registry::new()); - let metrics_registry = Arc::new(MetricsRegistry::new( - logger.clone(), - prometheus_registry.clone(), - )); let mut metrics_server = PrometheusMetricsServer::new(&logger_factory, prometheus_registry.clone()); diff --git a/node/src/manager/commands/run.rs b/node/src/manager/commands/run.rs index cd62f299776..181978aac16 100644 --- a/node/src/manager/commands/run.rs +++ b/node/src/manager/commands/run.rs @@ -57,7 +57,7 @@ pub async fn run( let env_vars = Arc::new(EnvVars::from_env().unwrap()); let metrics_registry = metrics_ctx.registry.clone(); - let logger_factory = LoggerFactory::new(logger.clone(), None); + let logger_factory = LoggerFactory::new(logger.clone(), None, metrics_ctx.registry.clone()); // FIXME: Hard-coded IPFS config, take it from config file instead? let ipfs_clients: Vec<_> = create_ipfs_clients(&logger, &ipfs_url); diff --git a/server/http/tests/server.rs b/server/http/tests/server.rs index 899a9effb40..589e10d696b 100644 --- a/server/http/tests/server.rs +++ b/server/http/tests/server.rs @@ -89,6 +89,8 @@ impl GraphQlRunner for TestGraphQlRunner { #[cfg(test)] mod test { + use graph_mock::MockMetricsRegistry; + use super::*; lazy_static! { @@ -101,7 +103,7 @@ mod test { runtime .block_on(async { let logger = Logger::root(slog::Discard, o!()); - let logger_factory = LoggerFactory::new(logger, None); + let logger_factory = LoggerFactory::new(logger, None, Arc::new(MockMetricsRegistry::new())); let id = USERS.clone(); let query_runner = Arc::new(TestGraphQlRunner); let node_id = NodeId::new("test").unwrap(); @@ -142,7 +144,8 @@ mod test { let runtime = tokio::runtime::Runtime::new().unwrap(); runtime.block_on(async { let logger = Logger::root(slog::Discard, o!()); - let logger_factory = LoggerFactory::new(logger, None); + let logger_factory = + LoggerFactory::new(logger, None, Arc::new(MockMetricsRegistry::new())); let id = USERS.clone(); let query_runner = Arc::new(TestGraphQlRunner); let node_id = NodeId::new("test").unwrap(); @@ -222,7 +225,8 @@ mod test { let runtime = tokio::runtime::Runtime::new().unwrap(); runtime.block_on(async { let logger = Logger::root(slog::Discard, o!()); - let logger_factory = LoggerFactory::new(logger, None); + let logger_factory = + LoggerFactory::new(logger, None, Arc::new(MockMetricsRegistry::new())); let id = USERS.clone(); let query_runner = Arc::new(TestGraphQlRunner); let node_id = NodeId::new("test").unwrap(); @@ -267,7 +271,8 @@ mod test { let runtime = tokio::runtime::Runtime::new().unwrap(); let _ = runtime.block_on(async { let logger = Logger::root(slog::Discard, o!()); - let logger_factory = LoggerFactory::new(logger, None); + let logger_factory = + LoggerFactory::new(logger, None, Arc::new(MockMetricsRegistry::new())); let id = USERS.clone(); let query_runner = Arc::new(TestGraphQlRunner); let node_id = NodeId::new("test").unwrap(); diff --git a/tests/src/fixture.rs b/tests/src/fixture.rs index 654bea65014..5a489bbdf72 100644 --- a/tests/src/fixture.rs +++ b/tests/src/fixture.rs @@ -377,8 +377,8 @@ pub async fn setup( }); let logger = graph::log::logger(true); - let logger_factory = LoggerFactory::new(logger.clone(), None); let mock_registry: Arc = Arc::new(MockMetricsRegistry::new()); + let logger_factory = LoggerFactory::new(logger.clone(), None, mock_registry.clone()); let node_id = NodeId::new(NODE_ID).unwrap(); // Make sure we're starting from a clean state. diff --git a/tests/src/fixture/ethereum.rs b/tests/src/fixture/ethereum.rs index 3e94385ece5..426fffb8e0f 100644 --- a/tests/src/fixture/ethereum.rs +++ b/tests/src/fixture/ethereum.rs @@ -31,9 +31,9 @@ pub async fn chain( x: PhantomData, })); let logger = graph::log::logger(true); - let logger_factory = LoggerFactory::new(logger.cheap_clone(), None); - let node_id = NodeId::new(NODE_ID).unwrap(); let mock_registry = Arc::new(MockMetricsRegistry::new()); + let logger_factory = LoggerFactory::new(logger.cheap_clone(), None, mock_registry.clone()); + let node_id = NodeId::new(NODE_ID).unwrap(); let chain_store = stores.chain_store.cheap_clone();