diff --git a/.gitignore b/.gitignore index f5d35348bb..7e26997420 100755 --- a/.gitignore +++ b/.gitignore @@ -118,8 +118,11 @@ __not_found__ thirdparties/rocksdb/lib/ thirdparties/rocksdb/include/ thirdparties/rocksdb/rocksdb/ +thirdparties/rocksdb/*log thirdparties/rocksdb/*.tar.gz thirdparties/aws/*.tar.gz +thirdparties/etcdclient/tmp/ +thirdparties/etcdclient/*.h /external /bazel-* diff --git a/curvefs/src/client/s3/client_s3_cache_manager.cpp b/curvefs/src/client/s3/client_s3_cache_manager.cpp index 509da89cab..7653e0ac2a 100644 --- a/curvefs/src/client/s3/client_s3_cache_manager.cpp +++ b/curvefs/src/client/s3/client_s3_cache_manager.cpp @@ -2197,6 +2197,11 @@ CURVEFS_ERROR DataCache::Flush(uint64_t inodeId, bool toS3) { curve::common::CountDownEvent cond(1); std::atomic pendingReq(0); FSStatusCode ret; + enum class cachePoily { + NCache, + RCache, + WRCache, + } cachePoily = cachePoily::NCache; VLOG(9) << "DataCache::Flush : now:" << now << ",createTime:" << createTime_ << ",flushIntervalSec:" << flushIntervalSec @@ -2225,8 +2230,22 @@ CURVEFS_ERROR DataCache::Flush(uint64_t inodeId, bool toS3) { << ", inodeId:" << inodeId << ",Len:" << tmpLen << ",blockPos:" << blockPos << ",blockIndex:" << blockIndex; + + const bool mayCache = s3ClientAdaptor_->HasDiskCache() && + !s3ClientAdaptor_->GetDiskCacheManager() + ->IsDiskCacheFull() && !toS3; + + if (s3ClientAdaptor_->IsReadCache() && mayCache) { + cachePoily = cachePoily::RCache; + } else if (s3ClientAdaptor_->IsReadWriteCache() && mayCache) { + cachePoily = cachePoily::WRCache; + } else { + cachePoily = cachePoily::NCache; + } + PutObjectAsyncCallBack cb = - [&](const std::shared_ptr &context) { + [&, cachePoily] + (const std::shared_ptr &context) { if (context->retCode == 0) { if (s3ClientAdaptor_->s3Metric_.get() != nullptr) { s3ClientAdaptor_->CollectMetrics( @@ -2243,6 +2262,11 @@ CURVEFS_ERROR DataCache::Flush(uint64_t inodeId, bool toS3) { } VLOG(9) << "PutObjectAsyncCallBack: " << context->key << " pendingReq is: " << pendingReq; + if (cachePoily::RCache == cachePoily) { + VLOG(9) << "Write to read cache, name: " << context->key; + s3ClientAdaptor_->GetDiskCacheManager() + ->Enqueue(context, true); + } return; } LOG(WARNING) << "Put object failed, key: " << context->key; @@ -2250,10 +2274,7 @@ CURVEFS_ERROR DataCache::Flush(uint64_t inodeId, bool toS3) { }; std::vector> uploadTasks; - bool useDiskCache = - s3ClientAdaptor_->IsReadWriteCache() && - !s3ClientAdaptor_->GetDiskCacheManager()->IsDiskCacheFull() && - !toS3; + while (tmpLen > 0) { if (blockPos + tmpLen > blockSize) { n = blockSize - blockPos; @@ -2288,10 +2309,10 @@ CURVEFS_ERROR DataCache::Flush(uint64_t inodeId, bool toS3) { ++iter) { VLOG(9) << "upload start: " << (*iter)->key << " len : " << (*iter)->bufferSize; - if (!useDiskCache) { - s3ClientAdaptor_->GetS3Client()->UploadAsync(*iter); - } else { + if (cachePoily::WRCache == cachePoily) { s3ClientAdaptor_->GetDiskCacheManager()->Enqueue(*iter); + } else { + s3ClientAdaptor_->GetS3Client()->UploadAsync(*iter); } } cond.Wait(); diff --git a/curvefs/src/client/s3/disk_cache_manager_impl.cpp b/curvefs/src/client/s3/disk_cache_manager_impl.cpp index aefdd796de..d1bc3ec82d 100644 --- a/curvefs/src/client/s3/disk_cache_manager_impl.cpp +++ b/curvefs/src/client/s3/disk_cache_manager_impl.cpp @@ -56,13 +56,32 @@ int DiskCacheManagerImpl::Init(const S3ClientAdaptorOption option) { } void DiskCacheManagerImpl::Enqueue( - std::shared_ptr context) { + std::shared_ptr context, bool isReadCacheOnly) { + if ( isReadCacheOnly ) { + auto task = [this, context]() { + this->WriteReadDirectClosure(context); + }; + taskPool_.Enqueue(task); + return; + } auto task = [this, context]() { this->WriteClosure(context); }; taskPool_.Enqueue(task); } + + +int DiskCacheManagerImpl::WriteReadDirectClosure( + std::shared_ptr context) { + VLOG(9) << "WriteReadClosure start, name: " << context->key; + // Write to read cache, we don't care if the cache wirte success + int ret = WriteReadDirect(context->key, + context->buffer, context->bufferSize); + VLOG(9) << "WriteReadClosure end, name: " << context->key; + return ret; +} + int DiskCacheManagerImpl::WriteClosure( std::shared_ptr context) { VLOG(9) << "WriteClosure start, name: " << context->key; diff --git a/curvefs/src/client/s3/disk_cache_manager_impl.h b/curvefs/src/client/s3/disk_cache_manager_impl.h index 7dc32b6ed8..b7ee405540 100644 --- a/curvefs/src/client/s3/disk_cache_manager_impl.h +++ b/curvefs/src/client/s3/disk_cache_manager_impl.h @@ -117,7 +117,8 @@ class DiskCacheManagerImpl { virtual int ClearReadCache(const std::list &files); - void Enqueue(std::shared_ptr context); + void Enqueue(std::shared_ptr context, + bool isReadCacheOnly = false); private: int WriteDiskFile(const std::string name, const char *buf, uint64_t length); @@ -128,6 +129,8 @@ class DiskCacheManagerImpl { std::shared_ptr client_; int WriteClosure(std::shared_ptr context); + + int WriteReadDirectClosure(std::shared_ptr context); // threads for disk cache uint32_t threads_; TaskThreadPool diff --git a/curvefs/src/client/s3/disk_cache_write.h b/curvefs/src/client/s3/disk_cache_write.h index 421504a419..54e13eeaa4 100644 --- a/curvefs/src/client/s3/disk_cache_write.h +++ b/curvefs/src/client/s3/disk_cache_write.h @@ -39,7 +39,6 @@ #include "curvefs/src/common/wrap_posix.h" #include "curvefs/src/common/utils.h" #include "curvefs/src/client/s3/client_s3.h" -#include "curvefs/src/client/s3/disk_cache_write.h" #include "curvefs/src/client/s3/disk_cache_read.h" #include "curvefs/src/client/common/config.h" #include "curvefs/src/client/s3/disk_cache_base.h" @@ -135,6 +134,7 @@ class DiskCacheWrite : public DiskCacheBase { } private: + using DiskCacheBase::Init; int AsyncUploadFunc(); void UploadFile(const std::list &toUpload, std::shared_ptr syncTask = nullptr); diff --git a/curvefs/test/client/test_disk_cache_manager_impl.cpp b/curvefs/test/client/test_disk_cache_manager_impl.cpp index 52f27fcf2f..2c16c14cca 100644 --- a/curvefs/test/client/test_disk_cache_manager_impl.cpp +++ b/curvefs/test/client/test_disk_cache_manager_impl.cpp @@ -23,6 +23,7 @@ #include #include +#include "curvefs/src/client/common/common.h" #include "curvefs/test/client/mock_disk_cache_write.h" #include "curvefs/test/client/mock_disk_cache_read.h" #include "curvefs/test/client/mock_disk_cache_manager.h" @@ -138,6 +139,35 @@ TEST_F(TestDiskCacheManagerImpl, WriteClosure) { sleep(5); } +TEST_F(TestDiskCacheManagerImpl, WriteReadClosure) { + PutObjectAsyncCallBack cb = + [&](const std::shared_ptr &context) { + }; + auto context = std::make_shared(); + context->key = "objectName"; + char data[5] = "gggg"; + context->buffer = data + 0; + context->bufferSize = 2; + context->cb = cb; + context->startTime = butil::cpuwide_time_us(); + + S3ClientAdaptorOption s3AdaptorOption; + s3AdaptorOption.diskCacheOpt.threads = 5; + s3AdaptorOption.diskCacheOpt.diskCacheType = DiskCacheType::OnlyRead; + EXPECT_CALL(*diskCacheManager_, Init(_, _)).WillOnce(Return(0)); + diskCacheManagerImpl_->Init(s3AdaptorOption); + std::string fileName = "test"; + std::string buf = "test"; + + // If the mode is read cache, will call WriteReadDirect + EXPECT_CALL(*diskCacheManager_, IsDiskUsedInited()).WillOnce(Return(true)); + EXPECT_CALL(*diskCacheManager_, IsDiskCacheFull()).WillOnce(Return(false)); + EXPECT_CALL(*diskCacheManager_, WriteReadDirect(_, _, _)) + .WillOnce(Return(context->bufferSize)); + diskCacheManagerImpl_->Enqueue(context, true); + sleep(5); +} + TEST_F(TestDiskCacheManagerImpl, Write) { std::string fileName = "test"; std::string buf = "test";