Skip to content

Commit

Permalink
chore(su): add more logs, remove analytics #1048
Browse files Browse the repository at this point in the history
  • Loading branch information
VinceJuliano committed Dec 11, 2024
1 parent c9659f1 commit ef21bb7
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 126 deletions.
75 changes: 0 additions & 75 deletions servers/su/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions servers/su/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ actix-cors = "0.6.0"
simd-json = "0.13.10"
futures = "0.3.30"
rocksdb = "0.22.0"
actix-web-prom = { version = "0.8.0", features = ["process"] }
prometheus = { version = "0.13.4", features = ["process"] }
lru = "0.12.4"
lazy_static = "1.5.0"
avro-rs = "0.13.0"
Expand Down
72 changes: 37 additions & 35 deletions servers/su/src/domain/clients/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::super::config::AoConfig;
use super::super::core::dal::CoreMetrics;
use prometheus::{HistogramOpts, HistogramVec, IntCounter, Registry};
// use prometheus::{HistogramOpts, HistogramVec, IntCounter, Registry};

/*
Implementation of metrics
Expand All @@ -11,53 +11,55 @@ use prometheus::{HistogramOpts, HistogramVec, IntCounter, Registry};

pub struct PromMetrics {
enabled: bool,
core_metrics: HistogramVec,
message_save_failures: IntCounter,
// core_metrics: HistogramVec,
// message_save_failures: IntCounter,
}

impl PromMetrics {
pub fn new(config: AoConfig, registry: Registry) -> Self {
// Define the options for the histogram, with buckets in milliseconds
let histogram_opts = HistogramOpts::new(
"core_metrics_duration_milliseconds",
"Histogram of durations for core metrics functions in milliseconds",
)
.buckets(vec![
0.0, 1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, 5500.0,
6000.0, 6500.0, 7000.0, 7500.0, 8000.0, 8500.0, 9000.0, 9500.0, 10000.0,
])
.namespace("su");

// Create a HistogramVec with labels for the different core metric functions
let core_metrics = HistogramVec::new(histogram_opts, &["function_name"]).unwrap();

// Register the HistogramVec with the provided registry
registry.register(Box::new(core_metrics.clone())).unwrap();

let message_save_failures: IntCounter =
IntCounter::new("message_save_failures", "message save failure count").unwrap();

// Register the IntCounter with the provided registry
registry
.register(Box::new(message_save_failures.clone()))
.unwrap();
pub fn new(config: AoConfig) -> Self {
// let registry = Registry::new();

// // Define the options for the histogram, with buckets in milliseconds
// let histogram_opts = HistogramOpts::new(
// "core_metrics_duration_milliseconds",
// "Histogram of durations for core metrics functions in milliseconds",
// )
// .buckets(vec![
// 0.0, 1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, 5500.0,
// 6000.0, 6500.0, 7000.0, 7500.0, 8000.0, 8500.0, 9000.0, 9500.0, 10000.0,
// ])
// .namespace("su");

// // Create a HistogramVec with labels for the different core metric functions
// let core_metrics = HistogramVec::new(histogram_opts, &["function_name"]).unwrap();

// // Register the HistogramVec with the provided registry
// registry.register(Box::new(core_metrics.clone())).unwrap();

// let message_save_failures: IntCounter =
// IntCounter::new("message_save_failures", "message save failure count").unwrap();

// // Register the IntCounter with the provided registry
// registry
// .register(Box::new(message_save_failures.clone()))
// .unwrap();

PromMetrics {
enabled: config.enable_metrics,
core_metrics,
message_save_failures,
// core_metrics,
// message_save_failures,
}
}

fn observe_duration(&self, function_name: &str, duration: u128) {
fn observe_duration(&self, _function_name: &str, _duration: u128) {
if !self.enabled {
return;
}

// Observe the duration in milliseconds directly
self.core_metrics
.with_label_values(&[function_name])
.observe(duration as f64);
// self.core_metrics
// .with_label_values(&[function_name])
// .observe(duration as f64);
}
}

Expand Down Expand Up @@ -91,6 +93,6 @@ impl CoreMetrics for PromMetrics {
}

fn failed_message_save(&self) {
self.message_save_failures.inc();
// self.message_save_failures.inc();
}
}
23 changes: 23 additions & 0 deletions servers/su/src/domain/core/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub async fn write_item(
base_layer: Option<String>,
exclude: Option<String>,
) -> Result<String, String> {
deps.logger.log(format!("write item called"));
let start_top_level = Instant::now();
let builder = init_builder(&deps)?;

Expand All @@ -103,6 +104,8 @@ pub async fn write_item(
}
};

deps.logger.log(format!("builder initialized item parsed target - {}", &target_id));

/*
Acquire the lock for a given process id. After acquiring the lock
we can safely increment it and start building/writing data
Expand All @@ -119,6 +122,8 @@ pub async fn write_item(
deps.metrics
.acquire_write_lock_observe(elapsed_acquire_lock.as_millis());

deps.logger.log(format!("lock acquired - {} - {}", &target_id, elapsed_acquire_lock.as_millis()));

/*
Check to see if the message already exists, this
doesn't need to run for an assignment. If we start
Expand All @@ -130,6 +135,8 @@ pub async fn write_item(
deps.data_store.check_existing_message(&item.id())?
};

deps.logger.log(format!("checked for message existence- {}", &target_id));

/*
Increment the scheduling info using the locked mutable reference
to schedule_info
Expand All @@ -139,6 +146,8 @@ pub async fn write_item(
.increment(&mut *schedule_info, target_id.clone())
.await?;

deps.logger.log(format!("incrememted scheduler - {}", &target_id));

// XOR, if we have one of these, we must have both.
if process_id.is_some() ^ assign.is_some() {
return Err("If sending assign or process-id, you must send both.".to_string());
Expand Down Expand Up @@ -195,6 +204,8 @@ pub async fn write_item(
return Err("Data-Protocol tag not present".to_string());
}

deps.logger.log(format!("tags cloned - {}", &target_id));

if let Some(type_tag) = type_tag {
if type_tag.value == "Process" {
let mod_tag_exists = tags.iter().any(|tag| tag.name == "Module");
Expand Down Expand Up @@ -227,13 +238,21 @@ pub async fn write_item(
},
None => (),
};

deps.logger.log(format!("boot load check complete - {}", &target_id));

let assignment = builder
.gen_assignment(None, data_item.id(), &next_schedule_info, &None)
.await?;

deps.logger.log(format!("assignment generated - {}", &target_id));

let aid = assignment.id();
let did = data_item.id();
let build_result = builder.bundle_items(vec![assignment, data_item]).await?;

deps.logger.log(format!("data bundled - {}", &target_id));

let process = Process::from_bundle(&build_result.bundle)?;
deps.data_store
.save_process(&process, &build_result.binary)?;
Expand All @@ -243,7 +262,11 @@ pub async fn write_item(
.commit(&mut *schedule_info, &next_schedule_info, did, aid);
drop(schedule_info);

deps.logger.log(format!("scheduler committed cloned - {}", &target_id));

upload(&deps, build_result.binary.to_vec()).await?;

deps.logger.log(format!("upload triggered - {}", &target_id));
return id_res(&deps, process.process.process_id.clone(), start_top_level);
} else {
let build_result = builder.build_process(input, &next_schedule_info).await?;
Expand Down
3 changes: 1 addition & 2 deletions servers/su/src/domain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub use flows::Deps;
pub use local_store::migration::migrate_to_local;
pub use store::migrate_to_disk;

pub async fn init_deps(mode: Option<String>, metrics_registry: prometheus::Registry) -> Arc<Deps> {
pub async fn init_deps(mode: Option<String>) -> Arc<Deps> {
let logger: Arc<dyn Log> = SuLog::init();

let config = Arc::new(AoConfig::new(mode.clone()).expect("Failed to read configuration"));
Expand Down Expand Up @@ -99,7 +99,6 @@ pub async fn init_deps(mode: Option<String>, metrics_registry: prometheus::Regis

let metrics = Arc::new(PromMetrics::new(
AoConfig::new(mode).expect("Failed to read configuration"),
metrics_registry,
));

Arc::new(Deps {
Expand Down
13 changes: 1 addition & 12 deletions servers/su/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ use actix_web::{
use serde::Deserialize;
use serde_json::json;

use actix_web_prom::PrometheusMetricsBuilder;
use prometheus::Registry;

use su::domain::{flows, init_deps, router, Deps};

#[derive(Deserialize)]
Expand Down Expand Up @@ -242,14 +239,7 @@ async fn main() -> io::Result<()> {
}
};

let registry = Registry::new();
let prometheus = PrometheusMetricsBuilder::new("su")
.endpoint("/metrics")
.registry(registry.clone())
.build()
.unwrap();

let deps = init_deps(mode, registry).await;
let deps = init_deps(mode).await;
let app_state = web::Data::new(AppState { deps });

let run_deps = app_state.deps.clone();
Expand All @@ -270,7 +260,6 @@ async fn main() -> io::Result<()> {
.allow_any_header(),
)
.wrap(Logger::default())
.wrap(prometheus.clone())
.app_data(app_state.clone())
.app_data(web::PayloadConfig::new(10485760))
.route("/", web::get().to(base))
Expand Down
Binary file modified servers/su/su
Binary file not shown.

0 comments on commit ef21bb7

Please sign in to comment.