Skip to content

Commit

Permalink
lease read, check has committed logs in this term, modify collectN ev…
Browse files Browse the repository at this point in the history
…al (vesoft-inc#1756)

Some changes during jepsen test
 * Add leader lease check, if leader has not send message successfully in a lease, reject read request.
 * collect the result when eval is false in collectN, so we can handle the resp in processElectionResponses and processAppendLogRequest.
  • Loading branch information
critical27 authored Mar 6, 2020
1 parent 9f8fad2 commit 10b8f58
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 25 deletions.
9 changes: 6 additions & 3 deletions src/common/base/CollectNSucceeded.inl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ folly::Future<SucceededResultList<FutureIter>> collectNSucceeded(
ResultEval eval;
Result results;
std::atomic<size_t> numCompleted = {0};
std::atomic<size_t> nSucceeded = {0};
folly::Promise<Result> promise;
size_t nTotal;
};
Expand All @@ -51,11 +52,13 @@ folly::Future<SucceededResultList<FutureIter>> collectNSucceeded(
first->setCallback_([n, ctx, index] (
folly::Try<FutureReturnType<FutureIter>>&& t) {
if (!ctx->promise.isFulfilled()) {
if (!t.hasException() && ctx->eval(index, t.value())) {
if (!t.hasException()) {
if (ctx->eval(index, t.value())) {
++ctx->nSucceeded;
}
ctx->results.emplace_back(index, std::move(t.value()));
}
if ((++ctx->numCompleted) == ctx->nTotal ||
ctx->results.size() == n) {
if ((++ctx->numCompleted) == ctx->nTotal || ctx->nSucceeded == n) {
// Done
VLOG(2) << "Set Value [completed="
<< ctx->numCompleted
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@ int32_t NebulaStore::allLeader(std::unordered_map<GraphSpaceID,
}

bool NebulaStore::checkLeader(std::shared_ptr<Part> part) const {
return !FLAGS_check_leader || part->isLeader();
return !FLAGS_check_leader || (part->isLeader() && part->leaseValid());
}


Expand Down
5 changes: 3 additions & 2 deletions src/kvstore/raftex/Host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ cpp2::ErrorCode Host::checkStatus() const {


folly::Future<cpp2::AskForVoteResponse> Host::askForVote(
const cpp2::AskForVoteRequest& req) {
const cpp2::AskForVoteRequest& req,
folly::EventBase* eb) {
{
std::lock_guard<std::mutex> g(lock_);
auto res = checkStatus();
Expand All @@ -79,7 +80,7 @@ folly::Future<cpp2::AskForVoteResponse> Host::askForVote(
return resp;
}
}
auto client = tcManager().client(addr_);
auto client = tcManager().client(addr_, eb, false, FLAGS_raft_rpc_timeout_ms);
return client->future_askForVote(req);
}

Expand Down
3 changes: 2 additions & 1 deletion src/kvstore/raftex/Host.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class Host final : public std::enable_shared_from_this<Host> {
}

folly::Future<cpp2::AskForVoteResponse> askForVote(
const cpp2::AskForVoteRequest& req);
const cpp2::AskForVoteRequest& req,
folly::EventBase* eb);

// When logId == lastLogIdSent, it is a heartbeat
folly::Future<cpp2::AppendLogResponse> appendLogs(
Expand Down
64 changes: 49 additions & 15 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ DEFINE_int32(wal_buffer_size, 8 * 1024 * 1024, "Default wal buffer size");
DEFINE_int32(wal_buffer_num, 2, "Default wal buffer number");
DEFINE_bool(trace_raft, false, "Enable trace one raft request");

DEFINE_bool(has_leader_lease, true, "If set to true, the leader only can read when "
"its lease is valid. If set to false, always valid");


namespace nebula {
namespace raftex {
Expand Down Expand Up @@ -764,6 +767,7 @@ void RaftPart::replicateLogs(folly::EventBase* eb,

VLOG(2) << idStr_ << "About to replicate logs to all peer hosts";

lastMsgSentDur_.reset();
SlowOpTracker tracker;
collectNSucceeded(
gen::from(hosts)
Expand Down Expand Up @@ -872,8 +876,6 @@ void RaftPart::processAppendLogResponses(
lastLogId_ = lastLogId;
lastLogTerm_ = currTerm;

lastMsgSentDur_.reset();

auto walIt = wal_->iterator(committedId + 1, lastLogId);
SlowOpTracker tracker;
// Step 3: Commit the batch
Expand All @@ -889,6 +891,9 @@ void RaftPart::processAppendLogResponses(
}
VLOG(2) << idStr_ << "Leader succeeded in committing the logs "
<< committedId + 1 << " to " << lastLogId;

lastMsgAcceptedCostMs_ = lastMsgSentDur_.elapsedInMSec();
lastMsgAcceptedTime_ = time::WallClock::fastNowInMilliSec();
} while (false);

if (!checkAppendLogResult(res)) {
Expand Down Expand Up @@ -963,7 +968,8 @@ bool RaftPart::needToSendHeartbeat() {
std::lock_guard<std::mutex> g(raftLock_);
return status_ == Status::RUNNING &&
role_ == Role::LEADER &&
lastMsgSentDur_.elapsedInSec() >= FLAGS_raft_heartbeat_interval_secs * 2 / 5;
time::WallClock::fastNowInMilliSec() - lastMsgAcceptedTime_ >=
FLAGS_raft_heartbeat_interval_secs * 1000 * 2 / 5;
}


Expand Down Expand Up @@ -1032,7 +1038,8 @@ bool RaftPart::prepareElectionRequest(


typename RaftPart::Role RaftPart::processElectionResponses(
const RaftPart::ElectionResponses& results) {
const RaftPart::ElectionResponses& results,
std::vector<std::shared_ptr<Host>> hosts) {
std::lock_guard<std::mutex> g(raftLock_);

if (UNLIKELY(status_ == Status::STOPPED)) {
Expand Down Expand Up @@ -1062,6 +1069,11 @@ typename RaftPart::Role RaftPart::processElectionResponses(
for (auto& r : results) {
if (r.second.get_error_code() == cpp2::ErrorCode::SUCCEEDED) {
++numSucceeded;
} else if (r.second.get_error_code() == cpp2::ErrorCode::E_LOG_STALE) {
LOG(INFO) << idStr_ << "My last log id is less than " << hosts[r.first]->address()
<< ", double my election interval.";
uint64_t curWeight = weight_.load();
weight_.store(curWeight * 2);
}
}

Expand Down Expand Up @@ -1124,22 +1136,16 @@ bool RaftPart::leaderElection() {
<< host->idStr();
return via(
eb,
[&voteReq, &host] ()
[&voteReq, &host, eb] ()
-> Future<cpp2::AskForVoteResponse> {
return host->askForVote(voteReq);
return host->askForVote(voteReq, eb);
});
})
| gen::as<std::vector>(),
// Number of succeeded required
quorum_,
// Result evaluator
[hosts, this](size_t idx, cpp2::AskForVoteResponse& resp) {
if (resp.get_error_code() == cpp2::ErrorCode::E_LOG_STALE) {
LOG(INFO) << idStr_ << "My last log id is less than " << hosts[idx]->address()
<< ", double my election interval.";
uint64_t curWeight = weight_.load();
weight_.store(curWeight * 2);
}
[hosts] (size_t idx, cpp2::AskForVoteResponse& resp) {
return resp.get_error_code() == cpp2::ErrorCode::SUCCEEDED
&& !hosts[idx]->isLearner();
});
Expand All @@ -1157,7 +1163,7 @@ bool RaftPart::leaderElection() {
}

// Process the responses
switch (processElectionResponses(resps)) {
switch (processElectionResponses(resps, std::move(hosts))) {
case Role::LEADER: {
// Elected
LOG(INFO) << idStr_
Expand All @@ -1170,6 +1176,7 @@ bool RaftPart::leaderElection() {
term = voteReq.get_term()] {
self->onElected(term);
});
lastMsgAcceptedTime_ = 0;
}
}
weight_ = 1;
Expand Down Expand Up @@ -1306,6 +1313,15 @@ void RaftPart::processAskForVoteRequest(
return;
}

auto candidate = HostAddr(req.get_candidate_ip(), req.get_candidate_port());
if (role_ == Role::FOLLOWER && leader_ != std::make_pair(0, 0) && leader_ != candidate &&
lastMsgRecvDur_.elapsedInMSec() < FLAGS_raft_heartbeat_interval_secs * 1000) {
LOG(INFO) << idStr_ << "I believe the leader exists. "
<< "Refuse to vote for " << candidate;
resp.set_error_code(cpp2::ErrorCode::E_WRONG_LEADER);
return;
}

// Check term id
auto term = role_ == Role::CANDIDATE ? proposedTerm_ : term_;
if (req.get_term() <= term) {
Expand Down Expand Up @@ -1344,7 +1360,6 @@ void RaftPart::processAskForVoteRequest(
}
}

auto candidate = HostAddr(req.get_candidate_ip(), req.get_candidate_port());
auto hosts = followers();
auto it = std::find_if(hosts.begin(), hosts.end(), [&candidate] (const auto& h){
return h->address() == candidate;
Expand Down Expand Up @@ -1614,6 +1629,14 @@ cpp2::ErrorCode RaftPart::verifyLeader(
VLOG(2) << idStr_ << "The candidate leader " << candidate << " is not my peers";
return cpp2::ErrorCode::E_WRONG_LEADER;
}

if (role_ == Role::FOLLOWER && leader_ != std::make_pair(0, 0) && leader_ != candidate &&
lastMsgRecvDur_.elapsedInMSec() < FLAGS_raft_heartbeat_interval_secs * 1000) {
LOG(INFO) << idStr_ << "I believe the leader " << leader_ << " exists. "
<< "Refuse to append logs of " << candidate;
return cpp2::ErrorCode::E_WRONG_LEADER;
}

VLOG(2) << idStr_ << "The current role is " << roleStr(role_);
switch (role_) {
case Role::LEARNER:
Expand Down Expand Up @@ -1854,6 +1877,17 @@ void RaftPart::checkAndResetPeers(const std::vector<HostAddr>& peers) {
}
}

bool RaftPart::leaseValid() {
if (!FLAGS_has_leader_lease) {
return true;
}
// When majority has accepted a log, leader obtains a lease which last for heartbeat.
// However, we need to take off the net io time. On the left side of the inequality is
// the time duration since last time leader send a log (the log has been accepted as well)
return time::WallClock::fastNowInMilliSec() - lastMsgAcceptedTime_
< FLAGS_raft_heartbeat_interval_secs * 1000 - lastMsgAcceptedCostMs_;
}

} // namespace raftex
} // namespace nebula

12 changes: 9 additions & 3 deletions src/kvstore/raftex/RaftPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
const cpp2::SendSnapshotRequest& req,
cpp2::SendSnapshotResponse& resp);

bool leaseValid();

protected:
// Protected constructor to prevent from instantiating directly
RaftPart(ClusterID clusterId,
Expand Down Expand Up @@ -344,7 +346,8 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
std::vector<std::shared_ptr<Host>>& hosts);

// The method returns the partition's role after the election
Role processElectionResponses(const ElectionResponses& results);
Role processElectionResponses(const ElectionResponses& results,
std::vector<std::shared_ptr<Host>> hosts);

// Check whether new logs can be appended
// Pre-condition: The caller needs to hold the raftLock_
Expand Down Expand Up @@ -507,9 +510,12 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {

// To record how long ago when the last leader message received
time::Duration lastMsgRecvDur_;
// To record how long ago when the last log message or heartbeat
// was sent
// To record how long ago when the last log message or heartbeat was sent
time::Duration lastMsgSentDur_;
// To record when the last message was accepted by majority peers
uint64_t lastMsgAcceptedTime_{0};
// How long between last message was sent and was accepted by majority peers
uint64_t lastMsgAcceptedCostMs_{0};

// Write-ahead Log
std::shared_ptr<wal::FileBasedWal> wal_;
Expand Down

0 comments on commit 10b8f58

Please sign in to comment.