From 17133ecec88d754f0b6686ca12a1ab0c65a55cb3 Mon Sep 17 00:00:00 2001 From: tangruilin Date: Sat, 12 Nov 2022 16:46:03 +0800 Subject: [PATCH] [Issue #2025] local cache --- .gitignore | 3 +++ .../src/client/s3/client_s3_cache_manager.cpp | 12 ++++++++--- .../src/client/s3/disk_cache_manager_impl.cpp | 21 ++++++++++++++++++- .../src/client/s3/disk_cache_manager_impl.h | 4 +++- curvefs/src/client/s3/disk_cache_write.h | 2 +- 5 files changed, 36 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index f5d35348bb..7e26997420 100755 --- a/.gitignore +++ b/.gitignore @@ -118,8 +118,11 @@ __not_found__ thirdparties/rocksdb/lib/ thirdparties/rocksdb/include/ thirdparties/rocksdb/rocksdb/ +thirdparties/rocksdb/*log thirdparties/rocksdb/*.tar.gz thirdparties/aws/*.tar.gz +thirdparties/etcdclient/tmp/ +thirdparties/etcdclient/*.h /external /bazel-* diff --git a/curvefs/src/client/s3/client_s3_cache_manager.cpp b/curvefs/src/client/s3/client_s3_cache_manager.cpp index 509da89cab..04f9b0e097 100644 --- a/curvefs/src/client/s3/client_s3_cache_manager.cpp +++ b/curvefs/src/client/s3/client_s3_cache_manager.cpp @@ -2254,6 +2254,10 @@ CURVEFS_ERROR DataCache::Flush(uint64_t inodeId, bool toS3) { s3ClientAdaptor_->IsReadWriteCache() && !s3ClientAdaptor_->GetDiskCacheManager()->IsDiskCacheFull() && !toS3; + bool useReadCache = + s3ClientAdaptor_->IsReadCache() && + !s3ClientAdaptor_->GetDiskCacheManager()->IsDiskCacheFull() && + !toS3; while (tmpLen > 0) { if (blockPos + tmpLen > blockSize) { n = blockSize - blockPos; @@ -2288,10 +2292,12 @@ CURVEFS_ERROR DataCache::Flush(uint64_t inodeId, bool toS3) { ++iter) { VLOG(9) << "upload start: " << (*iter)->key << " len : " << (*iter)->bufferSize; - if (!useDiskCache) { - s3ClientAdaptor_->GetS3Client()->UploadAsync(*iter); - } else { + if (useDiskCache) { s3ClientAdaptor_->GetDiskCacheManager()->Enqueue(*iter); + } else if (useReadCache) { + s3ClientAdaptor_->GetDiskCacheManager()->Enqueue(*iter, true); + } else { + s3ClientAdaptor_->GetS3Client()->UploadAsync(*iter); } } cond.Wait(); diff --git a/curvefs/src/client/s3/disk_cache_manager_impl.cpp b/curvefs/src/client/s3/disk_cache_manager_impl.cpp index aefdd796de..159e981df1 100644 --- a/curvefs/src/client/s3/disk_cache_manager_impl.cpp +++ b/curvefs/src/client/s3/disk_cache_manager_impl.cpp @@ -56,13 +56,32 @@ int DiskCacheManagerImpl::Init(const S3ClientAdaptorOption option) { } void DiskCacheManagerImpl::Enqueue( - std::shared_ptr context) { + std::shared_ptr context, bool isReadWrite) { + if (isReadWrite) { + auto task = [this, context]() { + this->WriteReadDirectClosure(context); + }; + taskPool_.Enqueue(task); + return; + } auto task = [this, context]() { this->WriteClosure(context); }; taskPool_.Enqueue(task); } + + +int DiskCacheManagerImpl::WriteReadDirectClosure( + std::shared_ptr context) { + VLOG(9) << "WriteReadClosure start, name: " << context->key; + int ret = WriteReadDirect(context->key, context->buffer, context->bufferSize); + context->retCode = ret; + context->cb(context); + VLOG(9) << "WriteReadClosure end, name: " << context->key; + return 0; +} + int DiskCacheManagerImpl::WriteClosure( std::shared_ptr context) { VLOG(9) << "WriteClosure start, name: " << context->key; diff --git a/curvefs/src/client/s3/disk_cache_manager_impl.h b/curvefs/src/client/s3/disk_cache_manager_impl.h index 7dc32b6ed8..a5b838ea71 100644 --- a/curvefs/src/client/s3/disk_cache_manager_impl.h +++ b/curvefs/src/client/s3/disk_cache_manager_impl.h @@ -117,7 +117,7 @@ class DiskCacheManagerImpl { virtual int ClearReadCache(const std::list &files); - void Enqueue(std::shared_ptr context); + void Enqueue(std::shared_ptr context, bool isReadWrite = false); private: int WriteDiskFile(const std::string name, const char *buf, uint64_t length); @@ -128,6 +128,8 @@ class DiskCacheManagerImpl { std::shared_ptr client_; int WriteClosure(std::shared_ptr context); + + int WriteReadDirectClosure(std::shared_ptr context); // threads for disk cache uint32_t threads_; TaskThreadPool diff --git a/curvefs/src/client/s3/disk_cache_write.h b/curvefs/src/client/s3/disk_cache_write.h index 421504a419..54e13eeaa4 100644 --- a/curvefs/src/client/s3/disk_cache_write.h +++ b/curvefs/src/client/s3/disk_cache_write.h @@ -39,7 +39,6 @@ #include "curvefs/src/common/wrap_posix.h" #include "curvefs/src/common/utils.h" #include "curvefs/src/client/s3/client_s3.h" -#include "curvefs/src/client/s3/disk_cache_write.h" #include "curvefs/src/client/s3/disk_cache_read.h" #include "curvefs/src/client/common/config.h" #include "curvefs/src/client/s3/disk_cache_base.h" @@ -135,6 +134,7 @@ class DiskCacheWrite : public DiskCacheBase { } private: + using DiskCacheBase::Init; int AsyncUploadFunc(); void UploadFile(const std::list &toUpload, std::shared_ptr syncTask = nullptr);