Skip to content

Commit

Permalink
[Issue opencurve#2025] local cache
Browse files Browse the repository at this point in the history
  • Loading branch information
Tangruilin committed Nov 12, 2022
1 parent 09566f5 commit 17133ec
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 6 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,11 @@ __not_found__
thirdparties/rocksdb/lib/
thirdparties/rocksdb/include/
thirdparties/rocksdb/rocksdb/
thirdparties/rocksdb/*log
thirdparties/rocksdb/*.tar.gz
thirdparties/aws/*.tar.gz
thirdparties/etcdclient/tmp/
thirdparties/etcdclient/*.h

/external
/bazel-*
Expand Down
12 changes: 9 additions & 3 deletions curvefs/src/client/s3/client_s3_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2254,6 +2254,10 @@ CURVEFS_ERROR DataCache::Flush(uint64_t inodeId, bool toS3) {
s3ClientAdaptor_->IsReadWriteCache() &&
!s3ClientAdaptor_->GetDiskCacheManager()->IsDiskCacheFull() &&
!toS3;
bool useReadCache =
s3ClientAdaptor_->IsReadCache() &&
!s3ClientAdaptor_->GetDiskCacheManager()->IsDiskCacheFull() &&
!toS3;
while (tmpLen > 0) {
if (blockPos + tmpLen > blockSize) {
n = blockSize - blockPos;
Expand Down Expand Up @@ -2288,10 +2292,12 @@ CURVEFS_ERROR DataCache::Flush(uint64_t inodeId, bool toS3) {
++iter) {
VLOG(9) << "upload start: " << (*iter)->key
<< " len : " << (*iter)->bufferSize;
if (!useDiskCache) {
s3ClientAdaptor_->GetS3Client()->UploadAsync(*iter);
} else {
if (useDiskCache) {
s3ClientAdaptor_->GetDiskCacheManager()->Enqueue(*iter);
} else if (useReadCache) {
s3ClientAdaptor_->GetDiskCacheManager()->Enqueue(*iter, true);
} else {
s3ClientAdaptor_->GetS3Client()->UploadAsync(*iter);
}
}
cond.Wait();
Expand Down
21 changes: 20 additions & 1 deletion curvefs/src/client/s3/disk_cache_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,32 @@ int DiskCacheManagerImpl::Init(const S3ClientAdaptorOption option) {
}

void DiskCacheManagerImpl::Enqueue(
std::shared_ptr<PutObjectAsyncContext> context) {
std::shared_ptr<PutObjectAsyncContext> context, bool isReadWrite) {
if (isReadWrite) {
auto task = [this, context]() {
this->WriteReadDirectClosure(context);
};
taskPool_.Enqueue(task);
return;
}
auto task = [this, context]() {
this->WriteClosure(context);
};
taskPool_.Enqueue(task);
}



int DiskCacheManagerImpl::WriteReadDirectClosure(
std::shared_ptr<PutObjectAsyncContext> context) {
VLOG(9) << "WriteReadClosure start, name: " << context->key;
int ret = WriteReadDirect(context->key, context->buffer, context->bufferSize);
context->retCode = ret;
context->cb(context);
VLOG(9) << "WriteReadClosure end, name: " << context->key;
return 0;
}

int DiskCacheManagerImpl::WriteClosure(
std::shared_ptr<PutObjectAsyncContext> context) {
VLOG(9) << "WriteClosure start, name: " << context->key;
Expand Down
4 changes: 3 additions & 1 deletion curvefs/src/client/s3/disk_cache_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class DiskCacheManagerImpl {

virtual int ClearReadCache(const std::list<std::string> &files);

void Enqueue(std::shared_ptr<PutObjectAsyncContext> context);
void Enqueue(std::shared_ptr<PutObjectAsyncContext> context, bool isReadWrite = false);

private:
int WriteDiskFile(const std::string name, const char *buf, uint64_t length);
Expand All @@ -128,6 +128,8 @@ class DiskCacheManagerImpl {
std::shared_ptr<S3Client> client_;

int WriteClosure(std::shared_ptr<PutObjectAsyncContext> context);

int WriteReadDirectClosure(std::shared_ptr<PutObjectAsyncContext> context);
// threads for disk cache
uint32_t threads_;
TaskThreadPool<bthread::Mutex, bthread::ConditionVariable>
Expand Down
2 changes: 1 addition & 1 deletion curvefs/src/client/s3/disk_cache_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
#include "curvefs/src/common/wrap_posix.h"
#include "curvefs/src/common/utils.h"
#include "curvefs/src/client/s3/client_s3.h"
#include "curvefs/src/client/s3/disk_cache_write.h"
#include "curvefs/src/client/s3/disk_cache_read.h"
#include "curvefs/src/client/common/config.h"
#include "curvefs/src/client/s3/disk_cache_base.h"
Expand Down Expand Up @@ -135,6 +134,7 @@ class DiskCacheWrite : public DiskCacheBase {
}

private:
using DiskCacheBase::Init;
int AsyncUploadFunc();
void UploadFile(const std::list<std::string> &toUpload,
std::shared_ptr<SynchronizationTask> syncTask = nullptr);
Expand Down

0 comments on commit 17133ec

Please sign in to comment.