Skip to content

Commit

Permalink
dekaf: Implement a couple memory optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Oct 4, 2024
1 parent a36eb3c commit f622b78
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 13 deletions.
26 changes: 18 additions & 8 deletions crates/dekaf/src/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::{
time::{Duration, SystemTime},
};
use std::{io::BufWriter, pin::Pin, sync::Arc};
use tokio::sync::OnceCell;
use tokio::sync::RwLock;
use tokio_rustls::rustls;
use tokio_util::{codec, task::AbortOnDropHandle};
Expand All @@ -32,22 +33,31 @@ type BoxedKafkaConnection = Pin<
>,
>;

static ROOT_CERT_STORE: OnceCell<Arc<RootCertStore>> = OnceCell::const_new();

#[tracing::instrument(skip_all)]
async fn async_connect(broker_url: &str) -> anyhow::Result<BoxedKafkaConnection> {
// Establish a TCP connection to the Kafka broker

let parsed_url = Url::parse(broker_url)?;

// This returns an Err indicating that the default provider is already set
// but without this call rustls crashes with the following error:
// `no process-level CryptoProvider available -- call CryptoProvider::install_default() before this point`
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();

let mut root_cert_store = rustls::RootCertStore::empty();
root_cert_store.add_parsable_certificates(rustls_native_certs::load_native_certs()?);
let root_certs = ROOT_CERT_STORE
.get_or_try_init(|| async {
// This returns an Err indicating that the default provider is already set
// but without this call rustls crashes with the following error:
// `no process-level CryptoProvider available -- call CryptoProvider::install_default() before this point`
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();

let mut certs = rustls::RootCertStore::empty();
certs.add_parsable_certificates(
rustls_native_certs::load_native_certs().expect("failed to load native certs"),
);
Ok::<Arc<RootCertStore>, anyhow::Error>(Arc::new(certs))
})
.await?;

let tls_config = rustls::ClientConfig::builder()
.with_root_certificates(root_cert_store)
.with_root_certificates(root_certs.to_owned())
.with_no_client_auth();

let tls_connector = tokio_rustls::TlsConnector::from(Arc::new(tls_config));
Expand Down
10 changes: 5 additions & 5 deletions crates/dekaf/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tracing::instrument;
struct PendingRead {
offset: i64, // Journal offset to be completed by this PendingRead.
last_write_head: i64, // Most-recent observed journal write head.
handle: tokio::task::JoinHandle<anyhow::Result<(Read, bytes::Bytes)>>,
handle: tokio_util::task::AbortOnDropHandle<anyhow::Result<(Read, bytes::Bytes)>>,
}

pub struct Session {
Expand Down Expand Up @@ -384,9 +384,9 @@ impl Session {
let pending = PendingRead {
offset: fetch_offset,
last_write_head: fetch_offset,
handle: tokio::spawn(
handle: tokio_util::task::AbortOnDropHandle::new(tokio::spawn(
read.next_batch(partition_request.partition_max_bytes as usize),
),
)),
};

tracing::info!(
Expand Down Expand Up @@ -435,9 +435,9 @@ impl Session {
} {
pending.offset = read.offset;
pending.last_write_head = read.last_write_head;
pending.handle = tokio::spawn(
pending.handle = tokio_util::task::AbortOnDropHandle::new(tokio::spawn(
read.next_batch(partition_request.partition_max_bytes as usize),
);
));
batch
} else {
bytes::Bytes::new()
Expand Down

0 comments on commit f622b78

Please sign in to comment.