Skip to content

Commit

Permalink
Fix scaling logic and address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Monthon Klongklaew <[email protected]>
  • Loading branch information
monthonk committed Sep 27, 2024
1 parent 2cc1a53 commit 8fad2e5
Show file tree
Hide file tree
Showing 11 changed files with 156 additions and 50 deletions.
4 changes: 2 additions & 2 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ impl S3CrtClientInner {
// Buffer pool metrics
let start = Instant::now();
let buffer_pool_stats = s3_client.poll_buffer_pool_usage_stats();
metrics::histogram!("s3.client.buffer_pool.get_usage_latecy_us").record(start.elapsed().as_micros() as f64);
metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us").record(start.elapsed().as_micros() as f64);
metrics::gauge!("s3.client.buffer_pool.mem_limit").set(buffer_pool_stats.mem_limit as f64);
metrics::gauge!("s3.client.buffer_pool.primary_cutoff").set(buffer_pool_stats.primary_cutoff as f64);
metrics::gauge!("s3.client.buffer_pool.primary_used").set(buffer_pool_stats.primary_used as f64);
Expand Down Expand Up @@ -1213,7 +1213,7 @@ impl ObjectClient for S3CrtClient {
fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats> {
let start = Instant::now();
let crt_buffer_pool_stats = self.inner.s3_client.poll_buffer_pool_usage_stats();
metrics::histogram!("s3.client.buffer_pool.get_usage_latecy_us").record(start.elapsed().as_micros() as f64);
metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us").record(start.elapsed().as_micros() as f64);
Some(crt_buffer_pool_stats)
}

Expand Down
19 changes: 18 additions & 1 deletion mountpoint-s3/examples/prefetch_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig};
use mountpoint_s3_client::types::ETag;
use mountpoint_s3_client::{ObjectClient, S3CrtClient};
use mountpoint_s3_crt::common::rust_log_adapter::RustLogAdapter;
use sysinfo::{RefreshKind, System};
use tracing_subscriber::fmt::Subscriber;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
Expand Down Expand Up @@ -41,6 +42,11 @@ fn main() {
.long("throughput-target-gbps")
.help("Desired throughput in Gbps"),
)
.arg(
Arg::new("max-memory-target")
.long("max-memory-target")
.help("Maximum memory usage target in MiB"),
)
.arg(
Arg::new("part-size")
.long("part-size")
Expand All @@ -65,6 +71,9 @@ fn main() {
let throughput_target_gbps = matches
.get_one::<String>("throughput-target-gbps")
.map(|s| s.parse::<f64>().expect("throughput target must be an f64"));
let max_memory_target = matches
.get_one::<String>("max-memory-target")
.map(|s| s.parse::<u64>().expect("throughput target must be a u64"));
let part_size = matches
.get_one::<String>("part-size")
.map(|s| s.parse::<usize>().expect("part size must be a usize"));
Expand Down Expand Up @@ -93,7 +102,15 @@ fn main() {
config = config.part_size(part_size);
}
let client = Arc::new(S3CrtClient::new(config).expect("couldn't create client"));
let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), 512 * 1024 * 1024));

let max_memory_target = if let Some(target) = max_memory_target {
target * 1024 * 1024
} else {
// Default to 95% of total system memory
let sys = System::new_with_specifics(RefreshKind::everything());
(sys.total_memory() as f64 * 0.95) as u64
};
let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), max_memory_target));

let head_object_result = block_on(client.head_object(bucket, key)).expect("HeadObject failed");
let size = head_object_result.object.size;
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::fs::{CacheConfig, S3FilesystemConfig, ServerSideEncryption, TimeToLiv
use crate::fuse::session::FuseSession;
use crate::fuse::S3FuseFilesystem;
use crate::logging::{init_logging, LoggingConfig};
use crate::mem_limiter::MINIMUM_MEM_LIMIT;
use crate::prefetch::{caching_prefetch, default_prefetch, Prefetch};
use crate::prefix::Prefix;
use crate::s3::S3Personality;
Expand Down Expand Up @@ -787,7 +788,6 @@ where
filesystem_config.s3_personality = s3_personality;
filesystem_config.server_side_encryption = ServerSideEncryption::new(args.sse.clone(), args.sse_kms_key_id.clone());

const MINIMUM_MEM_LIMIT: u64 = 512 * 1024 * 1024;
let sys = System::new_with_specifics(RefreshKind::everything());
let default_mem_target = (sys.total_memory() as f64 * 0.95) as u64;
filesystem_config.mem_limit = default_mem_target.max(MINIMUM_MEM_LIMIT);
Expand Down
4 changes: 2 additions & 2 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::inode::{
Inode, InodeError, InodeKind, LookedUp, ReadHandle, ReaddirHandle, Superblock, SuperblockConfig, WriteHandle,
};
use crate::logging;
use crate::mem_limiter::MemoryLimiter;
use crate::mem_limiter::{MemoryLimiter, MINIMUM_MEM_LIMIT};
use crate::object::ObjectId;
use crate::prefetch::{Prefetch, PrefetchResult};
use crate::prefix::Prefix;
Expand Down Expand Up @@ -422,7 +422,7 @@ impl Default for S3FilesystemConfig {
s3_personality: S3Personality::default(),
server_side_encryption: Default::default(),
use_upload_checksums: true,
mem_limit: 512 * 1024 * 1024,
mem_limit: MINIMUM_MEM_LIMIT,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions mountpoint-s3/src/mem_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use metrics::atomics::AtomicU64;
use mountpoint_s3_client::ObjectClient;
use tracing::debug;

pub const MINIMUM_MEM_LIMIT: u64 = 512 * 1024 * 1024;

/// `MemoryLimiter` tracks memory used by Mountpoint and makes decisions if a new memory reservation request can be accepted.
/// Currently, there are two metrics we take into account:
/// 1) the memory reserved by prefetcher instances for the data requested or fetched from CRT client.
Expand Down
23 changes: 12 additions & 11 deletions mountpoint-s3/src/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ pub struct PrefetchGetObject<Stream: ObjectPartStream, Client: ObjectClient> {
part_stream: Arc<Stream>,
mem_limiter: Arc<MemoryLimiter<Client>>,
config: PrefetcherConfig,
backpressure_task: Option<RequestTask<Client::ClientError, Client>>,
backpressure_task: Option<RequestTask<Client>>,
// Invariant: the offset of the last byte in this window is always
// self.next_sequential_read_offset - 1.
backward_seek_window: SeekWindow,
Expand Down Expand Up @@ -420,7 +420,7 @@ where
/// We will be using flow-control window to control how much data we want to download into the prefetcher.
fn spawn_read_backpressure_request(
&mut self,
) -> Result<RequestTask<Client::ClientError, Client>, PrefetchReadError<Client::ClientError>> {
) -> Result<RequestTask<Client>, PrefetchReadError<Client::ClientError>> {
let start = self.next_sequential_read_offset;
let object_size = self.size as usize;
let read_part_size = self.client.read_part_size().unwrap_or(8 * 1024 * 1024);
Expand Down Expand Up @@ -560,6 +560,7 @@ mod tests {
#![allow(clippy::identity_op)]

use crate::data_cache::InMemoryDataCache;
use crate::mem_limiter::MINIMUM_MEM_LIMIT;

use super::caching_stream::CachingPartStream;
use super::*;
Expand Down Expand Up @@ -619,7 +620,7 @@ mod tests {
..Default::default()
};
let client = Arc::new(MockClient::new(config));
let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024);
let mem_limiter = MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT);
let object = MockObject::ramp(0xaa, size as usize, ETag::for_tests());
let etag = object.etag();

Expand Down Expand Up @@ -714,7 +715,7 @@ mod tests {
Stream: ObjectPartStream + Send + Sync + 'static,
{
let client = Arc::new(MockClient::new(client_config));
let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024);
let mem_limiter = MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT);
let read_size = 1 * MB;
let object_size = 8 * MB;
let object = MockObject::ramp(0xaa, object_size, ETag::for_tests());
Expand Down Expand Up @@ -821,7 +822,7 @@ mod tests {
HashMap::new(),
HashMap::new(),
));
let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024);
let mem_limiter = MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT);

let prefetcher_config = PrefetcherConfig {
max_read_window_size: test_config.max_read_window_size,
Expand Down Expand Up @@ -946,7 +947,7 @@ mod tests {
..Default::default()
};
let client = Arc::new(MockClient::new(config));
let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024);
let mem_limiter = MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT);
let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
let etag = object.etag();

Expand Down Expand Up @@ -1130,7 +1131,7 @@ mod tests {
HashMap::new(),
HashMap::new(),
));
let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024);
let mem_limiter = MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT);

let prefetcher = Prefetcher::new(default_stream(), Default::default());
let mem_limiter = Arc::new(mem_limiter);
Expand Down Expand Up @@ -1183,7 +1184,7 @@ mod tests {
..Default::default()
};
let client = Arc::new(MockClient::new(config));
let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), 512 * 1024 * 1024));
let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT));
let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
let etag = object.etag();

Expand Down Expand Up @@ -1225,7 +1226,7 @@ mod tests {
..Default::default()
};
let client = Arc::new(MockClient::new(config));
let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), 512 * 1024 * 1024));
let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT));
let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
let etag = object.etag();

Expand Down Expand Up @@ -1287,7 +1288,7 @@ mod tests {
..Default::default()
};
let client = Arc::new(MockClient::new(config));
let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024);
let mem_limiter = MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT);
let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
let file_etag = object.etag();

Expand Down Expand Up @@ -1353,7 +1354,7 @@ mod tests {
..Default::default()
};
let client = Arc::new(MockClient::new(config));
let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024);
let mem_limiter = MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT);
let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
let file_etag = object.etag();

Expand Down
93 changes: 91 additions & 2 deletions mountpoint-s3/src/prefetch/backpressure_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ impl<Client: ObjectClient> BackpressureController<Client> {
// Scaling up fails silently if there is no enough free memory to perform it.
fn scale_up(&mut self) {
if self.preferred_read_window_size < self.max_read_window_size {
let new_read_window_size = self.preferred_read_window_size * self.read_window_size_multiplier;
let new_read_window_size = (self.preferred_read_window_size * self.read_window_size_multiplier)
.max(self.min_read_window_size)
.min(self.max_read_window_size);
// Only scale up when there is enough memory. We don't have to reserve the memory here
// because only `preferred_read_window_size` is increased but the actual read window will
// be updated later on `DataRead` event (where we do reserve memory).
Expand All @@ -203,7 +205,9 @@ impl<Client: ObjectClient> BackpressureController<Client> {
fn scale_down(&mut self) {
if self.preferred_read_window_size > self.min_read_window_size {
assert!(self.read_window_size_multiplier > 1);
let new_read_window_size = self.preferred_read_window_size / self.read_window_size_multiplier;
let new_read_window_size = (self.preferred_read_window_size / self.read_window_size_multiplier)
.max(self.min_read_window_size)
.min(self.max_read_window_size);
let formatter = make_format(humansize::BINARY);
debug!(
current_size = formatter(self.preferred_read_window_size),
Expand Down Expand Up @@ -267,3 +271,88 @@ impl BackpressureLimiter {
Ok(Some(self.read_window_end_offset))
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig};
use test_case::test_case;

use crate::mem_limiter::MemoryLimiter;

use super::{new_backpressure_controller, BackpressureConfig, BackpressureController, BackpressureLimiter};

#[test_case(1024 * 1024 + 128 * 1024, 2)] // real config
#[test_case(3 * 1024 * 1024, 4)]
#[test_case(8 * 1024 * 1024, 8)]
#[test_case(2 * 1024 * 1024 * 1024, 2)]
fn test_read_window_scale_up(initial_read_window_size: usize, read_window_size_multiplier: usize) {
let request_range = 0..(5 * 1024 * 1024 * 1024);
let backpressure_config = BackpressureConfig {
initial_read_window_size,
min_read_window_size: 8 * 1024 * 1024,
max_read_window_size: 2 * 1024 * 1024 * 1024,
read_window_size_multiplier,
request_range,
};

let (mut backpressure_controller, _backpressure_limiter) =
new_backpressure_controller_for_test(backpressure_config);
while backpressure_controller.preferred_read_window_size < backpressure_controller.max_read_window_size {
backpressure_controller.scale_up();
assert!(backpressure_controller.preferred_read_window_size >= backpressure_controller.min_read_window_size);
assert!(backpressure_controller.preferred_read_window_size <= backpressure_controller.max_read_window_size);
}
assert_eq!(
backpressure_controller.preferred_read_window_size, backpressure_controller.max_read_window_size,
"should have scaled up to max read window size"
);
}

#[test_case(2 * 1024 * 1024 * 1024, 2)]
#[test_case(15 * 1024 * 1024 * 1024, 2)]
#[test_case(2 * 1024 * 1024 * 1024, 8)]
#[test_case(8 * 1024 * 1024, 8)]
fn test_read_window_scale_down(initial_read_window_size: usize, read_window_size_multiplier: usize) {
let request_range = 0..(5 * 1024 * 1024 * 1024);
let backpressure_config = BackpressureConfig {
initial_read_window_size,
min_read_window_size: 8 * 1024 * 1024,
max_read_window_size: 2 * 1024 * 1024 * 1024,
read_window_size_multiplier,
request_range,
};

let (mut backpressure_controller, _backpressure_limiter) =
new_backpressure_controller_for_test(backpressure_config);
while backpressure_controller.preferred_read_window_size > backpressure_controller.min_read_window_size {
backpressure_controller.scale_down();
assert!(backpressure_controller.preferred_read_window_size <= backpressure_controller.max_read_window_size);
assert!(backpressure_controller.preferred_read_window_size >= backpressure_controller.min_read_window_size);
}
assert_eq!(
backpressure_controller.preferred_read_window_size, backpressure_controller.min_read_window_size,
"should have scaled down to min read window size"
);
}

fn new_backpressure_controller_for_test(
backpressure_config: BackpressureConfig,
) -> (BackpressureController<MockClient>, BackpressureLimiter) {
let config = MockClientConfig {
bucket: "test-bucket".to_string(),
part_size: 8 * 1024 * 1024,
enable_backpressure: true,
initial_read_window_size: backpressure_config.initial_read_window_size,
..Default::default()
};

let client = MockClient::new(config);
let mem_limiter = Arc::new(MemoryLimiter::new(
client,
backpressure_config.max_read_window_size as u64,
));
new_backpressure_controller(backpressure_config, mem_limiter.clone())
}
}
18 changes: 9 additions & 9 deletions mountpoint-s3/src/prefetch/caching_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ where
client: &Client,
config: RequestTaskConfig,
mem_limiter: Arc<MemoryLimiter<Client>>,
) -> RequestTask<<Client as ObjectClient>::ClientError, Client>
) -> RequestTask<Client>
where
Client: ObjectClient + Clone + Send + Sync + 'static,
{
Expand Down Expand Up @@ -391,7 +391,11 @@ mod tests {
};
use test_case::test_case;

use crate::{data_cache::InMemoryDataCache, mem_limiter::MemoryLimiter, object::ObjectId};
use crate::{
data_cache::InMemoryDataCache,
mem_limiter::{MemoryLimiter, MINIMUM_MEM_LIMIT},
object::ObjectId,
};

use super::*;

Expand Down Expand Up @@ -432,7 +436,7 @@ mod tests {
..Default::default()
};
let mock_client = Arc::new(MockClient::new(config));
let mem_limiter = Arc::new(MemoryLimiter::new(mock_client.clone(), 512 * 1024 * 1024));
let mem_limiter = Arc::new(MemoryLimiter::new(mock_client.clone(), MINIMUM_MEM_LIMIT));
mock_client.add_object(key, object.clone());

let runtime = ThreadPool::builder().pool_size(1).create().unwrap();
Expand Down Expand Up @@ -513,7 +517,7 @@ mod tests {
..Default::default()
};
let mock_client = Arc::new(MockClient::new(config));
let mem_limiter = Arc::new(MemoryLimiter::new(mock_client.clone(), 512 * 1024 * 1024));
let mem_limiter = Arc::new(MemoryLimiter::new(mock_client.clone(), MINIMUM_MEM_LIMIT));
mock_client.add_object(key, object.clone());

let runtime = ThreadPool::builder().pool_size(1).create().unwrap();
Expand All @@ -537,11 +541,7 @@ mod tests {
}
}

fn compare_read<E: std::error::Error + Send + Sync, Client: ObjectClient>(
id: &ObjectId,
object: &MockObject,
mut request_task: RequestTask<E, Client>,
) {
fn compare_read<Client: ObjectClient>(id: &ObjectId, object: &MockObject, mut request_task: RequestTask<Client>) {
let mut offset = request_task.start_offset();
let mut remaining = request_task.total_size();
while remaining > 0 {
Expand Down
Loading

0 comments on commit 8fad2e5

Please sign in to comment.