Skip to content

Commit

Permalink
chore(su): reimplement app level metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
VinceJuliano committed Dec 12, 2024
1 parent 00fdb7a commit 864767e
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 37 deletions.
53 changes: 53 additions & 0 deletions servers/su/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions servers/su/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ actix-cors = "0.6.0"
simd-json = "0.13.10"
futures = "0.3.30"
rocksdb = "0.22.0"
prometheus = { version = "0.13.4", features = ["process"] }
lru = "0.12.4"
lazy_static = "1.5.0"
avro-rs = "0.13.0"
Expand Down
81 changes: 50 additions & 31 deletions servers/su/src/domain/clients/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::super::config::AoConfig;
use super::super::core::dal::CoreMetrics;
// use prometheus::{HistogramOpts, HistogramVec, IntCounter, Registry};
use prometheus::{HistogramOpts, HistogramVec, IntCounter, Registry, TextEncoder};

/*
Implementation of metrics
Expand All @@ -11,55 +11,74 @@ use super::super::core::dal::CoreMetrics;

pub struct PromMetrics {
enabled: bool,
// core_metrics: HistogramVec,
// message_save_failures: IntCounter,
core_metrics: HistogramVec,
message_save_failures: IntCounter,
registry: Registry
}

impl PromMetrics {
pub fn new(config: AoConfig) -> Self {
// let registry = Registry::new();
let registry = Registry::new();

// // Define the options for the histogram, with buckets in milliseconds
// let histogram_opts = HistogramOpts::new(
// "core_metrics_duration_milliseconds",
// "Histogram of durations for core metrics functions in milliseconds",
// )
// .buckets(vec![
// 0.0, 1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, 5500.0,
// 6000.0, 6500.0, 7000.0, 7500.0, 8000.0, 8500.0, 9000.0, 9500.0, 10000.0,
// ])
// .namespace("su");
// Define the options for the histogram, with buckets in milliseconds
let histogram_opts = HistogramOpts::new(
"core_metrics_duration_milliseconds",
"Histogram of durations for core metrics functions in milliseconds",
)
.buckets(vec![
0.0, 1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, 5500.0,
6000.0, 6500.0, 7000.0, 7500.0, 8000.0, 8500.0, 9000.0, 9500.0, 10000.0,
])
.namespace("su");

// // Create a HistogramVec with labels for the different core metric functions
// let core_metrics = HistogramVec::new(histogram_opts, &["function_name"]).unwrap();
// Create a HistogramVec with labels for the different core metric functions
let core_metrics = HistogramVec::new(histogram_opts, &["function_name"]).unwrap();

// // Register the HistogramVec with the provided registry
// registry.register(Box::new(core_metrics.clone())).unwrap();
// Register the HistogramVec with the provided registry
registry.register(Box::new(core_metrics.clone())).unwrap();

// let message_save_failures: IntCounter =
// IntCounter::new("message_save_failures", "message save failure count").unwrap();
let message_save_failures: IntCounter =
IntCounter::new("message_save_failures", "message save failure count").unwrap();

// // Register the IntCounter with the provided registry
// registry
// .register(Box::new(message_save_failures.clone()))
// .unwrap();
// Register the IntCounter with the provided registry
registry
.register(Box::new(message_save_failures.clone()))
.unwrap();

PromMetrics {
enabled: config.enable_metrics,
// core_metrics,
// message_save_failures,
core_metrics,
message_save_failures,
registry,
}
}

fn observe_duration(&self, _function_name: &str, _duration: u128) {
fn observe_duration(&self, function_name: &str, duration: u128) {
if !self.enabled {
return;
}

// Observe the duration in milliseconds directly
// self.core_metrics
// .with_label_values(&[function_name])
// .observe(duration as f64);
self.core_metrics
.with_label_values(&[function_name])
.observe(duration as f64);
}

pub fn emit_metrics(&self) -> Result<String, String> {
if !self.enabled {
return Err("Metrics not enabled".to_string());
}

let encoder = TextEncoder::new();

let metric_families = self.registry.gather();

let mut buffer = String::new();
if let Err(err) = encoder.encode_utf8(&metric_families, &mut buffer) {
return Err(format!("Failed to encode metrics: {}", err));
}

Ok(buffer)
}
}

Expand Down Expand Up @@ -93,6 +112,6 @@ impl CoreMetrics for PromMetrics {
}

fn failed_message_save(&self) {
// self.message_save_failures.inc();
self.message_save_failures.inc();
}
}
7 changes: 4 additions & 3 deletions servers/su/src/domain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub use flows::Deps;
pub use local_store::migration::migrate_to_local;
pub use store::migrate_to_disk;

pub async fn init_deps(mode: Option<String>) -> Arc<Deps> {
pub async fn init_deps(mode: Option<String>) -> (Arc<Deps>, Arc<PromMetrics>) {
let logger: Arc<dyn Log> = SuLog::init();

let config = Arc::new(AoConfig::new(mode.clone()).expect("Failed to read configuration"));
Expand Down Expand Up @@ -100,8 +100,9 @@ pub async fn init_deps(mode: Option<String>) -> Arc<Deps> {
let metrics = Arc::new(PromMetrics::new(
AoConfig::new(mode).expect("Failed to read configuration"),
));
let metrics_clone = metrics.clone();

Arc::new(Deps {
(Arc::new(Deps {
data_store: main_data_store,
router_data_store,
logger,
Expand All @@ -112,5 +113,5 @@ pub async fn init_deps(mode: Option<String>) -> Arc<Deps> {
wallet,
uploader,
metrics,
})
}), metrics_clone)
}
20 changes: 17 additions & 3 deletions servers/su/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use actix_web::{
use serde::Deserialize;
use serde_json::json;

use su::domain::{flows, init_deps, router, Deps};
use su::domain::{flows, init_deps, router, Deps, PromMetrics};

#[derive(Deserialize)]
struct FromTo {
Expand Down Expand Up @@ -213,8 +213,21 @@ async fn health_check() -> impl Responder {
HttpResponse::Ok()
}

async fn metrics_route(data: web::Data<AppState>) -> impl Responder {
let result = data.metrics.emit_metrics();
match result {
Ok(metrics_str) => HttpResponse::Ok()
.content_type("application/openmetrics-text; version=1.0.0; charset=utf-8")
.body(metrics_str),
Err(err) => HttpResponse::BadRequest()
.content_type("text/plain")
.body(err),
}
}

struct AppState {
deps: Arc<Deps>,
metrics: Arc<PromMetrics>,
}

#[actix_web::main]
Expand All @@ -239,8 +252,8 @@ async fn main() -> io::Result<()> {
}
};

let deps = init_deps(mode).await;
let app_state = web::Data::new(AppState { deps });
let (deps, metrics) = init_deps(mode).await;
let app_state = web::Data::new(AppState { deps, metrics });

let run_deps = app_state.deps.clone();

Expand All @@ -266,6 +279,7 @@ async fn main() -> io::Result<()> {
.route("/", web::post().to(main_post_route))
.route("/timestamp", web::get().to(timestamp_route))
.route("/health", web::get().to(health_check))
.route("/metrics", web::get().to(metrics_route))
.route("/{tx_id}", web::get().to(main_get_route))
.route("/processes/{process_id}", web::get().to(read_process_route))
})
Expand Down
Binary file modified servers/su/su
Binary file not shown.

0 comments on commit 864767e

Please sign in to comment.