Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky out_of_order_write test #1170

Merged
merged 1 commit into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions mountpoint-s3/src/upload/incremental.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ where

// Flush buffer to the queue if it is full
if buffer.is_full() {
trace!("push full buffer to append queue");
self.upload_queue.push(self.buffer.take().unwrap()).await?;
}
}
Expand All @@ -98,10 +99,12 @@ where
/// The pipeline cannot be used after this.
pub async fn complete(mut self) -> Result<Option<PutObjectResult>, AppendUploadError<Client::ClientError>> {
if let Some(buffer) = self.buffer.take() {
trace!("push remaining buffer to append queue");
self.upload_queue.push(buffer).await?;
} else if self.offset == 0 {
// If we are not appending, but uploading a new object or entirely replacing an existing one,
// we need to push an empty buffer to ensure a PutObject request is issued.
trace!("push empty buffer to append queue");
let empty_buffer = self.upload_queue.get_buffer(0).await?;
self.upload_queue.push(empty_buffer).await?;
}
Expand Down Expand Up @@ -451,6 +454,7 @@ async fn append<Client: ObjectClient>(
etag: Option<ETag>,
server_side_encryption: ServerSideEncryption,
) -> Result<PutObjectResult, AppendUploadError<Client::ClientError>> {
trace!(key, offset, len = buffer.len(), "preparing PutObject request");
let (data, checksum) = buffer.freeze()?;
let mut request_params = if offset == 0 {
PutObjectSingleParams::new()
Expand Down
24 changes: 14 additions & 10 deletions mountpoint-s3/tests/fuse_tests/write_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,28 +591,32 @@ fn out_of_order_write_test(creator_fn: impl TestSessionCreator, offset: i64, upl

drop(f);

let err = metadata(&path).expect_err("upload shouldn't have succeeded");
assert_eq!(err.raw_os_error(), Some(libc::ENOENT));
// When using incremental upload, data from previous successful writes is uploaded if
// fsync or flush are called. In this test, flush is occasionally invoked before the
// out-of-order write when the test runner process is forked.
if !upload_mode.is_incremental() {
// In atomic mode, the multi-part upload is always aborted on error.
let err = metadata(&path).expect_err("upload shouldn't have succeeded atomic mode");
assert_eq!(err.raw_os_error(), Some(libc::ENOENT));
}
}

const EARLIER_OFFSET: i64 = -1;
const LATER_OFFSET: i64 = 1;

#[cfg(feature = "s3_tests")]
#[test_case(-1; "earlier offset")]
#[test_case(1; "later offset")]
#[test_matrix([EARLIER_OFFSET, LATER_OFFSET])]
fn out_of_order_write_test_s3(offset: i64) {
out_of_order_write_test(fuse::s3_session::new, offset, ATOMIC_UPLOAD);
}

#[cfg(feature = "s3express_tests")]
#[test_case(-1; "earlier offset")]
#[test_case(1; "later offset")]
#[test_matrix([EARLIER_OFFSET, LATER_OFFSET])]
fn out_of_order_write_test_s3_incremental_upload(offset: i64) {
out_of_order_write_test(fuse::s3_session::new, offset, INCREMENTAL_UPLOAD);
}

#[test_case(-1, ATOMIC_UPLOAD; "earlier offset")]
#[test_case(1, ATOMIC_UPLOAD; "later offset")]
#[test_case(-1, INCREMENTAL_UPLOAD; "earlier offset, incremental upload")]
#[test_case(1, INCREMENTAL_UPLOAD; "later offset, incremental upload")]
#[test_matrix([EARLIER_OFFSET, LATER_OFFSET], [ATOMIC_UPLOAD, INCREMENTAL_UPLOAD])]
fn out_of_order_write_test_mock(offset: i64, upload_mode: UploadMode) {
out_of_order_write_test(fuse::mock_session::new, offset, upload_mode);
}
Expand Down
Loading