diff --git a/crates/dekaf/src/api_client.rs b/crates/dekaf/src/api_client.rs index 65d3cb7593..29ee73a43c 100644 --- a/crates/dekaf/src/api_client.rs +++ b/crates/dekaf/src/api_client.rs @@ -17,6 +17,7 @@ use std::{ time::{Duration, SystemTime}, }; use std::{io::BufWriter, pin::Pin, sync::Arc}; +use tokio::sync::OnceCell; use tokio::sync::RwLock; use tokio_rustls::rustls; use tokio_util::{codec, task::AbortOnDropHandle}; @@ -32,22 +33,31 @@ type BoxedKafkaConnection = Pin< >, >; +static ROOT_CERT_STORE: OnceCell> = OnceCell::const_new(); + #[tracing::instrument(skip_all)] async fn async_connect(broker_url: &str) -> anyhow::Result { // Establish a TCP connection to the Kafka broker let parsed_url = Url::parse(broker_url)?; - // This returns an Err indicating that the default provider is already set - // but without this call rustls crashes with the following error: - // `no process-level CryptoProvider available -- call CryptoProvider::install_default() before this point` - let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); - - let mut root_cert_store = rustls::RootCertStore::empty(); - root_cert_store.add_parsable_certificates(rustls_native_certs::load_native_certs()?); + let root_certs = ROOT_CERT_STORE + .get_or_try_init(|| async { + // This returns an Err indicating that the default provider is already set + // but without this call rustls crashes with the following error: + // `no process-level CryptoProvider available -- call CryptoProvider::install_default() before this point` + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + + let mut certs = rustls::RootCertStore::empty(); + certs.add_parsable_certificates( + rustls_native_certs::load_native_certs().expect("failed to load native certs"), + ); + Ok::, anyhow::Error>(Arc::new(certs)) + }) + .await?; let tls_config = rustls::ClientConfig::builder() - .with_root_certificates(root_cert_store) + .with_root_certificates(root_certs.to_owned()) .with_no_client_auth(); let tls_connector = tokio_rustls::TlsConnector::from(Arc::new(tls_config)); diff --git a/crates/dekaf/src/session.rs b/crates/dekaf/src/session.rs index 8f140b6001..fcd8e9417a 100644 --- a/crates/dekaf/src/session.rs +++ b/crates/dekaf/src/session.rs @@ -26,7 +26,7 @@ use tracing::instrument; struct PendingRead { offset: i64, // Journal offset to be completed by this PendingRead. last_write_head: i64, // Most-recent observed journal write head. - handle: tokio::task::JoinHandle>, + handle: tokio_util::task::AbortOnDropHandle>, } pub struct Session { @@ -384,9 +384,9 @@ impl Session { let pending = PendingRead { offset: fetch_offset, last_write_head: fetch_offset, - handle: tokio::spawn( + handle: tokio_util::task::AbortOnDropHandle::new(tokio::spawn( read.next_batch(partition_request.partition_max_bytes as usize), - ), + )), }; tracing::info!( @@ -435,9 +435,9 @@ impl Session { } { pending.offset = read.offset; pending.last_write_head = read.last_write_head; - pending.handle = tokio::spawn( + pending.handle = tokio_util::task::AbortOnDropHandle::new(tokio::spawn( read.next_batch(partition_request.partition_max_bytes as usize), - ); + )); batch } else { bytes::Bytes::new()