From f58dbc52fec09394ee60d285bec1c3e082f06391 Mon Sep 17 00:00:00 2001 From: James Bornholt Date: Wed, 18 Oct 2023 04:46:49 -0400 Subject: [PATCH] Allow seeking within a prefetch stream (#556) * Allow seeking forwards within the prefetch stream Right now we reset the prefetcher any time it seeks forwards, even if the distance it's seeking could be handled by inflight requests (in the worst case, the bytes are already in our buffers, and we just throw them away). That's expensive and slow! This change allows us to seek forwards a limited distance into the prefetch stream. When we see a seek of an acceptable distance, we fast-forward through the stream to the desired target offset, dropping the skipped bytes on the floor. We enforce a maximum seek distance, which is a trade-off between streaming a lot of unnecessary bytes versus an extra request's latency. I haven't put any careful thought into the number. This commit also sets us up to support backwards seeking, which will come in the future. Signed-off-by: James Bornholt * Allow seeking backwards within a prefetch stream Linux asynchronous readahead confuses our prefetcher by sometimes making the stream appear to go backwards, even though the customer is actually just reading sequentially (#488). The problem is that with parallel FUSE threads, the two asynchronous read operations can arrive to the prefetcher out of order. This change allows us to tolerate a little bit of backwards seeking in a prefetch stream. We keep around a little bit of previously read data and can reload it in the event that a seek goes backwards. We do this by creating a fake new request containing the rewound bytes, so that the existing read logic will pick them up. I chose an arbitrary max for the backwards seek buffer, big enough to handle Linux readahead. This should fix the readahead issue: in my testing, I no longer saw slow sequential reads, and the logs confirmed this seeking logic was being triggered in both directions (forwards and backwards), consistent with the readahead requests sometimes arriving out of order. Signed-off-by: James Bornholt * Fix Shuttle tests with new request size logic The old test was hiding a bug because it used a hard coded part size of 8MB regardless of what the client used. #552 changed that and now this test runs out of memory a lot because it degrades to doing 1 byte requests. I don't think it's worth playing with the logic because it requires a weird config to get there, so just fix the test. Signed-off-by: James Bornholt --------- Signed-off-by: James Bornholt --- mountpoint-s3/CHANGELOG.md | 6 + mountpoint-s3/src/prefetch.rs | 361 ++++++++++++++++-- .../src/prefetch/checksummed_bytes.rs | 2 +- mountpoint-s3/src/prefetch/part.rs | 2 +- mountpoint-s3/src/prefetch/seek_window.rs | 78 ++++ 5 files changed, 421 insertions(+), 28 deletions(-) create mode 100644 mountpoint-s3/src/prefetch/seek_window.rs diff --git a/mountpoint-s3/CHANGELOG.md b/mountpoint-s3/CHANGELOG.md index f4a6321dc..468fc5267 100644 --- a/mountpoint-s3/CHANGELOG.md +++ b/mountpoint-s3/CHANGELOG.md @@ -1,5 +1,11 @@ ## Unreleased +### Breaking changes +* ... + +### Other changes +* Fixed a bug that cause poor performance for sequential reads in some cases ([#488](https://github.com/awslabs/mountpoint-s3/pull/488)). A workaround we have previously shared for this issue (setting the `--max-threads` argument to `1`) is no longer necessary with this fix. ([#556](https://github.com/awslabs/mountpoint-s3/pull/556)) + ## v1.0.2 (September 22, 2023) ### Breaking changes diff --git a/mountpoint-s3/src/prefetch.rs b/mountpoint-s3/src/prefetch.rs index 310487802..b4717bc0b 100644 --- a/mountpoint-s3/src/prefetch.rs +++ b/mountpoint-s3/src/prefetch.rs @@ -11,6 +11,7 @@ pub mod checksummed_bytes; mod feed; mod part; mod part_queue; +mod seek_window; use std::collections::VecDeque; use std::fmt::Debug; @@ -18,7 +19,7 @@ use std::time::Duration; use futures::future::RemoteHandle; use futures::task::{Spawn, SpawnExt}; -use metrics::counter; +use metrics::{counter, histogram}; use mountpoint_s3_client::error::{GetObjectError, ObjectClientError}; use mountpoint_s3_client::types::ETag; use mountpoint_s3_client::ObjectClient; @@ -29,7 +30,8 @@ use crate::prefetch::checksummed_bytes::{ChecksummedBytes, IntegrityError}; use crate::prefetch::feed::{ClientPartFeed, ObjectPartFeed}; use crate::prefetch::part::Part; use crate::prefetch::part_queue::{unbounded_part_queue, PartQueue}; -use crate::sync::{Arc, RwLock}; +use crate::prefetch::seek_window::SeekWindow; +use crate::sync::Arc; type TaskError = ObjectClientError::ClientError>; @@ -43,6 +45,12 @@ pub struct PrefetcherConfig { pub sequential_prefetch_multiplier: usize, /// Timeout to wait for a part to become available pub read_timeout: Duration, + /// The maximum distance the prefetcher will seek forwards before resetting and starting a new + /// S3 request + pub max_forward_seek_distance: u64, + /// The maximum distance the prefetcher will seek backwards before resetting and starting a new + /// S3 request. We keep this much data in memory in addition to any inflight requests. + pub max_backward_seek_distance: u64, } impl Default for PrefetcherConfig { @@ -61,6 +69,12 @@ impl Default for PrefetcherConfig { max_request_size: 2 * 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, read_timeout: Duration::from_secs(60), + // We want these large enough to tolerate a single out-of-order Linux readahead, which + // is at most 256KiB backwards and then 512KiB forwards. For forwards seeks, we're also + // making a guess about where the optimal cut-off point is before it would be faster to + // just start a new request instead. + max_forward_seek_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 1 * 1024 * 1024, } } } @@ -111,9 +125,14 @@ where #[derive(Debug)] pub struct PrefetchGetObject { inner: Arc>, + // Invariant: the offset of the first byte in this task's part queue is always + // self.next_sequential_read_offset. current_task: Option>>, // Currently we only every spawn at most one future task (see [spawn_next_request]) - future_tasks: Arc>>>>, + future_tasks: VecDeque>>, + // Invariant: the offset of the last byte in this window is always + // self.next_sequential_read_offset - 1. + backward_seek_window: SeekWindow, bucket: String, key: String, // preferred part size in the prefetcher's part queue, not the object part @@ -136,6 +155,7 @@ where inner: inner.clone(), current_task: None, future_tasks: Default::default(), + backward_seek_window: SeekWindow::new(inner.config.max_backward_seek_distance as usize), preferred_part_size: 128 * 1024, next_request_size: inner.config.first_request_size, next_sequential_read_offset: 0, @@ -178,17 +198,22 @@ where } let mut to_read = (length as u64).min(remaining); - // Cancel and reset prefetching if this is an out-of-order read + // Try to seek if this read is not sequential, and if seeking fails, cancel and reset the + // prefetcher. if self.next_sequential_read_offset != offset { - trace!( - expected = self.next_sequential_read_offset, - actual = offset, - "out-of-order read, resetting prefetch" - ); - counter!("prefetch.out_of_order", 1); - self.reset_prefetch_to_offset(offset); + if self.try_seek(offset).await? { + trace!("seek succeeded"); + } else { + trace!( + expected = self.next_sequential_read_offset, + actual = offset, + "out-of-order read, resetting prefetch" + ); + counter!("prefetch.out_of_order", 1); + self.reset_prefetch_to_offset(offset); + } } - debug_assert_eq!(self.next_sequential_read_offset, offset); + assert_eq!(self.next_sequential_read_offset, offset); self.prepare_requests(); @@ -208,6 +233,7 @@ where } Ok(part) => part, }; + self.backward_seek_window.push(part.clone()); let part_bytes = part.into_bytes(&self.key, self.next_sequential_read_offset).unwrap(); self.next_sequential_read_offset += part_bytes.len() as u64; @@ -227,7 +253,7 @@ where Err(e @ IntegrityError::ChecksumMismatch(_, _)) => { // cancel inflight tasks self.current_task = None; - self.future_tasks.write().unwrap().drain(..); + self.future_tasks.drain(..); return Err(e.into()); } } @@ -242,20 +268,23 @@ where let current_task = self.current_task.as_ref(); if current_task.map(|task| task.remaining == 0).unwrap_or(true) { // There's no current task, or the current task is finished. Prepare the next request. - if let Some(next_task) = self.future_tasks.write().unwrap().pop_front() { + if let Some(next_task) = self.future_tasks.pop_front() { self.current_task = Some(next_task); return; } self.current_task = self.spawn_next_request(); } else if current_task - .map(|task| task.remaining <= task.total_size / 2) + .map(|task| { + // Don't trigger prefetch if we're in a fake task created by backward streaming + task.is_streaming() && task.remaining <= task.total_size / 2 + }) .unwrap_or(false) - && self.future_tasks.read().unwrap().is_empty() + && self.future_tasks.is_empty() { // The current task is nearing completion, so pre-spawn the next request in anticipation // of it completing. if let Some(task) = self.spawn_next_request() { - self.future_tasks.write().unwrap().push_back(task); + self.future_tasks.push_back(task); } } } @@ -298,9 +327,10 @@ where let task_handle = self.inner.runtime.spawn_with_handle(request_task).unwrap(); Some(RequestTask { - _task_handle: task_handle, + task_handle: Some(task_handle), total_size: size as usize, remaining: size as usize, + start_offset: start, part_queue, }) } @@ -308,7 +338,11 @@ where /// Suggest next request size. /// The next request size is the current request size multiplied by sequential prefetch multiplier. fn get_next_request_size(&self) -> usize { - // calculate next request size + // TODO: this logic doesn't work well right now in the case where part_size < + // first_request_size and sequential_prefetch_multiplier = 1. It ends up just repeatedly + // shrinking the request size until it reaches 1. But this isn't a configuration we + // currently expect to ever run in (part_size will always be >= 5MB for MPU reasons, and a + // prefetcher with multiplier 1 is not very good). let next_request_size = (self.next_request_size * self.inner.config.sequential_prefetch_multiplier) .min(self.inner.config.max_request_size); self.inner @@ -318,21 +352,123 @@ where /// Reset this prefetch request to a new offset, clearing any existing tasks queued. fn reset_prefetch_to_offset(&mut self, offset: u64) { - // TODO see if we can reuse any inflight requests rather than dropping them immediately self.current_task = None; - self.future_tasks.write().unwrap().drain(..); + self.future_tasks.drain(..); + self.backward_seek_window.clear(); self.next_request_size = self.inner.config.first_request_size; self.next_sequential_read_offset = offset; self.next_request_offset = offset; } + + /// Try to seek within the current inflight requests without restarting them. Returns true if + /// the seek succeeded, in which case self.next_sequential_read_offset will be updated to the + /// new offset. If this returns false, the prefetcher is in an unknown state and must be reset. + async fn try_seek(&mut self, offset: u64) -> Result>> { + assert_ne!(offset, self.next_sequential_read_offset); + trace!(from = self.next_sequential_read_offset, to = offset, "trying to seek"); + if offset > self.next_sequential_read_offset { + self.try_seek_forward(offset).await + } else { + self.try_seek_backward(offset) + } + } + + async fn try_seek_forward(&mut self, offset: u64) -> Result>> { + assert!(offset > self.next_sequential_read_offset); + let total_seek_distance = offset - self.next_sequential_read_offset; + let Some(current_task) = self.current_task.as_mut() else { + // Can't seek if there's no requests in flight at all + return Ok(false); + }; + let future_remaining = self.future_tasks.iter().map(|task| task.remaining).sum::() as u64; + if total_seek_distance + >= (current_task.remaining as u64 + future_remaining).min(self.inner.config.max_forward_seek_distance) + { + // TODO maybe adjust the next_request_size somehow if we were still within + // max_forward_seek_distance, so that strides > first_request_size can still get + // prefetched. + trace!(?current_task.remaining, ?future_remaining, "seek failed: not enough inflight data"); + return Ok(false); + } + + // Jump ahead to the right request + if total_seek_distance >= current_task.remaining as u64 { + self.next_sequential_read_offset += current_task.remaining as u64; + self.current_task = None; + while let Some(next_request) = self.future_tasks.pop_front() { + if next_request.end_offset() > offset { + self.current_task = Some(next_request); + break; + } else { + self.next_sequential_read_offset = next_request.end_offset(); + } + } + // We checked there was an inflight task that contained the target offset, so this + // is impossible. + assert!(self.current_task.is_some()); + // We could try harder to preserve the backwards seek buffer if we're near the + // request boundary, but it's probably not worth the trouble. + self.backward_seek_window.clear(); + } + let mut seek_distance = offset - self.next_sequential_read_offset; + + let current_task = self + .current_task + .as_mut() + .expect("a request existed that covered this seek offset"); + while seek_distance > 0 { + let part = current_task.read(seek_distance as usize).await?; + seek_distance -= part.len() as u64; + self.next_sequential_read_offset += part.len() as u64; + self.backward_seek_window.push(part); + } + + histogram!("prefetch.seek_distance", total_seek_distance as f64, "dir" => "forward"); + + Ok(true) + } + + fn try_seek_backward(&mut self, offset: u64) -> Result>> { + assert!(offset < self.next_sequential_read_offset); + let backwards_length_needed = self.next_sequential_read_offset - offset; + let Some(parts) = self.backward_seek_window.read_back(backwards_length_needed as usize) else { + trace!("seek failed: not enough data in backwards seek window"); + return Ok(false); + }; + // We're going to create a new fake "request" that contains the parts we read out of the + // window. That sounds a bit hacky, but it keeps all the read logic simple rather than + // needing separate paths for backwards seeks vs others. + let (part_queue, part_queue_producer) = unbounded_part_queue(); + for part in parts { + part_queue_producer.push(Ok(part)); + } + let request = RequestTask { + task_handle: None, + remaining: backwards_length_needed as usize, + start_offset: offset, + total_size: backwards_length_needed as usize, + part_queue, + }; + if let Some(current_task) = self.current_task.take() { + self.future_tasks.push_front(current_task); + } + self.current_task = Some(request); + self.next_sequential_read_offset = offset; + + histogram!("prefetch.seek_distance", backwards_length_needed as f64, "dir" => "backward"); + + Ok(true) + } } /// A single GetObject request submitted to the S3 client #[derive(Debug)] struct RequestTask { - /// Handle on the task/future. Future is cancelled when handle is dropped. - _task_handle: RemoteHandle<()>, + /// Handle on the task/future. The future is cancelled when handle is dropped. This is None if + /// the request is fake (created by seeking backwards in the stream) + task_handle: Option>, remaining: usize, + start_offset: u64, total_size: usize, part_queue: PartQueue, } @@ -344,6 +480,16 @@ impl RequestTask { self.remaining -= part.len(); Ok(part) } + + fn end_offset(&self) -> u64 { + self.start_offset + self.total_size as u64 + } + + /// Some requests aren't actually streaming data (they're fake, created by backwards seeks), and + /// shouldn't be counted for prefetcher progress. + fn is_streaming(&self) -> bool { + self.task_handle.is_some() + } } #[derive(Debug, Error)] @@ -386,6 +532,10 @@ mod tests { sequential_prefetch_multiplier: usize, #[proptest(strategy = "16usize..2*1024*1024")] client_part_size: usize, + #[proptest(strategy = "1u64..4*1024*1024")] + max_forward_seek_distance: u64, + #[proptest(strategy = "1u64..4*1024*1024")] + max_backward_seek_distance: u64, } fn run_sequential_read_test(size: u64, read_size: usize, test_config: TestConfig) { @@ -404,6 +554,8 @@ mod tests { max_request_size: test_config.max_request_size, sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, read_timeout: Duration::from_secs(5), + max_forward_seek_distance: test_config.max_forward_seek_distance, + max_backward_seek_distance: test_config.max_backward_seek_distance, }; let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); @@ -431,6 +583,8 @@ mod tests { max_request_size: 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, + max_forward_seek_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, }; run_sequential_read_test(1024 * 1024 + 111, 1024 * 1024, config); } @@ -442,6 +596,8 @@ mod tests { max_request_size: 64 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, + max_forward_seek_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, }; run_sequential_read_test(16 * 1024 * 1024 + 111, 1024 * 1024, config); } @@ -453,6 +609,8 @@ mod tests { max_request_size: 64 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, + max_forward_seek_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, }; run_sequential_read_test(256 * 1024 * 1024 + 111, 1024 * 1024, config); } @@ -513,6 +671,8 @@ mod tests { max_request_size: 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, + max_forward_seek_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, }; let mut get_failures = HashMap::new(); @@ -554,6 +714,8 @@ mod tests { sequential_prefetch_multiplier: prefetch_multiplier, max_request_size, read_timeout: Duration::from_secs(60), + max_forward_seek_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, }; let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); @@ -584,6 +746,21 @@ mod tests { } } + #[test] + fn test_sequential_read_regression() { + let object_size = 854966; + let read_size = 161647; + let config = TestConfig { + first_request_size: 484941, + max_request_size: 81509, + sequential_prefetch_multiplier: 1, + client_part_size: 181682, + max_forward_seek_distance: 1, + max_backward_seek_distance: 18668, + }; + run_sequential_read_test(object_size, read_size, config); + } + fn run_random_read_test(object_size: u64, reads: Vec<(u64, usize)>, test_config: TestConfig) { let config = MockClientConfig { bucket: "test-bucket".to_string(), @@ -599,6 +776,8 @@ mod tests { first_request_size: test_config.first_request_size, max_request_size: test_config.max_request_size, sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, + max_forward_seek_distance: test_config.max_forward_seek_distance, + max_backward_seek_distance: test_config.max_backward_seek_distance, ..Default::default() }; let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); @@ -661,10 +840,132 @@ mod tests { max_request_size: 2147621, sequential_prefetch_multiplier: 4, client_part_size: 516882, + max_forward_seek_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, + }; + run_random_read_test(object_size, reads, config); + } + + #[test] + fn test_random_read_regression2() { + let object_size = 755678; + let reads = vec![(0, 278499), (311250, 1)]; + let config = TestConfig { + first_request_size: 556997, + max_request_size: 105938, + sequential_prefetch_multiplier: 7, + client_part_size: 1219731, + max_forward_seek_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, + }; + run_random_read_test(object_size, reads, config); + } + + #[test] + fn test_random_read_regression3() { + let object_size = 755678; + let reads = vec![(0, 236766), (291204, 1), (280930, 36002)]; + let config = TestConfig { + first_request_size: 556997, + max_request_size: 105938, + sequential_prefetch_multiplier: 7, + client_part_size: 1219731, + max_forward_seek_distance: 2260662, + max_backward_seek_distance: 2369799, }; run_random_read_test(object_size, reads, config); } + #[test] + fn test_random_read_regression4() { + let object_size = 14201; + let reads = vec![(3584, 1), (9424, 1460), (3582, 3340), (248, 9218)]; + let config = TestConfig { + first_request_size: 457999, + max_request_size: 863511, + sequential_prefetch_multiplier: 5, + client_part_size: 1972409, + max_forward_seek_distance: 2810651, + max_backward_seek_distance: 3531090, + }; + run_random_read_test(object_size, reads, config); + } + + #[test_case(0, 25; "no first read")] + #[test_case(60, 25; "read beyond first part")] + #[test_case(20, 25; "read in first part")] + #[test_case(125, 110; "read in second request")] + fn test_forward_seek(first_read_size: usize, part_size: usize) { + const OBJECT_SIZE: usize = 200; + const FIRST_REQUEST_SIZE: usize = 100; + + let config = MockClientConfig { + bucket: "test-bucket".to_string(), + part_size, + }; + let client = MockClient::new(config); + let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests()); + let etag = object.etag(); + + client.add_object("hello", object); + + let test_config = PrefetcherConfig { + first_request_size: FIRST_REQUEST_SIZE, + ..Default::default() + }; + let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); + let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); + + // Try every possible seek from first_read_size + for offset in first_read_size + 1..OBJECT_SIZE { + let mut request = prefetcher.get("test-bucket", "hello", OBJECT_SIZE as u64, etag.clone()); + if first_read_size > 0 { + let _first_read = block_on(request.read(0, first_read_size)).unwrap(); + } + + let byte = block_on(request.read(offset as u64, 1)).unwrap(); + let expected = ramp_bytes(0xaa + offset, 1); + assert_eq!(byte.into_bytes().unwrap()[..], expected[..]); + } + } + + #[test_case(60, 25; "read beyond first part")] + #[test_case(20, 25; "read in first part")] + #[test_case(125, 110; "read in second request")] + fn test_backward_seek(first_read_size: usize, part_size: usize) { + const OBJECT_SIZE: usize = 200; + const FIRST_REQUEST_SIZE: usize = 100; + + let config = MockClientConfig { + bucket: "test-bucket".to_string(), + part_size, + }; + let client = MockClient::new(config); + let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests()); + let etag = object.etag(); + + client.add_object("hello", object); + + let test_config = PrefetcherConfig { + first_request_size: FIRST_REQUEST_SIZE, + ..Default::default() + }; + let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); + let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); + + // Try every possible seek from first_read_size + for offset in 0..first_read_size { + let mut request = prefetcher.get("test-bucket", "hello", OBJECT_SIZE as u64, etag.clone()); + if first_read_size > 0 { + let _first_read = block_on(request.read(0, first_read_size)).unwrap(); + } + + let byte = block_on(request.read(offset as u64, 1)).unwrap(); + let expected = ramp_bytes(0xaa + offset, 1); + assert_eq!(byte.into_bytes().unwrap()[..], expected[..]); + } + } + #[cfg(feature = "shuttle")] mod shuttle_tests { use super::*; @@ -686,8 +987,10 @@ mod tests { let object_size = rng.gen_range(1u64..1 * 1024 * 1024); let first_request_size = rng.gen_range(16usize..1 * 1024 * 1024); let max_request_size = rng.gen_range(16usize..1 * 1024 * 1024); - let sequential_prefetch_multiplier = rng.gen_range(1usize..16); - let part_size = rng.gen_range(16usize..2 * 1024 * 1024); + let sequential_prefetch_multiplier = rng.gen_range(2usize..16); + let part_size = rng.gen_range(16usize..1 * 1024 * 1024 + 128 * 1024); + let max_forward_seek_distance = rng.gen_range(16u64..1 * 1024 * 1024 + 256 * 1024); + let max_backward_seek_distance = rng.gen_range(16u64..1 * 1024 * 1024 + 256 * 1024); let config = MockClientConfig { bucket: "test-bucket".to_string(), @@ -703,6 +1006,8 @@ mod tests { first_request_size, max_request_size, sequential_prefetch_multiplier, + max_forward_seek_distance, + max_backward_seek_distance, ..Default::default() }; @@ -739,8 +1044,10 @@ mod tests { // under Shuttle (lots of concurrent tasks) let max_object_size = first_request_size.min(max_request_size) * 20; let object_size = rng.gen_range(1u64..(64 * 1024).min(max_object_size) as u64); - let sequential_prefetch_multiplier = rng.gen_range(1usize..16); + let sequential_prefetch_multiplier = rng.gen_range(2usize..16); let part_size = rng.gen_range(16usize..128 * 1024); + let max_forward_seek_distance = rng.gen_range(16u64..192 * 1024); + let max_backward_seek_distance = rng.gen_range(16u64..192 * 1024); let config = MockClientConfig { bucket: "test-bucket".to_string(), @@ -756,6 +1063,8 @@ mod tests { first_request_size, max_request_size, sequential_prefetch_multiplier, + max_forward_seek_distance, + max_backward_seek_distance, ..Default::default() }; diff --git a/mountpoint-s3/src/prefetch/checksummed_bytes.rs b/mountpoint-s3/src/prefetch/checksummed_bytes.rs index 7bf6dea13..4131c04d7 100644 --- a/mountpoint-s3/src/prefetch/checksummed_bytes.rs +++ b/mountpoint-s3/src/prefetch/checksummed_bytes.rs @@ -4,7 +4,7 @@ use thiserror::Error; /// A `ChecksummedBytes` is a bytes buffer that carries its checksum. /// The implementation guarantees that its integrity will be validated when data transformation occurs. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ChecksummedBytes { orig_bytes: Bytes, curr_slice: Bytes, diff --git a/mountpoint-s3/src/prefetch/part.rs b/mountpoint-s3/src/prefetch/part.rs index a2b12215a..8a8af6b25 100644 --- a/mountpoint-s3/src/prefetch/part.rs +++ b/mountpoint-s3/src/prefetch/part.rs @@ -7,7 +7,7 @@ use super::checksummed_bytes::ChecksummedBytes; // TODO this is not very efficient right now -- it forces a lot of copying around of Strings. If // that's a bottleneck, let's think about either carrying &str (hard to make lifetimes work?) or // the etag or some kind of "cookie" (like the hash of the key). -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Part { key: String, offset: u64, diff --git a/mountpoint-s3/src/prefetch/seek_window.rs b/mountpoint-s3/src/prefetch/seek_window.rs new file mode 100644 index 000000000..f2b10c6ac --- /dev/null +++ b/mountpoint-s3/src/prefetch/seek_window.rs @@ -0,0 +1,78 @@ +use std::collections::VecDeque; + +use crate::prefetch::part::Part; + +/// A backwards seek window for a single prefetch stream. Parts can be pushed onto the end of the +/// window (== closest to the current offset in the stream) and older parts will be dropped to +/// remain within a maximum size. +#[derive(Debug)] +pub struct SeekWindow { + parts: VecDeque, + max_size: usize, + current_size: usize, +} + +impl SeekWindow { + pub fn new(max_size: usize) -> Self { + assert!(max_size > 0); + SeekWindow { + parts: VecDeque::new(), + max_size, + current_size: 0, + } + } + + /// Add a new part to the front of the window, and drop any parts necessary to fit the new part + /// within the maximum size. + pub fn push(&mut self, part: Part) { + if part.len() > self.max_size { + self.clear(); + return; + } + + while self.max_size - self.current_size < part.len() { + let p = self + .parts + .pop_front() + .expect("window is non-empty if current size is non-zero"); + self.current_size -= p.len(); + } + + self.current_size += part.len(); + self.parts.push_back(part); + } + + /// Read off the back of the window. Returns None if there's not enough data in the window to + /// satisfy the desired length. + pub fn read_back(&mut self, mut length: usize) -> Option> { + if length > self.current_size { + return None; + } + + let mut result = VecDeque::new(); + loop { + if length == 0 { + break; + } + let mut part = self.parts.pop_back().expect("we checked that current_size >= length"); + // If we only need some of this part, split it up and put the rest back onto the window + if part.len() > length { + let back = part.split_off(part.len() - length); + self.parts.push_back(part); + part = back; + } + length -= part.len(); + self.current_size -= part.len(); + // We're walking backwards through the queue, so to keep the result in object offset + // order, push to the front. + result.push_front(part); + } + Some(result.into()) + } + + /// Reset the seek window to an empty state + pub fn clear(&mut self) { + self.parts.drain(..); + self.current_size = 0; + } +}