Skip to content

Commit

Permalink
curvefs/client: optimize DiskCacheManager::UmountDiskCache()
Browse files Browse the repository at this point in the history
Signed-off-by: Ziy1-Tan <ajb459684460@gmail.com>
  • Loading branch information
Ziy1-Tan committed Mar 1, 2023
1 parent e64010b commit 0580a41
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 26 deletions.
9 changes: 4 additions & 5 deletions curvefs/src/client/s3/disk_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,13 @@ bool DiskCacheManager::IsCached(const std::string name) {

int DiskCacheManager::UmountDiskCache() {
LOG(INFO) << "umount disk cache.";
int ret;
diskInitThread_.join();
ret = cacheWrite_->UploadAllCacheWriteFile();
if (ret < 0) {
LOG(ERROR) << "umount disk cache error.";
}
TrimStop();
cacheWrite_->AsyncUploadStop();
std::set<std::string> objs;
if (cacheWrite_->LoadAllCacheFile(&objs) == 0 && !objs.empty()) {
LOG(ERROR) << "umount disk cache error.";
}
LOG(INFO) << "umount disk cache end.";
return 0;
}
Expand Down
17 changes: 12 additions & 5 deletions curvefs/src/client/s3/disk_cache_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ void DiskCacheWrite::Init(std::shared_ptr<S3Client> client,
}

void DiskCacheWrite::AsyncUploadEnqueue(const std::string objName) {
std::lock_guard<bthread::Mutex> lk(mtx_);
std::lock_guard<std::mutex> lock(mtx_);
waitUpload_.push_back(objName);
}

Expand Down Expand Up @@ -139,7 +139,7 @@ int DiskCacheWrite::UploadFile(const std::string &name,
[&, buffer, syncTask, name]
(const std::shared_ptr<PutObjectAsyncContext> &context) {
if (context->retCode == 0) {
if (metric_.get() != nullptr) {
if (metric_ != nullptr) {
metric_->writeS3.bps.count << context->bufferSize;
metric_->writeS3.qps.count << 1;
metric_->writeS3.latency
Expand Down Expand Up @@ -186,7 +186,7 @@ bool DiskCacheWrite::WriteCacheValid() {

int DiskCacheWrite::GetUploadFile(const std::string &inode,
std::list<std::string> *toUpload) {
std::unique_lock<bthread::Mutex> lk(mtx_);
std::unique_lock<std::mutex> lk(mtx_);
if (waitUpload_.empty()) {
return 0;
}
Expand Down Expand Up @@ -283,6 +283,8 @@ int DiskCacheWrite::AsyncUploadFunc() {
}
toUpload.clear();
if (GetUploadFile("", &toUpload) <= 0) {
std::unique_lock<std::mutex> lock(mtx_);
cond_.notify_all();
continue;
}
VLOG(6) << "async upload file size = " << toUpload.size();
Expand All @@ -303,15 +305,20 @@ int DiskCacheWrite::AsyncUploadRun() {
}

int DiskCacheWrite::AsyncUploadStop() {
if (isRunning_.load()) {
std::unique_lock<std::mutex> lock(mtx_);
while (!waitUpload_.empty()) {
cond_.wait_for(lock, std::chrono::milliseconds(asyncLoadPeriodMs_));
}
}
if (isRunning_.exchange(false)) {
LOG(INFO) << "stop AsyncUpload thread...";
sleeper_.interrupt();
backEndThread_.join();
LOG(INFO) << "stop AsyncUpload thread ok.";
return -1;
} else {
LOG(INFO) << "AsyncUpload thread not running.";
}
LOG(INFO) << "AsyncUpload thread not running.";
return 0;
}

Expand Down
3 changes: 2 additions & 1 deletion curvefs/src/client/s3/disk_cache_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ class DiskCacheWrite : public DiskCacheBase {
curve::common::Thread backEndThread_;
curve::common::Atomic<bool> isRunning_;
std::list<std::string> waitUpload_;
bthread::Mutex mtx_;
std::mutex mtx_;
std::condition_variable cond_;
InterruptibleSleeper sleeper_;
uint64_t asyncLoadPeriodMs_;
std::shared_ptr<S3Client> client_;
Expand Down
12 changes: 0 additions & 12 deletions curvefs/test/client/test_disk_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,6 @@ TEST_F(TestDiskCacheManager, TrimRun_1) {
option.diskCacheOpt.trimCheckIntervalSec = 1;
EXPECT_CALL(*wrapper, stat(NotNull(), NotNull())).WillOnce(Return(-1));
EXPECT_CALL(*wrapper, mkdir(_, _)).WillOnce(Return(-1));
EXPECT_CALL(*diskCacheWrite_, UploadAllCacheWriteFile())
.WillOnce(Return(0));
diskCacheManager_->Init(client_, option);
diskCacheManager_->InitMetrics("test");
EXPECT_CALL(*wrapper, statfs(NotNull(), NotNull()))
Expand Down Expand Up @@ -312,8 +310,6 @@ TEST_F(TestDiskCacheManager, TrimCache_2) {
option.diskCacheOpt.trimCheckIntervalSec = 1;
EXPECT_CALL(*wrapper, stat(NotNull(), NotNull())).WillOnce(Return(-1));
EXPECT_CALL(*wrapper, mkdir(_, _)).WillOnce(Return(-1));
EXPECT_CALL(*diskCacheWrite_, UploadAllCacheWriteFile())
.WillOnce(Return(0));
diskCacheManager_->Init(client_, option);
diskCacheManager_->InitMetrics("test");
diskCacheManager_->AddCache("test");
Expand Down Expand Up @@ -349,8 +345,6 @@ TEST_F(TestDiskCacheManager, TrimCache_4) {
option.diskCacheOpt.trimCheckIntervalSec = 1;
EXPECT_CALL(*wrapper, stat(NotNull(), NotNull())).WillOnce(Return(-1));
EXPECT_CALL(*wrapper, mkdir(_, _)).WillOnce(Return(-1));
EXPECT_CALL(*diskCacheWrite_, UploadAllCacheWriteFile())
.WillOnce(Return(0));
diskCacheManager_->Init(client_, option);
diskCacheManager_->InitMetrics("test");
diskCacheManager_->AddCache("test");
Expand Down Expand Up @@ -387,8 +381,6 @@ TEST_F(TestDiskCacheManager, TrimCache_5) {
option.diskCacheOpt.trimCheckIntervalSec = 1;
EXPECT_CALL(*wrapper, stat(NotNull(), NotNull())).WillOnce(Return(-1));
EXPECT_CALL(*wrapper, mkdir(_, _)).WillOnce(Return(-1));
EXPECT_CALL(*diskCacheWrite_, UploadAllCacheWriteFile())
.WillOnce(Return(0));
diskCacheManager_->Init(client_, option);
diskCacheManager_->InitMetrics("test");
diskCacheManager_->AddCache("test");
Expand Down Expand Up @@ -428,8 +420,6 @@ TEST_F(TestDiskCacheManager, TrimCache_noexceed) {
.WillOnce(Return(0))
.WillOnce(Return(-1))
.WillOnce(Return(0));
EXPECT_CALL(*diskCacheWrite_, UploadAllCacheWriteFile())
.WillOnce(Return(0));
int ret = diskCacheManager_->TrimRun();
diskCacheManager_->InitMetrics("test");
sleep(6);
Expand Down Expand Up @@ -473,8 +463,6 @@ TEST_F(TestDiskCacheManager, TrimCache_exceed) {
.Times(2)
.WillOnce(Return(-1))
.WillOnce(Return(0));
EXPECT_CALL(*diskCacheWrite_, UploadAllCacheWriteFile())
.WillOnce(Return(0));
diskCacheManager_->TrimRun();
diskCacheManager_->InitMetrics("test");
sleep(6);
Expand Down
11 changes: 8 additions & 3 deletions curvefs/test/client/test_disk_cache_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,10 +462,15 @@ TEST_F(TestDiskCacheWrite, AsyncUploadRun) {
sleep(1);
diskCacheWrite_->AsyncUploadEnqueue("test");
std::string t1 = "test";
curve::common::Thread backEndThread =
std::thread(&DiskCacheWrite::AsyncUploadEnqueue, diskCacheWrite_, t1);
std::vector<curve::common::Thread> threads;
for (int i = 0; i < 5; i++) {
threads.emplace_back(&DiskCacheWrite::AsyncUploadEnqueue,
diskCacheWrite_, t1);
}
diskCacheWrite_->AsyncUploadStop();
backEndThread.join();
for (auto &t : threads) {
t.join();
}
}

TEST_F(TestDiskCacheWrite, UploadFileByInode) {
Expand Down

0 comments on commit 0580a41

Please sign in to comment.