Skip to content

Commit

Permalink
fix crash raftLock, enable balance by experimental (vesoft-inc#4088)
Browse files Browse the repository at this point in the history
Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>
  • Loading branch information
critical27 and Sophie-Xie authored Mar 30, 2022
1 parent d2da606 commit 28c24fe
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 79 deletions.
4 changes: 3 additions & 1 deletion src/graph/validator/AdminJobValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ namespace graph {
Status AdminJobValidator::validateImpl() {
if (sentence_->getJobType() == meta::cpp2::JobType::DATA_BALANCE ||
sentence_->getJobType() == meta::cpp2::JobType::ZONE_BALANCE) {
return Status::SemanticError("Data balance not support");
if (!FLAGS_enable_experimental_feature) {
return Status::SemanticError("Data balance not support");
}
}

// Note: The last parameter of paras is no longer spacename
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/Listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void Listener::cleanWal() {
}

std::tuple<nebula::cpp2::ErrorCode, LogID, TermID> Listener::commitLogs(
std::unique_ptr<LogIterator> iter, bool) {
std::unique_ptr<LogIterator> iter, bool, bool) {
LogID lastId = kNoCommitLogId;
TermID lastTerm = kNoCommitLogTerm;
while (iter->valid()) {
Expand Down
1 change: 1 addition & 0 deletions src/kvstore/Listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ class Listener : public raftex::RaftPart {
* @return std::tuple<SUCCEED, last log id, last log term>
*/
std::tuple<nebula::cpp2::ErrorCode, LogID, TermID> commitLogs(std::unique_ptr<LogIterator>,
bool,
bool) override;

/**
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ void NebulaStore::loadPartFromDataPath() {
}

if (raftPeer.status == Peer::Status::kLearner) {
part->addLearner(addr);
part->addLearner(addr, true);
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ void Part::onDiscoverNewLeader(HostAddr nLeader) {
}

std::tuple<nebula::cpp2::ErrorCode, LogID, TermID> Part::commitLogs(
std::unique_ptr<LogIterator> iter, bool wait) {
std::unique_ptr<LogIterator> iter, bool wait, bool needLock) {
SCOPED_TIMER(&execTime_);
auto batch = engine_->startBatchWrite();
LogID lastId = kNoCommitLogId;
Expand Down Expand Up @@ -309,12 +309,12 @@ std::tuple<nebula::cpp2::ErrorCode, LogID, TermID> Part::commitLogs(
}
case OP_TRANS_LEADER: {
auto newLeader = decodeHost(OP_TRANS_LEADER, log);
commitTransLeader(newLeader);
commitTransLeader(newLeader, needLock);
break;
}
case OP_REMOVE_PEER: {
auto peer = decodeHost(OP_REMOVE_PEER, log);
commitRemovePeer(peer);
commitRemovePeer(peer, needLock);
break;
}
default: {
Expand Down Expand Up @@ -395,7 +395,7 @@ bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, const
case OP_ADD_LEARNER: {
auto learner = decodeHost(OP_ADD_LEARNER, log);
LOG(INFO) << idStr_ << "preprocess add learner " << learner;
addLearner(learner);
addLearner(learner, false);
// persist the part learner info in case of storaged restarting
engine_->updatePart(partId_, Peer(learner, Peer::Status::kLearner));
break;
Expand Down
6 changes: 5 additions & 1 deletion src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,15 @@ class Part : public raftex::RaftPart {
*
* @param iter Wal log iterator
* @param wait Whether we should until all data applied to state machine
* @param needLock Whether need to acquire raftLock_ before operations. When the raftLock_ has
* been acquired before commitLogs is invoked, needLock is false (e.g. commitLogs by follower). If
* the lock has not been acquired, needLock is true (e.g. commitLogs by leader).
* @return std::tuple<nebula::cpp2::ErrorCode, LogID, TermID>
*
*/
std::tuple<nebula::cpp2::ErrorCode, LogID, TermID> commitLogs(std::unique_ptr<LogIterator> iter,
bool wait) override;
bool wait,
bool needLock) override;

/**
* @brief Some special log need to be pre-processed when appending to wal
Expand Down
138 changes: 76 additions & 62 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,28 +383,27 @@ nebula::cpp2::ErrorCode RaftPart::canAppendLogs(TermID termId) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

void RaftPart::addLearner(const HostAddr& addr) {
bool acquiredHere = raftLock_.try_lock(); // because addLearner may be called in the
// NebulaStore, in which we could not access raftLock_
if (addr == addr_) {
if (acquiredHere) {
raftLock_.unlock();
void RaftPart::addLearner(const HostAddr& addr, bool needLock) {
auto addLearner = [&] {
if (addr == addr_) {
VLOG(1) << idStr_ << "I am learner!";
return;
}
VLOG(1) << idStr_ << "I am learner!";
return;
}
auto it = std::find_if(
hosts_.begin(), hosts_.end(), [&addr](const auto& h) { return h->address() == addr; });
if (it == hosts_.end()) {
hosts_.emplace_back(std::make_shared<Host>(addr, shared_from_this(), true));
VLOG(1) << idStr_ << "Add learner " << addr;
auto it = std::find_if(
hosts_.begin(), hosts_.end(), [&addr](const auto& h) { return h->address() == addr; });
if (it == hosts_.end()) {
hosts_.emplace_back(std::make_shared<Host>(addr, shared_from_this(), true));
VLOG(1) << idStr_ << "Add learner " << addr;
} else {
VLOG(1) << idStr_ << "The host " << addr << " has been existed as "
<< ((*it)->isLearner() ? " learner " : " group member");
}
};
if (needLock) {
std::lock_guard<std::mutex> guard(raftLock_);
addLearner();
} else {
VLOG(1) << idStr_ << "The host " << addr << " has been existed as "
<< ((*it)->isLearner() ? " learner " : " group member");
}

if (acquiredHere) {
raftLock_.unlock();
addLearner();
}
}

Expand Down Expand Up @@ -437,40 +436,44 @@ void RaftPart::preProcessTransLeader(const HostAddr& target) {
}
}

void RaftPart::commitTransLeader(const HostAddr& target) {
bool needToUnlock = raftLock_.try_lock();
VLOG(1) << idStr_ << "Commit transfer leader to " << target;
switch (role_) {
case Role::LEADER: {
if (target != addr_ && !hosts_.empty()) {
auto iter = std::find_if(
hosts_.begin(), hosts_.end(), [](const auto& h) { return !h->isLearner(); });
if (iter != hosts_.end()) {
lastMsgRecvDur_.reset();
role_ = Role::FOLLOWER;
leader_ = HostAddr("", 0);
for (auto& host : hosts_) {
host->pause();
void RaftPart::commitTransLeader(const HostAddr& target, bool needLock) {
auto transfer = [&] {
VLOG(1) << idStr_ << "Commit transfer leader to " << target;
switch (role_) {
case Role::LEADER: {
if (target != addr_ && !hosts_.empty()) {
auto iter = std::find_if(
hosts_.begin(), hosts_.end(), [](const auto& h) { return !h->isLearner(); });
if (iter != hosts_.end()) {
lastMsgRecvDur_.reset();
role_ = Role::FOLLOWER;
leader_ = HostAddr("", 0);
for (auto& host : hosts_) {
host->pause();
}
VLOG(1) << idStr_ << "Give up my leadership!";
}
VLOG(1) << idStr_ << "Give up my leadership!";
} else {
VLOG(1) << idStr_ << "I am already the leader!";
}
} else {
VLOG(1) << idStr_ << "I am already the leader!";
break;
}
case Role::FOLLOWER:
case Role::CANDIDATE: {
VLOG(1) << idStr_ << "I am " << roleStr(role_) << ", just wait for the new leader!";
break;
}
case Role::LEARNER: {
VLOG(1) << idStr_ << "I am learner, not in the raft group, skip the log";
break;
}
break;
}
case Role::FOLLOWER:
case Role::CANDIDATE: {
VLOG(1) << idStr_ << "I am " << roleStr(role_) << ", just wait for the new leader!";
break;
}
case Role::LEARNER: {
VLOG(1) << idStr_ << "I am learner, not in the raft group, skip the log";
break;
}
}
if (needToUnlock) {
raftLock_.unlock();
};
if (needLock) {
std::lock_guard<std::mutex> guard(raftLock_);
transfer();
} else {
transfer();
}
}

Expand Down Expand Up @@ -596,19 +599,21 @@ void RaftPart::preProcessRemovePeer(const HostAddr& peer) {
removePeer(peer);
}

void RaftPart::commitRemovePeer(const HostAddr& peer) {
bool needToUnlock = raftLock_.try_lock();
SCOPE_EXIT {
if (needToUnlock) {
raftLock_.unlock();
void RaftPart::commitRemovePeer(const HostAddr& peer, bool needLock) {
auto remove = [&] {
if (role_ == Role::FOLLOWER || role_ == Role::LEARNER) {
VLOG(1) << idStr_ << "I am " << roleStr(role_) << ", skip remove peer in commit";
return;
}
CHECK(Role::LEADER == role_);
removePeer(peer);
};
if (role_ == Role::FOLLOWER || role_ == Role::LEARNER) {
VLOG(1) << idStr_ << "I am " << roleStr(role_) << ", skip remove peer in commit";
return;
if (needLock) {
std::lock_guard<std::mutex> guard(raftLock_);
remove();
} else {
remove();
}
CHECK(Role::LEADER == role_);
removePeer(peer);
}

folly::Future<nebula::cpp2::ErrorCode> RaftPart::appendAsync(ClusterID source, std::string log) {
Expand Down Expand Up @@ -932,7 +937,14 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps,
{
auto walIt = wal_->iterator(committedId + 1, lastLogId);
// Step 3: Commit the batch
auto [code, lastCommitId, lastCommitTerm] = commitLogs(std::move(walIt), true);
/*
As for leader, we did't acquire raftLock because it would block heartbeat. Instead, we
protect the partition by the logsLock_, there won't be another out-going logs. So the third
parameters need to be true, we would grab the lock for some special operations. Besides,
leader neet to wait all logs applied to state machine, so the second parameters need to be
true so the second parameters need to be true.
*/
auto [code, lastCommitId, lastCommitTerm] = commitLogs(std::move(walIt), true, true);
if (code == nebula::cpp2::ErrorCode::SUCCEEDED) {
stats::StatsManager::addValue(kCommitLogLatencyUs, execTime_);
std::lock_guard<std::mutex> g(raftLock_);
Expand Down Expand Up @@ -1664,7 +1676,9 @@ void RaftPart::processAppendLogRequest(const cpp2::AppendLogRequest& req,
CHECK_LE(lastLogIdCanCommit, wal_->lastLogId());
if (lastLogIdCanCommit > committedLogId_) {
auto walIt = wal_->iterator(committedLogId_ + 1, lastLogIdCanCommit);
auto [code, lastCommitId, lastCommitTerm] = commitLogs(std::move(walIt), false);
// follower do not wait all logs applied to state machine, so second parameter is false. And the
// raftLock_ has been acquired, so the third parameter is false as well.
auto [code, lastCommitId, lastCommitTerm] = commitLogs(std::move(walIt), false, false);
if (code == nebula::cpp2::ErrorCode::SUCCEEDED) {
stats::StatsManager::addValue(kCommitLogLatencyUs, execTime_);
VLOG(4) << idStr_ << "Follower succeeded committing log " << committedLogId_ + 1 << " to "
Expand Down
14 changes: 10 additions & 4 deletions src/kvstore/raftex/RaftPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,17 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
* @brief Add a raft learner to its peers
*
* @param learner Learner address
* @param needLock Whether need to acquire lock in the function
*/
void addLearner(const HostAddr& learner);
void addLearner(const HostAddr& learner, bool needLock);

/**
* @brief When commit to state machine, old leader will step down as follower
*
* @param target Target new leader
* @param needLock Whether need to acquire lock in the function
*/
void commitTransLeader(const HostAddr& target);
void commitTransLeader(const HostAddr& target, bool needLock);

/**
* @brief Pre-process of transfer leader, target new leader will start election task to background
Expand All @@ -207,8 +209,9 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
* preProcessRemovePeer, leader will remove in commitRemovePeer
*
* @param peer Target peer to remove
* @param needLock Whether need to acquire lock in the function
*/
void commitRemovePeer(const HostAddr& peer);
void commitRemovePeer(const HostAddr& peer, bool needLock);

// All learner and listener are raft learner. The difference between listener and learner is that
// learner could be promoted to follower, but listener could not. (learner are added to hosts_,
Expand Down Expand Up @@ -491,13 +494,16 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
*
* @param iter Log iterator of all logs to commit
* @param wait Whether wait until all logs has been applied to state machine
* @param needLock Whether need to acquire raftLock_ before operations. When the raftLock_ has
* been acquired before commitLogs is invoked, needLock is false (e.g. commitLogs by follower). If
* the lock has not been acquired, needLock is true (e.g. commitLogs by leader).
* @return std::tuple<nebula::cpp2::ErrorCode, LogID, TermID>
* Return {error code, last commit log id, last commit log term}. When no logs applied to state
* machine or error occurs when calling commitLogs, kNoCommitLogId and kNoCommitLogTerm are
* returned.
*/
virtual std::tuple<nebula::cpp2::ErrorCode, LogID, TermID> commitLogs(
std::unique_ptr<LogIterator> iter, bool wait) = 0;
std::unique_ptr<LogIterator> iter, bool wait, bool needLock) = 0;

/**
* @brief A interface to pre-process wal, mainly for membership change
Expand Down
7 changes: 4 additions & 3 deletions src/kvstore/raftex/test/TestShard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ void TestShard::onLeaderReady(TermID term) {
}

std::tuple<nebula::cpp2::ErrorCode, LogID, TermID> TestShard::commitLogs(
std::unique_ptr<LogIterator> iter, bool) {
std::unique_ptr<LogIterator> iter, bool wait, bool needLock) {
UNUSED(wait);
LogID lastId = kNoCommitLogId;
TermID lastTerm = kNoCommitLogTerm;
int32_t commitLogsNum = 0;
Expand All @@ -178,12 +179,12 @@ std::tuple<nebula::cpp2::ErrorCode, LogID, TermID> TestShard::commitLogs(
switch (static_cast<CommandType>(log[0])) {
case CommandType::TRANSFER_LEADER: {
auto nLeader = decodeTransferLeader(log);
commitTransLeader(nLeader);
commitTransLeader(nLeader, needLock);
break;
}
case CommandType::REMOVE_PEER: {
auto peer = decodeRemovePeer(log);
commitRemovePeer(peer);
commitRemovePeer(peer, needLock);
break;
}
case CommandType::ADD_PEER:
Expand Down
5 changes: 3 additions & 2 deletions src/kvstore/raftex/test/TestShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,15 @@ class TestShard : public RaftPart {
void onDiscoverNewLeader(HostAddr) override {}

std::tuple<nebula::cpp2::ErrorCode, LogID, TermID> commitLogs(std::unique_ptr<LogIterator> iter,
bool wait) override;
bool wait,
bool needLock) override;

bool preProcessLog(LogID, TermID, ClusterID, const std::string& log) override {
if (!log.empty()) {
switch (static_cast<CommandType>(log[0])) {
case CommandType::ADD_LEARNER: {
auto learner = decodeLearner(log);
addLearner(learner);
addLearner(learner, false);
LOG(INFO) << idStr_ << "Add learner " << learner;
break;
}
Expand Down

0 comments on commit 28c24fe

Please sign in to comment.