diff --git a/serving/Cargo.lock b/serving/Cargo.lock index 0e70179df8..e765a7d9a6 100644 --- a/serving/Cargo.lock +++ b/serving/Cargo.lock @@ -2743,7 +2743,6 @@ dependencies = [ "bytes", "chrono", "hyper-util", - "log", "metrics", "metrics-exporter-prometheus", "numaflow", diff --git a/serving/source-sink/Cargo.toml b/serving/source-sink/Cargo.toml index 9db2bbff35..ab1f4dcd1d 100644 --- a/serving/source-sink/Cargo.toml +++ b/serving/source-sink/Cargo.toml @@ -19,7 +19,6 @@ chrono = "0.4.31" base64 = "0.22.1" metrics = { version = "0.23.0", default-features = false } metrics-exporter-prometheus = { version = "0.15.3", default-features = false } -log = "0.4.22" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } hyper-util = "0.1.6" tower = "0.4.13" diff --git a/serving/source-sink/src/config.rs b/serving/source-sink/src/config.rs index 9ac27a3413..8adbc2691d 100644 --- a/serving/source-sink/src/config.rs +++ b/serving/source-sink/src/config.rs @@ -1,10 +1,10 @@ use crate::error::Error; use base64::prelude::BASE64_STANDARD; use base64::Engine; -use log::LevelFilter; use numaflow_models::models::MonoVertex; use std::env; use std::sync::OnceLock; +use tracing::level_filters::LevelFilter; const ENV_MONO_VERTEX_OBJ: &str = "NUMAFLOW_MONO_VERTEX_OBJECT"; const ENV_GRPC_MAX_MESSAGE_SIZE: &str = "NUMAFLOW_GRPC_MAX_MESSAGE_SIZE"; @@ -48,7 +48,7 @@ impl Default for Settings { batch_size: DEFAULT_BATCH_SIZE, timeout_in_ms: DEFAULT_TIMEOUT_IN_MS, metrics_server_listen_port: DEFAULT_METRICS_PORT, - log_level: "info".to_string(), + log_level: LevelFilter::INFO.to_string(), grpc_max_message_size: DEFAULT_GRPC_MAX_MESSAGE_SIZE, is_transformer_enabled: false, lag_check_interval_in_secs: DEFAULT_LAG_CHECK_INTERVAL_IN_SECS, @@ -105,7 +105,7 @@ impl Settings { } settings.log_level = - env::var(ENV_LOG_LEVEL).unwrap_or_else(|_| LevelFilter::Info.to_string()); + env::var(ENV_LOG_LEVEL).unwrap_or_else(|_| LevelFilter::INFO.to_string()); settings.grpc_max_message_size = env::var(ENV_GRPC_MAX_MESSAGE_SIZE) .unwrap_or_else(|_| DEFAULT_GRPC_MAX_MESSAGE_SIZE.to_string()) @@ -131,9 +131,11 @@ mod tests { #[test] fn test_settings_load() { // Set up environment variables - env::set_var(ENV_MONO_VERTEX_OBJ, "eyJtZXRhZGF0YSI6eyJuYW1lIjoic2ltcGxlLW1vbm8tdmVydGV4IiwibmFtZXNwYWNlIjoiZGVmYXVsdCIsImNyZWF0aW9uVGltZXN0YW1wIjpudWxsfSwic3BlYyI6eyJyZXBsaWNhcyI6MCwic291cmNlIjp7InRyYW5zZm9ybWVyIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6InF1YXkuaW8vbnVtYWlvL251bWFmbG93LXJzL21hcHQtZXZlbnQtdGltZS1maWx0ZXI6c3RhYmxlIiwicmVzb3VyY2VzIjp7fX0sImJ1aWx0aW4iOm51bGx9LCJ1ZHNvdXJjZSI6eyJjb250YWluZXIiOnsiaW1hZ2UiOiJkb2NrZXIuaW50dWl0LmNvbS9wZXJzb25hbC95aGwwMS9zaW1wbGUtc291cmNlOnN0YWJsZSIsInJlc291cmNlcyI6e319fX0sInNpbmsiOnsidWRzaW5rIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6ImRvY2tlci5pbnR1aXQuY29tL3BlcnNvbmFsL3lobDAxL2JsYWNraG9sZS1zaW5rOnN0YWJsZSIsInJlc291cmNlcyI6e319fX0sImxpbWl0cyI6eyJyZWFkQmF0Y2hTaXplIjo1MDAsInJlYWRUaW1lb3V0IjoiMXMifSwic2NhbGUiOnt9fSwic3RhdHVzIjp7InJlcGxpY2FzIjowLCJsYXN0VXBkYXRlZCI6bnVsbCwibGFzdFNjYWxlZEF0IjpudWxsfX0="); - env::set_var(ENV_LOG_LEVEL, "debug"); - env::set_var(ENV_GRPC_MAX_MESSAGE_SIZE, "128000000"); + unsafe { + env::set_var(ENV_MONO_VERTEX_OBJ, "eyJtZXRhZGF0YSI6eyJuYW1lIjoic2ltcGxlLW1vbm8tdmVydGV4IiwibmFtZXNwYWNlIjoiZGVmYXVsdCIsImNyZWF0aW9uVGltZXN0YW1wIjpudWxsfSwic3BlYyI6eyJyZXBsaWNhcyI6MCwic291cmNlIjp7InRyYW5zZm9ybWVyIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6InF1YXkuaW8vbnVtYWlvL251bWFmbG93LXJzL21hcHQtZXZlbnQtdGltZS1maWx0ZXI6c3RhYmxlIiwicmVzb3VyY2VzIjp7fX0sImJ1aWx0aW4iOm51bGx9LCJ1ZHNvdXJjZSI6eyJjb250YWluZXIiOnsiaW1hZ2UiOiJkb2NrZXIuaW50dWl0LmNvbS9wZXJzb25hbC95aGwwMS9zaW1wbGUtc291cmNlOnN0YWJsZSIsInJlc291cmNlcyI6e319fX0sInNpbmsiOnsidWRzaW5rIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6ImRvY2tlci5pbnR1aXQuY29tL3BlcnNvbmFsL3lobDAxL2JsYWNraG9sZS1zaW5rOnN0YWJsZSIsInJlc291cmNlcyI6e319fX0sImxpbWl0cyI6eyJyZWFkQmF0Y2hTaXplIjo1MDAsInJlYWRUaW1lb3V0IjoiMXMifSwic2NhbGUiOnt9fSwic3RhdHVzIjp7InJlcGxpY2FzIjowLCJsYXN0VXBkYXRlZCI6bnVsbCwibGFzdFNjYWxlZEF0IjpudWxsfX0="); + env::set_var(ENV_LOG_LEVEL, "debug"); + env::set_var(ENV_GRPC_MAX_MESSAGE_SIZE, "128000000"); + }; // Load settings let settings = Settings::load().unwrap(); @@ -146,8 +148,10 @@ mod tests { assert_eq!(settings.grpc_max_message_size, 128000000); // Clean up environment variables - env::remove_var(ENV_MONO_VERTEX_OBJ); - env::remove_var(ENV_LOG_LEVEL); - env::remove_var(ENV_GRPC_MAX_MESSAGE_SIZE); + unsafe { + env::remove_var(ENV_MONO_VERTEX_OBJ); + env::remove_var(ENV_LOG_LEVEL); + env::remove_var(ENV_GRPC_MAX_MESSAGE_SIZE); + }; } } diff --git a/serving/source-sink/src/main.rs b/serving/source-sink/src/main.rs index aa1d8c0605..0013e33613 100644 --- a/serving/source-sink/src/main.rs +++ b/serving/source-sink/src/main.rs @@ -1,19 +1,18 @@ -use log::Level::Info; use sourcer_sinker::config::config; +use sourcer_sinker::metrics::start_metrics_https_server; +use sourcer_sinker::run_forwarder; use sourcer_sinker::sink::SinkConfig; use sourcer_sinker::source::SourceConfig; use sourcer_sinker::transformer::TransformerConfig; -use sourcer_sinker::run_forwarder; use std::env; use std::net::SocketAddr; -use tracing::error; use tracing::level_filters::LevelFilter; +use tracing::{error, info}; use tracing_subscriber::EnvFilter; -use sourcer_sinker::metrics::start_metrics_https_server; #[tokio::main] async fn main() { - let log_level = env::var("NUMAFLOW_DEBUG").unwrap_or_else(|_| Info.to_string()); + let log_level = env::var("NUMAFLOW_DEBUG").unwrap_or_else(|_| LevelFilter::INFO.to_string()); // Initialize the logger tracing_subscriber::fmt() .with_env_filter( @@ -61,4 +60,6 @@ async fn main() { if let Err(e) = run_forwarder(source_config, sink_config, transformer_config, None).await { error!("Application error: {:?}", e); } + + info!("Gracefully Exiting..."); } diff --git a/serving/source-sink/src/metrics.rs b/serving/source-sink/src/metrics.rs index d257609c76..b33b5b2fb2 100644 --- a/serving/source-sink/src/metrics.rs +++ b/serving/source-sink/src/metrics.rs @@ -7,16 +7,15 @@ use axum::http::StatusCode; use axum::response::IntoResponse; use axum::{routing::get, Router}; use axum_server::tls_rustls::RustlsConfig; -use log::info; use metrics::describe_counter; use metrics_exporter_prometheus::{Matcher, PrometheusBuilder, PrometheusHandle}; -use rcgen::{CertifiedKey, generate_simple_self_signed}; +use rcgen::{generate_simple_self_signed, CertifiedKey}; use tokio::net::{TcpListener, ToSocketAddrs}; use tokio::sync::Mutex; use tokio::task::JoinHandle; use tokio::time; use tokio_util::sync::CancellationToken; -use tracing::{debug, error}; +use tracing::{debug, error, info}; use crate::error::Error; use crate::source::SourceClient; @@ -30,7 +29,6 @@ pub const VERTEX_TYPE_LABEL: &str = "vertex_type"; // Define the metrics pub const FORWARDER_READ_TOTAL: &str = "forwarder_read_total"; pub const FORWARDER_READ_BYTES_TOTAL: &str = "forwarder_read_bytes_total"; - pub const FORWARDER_ACK_TOTAL: &str = "forwarder_ack_total"; pub const FORWARDER_WRITE_TOTAL: &str = "forwarder_write_total"; @@ -43,11 +41,7 @@ where // setup_metrics_recorder should only be invoked once let recorder_handle = setup_metrics_recorder()?; - let metrics_app = Router::new() - .route("/metrics", get(move || ready(recorder_handle.render()))) - .route("/livez", get(livez)) - .route("/readyz", get(readyz)) - .route("/sidecar-livez", get(sidecar_livez)); + let metrics_app = metrics_router(recorder_handle); let listener = TcpListener::bind(&addr) .await @@ -70,7 +64,6 @@ where let CertifiedKey { cert, key_pair } = generate_simple_self_signed(vec!["localhost".into()]) .map_err(|e| Error::MetricsError(format!("Generating self-signed certificate: {}", e)))?; - let tls_config = RustlsConfig::from_pem(cert.pem().into(), key_pair.serialize_pem().into()) .await .map_err(|e| Error::MetricsError(format!("Creating tlsConfig from pem: {}", e)))?; @@ -78,11 +71,7 @@ where // setup_metrics_recorder should only be invoked once let recorder_handle = setup_metrics_recorder()?; - let metrics_app = Router::new() - .route("/metrics", get(move || ready(recorder_handle.render()))) - .route("/livez", get(livez)) - .route("/readyz", get(readyz)) - .route("/sidecar-livez", get(sidecar_livez)); + let metrics_app = metrics_router(recorder_handle); axum_server::bind_rustls(addr, tls_config) .serve(metrics_app.into_make_service()) @@ -92,6 +81,16 @@ where Ok(()) } +/// router for metrics and k8s health endpoints +fn metrics_router(recorder_handle: PrometheusHandle) -> Router { + let metrics_app = Router::new() + .route("/metrics", get(move || ready(recorder_handle.render()))) + .route("/livez", get(livez)) + .route("/readyz", get(readyz)) + .route("/sidecar-livez", get(sidecar_livez)); + metrics_app +} + async fn livez() -> impl IntoResponse { StatusCode::NO_CONTENT } @@ -139,6 +138,7 @@ fn setup_metrics_recorder() -> crate::Result { FORWARDER_WRITE_TOTAL, "Total number of Data Messages written by the forwarder" ); + Ok(prometheus_handle) } @@ -177,7 +177,7 @@ impl LagReader { cancellation_token: CancellationToken::new(), buildup_handle: None, expose_handle: None, - pending_stats: Arc::new(Mutex::new(Vec::new())), + pending_stats: Arc::new(Mutex::new(Vec::with_capacity(MAX_PENDING_STATS))), } } @@ -194,7 +194,7 @@ impl LagReader { let pending_stats = self.pending_stats.clone(); self.buildup_handle = Some(tokio::spawn(async move { - buildup_pending_info(source_client, token, lag_checking_interval, pending_stats).await; + build_pending_info(source_client, token, lag_checking_interval, pending_stats).await; })); let token = self.cancellation_token.clone(); @@ -216,8 +216,8 @@ impl LagReader { } } -// Periodically checks the pending messages from the source client and updates the pending stats. -async fn buildup_pending_info( +/// Periodically checks the pending messages from the source client and build the pending stats. +async fn build_pending_info( mut source_client: SourceClient, cancellation_token: CancellationToken, lag_checking_interval: Duration, @@ -240,7 +240,7 @@ async fn buildup_pending_info( }); let n = stats.len(); // Ensure only the most recent MAX_PENDING_STATS entries are kept - if n > MAX_PENDING_STATS { + if n >= MAX_PENDING_STATS { stats.drain(0..(n - MAX_PENDING_STATS)); } } @@ -280,7 +280,7 @@ async fn expose_pending_metrics( } } -// Calculate the average pending messages over the last `seconds` seconds. +/// Calculate the average pending messages over the last `seconds` seconds. async fn calculate_pending( seconds: i64, pending_stats: &Arc>>, @@ -306,6 +306,7 @@ async fn calculate_pending( result } + #[cfg(test)] mod tests { use std::net::SocketAddr; diff --git a/serving/source-sink/src/sink.rs b/serving/source-sink/src/sink.rs index e2801873df..6312287338 100644 --- a/serving/source-sink/src/sink.rs +++ b/serving/source-sink/src/sink.rs @@ -76,11 +76,10 @@ impl SinkClient { #[cfg(test)] mod tests { + use crate::message::Offset; use chrono::offset::Utc; - use log::info; use numaflow::sink; - - use crate::message::Offset; + use tracing::info; use super::*;