Skip to content

Commit

Permalink
chore: mono-vertex code review (numaproj#1917)
Browse files Browse the repository at this point in the history
Signed-off-by: Vigith Maurice <[email protected]>
  • Loading branch information
vigith authored and Saniya Kalamkar committed Aug 14, 2024
1 parent 7971f22 commit e120495
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 40 deletions.
1 change: 0 additions & 1 deletion serving/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion serving/source-sink/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ chrono = "0.4.31"
base64 = "0.22.1"
metrics = { version = "0.23.0", default-features = false }
metrics-exporter-prometheus = { version = "0.15.3", default-features = false }
log = "0.4.22"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
hyper-util = "0.1.6"
tower = "0.4.13"
Expand Down
22 changes: 13 additions & 9 deletions serving/source-sink/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::error::Error;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use log::LevelFilter;
use numaflow_models::models::MonoVertex;
use std::env;
use std::sync::OnceLock;
use tracing::level_filters::LevelFilter;

const ENV_MONO_VERTEX_OBJ: &str = "NUMAFLOW_MONO_VERTEX_OBJECT";
const ENV_GRPC_MAX_MESSAGE_SIZE: &str = "NUMAFLOW_GRPC_MAX_MESSAGE_SIZE";
Expand Down Expand Up @@ -48,7 +48,7 @@ impl Default for Settings {
batch_size: DEFAULT_BATCH_SIZE,
timeout_in_ms: DEFAULT_TIMEOUT_IN_MS,
metrics_server_listen_port: DEFAULT_METRICS_PORT,
log_level: "info".to_string(),
log_level: LevelFilter::INFO.to_string(),
grpc_max_message_size: DEFAULT_GRPC_MAX_MESSAGE_SIZE,
is_transformer_enabled: false,
lag_check_interval_in_secs: DEFAULT_LAG_CHECK_INTERVAL_IN_SECS,
Expand Down Expand Up @@ -105,7 +105,7 @@ impl Settings {
}

settings.log_level =
env::var(ENV_LOG_LEVEL).unwrap_or_else(|_| LevelFilter::Info.to_string());
env::var(ENV_LOG_LEVEL).unwrap_or_else(|_| LevelFilter::INFO.to_string());

settings.grpc_max_message_size = env::var(ENV_GRPC_MAX_MESSAGE_SIZE)
.unwrap_or_else(|_| DEFAULT_GRPC_MAX_MESSAGE_SIZE.to_string())
Expand All @@ -131,9 +131,11 @@ mod tests {
#[test]
fn test_settings_load() {
// Set up environment variables
env::set_var(ENV_MONO_VERTEX_OBJ, "eyJtZXRhZGF0YSI6eyJuYW1lIjoic2ltcGxlLW1vbm8tdmVydGV4IiwibmFtZXNwYWNlIjoiZGVmYXVsdCIsImNyZWF0aW9uVGltZXN0YW1wIjpudWxsfSwic3BlYyI6eyJyZXBsaWNhcyI6MCwic291cmNlIjp7InRyYW5zZm9ybWVyIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6InF1YXkuaW8vbnVtYWlvL251bWFmbG93LXJzL21hcHQtZXZlbnQtdGltZS1maWx0ZXI6c3RhYmxlIiwicmVzb3VyY2VzIjp7fX0sImJ1aWx0aW4iOm51bGx9LCJ1ZHNvdXJjZSI6eyJjb250YWluZXIiOnsiaW1hZ2UiOiJkb2NrZXIuaW50dWl0LmNvbS9wZXJzb25hbC95aGwwMS9zaW1wbGUtc291cmNlOnN0YWJsZSIsInJlc291cmNlcyI6e319fX0sInNpbmsiOnsidWRzaW5rIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6ImRvY2tlci5pbnR1aXQuY29tL3BlcnNvbmFsL3lobDAxL2JsYWNraG9sZS1zaW5rOnN0YWJsZSIsInJlc291cmNlcyI6e319fX0sImxpbWl0cyI6eyJyZWFkQmF0Y2hTaXplIjo1MDAsInJlYWRUaW1lb3V0IjoiMXMifSwic2NhbGUiOnt9fSwic3RhdHVzIjp7InJlcGxpY2FzIjowLCJsYXN0VXBkYXRlZCI6bnVsbCwibGFzdFNjYWxlZEF0IjpudWxsfX0=");
env::set_var(ENV_LOG_LEVEL, "debug");
env::set_var(ENV_GRPC_MAX_MESSAGE_SIZE, "128000000");
unsafe {
env::set_var(ENV_MONO_VERTEX_OBJ, "eyJtZXRhZGF0YSI6eyJuYW1lIjoic2ltcGxlLW1vbm8tdmVydGV4IiwibmFtZXNwYWNlIjoiZGVmYXVsdCIsImNyZWF0aW9uVGltZXN0YW1wIjpudWxsfSwic3BlYyI6eyJyZXBsaWNhcyI6MCwic291cmNlIjp7InRyYW5zZm9ybWVyIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6InF1YXkuaW8vbnVtYWlvL251bWFmbG93LXJzL21hcHQtZXZlbnQtdGltZS1maWx0ZXI6c3RhYmxlIiwicmVzb3VyY2VzIjp7fX0sImJ1aWx0aW4iOm51bGx9LCJ1ZHNvdXJjZSI6eyJjb250YWluZXIiOnsiaW1hZ2UiOiJkb2NrZXIuaW50dWl0LmNvbS9wZXJzb25hbC95aGwwMS9zaW1wbGUtc291cmNlOnN0YWJsZSIsInJlc291cmNlcyI6e319fX0sInNpbmsiOnsidWRzaW5rIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6ImRvY2tlci5pbnR1aXQuY29tL3BlcnNvbmFsL3lobDAxL2JsYWNraG9sZS1zaW5rOnN0YWJsZSIsInJlc291cmNlcyI6e319fX0sImxpbWl0cyI6eyJyZWFkQmF0Y2hTaXplIjo1MDAsInJlYWRUaW1lb3V0IjoiMXMifSwic2NhbGUiOnt9fSwic3RhdHVzIjp7InJlcGxpY2FzIjowLCJsYXN0VXBkYXRlZCI6bnVsbCwibGFzdFNjYWxlZEF0IjpudWxsfX0=");
env::set_var(ENV_LOG_LEVEL, "debug");
env::set_var(ENV_GRPC_MAX_MESSAGE_SIZE, "128000000");
};

// Load settings
let settings = Settings::load().unwrap();
Expand All @@ -146,8 +148,10 @@ mod tests {
assert_eq!(settings.grpc_max_message_size, 128000000);

// Clean up environment variables
env::remove_var(ENV_MONO_VERTEX_OBJ);
env::remove_var(ENV_LOG_LEVEL);
env::remove_var(ENV_GRPC_MAX_MESSAGE_SIZE);
unsafe {
env::remove_var(ENV_MONO_VERTEX_OBJ);
env::remove_var(ENV_LOG_LEVEL);
env::remove_var(ENV_GRPC_MAX_MESSAGE_SIZE);
};
}
}
11 changes: 6 additions & 5 deletions serving/source-sink/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
use log::Level::Info;
use sourcer_sinker::config::config;
use sourcer_sinker::metrics::start_metrics_https_server;
use sourcer_sinker::run_forwarder;
use sourcer_sinker::sink::SinkConfig;
use sourcer_sinker::source::SourceConfig;
use sourcer_sinker::transformer::TransformerConfig;
use sourcer_sinker::run_forwarder;
use std::env;
use std::net::SocketAddr;
use tracing::error;
use tracing::level_filters::LevelFilter;
use tracing::{error, info};
use tracing_subscriber::EnvFilter;
use sourcer_sinker::metrics::start_metrics_https_server;

#[tokio::main]
async fn main() {
let log_level = env::var("NUMAFLOW_DEBUG").unwrap_or_else(|_| Info.to_string());
let log_level = env::var("NUMAFLOW_DEBUG").unwrap_or_else(|_| LevelFilter::INFO.to_string());
// Initialize the logger
tracing_subscriber::fmt()
.with_env_filter(
Expand Down Expand Up @@ -61,4 +60,6 @@ async fn main() {
if let Err(e) = run_forwarder(source_config, sink_config, transformer_config, None).await {
error!("Application error: {:?}", e);
}

info!("Gracefully Exiting...");
}
43 changes: 22 additions & 21 deletions serving/source-sink/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,15 @@ use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::{routing::get, Router};
use axum_server::tls_rustls::RustlsConfig;
use log::info;
use metrics::describe_counter;
use metrics_exporter_prometheus::{Matcher, PrometheusBuilder, PrometheusHandle};
use rcgen::{CertifiedKey, generate_simple_self_signed};
use rcgen::{generate_simple_self_signed, CertifiedKey};
use tokio::net::{TcpListener, ToSocketAddrs};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio::time;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
use tracing::{debug, error, info};

use crate::error::Error;
use crate::source::SourceClient;
Expand All @@ -30,7 +29,6 @@ pub const VERTEX_TYPE_LABEL: &str = "vertex_type";
// Define the metrics
pub const FORWARDER_READ_TOTAL: &str = "forwarder_read_total";
pub const FORWARDER_READ_BYTES_TOTAL: &str = "forwarder_read_bytes_total";

pub const FORWARDER_ACK_TOTAL: &str = "forwarder_ack_total";
pub const FORWARDER_WRITE_TOTAL: &str = "forwarder_write_total";

Expand All @@ -43,11 +41,7 @@ where
// setup_metrics_recorder should only be invoked once
let recorder_handle = setup_metrics_recorder()?;

let metrics_app = Router::new()
.route("/metrics", get(move || ready(recorder_handle.render())))
.route("/livez", get(livez))
.route("/readyz", get(readyz))
.route("/sidecar-livez", get(sidecar_livez));
let metrics_app = metrics_router(recorder_handle);

let listener = TcpListener::bind(&addr)
.await
Expand All @@ -70,19 +64,14 @@ where
let CertifiedKey { cert, key_pair } = generate_simple_self_signed(vec!["localhost".into()])
.map_err(|e| Error::MetricsError(format!("Generating self-signed certificate: {}", e)))?;


let tls_config = RustlsConfig::from_pem(cert.pem().into(), key_pair.serialize_pem().into())
.await
.map_err(|e| Error::MetricsError(format!("Creating tlsConfig from pem: {}", e)))?;

// setup_metrics_recorder should only be invoked once
let recorder_handle = setup_metrics_recorder()?;

let metrics_app = Router::new()
.route("/metrics", get(move || ready(recorder_handle.render())))
.route("/livez", get(livez))
.route("/readyz", get(readyz))
.route("/sidecar-livez", get(sidecar_livez));
let metrics_app = metrics_router(recorder_handle);

axum_server::bind_rustls(addr, tls_config)
.serve(metrics_app.into_make_service())
Expand All @@ -92,6 +81,16 @@ where
Ok(())
}

/// router for metrics and k8s health endpoints
fn metrics_router(recorder_handle: PrometheusHandle) -> Router {
let metrics_app = Router::new()
.route("/metrics", get(move || ready(recorder_handle.render())))
.route("/livez", get(livez))
.route("/readyz", get(readyz))
.route("/sidecar-livez", get(sidecar_livez));
metrics_app
}

async fn livez() -> impl IntoResponse {
StatusCode::NO_CONTENT
}
Expand Down Expand Up @@ -139,6 +138,7 @@ fn setup_metrics_recorder() -> crate::Result<PrometheusHandle> {
FORWARDER_WRITE_TOTAL,
"Total number of Data Messages written by the forwarder"
);

Ok(prometheus_handle)
}

Expand Down Expand Up @@ -177,7 +177,7 @@ impl LagReader {
cancellation_token: CancellationToken::new(),
buildup_handle: None,
expose_handle: None,
pending_stats: Arc::new(Mutex::new(Vec::new())),
pending_stats: Arc::new(Mutex::new(Vec::with_capacity(MAX_PENDING_STATS))),
}
}

Expand All @@ -194,7 +194,7 @@ impl LagReader {
let pending_stats = self.pending_stats.clone();

self.buildup_handle = Some(tokio::spawn(async move {
buildup_pending_info(source_client, token, lag_checking_interval, pending_stats).await;
build_pending_info(source_client, token, lag_checking_interval, pending_stats).await;
}));

let token = self.cancellation_token.clone();
Expand All @@ -216,8 +216,8 @@ impl LagReader {
}
}

// Periodically checks the pending messages from the source client and updates the pending stats.
async fn buildup_pending_info(
/// Periodically checks the pending messages from the source client and build the pending stats.
async fn build_pending_info(
mut source_client: SourceClient,
cancellation_token: CancellationToken,
lag_checking_interval: Duration,
Expand All @@ -240,7 +240,7 @@ async fn buildup_pending_info(
});
let n = stats.len();
// Ensure only the most recent MAX_PENDING_STATS entries are kept
if n > MAX_PENDING_STATS {
if n >= MAX_PENDING_STATS {
stats.drain(0..(n - MAX_PENDING_STATS));
}
}
Expand Down Expand Up @@ -280,7 +280,7 @@ async fn expose_pending_metrics(
}
}

// Calculate the average pending messages over the last `seconds` seconds.
/// Calculate the average pending messages over the last `seconds` seconds.
async fn calculate_pending(
seconds: i64,
pending_stats: &Arc<Mutex<Vec<TimestampedPending>>>,
Expand All @@ -306,6 +306,7 @@ async fn calculate_pending(

result
}

#[cfg(test)]
mod tests {
use std::net::SocketAddr;
Expand Down
5 changes: 2 additions & 3 deletions serving/source-sink/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,10 @@ impl SinkClient {

#[cfg(test)]
mod tests {
use crate::message::Offset;
use chrono::offset::Utc;
use log::info;
use numaflow::sink;

use crate::message::Offset;
use tracing::info;

use super::*;

Expand Down

0 comments on commit e120495

Please sign in to comment.