Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend CAS in Raft #800

Merged
merged 2 commits into from
Aug 21, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Extend CAS in Raft
  • Loading branch information
heng committed Aug 19, 2019
commit 92b7821431d87f879c8b3857af7739fbde9f5b11
6 changes: 6 additions & 0 deletions src/kvstore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "base/Base.h"
#include <rocksdb/merge_operator.h>
#include <rocksdb/compaction_filter.h>
#include "kvstore/raftex/RaftPart.h"
#include "kvstore/Common.h"
#include "kvstore/KVIterator.h"
#include "kvstore/PartManager.h"
Expand Down Expand Up @@ -136,6 +137,11 @@ class KVStore {
const std::string& prefix,
KVCallback cb) = 0;

virtual void asyncAtomicOp(GraphSpaceID spaceId,
PartitionID partId,
raftex::AtomicOp op,
KVCallback cb) = 0;

virtual ResultCode ingest(GraphSpaceID spaceId) = 0;

virtual ErrorOr<ResultCode, std::shared_ptr<Part>> part(GraphSpaceID spaceId,
Expand Down
13 changes: 13 additions & 0 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,19 @@ void NebulaStore::asyncRemovePrefix(GraphSpaceID spaceId,
part->asyncRemovePrefix(prefix, std::move(cb));
}

void NebulaStore::asyncAtomicOp(GraphSpaceID spaceId,
PartitionID partId,
raftex::AtomicOp op,
KVCallback cb) {
auto ret = part(spaceId, partId);
if (!ok(ret)) {
cb(error(ret));
return;
}
auto part = nebula::value(ret);
part->asyncAtomicOp(std::move(op), std::move(cb));
}

ErrorOr<ResultCode, std::shared_ptr<Part>> NebulaStore::part(GraphSpaceID spaceId,
PartitionID partId) {
folly::RWSpinLock::ReadHolder rh(&lock_);
Expand Down
5 changes: 5 additions & 0 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ class NebulaStore : public KVStore, public Handler {
const std::string& prefix,
KVCallback cb) override;

void asyncAtomicOp(GraphSpaceID spaceId,
PartitionID partId,
raftex::AtomicOp op,
KVCallback cb) override;

ErrorOr<ResultCode, std::shared_ptr<Part>> part(GraphSpaceID spaceId,
PartitionID partId) override;

Expand Down
13 changes: 6 additions & 7 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ void Part::asyncRemoveRange(folly::StringPiece start,
});
}

void Part::asyncAtomicOp(raftex::AtomicOp op, KVCallback cb) {
atomicOpAsync(std::move(op)).then([callback = std::move(cb)] (AppendLogResult res) mutable {
callback(toResultCode(res));
});
}

void Part::asyncAddLearner(const HostAddr& learner, KVCallback cb) {
std::string log = encodeLearner(learner);
sendCommandAsync(std::move(log))
Expand All @@ -153,13 +159,6 @@ void Part::onElected(TermID term) {
VLOG(1) << "Being elected as the leader for the term " << term;
}


std::string Part::compareAndSet(const std::string& log) {
UNUSED(log);
LOG(FATAL) << "To be implemented";
}


bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
auto batch = engine_->startBatchWrite();
LogID lastId = -1;
Expand Down
4 changes: 2 additions & 2 deletions src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class Part : public raftex::RaftPart {
folly::StringPiece end,
KVCallback cb);

void asyncAtomicOp(raftex::AtomicOp op, KVCallback cb);

void asyncAddLearner(const HostAddr& learner, KVCallback cb);
/**
* Methods inherited from RaftPart
Expand All @@ -56,8 +58,6 @@ class Part : public raftex::RaftPart {

void onElected(TermID term) override;

std::string compareAndSet(const std::string& log) override;

bool commitLogs(std::unique_ptr<LogIterator> iter) override;

bool preProcessLog(LogID logId,
Expand Down
7 changes: 7 additions & 0 deletions src/kvstore/plugins/hbase/HBaseStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ class HBaseStore : public KVStore {
const std::string& prefix,
KVCallback cb) override;

void asyncAtomicOp(GraphSpaceID,
PartitionID,
raftex::AtomicOp,
KVCallback) override {
LOG(FATAL) << "Not supportted yet!";
}

ResultCode ingest(GraphSpaceID spaceId) override;

ErrorOr<ResultCode, std::shared_ptr<Part>> part(GraphSpaceID,
Expand Down
96 changes: 49 additions & 47 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ class AppendLogsIterator final : public LogIterator {
AppendLogsIterator(LogID firstLogId,
TermID termId,
RaftPart::LogCache logs,
std::function<std::string(const std::string&)> casCB)
folly::Function<std::string(AtomicOp op)> opCB)
: firstLogId_(firstLogId)
, termId_(termId)
, logId_(firstLogId)
, logs_(std::move(logs))
, casCB_(std::move(casCB)) {
leadByCAS_ = processCAS();
, opCB_(std::move(opCB)) {
leadByAtomicOp_ = processAtomicOp();
valid_ = idx_ < logs_.size();
hasNonCASLogs_ = !leadByCAS_ && valid_;
hasNonAtomicOpLogs_ = !leadByAtomicOp_ && valid_;
if (valid_) {
currLogType_ = lastLogType_ = logType();
}
Expand All @@ -66,36 +66,36 @@ class AppendLogsIterator final : public LogIterator {
AppendLogsIterator& operator=(const AppendLogsIterator&) = delete;
AppendLogsIterator& operator=(AppendLogsIterator&&) = default;

bool leadByCAS() const {
return leadByCAS_;
bool leadByAtomicOp() const {
return leadByAtomicOp_;
}

bool hasNonCASLogs() const {
return hasNonCASLogs_;
bool hasNonAtomicOpLogs() const {
return hasNonAtomicOpLogs_;
}

LogID firstLogId() const {
return firstLogId_;
}

// Return true if the current log is a CAS, otherwise return false
bool processCAS() {
// Return true if the current log is a AtomicOp, otherwise return false
bool processAtomicOp() {
while (idx_ < logs_.size()) {
auto& tup = logs_.at(idx_);
auto logType = std::get<1>(tup);
if (logType != LogType::CAS) {
// Not a CAS
if (logType != LogType::ATOMIC_OP) {
// Not a AtomicOp
return false;
}

// Process CAS log
CHECK(!!casCB_);
casResult_ = casCB_(std::get<2>(tup));
if (casResult_.size() > 0) {
// CAS Succeeded
// Process AtomicOp log
CHECK(!!opCB_);
opResult_ = opCB_(std::move(std::get<3>(tup)));
if (opResult_.size() > 0) {
// AtomicOp Succeeded
return true;
} else {
// CAS failed, move to the next log, but do not increment the logId_
// AtomicOp failed, move to the next log, but do not increment the logId_
++idx_;
}
}
Expand All @@ -109,9 +109,9 @@ class AppendLogsIterator final : public LogIterator {
++logId_;
if (idx_ < logs_.size()) {
currLogType_ = logType();
valid_ = currLogType_ != LogType::CAS;
valid_ = currLogType_ != LogType::ATOMIC_OP;
if (valid_) {
hasNonCASLogs_ = true;
hasNonAtomicOpLogs_ = true;
}
valid_ = valid_ && lastLogType_ != LogType::COMMAND;
lastLogType_ = currLogType_;
Expand All @@ -122,7 +122,7 @@ class AppendLogsIterator final : public LogIterator {
}

// The iterator becomes invalid when exhausting the logs
// **OR** running into a CAS log
// **OR** running into a AtomicOp log
bool valid() const override {
return valid_;
}
Expand All @@ -143,8 +143,8 @@ class AppendLogsIterator final : public LogIterator {

folly::StringPiece logMsg() const override {
DCHECK(valid());
if (currLogType_ == LogType::CAS) {
return casResult_;
if (currLogType_ == LogType::ATOMIC_OP) {
return opResult_;
} else {
return std::get<2>(logs_.at(idx_));
}
Expand All @@ -159,9 +159,9 @@ class AppendLogsIterator final : public LogIterator {
void resume() {
CHECK(!valid_);
if (!empty()) {
leadByCAS_ = processCAS();
leadByAtomicOp_ = processAtomicOp();
valid_ = idx_ < logs_.size();
hasNonCASLogs_ = !leadByCAS_ && valid_;
hasNonAtomicOpLogs_ = !leadByAtomicOp_ && valid_;
if (valid_) {
currLogType_ = lastLogType_ = logType();
}
Expand All @@ -174,17 +174,17 @@ class AppendLogsIterator final : public LogIterator {

private:
size_t idx_{0};
bool leadByCAS_{false};
bool hasNonCASLogs_{false};
bool leadByAtomicOp_{false};
bool hasNonAtomicOpLogs_{false};
bool valid_{true};
LogType lastLogType_{LogType::NORMAL};
LogType currLogType_{LogType::NORMAL};
std::string casResult_;
std::string opResult_;
LogID firstLogId_;
TermID termId_;
LogID logId_;
RaftPart::LogCache logs_;
std::function<std::string(const std::string&)> casCB_;
folly::Function<std::string(AtomicOp op)> opCB_;
};


Expand Down Expand Up @@ -379,8 +379,8 @@ folly::Future<AppendLogResult> RaftPart::appendAsync(ClusterID source,
}


folly::Future<AppendLogResult> RaftPart::casAsync(std::string log) {
return appendLogAsync(clusterId_, LogType::CAS, std::move(log));
folly::Future<AppendLogResult> RaftPart::atomicOpAsync(AtomicOp op) {
return appendLogAsync(clusterId_, LogType::ATOMIC_OP, "", std::move(op));
}

folly::Future<AppendLogResult> RaftPart::sendCommandAsync(std::string log) {
Expand All @@ -389,7 +389,8 @@ folly::Future<AppendLogResult> RaftPart::sendCommandAsync(std::string log) {

folly::Future<AppendLogResult> RaftPart::appendLogAsync(ClusterID source,
LogType logType,
std::string log) {
std::string log,
AtomicOp op) {
LogCache swappedOutLogs;
auto retFuture = folly::Future<AppendLogResult>::makeEmpty();

Expand Down Expand Up @@ -419,9 +420,9 @@ folly::Future<AppendLogResult> RaftPart::appendLogAsync(ClusterID source,

// Append new logs to the buffer
DCHECK_GE(source, 0);
logs_.emplace_back(source, logType, std::move(log));
logs_.emplace_back(source, logType, std::move(log), std::move(op));
switch (logType) {
case LogType::CAS:
case LogType::ATOMIC_OP:
retFuture = cachingPromise_.getSingleFuture();
break;
case LogType::COMMAND:
Expand Down Expand Up @@ -474,13 +475,14 @@ folly::Future<AppendLogResult> RaftPart::appendLogAsync(ClusterID source,
firstId,
termId,
std::move(swappedOutLogs),
[this] (const std::string& msg) -> std::string {
auto casRet = compareAndSet(msg);
if (casRet.empty()) {
[this] (AtomicOp opCB) -> std::string {
CHECK(opCB != nullptr);
auto opRet = opCB();
if (opRet.empty()) {
// Failed
sendingPromise_.setOneSingleValue(AppendLogResult::E_CAS_FAILURE);
sendingPromise_.setOneSingleValue(AppendLogResult::E_ATOMIC_OP_FAILURE);
}
return casRet;
return opRet;
});
appendLogsInternal(std::move(it), termId);

Expand All @@ -498,7 +500,7 @@ void RaftPart::appendLogsInternal(AppendLogsIterator iter, TermID termId) {
<< iter.logId() << " (Current term is "
<< currTerm << ")";
} else {
LOG(ERROR) << idStr_ << "Only happend when CAS failed";
LOG(ERROR) << idStr_ << "Only happend when Atomic op failed";
replicatingLogs_ = false;
return;
}
Expand Down Expand Up @@ -715,10 +717,10 @@ void RaftPart::processAppendLogResponses(
return;
}
// Step 4: Fulfill the promise
if (iter.hasNonCASLogs()) {
if (iter.hasNonAtomicOpLogs()) {
sendingPromise_.setOneSharedValue(AppendLogResult::SUCCEEDED);
}
if (iter.leadByCAS()) {
if (iter.leadByAtomicOp()) {
sendingPromise_.setOneSingleValue(AppendLogResult::SUCCEEDED);
}
// Step 5: Check whether need to continue
Expand All @@ -737,14 +739,14 @@ void RaftPart::processAppendLogResponses(
firstLogId,
currTerm,
std::move(logs_),
[this] (const std::string& log) -> std::string {
auto casRet = compareAndSet(log);
if (casRet.empty()) {
[this] (AtomicOp op) -> std::string {
auto opRet = op();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to my understanding, in AtomicOp, user just define whatever they want to do atomically, and return the result as a string.
Why don't we just pass the AtomicOp instead of folly::Function<std::string(AtomicOp op)> opCB, it seems a little bit weird.

Copy link
Contributor Author

@dangleptr dangleptr Aug 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we will set promise value, if we implement the logic directly in Iterator, you have to pass the whole part into it. This is not what we want.

if (opRet.empty()) {
// Failed
sendingPromise_.setOneSingleValue(
AppendLogResult::E_CAS_FAILURE);
AppendLogResult::E_ATOMIC_OP_FAILURE);
}
return casRet;
return opRet;
});
logs_.clear();
bufferOverFlow_ = false;
Expand Down
Loading