Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix]curvefs: enableSumInDir in multi mount situation #2491

Merged
merged 1 commit into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ build --cxxopt -Wno-error=format-security
build:gcc7-later --cxxopt -faligned-new
build --incompatible_blacklisted_protos_requires_proto_info=false
build --copt=-fdiagnostics-color=always
run --copt=-fdiagnostics-color=always
run --copt=-fdiagnostics-color=always
1 change: 1 addition & 0 deletions curvefs/proto/mds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ message RefreshSessionRequest {
message RefreshSessionResponse {
required FSStatusCode statusCode = 1;
repeated topology.PartitionTxId latestTxIdList = 2;
optional bool enableSumInDir = 3;
}

message DLockValue {
Expand Down
29 changes: 16 additions & 13 deletions curvefs/src/client/fuse_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ CURVEFS_ERROR FuseClient::Init(const FuseClientOption &option) {
auto channelManager = std::make_shared<ChannelManager<MetaserverID>>();

leaseExecutor_ = absl::make_unique<LeaseExecutor>(option.leaseOpt,
metaCache, mdsClient_);
metaCache, mdsClient_,
&enableSumInDir_);

xattrManager_ = std::make_shared<XattrManager>(inodeManager_,
dentryManager_, option_.listDentryLimit, option_.listDentryThreads);
Expand Down Expand Up @@ -360,7 +361,7 @@ CURVEFS_ERROR FuseClient::FuseOpOpen(fuse_req_t req, fuse_ino_t ino,
inodeWrapper->MarkDirty();
}

if (enableSumInDir_ && length != 0) {
if (enableSumInDir_.load() && length != 0) {
// update parent summary info
const Inode *inode = inodeWrapper->GetInodeLocked();
XAttr xattr;
Expand Down Expand Up @@ -495,7 +496,7 @@ CURVEFS_ERROR FuseClient::MakeNode(fuse_req_t req, fuse_ino_t parent,
<< ", parent = " << parent << ", name = " << name
<< ", mode = " << mode;

if (enableSumInDir_) {
if (enableSumInDir_.load()) {
// update parent summary info
XAttr xattr;
xattr.mutable_xattrinfos()->insert({XATTRENTRIES, "1"});
Expand Down Expand Up @@ -568,7 +569,7 @@ CURVEFS_ERROR FuseClient::DeleteNode(uint64_t ino, fuse_ino_t parent,
<< ", parent = " << parent << ", name = " << name;
}

if (enableSumInDir_) {
if (enableSumInDir_.load()) {
// update parent summary info
XAttr xattr;
xattr.mutable_xattrinfos()->insert({XATTRENTRIES, "1"});
Expand Down Expand Up @@ -668,7 +669,7 @@ CURVEFS_ERROR FuseClient::CreateManageNode(fuse_req_t req, uint64_t parent,
<< ", parent = " << parent << ", name = " << name
<< ", mode = " << mode;

if (enableSumInDir_) {
if (enableSumInDir_.load()) {
// update parent summary info
XAttr xattr;
xattr.mutable_xattrinfos()->insert({XATTRENTRIES, "1"});
Expand Down Expand Up @@ -1025,7 +1026,7 @@ CURVEFS_ERROR FuseClient::FuseOpRename(fuse_req_t req, fuse_ino_t parent,
renameOp.UpdateInodeCtime();
renameOp.UpdateCache();

if (enableSumInDir_) {
if (enableSumInDir_.load()) {
xattrManager_->UpdateParentXattrAfterRename(
parent, newparent, newname, &renameOp);
}
Expand Down Expand Up @@ -1117,7 +1118,7 @@ CURVEFS_ERROR FuseClient::FuseOpSetAttr(fuse_req_t req, fuse_ino_t ino,
inodeWrapper->GetInodeAttrLocked(&inodeAttr);
InodeAttr2ParamAttr(inodeAttr, attrOut);

if (enableSumInDir_ && changeSize != 0) {
if (enableSumInDir_.load() && changeSize != 0) {
// update parent summary info
const Inode* inode = inodeWrapper->GetInodeLocked();
XAttr xattr;
Expand Down Expand Up @@ -1164,7 +1165,8 @@ CURVEFS_ERROR FuseClient::FuseOpGetXattr(fuse_req_t req, fuse_ino_t ino,
return ret;
}

ret = xattrManager_->GetXattr(name, value, &inodeAttr, enableSumInDir_);
ret = xattrManager_->GetXattr(name, value,
&inodeAttr, enableSumInDir_.load());
if (CURVEFS_ERROR::OK != ret) {
LOG(ERROR) << "xattrManager get xattr failed, name = " << name;
return ret;
Expand Down Expand Up @@ -1334,7 +1336,7 @@ CURVEFS_ERROR FuseClient::FuseOpSymlink(fuse_req_t req, const char *link,
return ret;
}

if (enableSumInDir_) {
if (enableSumInDir_.load()) {
// update parent summary info
XAttr xattr;
xattr.mutable_xattrinfos()->insert({XATTRENTRIES, "1"});
Expand Down Expand Up @@ -1405,7 +1407,7 @@ CURVEFS_ERROR FuseClient::FuseOpLink(fuse_req_t req, fuse_ino_t ino,
return ret;
}

if (enableSumInDir_) {
if (enableSumInDir_.load()) {
// update parent summary info
XAttr xattr;
xattr.mutable_xattrinfos()->insert({XATTRENTRIES, "1"});
Expand Down Expand Up @@ -1488,14 +1490,15 @@ FuseClient::SetMountStatus(const struct MountOption *mountOption) {
}
inodeManager_->SetFsId(fsInfo_->fsid());
dentryManager_->SetFsId(fsInfo_->fsid());
enableSumInDir_ = fsInfo_->enablesumindir() && !FLAGS_enableCto;
enableSumInDir_.store(fsInfo_->enablesumindir());
if (fsInfo_->has_recycletimehour()) {
enableSumInDir_ = enableSumInDir_ && (fsInfo_->recycletimehour() == 0);
enableSumInDir_.store(enableSumInDir_.load() &&
(fsInfo_->recycletimehour() == 0));
}

LOG(INFO) << "Mount " << fsName << " on " << mountpoint_.ShortDebugString()
<< " success!"
<< " enableSumInDir = " << enableSumInDir_;
<< " enableSumInDir = " << enableSumInDir_.load();

fsMetric_ = std::make_shared<FSMetric>(fsName);

Expand Down
3 changes: 2 additions & 1 deletion curvefs/src/client/fuse_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <list>
#include <utility>
#include <vector>
#include <atomic>

#include "curvefs/proto/common.pb.h"
#include "curvefs/proto/mds.pb.h"
Expand Down Expand Up @@ -394,7 +395,7 @@ class FuseClient {
bool init_;

// enable record summary info in dir inode xattr
bool enableSumInDir_;
std::atomic<bool> enableSumInDir_;

std::shared_ptr<FSMetric> fsMetric_;

Expand Down
5 changes: 3 additions & 2 deletions curvefs/src/client/lease/lease_excutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ LeaseExecutor::~LeaseExecutor() {
bool LeaseExecutor::Start() {
if (opt_.leaseTimeUs <= 0 || opt_.refreshTimesPerLease <= 0) {
LOG(ERROR) << "LeaseExecutor start fail. Invalid param in leaseopt, "
"leasTimesUs = "
"leaseTimeUs = "
<< opt_.leaseTimeUs
<< ", refreshTimePerLease = " << opt_.refreshTimesPerLease;
return false;
Expand Down Expand Up @@ -78,7 +78,8 @@ bool LeaseExecutor::RefreshLease() {
// refresh from mds
std::vector<PartitionTxId> latestTxIdList;
FSStatusCode ret = mdsCli_->RefreshSession(txIds, &latestTxIdList,
fsName_, mountpoint_);
fsName_, mountpoint_,
enableSumInDir_);
if (ret != FSStatusCode::OK) {
LOG(ERROR) << "LeaseExecutor refresh session fail, ret = " << ret
<< ", errorName = " << FSStatusCode_Name(ret);
Expand Down
8 changes: 6 additions & 2 deletions curvefs/src/client/lease/lease_excutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include <memory>
#include <string>
#include <atomic>

#include "curvefs/src/client/rpcclient/metacache.h"
#include "curvefs/src/client/rpcclient/mds_client.h"
Expand All @@ -44,8 +45,10 @@ namespace client {
class LeaseExecutor : public LeaseExecutorBase {
public:
LeaseExecutor(const LeaseOpt &opt, std::shared_ptr<MetaCache> metaCache,
std::shared_ptr<MdsClient> mdsCli)
: opt_(opt), metaCache_(metaCache), mdsCli_(mdsCli) {}
std::shared_ptr<MdsClient> mdsCli,
std::atomic<bool>* enableSumInDir)
: opt_(opt), metaCache_(metaCache), mdsCli_(mdsCli),
enableSumInDir_(enableSumInDir) {}

~LeaseExecutor();

Expand Down Expand Up @@ -73,6 +76,7 @@ class LeaseExecutor : public LeaseExecutorBase {
std::unique_ptr<RefreshSessionTask> task_;
std::string fsName_;
Mountpoint mountpoint_;
std::atomic<bool>* enableSumInDir_;
};

} // namespace client
Expand Down
8 changes: 7 additions & 1 deletion curvefs/src/client/rpcclient/mds_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,8 @@ FSStatusCode
MdsClientImpl::RefreshSession(const std::vector<PartitionTxId> &txIds,
std::vector<PartitionTxId> *latestTxIdList,
const std::string& fsName,
const Mountpoint& mountpoint) {
const Mountpoint& mountpoint,
std::atomic<bool>* enableSumInDir) {
auto task = RPCTask {
(void)addrindex;
(void)rpctimeoutMS;
Expand Down Expand Up @@ -532,6 +533,11 @@ MdsClientImpl::RefreshSession(const std::vector<PartitionTxId> &txIds,
LOG(INFO) << "RefreshSession need update partition txid list: "
<< response.DebugString();
}
if (enableSumInDir->load() && !response.enablesumindir()) {
enableSumInDir->store(response.enablesumindir());
LOG(INFO) << "update enableSumInDir to "
<< response.enablesumindir();
}

return ret;
};
Expand Down
7 changes: 5 additions & 2 deletions curvefs/src/client/rpcclient/mds_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <memory>
#include <string>
#include <vector>
#include <atomic>

#include "curvefs/proto/mds.pb.h"
#include "curvefs/proto/topology.pb.h"
Expand Down Expand Up @@ -115,7 +116,8 @@ class MdsClient {
RefreshSession(const std::vector<PartitionTxId> &txIds,
std::vector<PartitionTxId> *latestTxIdList,
const std::string& fsName,
const Mountpoint& mountpoint) = 0;
const Mountpoint& mountpoint,
std::atomic<bool>* enableSumInDir) = 0;

virtual FSStatusCode GetLatestTxId(uint32_t fsId,
std::vector<PartitionTxId>* txIds) = 0;
Expand Down Expand Up @@ -202,7 +204,8 @@ class MdsClientImpl : public MdsClient {
FSStatusCode RefreshSession(const std::vector<PartitionTxId> &txIds,
std::vector<PartitionTxId> *latestTxIdList,
const std::string& fsName,
const Mountpoint& mountpoint) override;
const Mountpoint& mountpoint,
std::atomic<bool>* enableSumInDir) override;

FSStatusCode GetLatestTxId(uint32_t fsId,
std::vector<PartitionTxId>* txIds) override;
Expand Down
22 changes: 8 additions & 14 deletions curvefs/src/client/xattr_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,14 @@ CURVEFS_ERROR XattrManager::CalOneLayerSumInfo(InodeAttr *attr) {
summaryInfo.entries++;
summaryInfo.fbytes += it.length();
}
if (!(AddUllStringToFirst(
&(attr->mutable_xattr()->find(XATTRFILES)->second),
summaryInfo.files, true) &&
AddUllStringToFirst(
&(attr->mutable_xattr()->find(XATTRSUBDIRS)->second),
summaryInfo.subdirs, true) &&
AddUllStringToFirst(
&(attr->mutable_xattr()->find(XATTRENTRIES)->second),
summaryInfo.entries, true) &&
AddUllStringToFirst(
&(attr->mutable_xattr()->find(XATTRFBYTES)->second),
summaryInfo.fbytes + attr->length(), true))) {
ret = CURVEFS_ERROR::INTERNAL;
}
(*attr->mutable_xattr())[XATTRFILES] =
std::to_string(summaryInfo.files);
(*attr->mutable_xattr())[XATTRSUBDIRS] =
std::to_string(summaryInfo.subdirs);
(*attr->mutable_xattr())[XATTRENTRIES] =
std::to_string(summaryInfo.entries);
(*attr->mutable_xattr())[XATTRFBYTES] =
std::to_string(summaryInfo.fbytes + attr->length());
}
return ret;
}
Expand Down
4 changes: 4 additions & 0 deletions curvefs/src/mds/fs_info_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ void FsInfoWrapper::AddMountPoint(const Mountpoint& mp) {
*p = mp;

fsInfo_.set_mountnum(fsInfo_.mountnum() + 1);

if (fsInfo_.enablesumindir() && fsInfo_.mountnum() > 1) {
fsInfo_.set_enablesumindir(false);
}
}

FSStatusCode FsInfoWrapper::DeleteMountPoint(const Mountpoint& mp) {
Expand Down
10 changes: 10 additions & 0 deletions curvefs/src/mds/fs_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,16 @@ void FsManager::RefreshSession(const RefreshSessionRequest* request,

// update this client's alive time
UpdateClientAliveTime(request->mountpoint(), request->fsname());
FsInfoWrapper wrapper;
FSStatusCode ret = fsStorage_->Get(request->fsname(), &wrapper);
if (ret != FSStatusCode::OK) {
LOG(WARNING) << "GetFsInfo fail, get fs fail, fsName = "
<< request->fsname()
<< ", errCode = " << FSStatusCode_Name(ret);
return;
}

response->set_enablesumindir(wrapper.ProtoFsInfo().enablesumindir());
}

FSStatusCode FsManager::ReloadMountedFsVolumeSpace() {
Expand Down
14 changes: 10 additions & 4 deletions curvefs/test/client/lease/lease_executor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,19 @@ TEST_F(LeaseExecutorTest, test_start) {
{
LOG(INFO) << "### case1: invalid lease time ###";
opt_.leaseTimeUs = 0;
LeaseExecutor exec(opt_, metaCache_, mdsCli_);
std::atomic<bool> enableSumInDir;
enableSumInDir.store(true);
LeaseExecutor exec(opt_, metaCache_, mdsCli_, &enableSumInDir);
ASSERT_FALSE(exec.Start());
}

{
LOG(INFO) << "### case2: invalid refresh times per lease ###";
opt_.refreshTimesPerLease = 0;
opt_.leaseTimeUs = 20;
LeaseExecutor exec(opt_, metaCache_, mdsCli_);
std::atomic<bool> enableSumInDir;
enableSumInDir.store(true);
LeaseExecutor exec(opt_, metaCache_, mdsCli_, &enableSumInDir);
ASSERT_FALSE(exec.Start());
}
}
Expand All @@ -85,15 +89,17 @@ TEST_F(LeaseExecutorTest, test_start_stop) {
EXPECT_CALL(*metaCache_, GetAllTxIds(_))
.WillOnce(SetArgPointee<0>(std::vector<PartitionTxId>{}))
.WillRepeatedly(SetArgPointee<0>(txIds));
EXPECT_CALL(*mdsCli_, RefreshSession(_, _, _, _))
EXPECT_CALL(*mdsCli_, RefreshSession(_, _, _, _, _))
.WillOnce(Return(FSStatusCode::UNKNOWN_ERROR))
.WillRepeatedly(
DoAll(SetArgPointee<1>(txIds), Return(FSStatusCode::OK)));
EXPECT_CALL(*metaCache_, SetTxId(1, 2))
.Times(AtLeast(1));

// lease executor start
LeaseExecutor exec(opt_, metaCache_, mdsCli_);
std::atomic<bool> enableSumInDir;
enableSumInDir.store(true);
LeaseExecutor exec(opt_, metaCache_, mdsCli_, &enableSumInDir);
ASSERT_TRUE(exec.Start());

std::this_thread::sleep_for(std::chrono::milliseconds(200));
Expand Down
10 changes: 6 additions & 4 deletions curvefs/test/client/rpcclient/mds_client_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <google/protobuf/util/message_differencer.h>
#include <gtest/gtest.h>
#include <cstdint>
#include <atomic>

#include "curvefs/src/client/rpcclient/mds_client.h"
#include "curvefs/test/client/rpcclient/mock_mds_base_client.h"
Expand Down Expand Up @@ -837,7 +838,7 @@ TEST_F(MdsClientImplTest, RefreshSession) {

// out
std::vector<PartitionTxId> out;

std::atomic<bool>* enableSumInDir = new std::atomic<bool> (true);
RefreshSessionResponse response;

{
Expand All @@ -846,7 +847,7 @@ TEST_F(MdsClientImplTest, RefreshSession) {
EXPECT_CALL(mockmdsbasecli_, RefreshSession(_, _, _, _))
.WillOnce(SetArgPointee<1>(response));
ASSERT_FALSE(mdsclient_.RefreshSession(txIds, &out,
fsName, mountpoint));
fsName, mountpoint, enableSumInDir));
ASSERT_TRUE(out.empty());
}

Expand All @@ -857,7 +858,7 @@ TEST_F(MdsClientImplTest, RefreshSession) {
EXPECT_CALL(mockmdsbasecli_, RefreshSession(_, _, _, _))
.WillOnce(SetArgPointee<1>(response));
ASSERT_FALSE(mdsclient_.RefreshSession(txIds, &out,
fsName, mountpoint));
fsName, mountpoint, enableSumInDir));
ASSERT_EQ(1, out.size());
ASSERT_TRUE(
google::protobuf::util::MessageDifferencer::Equals(out[0], tmp))
Expand All @@ -873,7 +874,8 @@ TEST_F(MdsClientImplTest, RefreshSession) {
EXPECT_CALL(mockmdsbasecli_, RefreshSession(_, _, _, _))
.WillRepeatedly(Invoke(RefreshSessionRpcFailed));
ASSERT_EQ(FSStatusCode::RPC_ERROR,
mdsclient_.RefreshSession(txIds, &out, fsName, mountpoint));
mdsclient_.RefreshSession(txIds, &out, fsName, mountpoint,
enableSumInDir));
}
}

Expand Down
Loading