Skip to content

Commit

Permalink
improve(metrics): Turn "Flushing logs to Elasticsearch" log into metr…
Browse files Browse the repository at this point in the history
…ic (#4333)
  • Loading branch information
leoyvens authored Feb 3, 2023
1 parent 7118fd8 commit 706d2b1
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 26 deletions.
25 changes: 16 additions & 9 deletions graph/src/log/elastic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::time::Duration;
use chrono::prelude::{SecondsFormat, Utc};
use futures03::TryFutureExt;
use http::header::CONTENT_TYPE;
use prometheus::Counter;
use reqwest;
use reqwest::Client;
use serde::ser::Serializer as SerdeSerializer;
Expand Down Expand Up @@ -175,15 +176,21 @@ pub struct ElasticDrainConfig {
pub struct ElasticDrain {
config: ElasticDrainConfig,
error_logger: Logger,
logs_sent_counter: Counter,
logs: Arc<Mutex<Vec<ElasticLog>>>,
}

impl ElasticDrain {
/// Creates a new `ElasticDrain`.
pub fn new(config: ElasticDrainConfig, error_logger: Logger) -> Self {
pub fn new(
config: ElasticDrainConfig,
error_logger: Logger,
logs_sent_counter: Counter,
) -> Self {
let drain = ElasticDrain {
config,
error_logger,
logs_sent_counter,
logs: Arc::new(Mutex::new(vec![])),
};
drain.periodically_flush_logs();
Expand All @@ -192,6 +199,7 @@ impl ElasticDrain {

fn periodically_flush_logs(&self) {
let flush_logger = self.error_logger.clone();
let logs_sent_counter = self.logs_sent_counter.clone();
let logs = self.logs.clone();
let config = self.config.clone();
let mut interval = tokio::time::interval(self.config.flush_interval);
Expand All @@ -203,7 +211,6 @@ impl ElasticDrain {

let logs = logs.clone();
let config = config.clone();
let flush_logger = flush_logger.clone();
let logs_to_send = {
let mut logs = logs.lock().unwrap();
let logs_to_send = (*logs).clone();
Expand All @@ -217,11 +224,7 @@ impl ElasticDrain {
continue;
}

debug!(
flush_logger,
"Flushing {} logs to Elasticsearch",
logs_to_send.len()
);
logs_sent_counter.inc_by(logs_to_send.len() as f64);

// The Elasticsearch batch API takes requests with the following format:
// ```ignore
Expand Down Expand Up @@ -382,8 +385,12 @@ impl Drain for ElasticDrain {
///
/// Uses `error_logger` to print any Elasticsearch logging errors,
/// so they don't go unnoticed.
pub fn elastic_logger(config: ElasticDrainConfig, error_logger: Logger) -> Logger {
let elastic_drain = ElasticDrain::new(config, error_logger).fuse();
pub fn elastic_logger(
config: ElasticDrainConfig,
error_logger: Logger,
logs_sent_counter: Counter,
) -> Logger {
let elastic_drain = ElasticDrain::new(config, error_logger, logs_sent_counter).fuse();
let async_drain = slog_async::Async::new(elastic_drain)
.chan_size(20000)
.build()
Expand Down
25 changes: 24 additions & 1 deletion graph/src/log/factory.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use std::sync::Arc;

use prometheus::Counter;
use slog::*;

use crate::components::metrics::MetricsRegistry;
use crate::components::store::DeploymentLocator;
use crate::log::elastic::*;
use crate::log::split::*;
Expand All @@ -20,14 +24,20 @@ pub struct ComponentLoggerConfig {
pub struct LoggerFactory {
parent: Logger,
elastic_config: Option<ElasticLoggingConfig>,
metrics_registry: Arc<dyn MetricsRegistry>,
}

impl LoggerFactory {
/// Creates a new factory using a parent logger and optional Elasticsearch configuration.
pub fn new(logger: Logger, elastic_config: Option<ElasticLoggingConfig>) -> Self {
pub fn new(
logger: Logger,
elastic_config: Option<ElasticLoggingConfig>,
metrics_registry: Arc<dyn MetricsRegistry>,
) -> Self {
Self {
parent: logger,
elastic_config,
metrics_registry,
}
}

Expand All @@ -36,6 +46,7 @@ impl LoggerFactory {
Self {
parent,
elastic_config: self.elastic_config.clone(),
metrics_registry: self.metrics_registry.clone(),
}
}

Expand Down Expand Up @@ -68,6 +79,7 @@ impl LoggerFactory {
max_retries: ENV_VARS.elastic_search_max_retries,
},
term_logger.clone(),
self.logs_sent_counter(None),
),
)
})
Expand Down Expand Up @@ -98,9 +110,20 @@ impl LoggerFactory {
max_retries: ENV_VARS.elastic_search_max_retries,
},
term_logger.clone(),
self.logs_sent_counter(Some(loc.hash.as_str())),
),
)
})
.unwrap_or(term_logger)
}

fn logs_sent_counter(&self, deployment: Option<&str>) -> Counter {
self.metrics_registry
.global_deployment_counter(
"graph_elasticsearch_logs_sent",
"Count of logs sent to Elasticsearch endpoint",
deployment.unwrap_or(""),
)
.unwrap()
}
}
17 changes: 9 additions & 8 deletions node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,16 @@ async fn main() {
client: reqwest::Client::new(),
});

// Set up Prometheus registry
let prometheus_registry = Arc::new(Registry::new());
let metrics_registry = Arc::new(MetricsRegistry::new(
logger.clone(),
prometheus_registry.clone(),
));

// Create a component and subgraph logger factory
let logger_factory = LoggerFactory::new(logger.clone(), elastic_config);
let logger_factory =
LoggerFactory::new(logger.clone(), elastic_config, metrics_registry.clone());

// Try to create IPFS clients for each URL specified in `--ipfs`
let ipfs_clients: Vec<_> = create_ipfs_clients(&logger, &opt.ipfs);
Expand All @@ -207,13 +215,6 @@ async fn main() {
// Convert the clients into a link resolver. Since we want to get past
// possible temporary DNS failures, make the resolver retry
let link_resolver = Arc::new(LinkResolver::new(ipfs_clients, env_vars.cheap_clone()));

// Set up Prometheus registry
let prometheus_registry = Arc::new(Registry::new());
let metrics_registry = Arc::new(MetricsRegistry::new(
logger.clone(),
prometheus_registry.clone(),
));
let mut metrics_server =
PrometheusMetricsServer::new(&logger_factory, prometheus_registry.clone());

Expand Down
2 changes: 1 addition & 1 deletion node/src/manager/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub async fn run(

let env_vars = Arc::new(EnvVars::from_env().unwrap());
let metrics_registry = metrics_ctx.registry.clone();
let logger_factory = LoggerFactory::new(logger.clone(), None);
let logger_factory = LoggerFactory::new(logger.clone(), None, metrics_ctx.registry.clone());

// FIXME: Hard-coded IPFS config, take it from config file instead?
let ipfs_clients: Vec<_> = create_ipfs_clients(&logger, &ipfs_url);
Expand Down
13 changes: 9 additions & 4 deletions server/http/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ impl GraphQlRunner for TestGraphQlRunner {

#[cfg(test)]
mod test {
use graph_mock::MockMetricsRegistry;

use super::*;

lazy_static! {
Expand All @@ -101,7 +103,7 @@ mod test {
runtime
.block_on(async {
let logger = Logger::root(slog::Discard, o!());
let logger_factory = LoggerFactory::new(logger, None);
let logger_factory = LoggerFactory::new(logger, None, Arc::new(MockMetricsRegistry::new()));
let id = USERS.clone();
let query_runner = Arc::new(TestGraphQlRunner);
let node_id = NodeId::new("test").unwrap();
Expand Down Expand Up @@ -142,7 +144,8 @@ mod test {
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(async {
let logger = Logger::root(slog::Discard, o!());
let logger_factory = LoggerFactory::new(logger, None);
let logger_factory =
LoggerFactory::new(logger, None, Arc::new(MockMetricsRegistry::new()));
let id = USERS.clone();
let query_runner = Arc::new(TestGraphQlRunner);
let node_id = NodeId::new("test").unwrap();
Expand Down Expand Up @@ -222,7 +225,8 @@ mod test {
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(async {
let logger = Logger::root(slog::Discard, o!());
let logger_factory = LoggerFactory::new(logger, None);
let logger_factory =
LoggerFactory::new(logger, None, Arc::new(MockMetricsRegistry::new()));
let id = USERS.clone();
let query_runner = Arc::new(TestGraphQlRunner);
let node_id = NodeId::new("test").unwrap();
Expand Down Expand Up @@ -267,7 +271,8 @@ mod test {
let runtime = tokio::runtime::Runtime::new().unwrap();
let _ = runtime.block_on(async {
let logger = Logger::root(slog::Discard, o!());
let logger_factory = LoggerFactory::new(logger, None);
let logger_factory =
LoggerFactory::new(logger, None, Arc::new(MockMetricsRegistry::new()));
let id = USERS.clone();
let query_runner = Arc::new(TestGraphQlRunner);
let node_id = NodeId::new("test").unwrap();
Expand Down
2 changes: 1 addition & 1 deletion tests/src/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,8 @@ pub async fn setup<C: Blockchain>(
});

let logger = graph::log::logger(true);
let logger_factory = LoggerFactory::new(logger.clone(), None);
let mock_registry: Arc<dyn MetricsRegistry> = Arc::new(MockMetricsRegistry::new());
let logger_factory = LoggerFactory::new(logger.clone(), None, mock_registry.clone());
let node_id = NodeId::new(NODE_ID).unwrap();

// Make sure we're starting from a clean state.
Expand Down
4 changes: 2 additions & 2 deletions tests/src/fixture/ethereum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ pub async fn chain(
x: PhantomData,
}));
let logger = graph::log::logger(true);
let logger_factory = LoggerFactory::new(logger.cheap_clone(), None);
let node_id = NodeId::new(NODE_ID).unwrap();
let mock_registry = Arc::new(MockMetricsRegistry::new());
let logger_factory = LoggerFactory::new(logger.cheap_clone(), None, mock_registry.clone());
let node_id = NodeId::new(NODE_ID).unwrap();

let chain_store = stores.chain_store.cheap_clone();

Expand Down

0 comments on commit 706d2b1

Please sign in to comment.