Skip to content

Commit

Permalink
[Issue opencurve#2025] local cache
Browse files Browse the repository at this point in the history
  • Loading branch information
Tangruilin committed Nov 14, 2022
1 parent 09566f5 commit e1ae703
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 7 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,11 @@ __not_found__
thirdparties/rocksdb/lib/
thirdparties/rocksdb/include/
thirdparties/rocksdb/rocksdb/
thirdparties/rocksdb/*log
thirdparties/rocksdb/*.tar.gz
thirdparties/aws/*.tar.gz
thirdparties/etcdclient/tmp/
thirdparties/etcdclient/*.h

/external
/bazel-*
Expand Down
14 changes: 10 additions & 4 deletions curvefs/src/client/s3/client_s3_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2250,10 +2250,14 @@ CURVEFS_ERROR DataCache::Flush(uint64_t inodeId, bool toS3) {
};

std::vector<std::shared_ptr<PutObjectAsyncContext>> uploadTasks;
bool useDiskCache =
bool useReadWriteCache =
s3ClientAdaptor_->IsReadWriteCache() &&
!s3ClientAdaptor_->GetDiskCacheManager()->IsDiskCacheFull() &&
!toS3;
bool useReadCacheOnly =
s3ClientAdaptor_->IsReadCache() &&
!s3ClientAdaptor_->GetDiskCacheManager()->IsDiskCacheFull() &&
!toS3;
while (tmpLen > 0) {
if (blockPos + tmpLen > blockSize) {
n = blockSize - blockPos;
Expand Down Expand Up @@ -2288,10 +2292,12 @@ CURVEFS_ERROR DataCache::Flush(uint64_t inodeId, bool toS3) {
++iter) {
VLOG(9) << "upload start: " << (*iter)->key
<< " len : " << (*iter)->bufferSize;
if (!useDiskCache) {
s3ClientAdaptor_->GetS3Client()->UploadAsync(*iter);
} else {
if (useReadWriteCache) {
s3ClientAdaptor_->GetDiskCacheManager()->Enqueue(*iter);
} else if (useReadCacheOnly) {
s3ClientAdaptor_->GetDiskCacheManager()->Enqueue(*iter, true);
} else {
s3ClientAdaptor_->GetS3Client()->UploadAsync(*iter);
}
}
cond.Wait();
Expand Down
34 changes: 33 additions & 1 deletion curvefs/src/client/s3/disk_cache_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,45 @@ int DiskCacheManagerImpl::Init(const S3ClientAdaptorOption option) {
}

void DiskCacheManagerImpl::Enqueue(
std::shared_ptr<PutObjectAsyncContext> context) {
std::shared_ptr<PutObjectAsyncContext> context, bool isReadCacheOnly) {
if ( isReadCacheOnly ) {
auto task = [this, context]() {
this->WriteReadDirectClosure(context);
};
taskPool_.Enqueue(task);
return;
}
auto task = [this, context]() {
this->WriteClosure(context);
};
taskPool_.Enqueue(task);
}



int DiskCacheManagerImpl::WriteReadDirectClosure(
std::shared_ptr<PutObjectAsyncContext> context) {
VLOG(9) << "WriteReadClosure start, name: " << context->key;
// Upload to s3
int s3Ret = client_->Upload(context->key,
context->buffer, context->bufferSize);
if (s3Ret < 0) {
LOG(ERROR) << "Upload to s3 error";
context->retCode = s3Ret;
context->cb(context);
return s3Ret;
}

// Write to read cache
int ret = WriteReadDirect(context->key,
context->buffer, context->bufferSize);
context->retCode = ret;
context->cb(context);
return ret;
VLOG(9) << "WriteReadClosure end, name: " << context->key;
return ret;
}

int DiskCacheManagerImpl::WriteClosure(
std::shared_ptr<PutObjectAsyncContext> context) {
VLOG(9) << "WriteClosure start, name: " << context->key;
Expand Down
5 changes: 4 additions & 1 deletion curvefs/src/client/s3/disk_cache_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ class DiskCacheManagerImpl {

virtual int ClearReadCache(const std::list<std::string> &files);

void Enqueue(std::shared_ptr<PutObjectAsyncContext> context);
void Enqueue(std::shared_ptr<PutObjectAsyncContext> context,
bool isReadCacheOnly = false);

private:
int WriteDiskFile(const std::string name, const char *buf, uint64_t length);
Expand All @@ -128,6 +129,8 @@ class DiskCacheManagerImpl {
std::shared_ptr<S3Client> client_;

int WriteClosure(std::shared_ptr<PutObjectAsyncContext> context);

int WriteReadDirectClosure(std::shared_ptr<PutObjectAsyncContext> context);
// threads for disk cache
uint32_t threads_;
TaskThreadPool<bthread::Mutex, bthread::ConditionVariable>
Expand Down
2 changes: 1 addition & 1 deletion curvefs/src/client/s3/disk_cache_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
#include "curvefs/src/common/wrap_posix.h"
#include "curvefs/src/common/utils.h"
#include "curvefs/src/client/s3/client_s3.h"
#include "curvefs/src/client/s3/disk_cache_write.h"
#include "curvefs/src/client/s3/disk_cache_read.h"
#include "curvefs/src/client/common/config.h"
#include "curvefs/src/client/s3/disk_cache_base.h"
Expand Down Expand Up @@ -135,6 +134,7 @@ class DiskCacheWrite : public DiskCacheBase {
}

private:
using DiskCacheBase::Init;
int AsyncUploadFunc();
void UploadFile(const std::list<std::string> &toUpload,
std::shared_ptr<SynchronizationTask> syncTask = nullptr);
Expand Down
29 changes: 29 additions & 0 deletions curvefs/test/client/test_disk_cache_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,35 @@ TEST_F(TestDiskCacheManagerImpl, WriteClosure) {
sleep(5);
}

TEST_F(TestDiskCacheManagerImpl, WriteReadClosure) {
PutObjectAsyncCallBack cb =
[&](const std::shared_ptr<PutObjectAsyncContext> &context) {
};
auto context = std::make_shared<PutObjectAsyncContext>();
context->key = "objectName";
char data[5] = "gggg";
context->buffer = data + 0;
context->bufferSize = 2;
context->cb = cb;
context->startTime = butil::cpuwide_time_us();

S3ClientAdaptorOption s3AdaptorOption;
s3AdaptorOption.diskCacheOpt.threads = 5;
EXPECT_CALL(*diskCacheManager_, Init(_, _)).WillOnce(Return(0));
diskCacheManagerImpl_->Init(s3AdaptorOption);
std::string fileName = "test";
std::string buf = "test";

// If the mode is read cache, will call WriteReadDirect
EXPECT_CALL(*diskCacheManager_, IsDiskUsedInited()).WillOnce(Return(true));
EXPECT_CALL(*diskCacheManager_, IsDiskCacheFull()).WillOnce(Return(false));
EXPECT_CALL(*client_, Upload(_, _, _)).WillOnce(Return(0));
EXPECT_CALL(*diskCacheManager_, WriteReadDirect(_, _, _))
.WillOnce(Return(context->bufferSize));
diskCacheManagerImpl_->Enqueue(context, true);
sleep(5);
}

TEST_F(TestDiskCacheManagerImpl, Write) {
std::string fileName = "test";
std::string buf = "test";
Expand Down

0 comments on commit e1ae703

Please sign in to comment.