Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: mono-vertex code review #1917

Merged
merged 1 commit into from
Aug 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion serving/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion serving/source-sink/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ chrono = "0.4.31"
base64 = "0.22.1"
metrics = { version = "0.23.0", default-features = false }
metrics-exporter-prometheus = { version = "0.15.3", default-features = false }
log = "0.4.22"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
hyper-util = "0.1.6"
tower = "0.4.13"
Expand Down
22 changes: 13 additions & 9 deletions serving/source-sink/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::error::Error;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use log::LevelFilter;
use numaflow_models::models::MonoVertex;
use std::env;
use std::sync::OnceLock;
use tracing::level_filters::LevelFilter;

const ENV_MONO_VERTEX_OBJ: &str = "NUMAFLOW_MONO_VERTEX_OBJECT";
const ENV_GRPC_MAX_MESSAGE_SIZE: &str = "NUMAFLOW_GRPC_MAX_MESSAGE_SIZE";
Expand Down Expand Up @@ -48,7 +48,7 @@ impl Default for Settings {
batch_size: DEFAULT_BATCH_SIZE,
timeout_in_ms: DEFAULT_TIMEOUT_IN_MS,
metrics_server_listen_port: DEFAULT_METRICS_PORT,
log_level: "info".to_string(),
log_level: LevelFilter::INFO.to_string(),
grpc_max_message_size: DEFAULT_GRPC_MAX_MESSAGE_SIZE,
is_transformer_enabled: false,
lag_check_interval_in_secs: DEFAULT_LAG_CHECK_INTERVAL_IN_SECS,
Expand Down Expand Up @@ -105,7 +105,7 @@ impl Settings {
}

settings.log_level =
env::var(ENV_LOG_LEVEL).unwrap_or_else(|_| LevelFilter::Info.to_string());
env::var(ENV_LOG_LEVEL).unwrap_or_else(|_| LevelFilter::INFO.to_string());

settings.grpc_max_message_size = env::var(ENV_GRPC_MAX_MESSAGE_SIZE)
.unwrap_or_else(|_| DEFAULT_GRPC_MAX_MESSAGE_SIZE.to_string())
Expand All @@ -131,9 +131,11 @@ mod tests {
#[test]
fn test_settings_load() {
// Set up environment variables
env::set_var(ENV_MONO_VERTEX_OBJ, "eyJtZXRhZGF0YSI6eyJuYW1lIjoic2ltcGxlLW1vbm8tdmVydGV4IiwibmFtZXNwYWNlIjoiZGVmYXVsdCIsImNyZWF0aW9uVGltZXN0YW1wIjpudWxsfSwic3BlYyI6eyJyZXBsaWNhcyI6MCwic291cmNlIjp7InRyYW5zZm9ybWVyIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6InF1YXkuaW8vbnVtYWlvL251bWFmbG93LXJzL21hcHQtZXZlbnQtdGltZS1maWx0ZXI6c3RhYmxlIiwicmVzb3VyY2VzIjp7fX0sImJ1aWx0aW4iOm51bGx9LCJ1ZHNvdXJjZSI6eyJjb250YWluZXIiOnsiaW1hZ2UiOiJkb2NrZXIuaW50dWl0LmNvbS9wZXJzb25hbC95aGwwMS9zaW1wbGUtc291cmNlOnN0YWJsZSIsInJlc291cmNlcyI6e319fX0sInNpbmsiOnsidWRzaW5rIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6ImRvY2tlci5pbnR1aXQuY29tL3BlcnNvbmFsL3lobDAxL2JsYWNraG9sZS1zaW5rOnN0YWJsZSIsInJlc291cmNlcyI6e319fX0sImxpbWl0cyI6eyJyZWFkQmF0Y2hTaXplIjo1MDAsInJlYWRUaW1lb3V0IjoiMXMifSwic2NhbGUiOnt9fSwic3RhdHVzIjp7InJlcGxpY2FzIjowLCJsYXN0VXBkYXRlZCI6bnVsbCwibGFzdFNjYWxlZEF0IjpudWxsfX0=");
env::set_var(ENV_LOG_LEVEL, "debug");
env::set_var(ENV_GRPC_MAX_MESSAGE_SIZE, "128000000");
unsafe {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yhl25 since tests are run in parallel, won't these become flaky?

env::set_var(ENV_MONO_VERTEX_OBJ, "eyJtZXRhZGF0YSI6eyJuYW1lIjoic2ltcGxlLW1vbm8tdmVydGV4IiwibmFtZXNwYWNlIjoiZGVmYXVsdCIsImNyZWF0aW9uVGltZXN0YW1wIjpudWxsfSwic3BlYyI6eyJyZXBsaWNhcyI6MCwic291cmNlIjp7InRyYW5zZm9ybWVyIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6InF1YXkuaW8vbnVtYWlvL251bWFmbG93LXJzL21hcHQtZXZlbnQtdGltZS1maWx0ZXI6c3RhYmxlIiwicmVzb3VyY2VzIjp7fX0sImJ1aWx0aW4iOm51bGx9LCJ1ZHNvdXJjZSI6eyJjb250YWluZXIiOnsiaW1hZ2UiOiJkb2NrZXIuaW50dWl0LmNvbS9wZXJzb25hbC95aGwwMS9zaW1wbGUtc291cmNlOnN0YWJsZSIsInJlc291cmNlcyI6e319fX0sInNpbmsiOnsidWRzaW5rIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6ImRvY2tlci5pbnR1aXQuY29tL3BlcnNvbmFsL3lobDAxL2JsYWNraG9sZS1zaW5rOnN0YWJsZSIsInJlc291cmNlcyI6e319fX0sImxpbWl0cyI6eyJyZWFkQmF0Y2hTaXplIjo1MDAsInJlYWRUaW1lb3V0IjoiMXMifSwic2NhbGUiOnt9fSwic3RhdHVzIjp7InJlcGxpY2FzIjowLCJsYXN0VXBkYXRlZCI6bnVsbCwibGFzdFNjYWxlZEF0IjpudWxsfX0=");
env::set_var(ENV_LOG_LEVEL, "debug");
env::set_var(ENV_GRPC_MAX_MESSAGE_SIZE, "128000000");
};

// Load settings
let settings = Settings::load().unwrap();
Expand All @@ -146,8 +148,10 @@ mod tests {
assert_eq!(settings.grpc_max_message_size, 128000000);

// Clean up environment variables
env::remove_var(ENV_MONO_VERTEX_OBJ);
env::remove_var(ENV_LOG_LEVEL);
env::remove_var(ENV_GRPC_MAX_MESSAGE_SIZE);
unsafe {
env::remove_var(ENV_MONO_VERTEX_OBJ);
env::remove_var(ENV_LOG_LEVEL);
env::remove_var(ENV_GRPC_MAX_MESSAGE_SIZE);
};
}
}
11 changes: 6 additions & 5 deletions serving/source-sink/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
use log::Level::Info;
use sourcer_sinker::config::config;
use sourcer_sinker::metrics::start_metrics_https_server;
use sourcer_sinker::run_forwarder;
use sourcer_sinker::sink::SinkConfig;
use sourcer_sinker::source::SourceConfig;
use sourcer_sinker::transformer::TransformerConfig;
use sourcer_sinker::run_forwarder;
use std::env;
use std::net::SocketAddr;
use tracing::error;
use tracing::level_filters::LevelFilter;
use tracing::{error, info};
use tracing_subscriber::EnvFilter;
use sourcer_sinker::metrics::start_metrics_https_server;

#[tokio::main]
async fn main() {
let log_level = env::var("NUMAFLOW_DEBUG").unwrap_or_else(|_| Info.to_string());
let log_level = env::var("NUMAFLOW_DEBUG").unwrap_or_else(|_| LevelFilter::INFO.to_string());
// Initialize the logger
tracing_subscriber::fmt()
.with_env_filter(
Expand Down Expand Up @@ -61,4 +60,6 @@ async fn main() {
if let Err(e) = run_forwarder(source_config, sink_config, transformer_config, None).await {
error!("Application error: {:?}", e);
}

info!("Gracefully Exiting...");
}
43 changes: 22 additions & 21 deletions serving/source-sink/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,15 @@ use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::{routing::get, Router};
use axum_server::tls_rustls::RustlsConfig;
use log::info;
use metrics::describe_counter;
use metrics_exporter_prometheus::{Matcher, PrometheusBuilder, PrometheusHandle};
use rcgen::{CertifiedKey, generate_simple_self_signed};
use rcgen::{generate_simple_self_signed, CertifiedKey};
use tokio::net::{TcpListener, ToSocketAddrs};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio::time;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
use tracing::{debug, error, info};

use crate::error::Error;
use crate::source::SourceClient;
Expand All @@ -30,7 +29,6 @@ pub const VERTEX_TYPE_LABEL: &str = "vertex_type";
// Define the metrics
pub const FORWARDER_READ_TOTAL: &str = "forwarder_read_total";
pub const FORWARDER_READ_BYTES_TOTAL: &str = "forwarder_read_bytes_total";

pub const FORWARDER_ACK_TOTAL: &str = "forwarder_ack_total";
pub const FORWARDER_WRITE_TOTAL: &str = "forwarder_write_total";

Expand All @@ -43,11 +41,7 @@ where
// setup_metrics_recorder should only be invoked once
let recorder_handle = setup_metrics_recorder()?;

let metrics_app = Router::new()
.route("/metrics", get(move || ready(recorder_handle.render())))
.route("/livez", get(livez))
.route("/readyz", get(readyz))
.route("/sidecar-livez", get(sidecar_livez));
let metrics_app = metrics_router(recorder_handle);

let listener = TcpListener::bind(&addr)
.await
Expand All @@ -70,19 +64,14 @@ where
let CertifiedKey { cert, key_pair } = generate_simple_self_signed(vec!["localhost".into()])
.map_err(|e| Error::MetricsError(format!("Generating self-signed certificate: {}", e)))?;


let tls_config = RustlsConfig::from_pem(cert.pem().into(), key_pair.serialize_pem().into())
.await
.map_err(|e| Error::MetricsError(format!("Creating tlsConfig from pem: {}", e)))?;

// setup_metrics_recorder should only be invoked once
let recorder_handle = setup_metrics_recorder()?;

let metrics_app = Router::new()
.route("/metrics", get(move || ready(recorder_handle.render())))
.route("/livez", get(livez))
.route("/readyz", get(readyz))
.route("/sidecar-livez", get(sidecar_livez));
let metrics_app = metrics_router(recorder_handle);

axum_server::bind_rustls(addr, tls_config)
.serve(metrics_app.into_make_service())
Expand All @@ -92,6 +81,16 @@ where
Ok(())
}

/// router for metrics and k8s health endpoints
fn metrics_router(recorder_handle: PrometheusHandle) -> Router {
let metrics_app = Router::new()
.route("/metrics", get(move || ready(recorder_handle.render())))
.route("/livez", get(livez))
.route("/readyz", get(readyz))
.route("/sidecar-livez", get(sidecar_livez));
metrics_app
}

async fn livez() -> impl IntoResponse {
StatusCode::NO_CONTENT
}
Expand Down Expand Up @@ -139,6 +138,7 @@ fn setup_metrics_recorder() -> crate::Result<PrometheusHandle> {
FORWARDER_WRITE_TOTAL,
"Total number of Data Messages written by the forwarder"
);

Ok(prometheus_handle)
}

Expand Down Expand Up @@ -177,7 +177,7 @@ impl LagReader {
cancellation_token: CancellationToken::new(),
buildup_handle: None,
expose_handle: None,
pending_stats: Arc::new(Mutex::new(Vec::new())),
pending_stats: Arc::new(Mutex::new(Vec::with_capacity(MAX_PENDING_STATS))),
}
}

Expand All @@ -194,7 +194,7 @@ impl LagReader {
let pending_stats = self.pending_stats.clone();

self.buildup_handle = Some(tokio::spawn(async move {
buildup_pending_info(source_client, token, lag_checking_interval, pending_stats).await;
build_pending_info(source_client, token, lag_checking_interval, pending_stats).await;
}));

let token = self.cancellation_token.clone();
Expand All @@ -216,8 +216,8 @@ impl LagReader {
}
}

// Periodically checks the pending messages from the source client and updates the pending stats.
async fn buildup_pending_info(
/// Periodically checks the pending messages from the source client and build the pending stats.
async fn build_pending_info(
mut source_client: SourceClient,
cancellation_token: CancellationToken,
lag_checking_interval: Duration,
Expand All @@ -240,7 +240,7 @@ async fn buildup_pending_info(
});
let n = stats.len();
// Ensure only the most recent MAX_PENDING_STATS entries are kept
if n > MAX_PENDING_STATS {
if n >= MAX_PENDING_STATS {
stats.drain(0..(n - MAX_PENDING_STATS));
}
}
Expand Down Expand Up @@ -280,7 +280,7 @@ async fn expose_pending_metrics(
}
}

// Calculate the average pending messages over the last `seconds` seconds.
/// Calculate the average pending messages over the last `seconds` seconds.
async fn calculate_pending(
seconds: i64,
pending_stats: &Arc<Mutex<Vec<TimestampedPending>>>,
Expand All @@ -306,6 +306,7 @@ async fn calculate_pending(

result
}

#[cfg(test)]
mod tests {
use std::net::SocketAddr;
Expand Down
5 changes: 2 additions & 3 deletions serving/source-sink/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,10 @@ impl SinkClient {

#[cfg(test)]
mod tests {
use crate::message::Offset;
use chrono::offset::Utc;
use log::info;
use numaflow::sink;

use crate::message::Offset;
use tracing::info;

use super::*;

Expand Down
Loading