Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: tangruilin <[email protected]>
  • Loading branch information
Tangruilin committed Nov 15, 2022
1 parent fdd2804 commit c6e55af
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 22 deletions.
20 changes: 13 additions & 7 deletions curvefs/src/client/s3/client_s3_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2225,8 +2225,14 @@ CURVEFS_ERROR DataCache::Flush(uint64_t inodeId, bool toS3) {
<< ", inodeId:" << inodeId
<< ",Len:" << tmpLen << ",blockPos:" << blockPos
<< ",blockIndex:" << blockIndex;

bool useReadCacheOnly =
s3ClientAdaptor_->IsReadCache() &&
!s3ClientAdaptor_->GetDiskCacheManager()->IsDiskCacheFull() &&
!toS3;

PutObjectAsyncCallBack cb =
[&](const std::shared_ptr<PutObjectAsyncContext> &context) {
[&, useReadCacheOnly](const std::shared_ptr<PutObjectAsyncContext> &context) {
if (context->retCode == 0) {
if (s3ClientAdaptor_->s3Metric_.get() != nullptr) {
s3ClientAdaptor_->CollectMetrics(
Expand All @@ -2243,6 +2249,11 @@ CURVEFS_ERROR DataCache::Flush(uint64_t inodeId, bool toS3) {
}
VLOG(9) << "PutObjectAsyncCallBack: " << context->key
<< " pendingReq is: " << pendingReq;
if (useReadCacheOnly) {
VLOG(9) << "Write to read cache, name: " << context->key;
s3ClientAdaptor_->GetDiskCacheManager()
->Enqueue(context, true);
}
return;
}
LOG(WARNING) << "Put object failed, key: " << context->key;
Expand All @@ -2254,10 +2265,7 @@ CURVEFS_ERROR DataCache::Flush(uint64_t inodeId, bool toS3) {
s3ClientAdaptor_->IsReadWriteCache() &&
!s3ClientAdaptor_->GetDiskCacheManager()->IsDiskCacheFull() &&
!toS3;
bool useReadCacheOnly =
s3ClientAdaptor_->IsReadCache() &&
!s3ClientAdaptor_->GetDiskCacheManager()->IsDiskCacheFull() &&
!toS3;

while (tmpLen > 0) {
if (blockPos + tmpLen > blockSize) {
n = blockSize - blockPos;
Expand Down Expand Up @@ -2294,8 +2302,6 @@ CURVEFS_ERROR DataCache::Flush(uint64_t inodeId, bool toS3) {
<< " len : " << (*iter)->bufferSize;
if (useReadWriteCache) {
s3ClientAdaptor_->GetDiskCacheManager()->Enqueue(*iter);
} else if (useReadCacheOnly) {
s3ClientAdaptor_->GetDiskCacheManager()->Enqueue(*iter, true);
} else {
s3ClientAdaptor_->GetS3Client()->UploadAsync(*iter);
}
Expand Down
15 changes: 1 addition & 14 deletions curvefs/src/client/s3/disk_cache_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,9 @@ void DiskCacheManagerImpl::Enqueue(
int DiskCacheManagerImpl::WriteReadDirectClosure(
std::shared_ptr<PutObjectAsyncContext> context) {
VLOG(9) << "WriteReadClosure start, name: " << context->key;
// Upload to s3
int s3Ret = client_->Upload(context->key,
context->buffer, context->bufferSize);
if (s3Ret < 0) {
LOG(ERROR) << "Upload to s3 error";
context->retCode = s3Ret;
context->cb(context);
return s3Ret;
}

// Write to read cache
// Write to read cache, we don't care if the cache wirte success
int ret = WriteReadDirect(context->key,
context->buffer, context->bufferSize);
context->retCode = ret;
context->cb(context);
return ret;
VLOG(9) << "WriteReadClosure end, name: " << context->key;
return ret;
}
Expand Down
3 changes: 2 additions & 1 deletion curvefs/test/client/test_disk_cache_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <gtest/gtest.h>
#include <gmock/gmock.h>

#include "curvefs/src/client/common/common.h"
#include "curvefs/test/client/mock_disk_cache_write.h"
#include "curvefs/test/client/mock_disk_cache_read.h"
#include "curvefs/test/client/mock_disk_cache_manager.h"
Expand Down Expand Up @@ -152,6 +153,7 @@ TEST_F(TestDiskCacheManagerImpl, WriteReadClosure) {

S3ClientAdaptorOption s3AdaptorOption;
s3AdaptorOption.diskCacheOpt.threads = 5;
s3AdaptorOption.diskCacheOpt.diskCacheType = DiskCacheType::OnlyRead;
EXPECT_CALL(*diskCacheManager_, Init(_, _)).WillOnce(Return(0));
diskCacheManagerImpl_->Init(s3AdaptorOption);
std::string fileName = "test";
Expand All @@ -160,7 +162,6 @@ TEST_F(TestDiskCacheManagerImpl, WriteReadClosure) {
// If the mode is read cache, will call WriteReadDirect
EXPECT_CALL(*diskCacheManager_, IsDiskUsedInited()).WillOnce(Return(true));
EXPECT_CALL(*diskCacheManager_, IsDiskCacheFull()).WillOnce(Return(false));
EXPECT_CALL(*client_, Upload(_, _, _)).WillOnce(Return(0));
EXPECT_CALL(*diskCacheManager_, WriteReadDirect(_, _, _))
.WillOnce(Return(context->bufferSize));
diskCacheManagerImpl_->Enqueue(context, true);
Expand Down

0 comments on commit c6e55af

Please sign in to comment.