Skip to content

Commit

Permalink
apacheGH-41493: [C++][S3] Add a new option to check existence before …
Browse files Browse the repository at this point in the history
…CreateDir (apache#41822)

### Rationale for this change
I have a use case that thousands of jobs are writing hive partitioned parquet files daily to the same bucket via S3FS filesystem. The gist here is a lot of keys are being created at the same time hense jobs hits `AWS Error SLOW_DOWN. during Put Object operation: The object exceeded the rate limit for object mutation operations(create, update, and delete). Please reduce your rate request error.` frequently throughout the day since the code is creating directories pessimistically.

### What changes are included in this PR?
Add a new S3Option to check the existence of the directory before creation in `CreateDir`. It's disabled by default.

When it's enabled, the CreateDir function will check the existence of the directory first before creation. It ensures that the create operation is only acted when necessary. Though there are more I/O calls, but it avoids hitting the cloud vendor put object limit.

### Are these changes tested?
Add test cases when the flag is set to true. Right on top of the mind i donno how to ensure it's working in these tests. But in our production environment, we have very similar code and it worked well.

### Are there any user-facing changes?

* GitHub Issue: apache#41493

Lead-authored-by: Haocheng Liu <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
HaochengLIU and pitrou authored Jun 4, 2024
1 parent d02a91b commit a44b537
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 7 deletions.
43 changes: 37 additions & 6 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2860,17 +2860,41 @@ Status S3FileSystem::CreateDir(const std::string& s, bool recursive) {
return impl_->CreateBucket(path.bucket);
}

FileInfo file_info;
// Create object
if (recursive) {
// Ensure bucket exists
ARROW_ASSIGN_OR_RAISE(bool bucket_exists, impl_->BucketExists(path.bucket));
if (!bucket_exists) {
RETURN_NOT_OK(impl_->CreateBucket(path.bucket));
}

auto key_i = path.key_parts.begin();
std::string parent_key{};
if (options().check_directory_existence_before_creation) {
// Walk up the directory first to find the first existing parent
for (const auto& part : path.key_parts) {
parent_key += part;
parent_key += kSep;
}
for (key_i = path.key_parts.end(); key_i-- != path.key_parts.begin();) {
ARROW_ASSIGN_OR_RAISE(file_info,
this->GetFileInfo(path.bucket + kSep + parent_key));
if (file_info.type() != FileType::NotFound) {
// Found!
break;
} else {
// remove the kSep and the part
parent_key.pop_back();
parent_key.erase(parent_key.end() - key_i->size(), parent_key.end());
}
}
key_i++; // Above for loop moves one extra iterator at the end
}
// Ensure that all parents exist, then the directory itself
std::string parent_key;
for (const auto& part : path.key_parts) {
parent_key += part;
// Create all missing directories
for (; key_i < path.key_parts.end(); ++key_i) {
parent_key += *key_i;
parent_key += kSep;
RETURN_NOT_OK(impl_->CreateEmptyDir(path.bucket, parent_key));
}
Expand All @@ -2888,11 +2912,18 @@ Status S3FileSystem::CreateDir(const std::string& s, bool recursive) {
"': parent directory does not exist");
}
}
}

// XXX Should we check that no non-directory entry exists?
// Minio does it for us, not sure about other S3 implementations.
return impl_->CreateEmptyDir(path.bucket, path.key);
// Check if the directory exists already
if (options().check_directory_existence_before_creation) {
ARROW_ASSIGN_OR_RAISE(file_info, this->GetFileInfo(path.full_path));
if (file_info.type() != FileType::NotFound) {
return Status::OK();
}
}
// XXX Should we check that no non-directory entry exists?
// Minio does it for us, not sure about other S3 implementations.
return impl_->CreateEmptyDir(path.bucket, path.key);
}

Status S3FileSystem::DeleteDir(const std::string& s) {
Expand Down
11 changes: 11 additions & 0 deletions cpp/src/arrow/filesystem/s3fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,17 @@ struct ARROW_EXPORT S3Options {
/// Whether to allow deletion of buckets
bool allow_bucket_deletion = false;

/// Whether to allow pessimistic directory creation in CreateDir function
///
/// By default, CreateDir function will try to create the directory without checking its
/// existence. It's an optimization to try directory creation and catch the error,
/// rather than issue two dependent I/O calls.
/// Though for key/value storage like Google Cloud Storage, too many creation calls will
/// breach the rate limit for object mutation operations and cause serious consequences.
/// It's also possible you don't have creation access for the parent directory. Set it
/// to be true to address these scenarios.
bool check_directory_existence_before_creation = false;

/// \brief Default metadata for OpenOutputStream.
///
/// This will be ignored if non-empty metadata is passed to OpenOutputStream.
Expand Down
31 changes: 30 additions & 1 deletion cpp/src/arrow/filesystem/s3fs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -922,9 +922,13 @@ TEST_F(TestS3FS, CreateDir) {

// New "directory"
AssertFileInfo(fs_.get(), "bucket/newdir", FileType::NotFound);
ASSERT_OK(fs_->CreateDir("bucket/newdir"));
ASSERT_OK(fs_->CreateDir("bucket/newdir", /*recursive=*/false));
AssertFileInfo(fs_.get(), "bucket/newdir", FileType::Directory);

// By default CreateDir uses recursvie mode, make it explictly to be false
ASSERT_RAISES(IOError,
fs_->CreateDir("bucket/newdir/newsub/newsubsub", /*recursive=*/false));

// New "directory", recursive
ASSERT_OK(fs_->CreateDir("bucket/newdir/newsub/newsubsub", /*recursive=*/true));
AssertFileInfo(fs_.get(), "bucket/newdir/newsub", FileType::Directory);
Expand All @@ -939,6 +943,31 @@ TEST_F(TestS3FS, CreateDir) {
// Extraneous slashes
ASSERT_RAISES(Invalid, fs_->CreateDir("bucket//somedir"));
ASSERT_RAISES(Invalid, fs_->CreateDir("bucket/somedir//newdir"));

// check existing before creation
options_.check_directory_existence_before_creation = true;
MakeFileSystem();
// New "directory" again
AssertFileInfo(fs_.get(), "bucket/checknewdir", FileType::NotFound);
ASSERT_OK(fs_->CreateDir("bucket/checknewdir"));
AssertFileInfo(fs_.get(), "bucket/checknewdir", FileType::Directory);

ASSERT_RAISES(IOError, fs_->CreateDir("bucket/checknewdir/newsub/newsubsub/newsubsub/",
/*recursive=*/false));

// New "directory" again, recursive
ASSERT_OK(fs_->CreateDir("bucket/checknewdir/newsub/newsubsub", /*recursive=*/true));
AssertFileInfo(fs_.get(), "bucket/checknewdir/newsub", FileType::Directory);
AssertFileInfo(fs_.get(), "bucket/checknewdir/newsub/newsubsub", FileType::Directory);
AssertFileInfo(fs_.get(), "bucket/checknewdir/newsub/newsubsub/newsubsub",
FileType::NotFound);
// Try creation with the same name
ASSERT_OK(fs_->CreateDir("bucket/checknewdir/newsub/newsubsub/newsubsub/",
/*recursive=*/true));
AssertFileInfo(fs_.get(), "bucket/checknewdir/newsub", FileType::Directory);
AssertFileInfo(fs_.get(), "bucket/checknewdir/newsub/newsubsub", FileType::Directory);
AssertFileInfo(fs_.get(), "bucket/checknewdir/newsub/newsubsub/newsubsub",
FileType::Directory);
}

TEST_F(TestS3FS, DeleteFile) {
Expand Down

0 comments on commit a44b537

Please sign in to comment.