Skip to content

Commit

Permalink
dekaf: Expose more useful api histogram buckets
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Oct 3, 2024
1 parent 14c50a8 commit 063f286
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 42 deletions.
109 changes: 83 additions & 26 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ lz4_flex = "0.11.0"
mime = "0.3"
memchr = "2.5"
metrics = "0.23.0"
metrics-prometheus = "0.7.0"
metrics-exporter-prometheus = "0.15.3"
prometheus = "0.13.4"
md5 = "0.7.0"
num-bigint = "0.4"
Expand Down
2 changes: 1 addition & 1 deletion crates/dekaf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ kafka-protocol = { workspace = true }
lz4_flex = { workspace = true }
md5 = { workspace = true }
metrics = { workspace = true }
metrics-prometheus = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
percent-encoding = { workspace = true }
postgrest = { workspace = true }
prometheus = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/dekaf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ async fn handle_api(
let handle_duration = SystemTime::now().duration_since(start_time)?;

metrics::histogram!("api_call_time", "api_key" => format!("{:?}",api_key))
.record(handle_duration.as_millis() as f64);
.record(handle_duration.as_secs_f32() as f64);

ret
}
Expand Down
4 changes: 1 addition & 3 deletions crates/dekaf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ async fn main() -> anyhow::Result<()> {
.with_writer(std::io::stderr)
.init();

metrics_prometheus::install();

let cli = Cli::parse();
tracing::info!("Starting dekaf");

Expand Down Expand Up @@ -163,7 +161,7 @@ async fn main() -> anyhow::Result<()> {
.await
.context("failed to bind server port")?;

let metrics_router = dekaf::metrics_server::build_router(app.clone());
let metrics_router = dekaf::metrics_server::build_router();
let metrics_server_task =
axum_server::bind(metrics_addr).serve(metrics_router.into_make_service());
tokio::spawn(async move { metrics_server_task.await.unwrap() });
Expand Down
26 changes: 16 additions & 10 deletions crates/dekaf/src/metrics_server.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
use super::App;
use std::sync::Arc;
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};

pub fn build_router(app: Arc<App>) -> axum::Router<()> {
pub fn build_router() -> axum::Router<()> {
use axum::routing::get;

let prom = PrometheusBuilder::new()
.set_buckets(
&prometheus::exponential_buckets(0.00001, 2.5, 15)
.expect("calculating histogram buckets"),
)
.expect("calculating histogram buckets")
.install_recorder()
.expect("failed to install prometheus recorder");

let schema_router = axum::Router::new()
.route("/metrics", get(prometheus_metrics))
.layer(tower_http::trace::TraceLayer::new_for_http())
.with_state(app);
.with_state(prom);

schema_router
}
Expand All @@ -23,12 +31,10 @@ fn record_jemalloc_stats() {
}

#[tracing::instrument(skip_all)]
async fn prometheus_metrics() -> (axum::http::StatusCode, String) {
async fn prometheus_metrics(
axum::extract::State(prom_handle): axum::extract::State<PrometheusHandle>,
) -> (axum::http::StatusCode, String) {
record_jemalloc_stats();

match prometheus::TextEncoder::new().encode_to_string(&prometheus::default_registry().gather())
{
Err(e) => (axum::http::StatusCode::INTERNAL_SERVER_ERROR, e.to_string()),
Ok(result) => (axum::http::StatusCode::OK, result),
}
(axum::http::StatusCode::OK, prom_handle.render())
}

0 comments on commit 063f286

Please sign in to comment.