Skip to content

Commit

Permalink
Implement leader change logic (vesoft-inc#675)
Browse files Browse the repository at this point in the history
  • Loading branch information
dangleptr authored Jul 25, 2019
1 parent 3380a08 commit ae73b70
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 20 deletions.
4 changes: 1 addition & 3 deletions src/storage/client/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ StorageClient::StorageClient(std::shared_ptr<folly::IOThreadPoolExecutor> thread
meta::MetaClient *client)
: ioThreadPool_(threadPool)
, client_(client) {
CHECK_NOTNULL(client_);
clientsMan_
= std::make_unique<thrift::ThriftClientManager<storage::cpp2::StorageServiceAsyncClient>>();
}
Expand Down Expand Up @@ -237,8 +236,7 @@ folly::SemiFuture<StorageRpcResponse<cpp2::EdgePropResponse>> StorageClient::get


PartitionID StorageClient::partId(GraphSpaceID spaceId, int64_t id) const {
CHECK(client_);
auto parts = client_->partsNum(spaceId);
auto parts = partsNum(spaceId);
auto s = ID_HASH(id, parts);
CHECK_GE(s, 0U);
return s;
Expand Down
66 changes: 53 additions & 13 deletions src/storage/client/StorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#define STORAGE_CLIENT_STORAGECLIENT_H_

#include "base/Base.h"
#include <gtest/gtest_prod.h>
#include <folly/futures/Future.h>
#include <folly/executors/IOThreadPoolExecutor.h>
#include "gen-cpp2/StorageServiceAsyncClient.h"
Expand Down Expand Up @@ -76,10 +77,12 @@ class StorageRpcResponse final {
*
* The class is NOT re-entriable
*/
class StorageClient final {
class StorageClient {
FRIEND_TEST(StorageClientTest, LeaderChangeTest);

public:
explicit StorageClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool,
meta::MetaClient *client = nullptr);
StorageClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool,
meta::MetaClient *client);
~StorageClient();

folly::SemiFuture<StorageRpcResponse<storage::cpp2::ExecResponse>> addVertices(
Expand Down Expand Up @@ -124,16 +127,35 @@ class StorageClient final {
std::vector<storage::cpp2::PropDef> returnCols,
folly::EventBase* evb = nullptr);

private:
std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool_;
meta::MetaClient *client_{nullptr};
std::unique_ptr<thrift::ThriftClientManager<
storage::cpp2::StorageServiceAsyncClient>> clientsMan_;

private:
protected:
// Calculate the partition id for the given vertex id
PartitionID partId(GraphSpaceID spaceId, int64_t id) const;

const HostAddr& leader(const PartMeta& partMeta) const {
{
folly::RWSpinLock::ReadHolder rh(leadersLock_);
auto it = leaders_.find(std::make_pair(partMeta.spaceId_, partMeta.partId_));
if (it != leaders_.end()) {
return it->second;
}
}
VLOG(1) << "No leader exists. Choose one random.";
return partMeta.peers_[folly::Random::rand32(partMeta.peers_.size())];
}

void updateLeader(GraphSpaceID spaceId, PartitionID partId, const HostAddr& leader) {
folly::RWSpinLock::WriteHolder wh(leadersLock_);
leaders_[std::make_pair(spaceId, partId)] = leader;
}

void invalidLeader(GraphSpaceID spaceId, PartitionID partId) {
folly::RWSpinLock::WriteHolder wh(leadersLock_);
auto it = leaders_.find(std::make_pair(spaceId, partId));
if (it != leaders_.end()) {
leaders_.erase(it);
}
}

template<class Request,
class RemoteFunc,
class Response =
Expand Down Expand Up @@ -164,13 +186,31 @@ class StorageClient final {
> clusters;
for (auto& id : ids) {
PartitionID part = partId(spaceId, f(id));
auto partMeta = client_->getPartMetaFromCache(spaceId, part);
auto partMeta = getPartMeta(spaceId, part);
CHECK_GT(partMeta.peers_.size(), 0U);
// TODO We need to use the leader here
clusters[partMeta.peers_.front()][part].emplace_back(std::move(id));
const auto& leader = this->leader(partMeta);
clusters[leader][part].emplace_back(std::move(id));
}
return clusters;
}

virtual int32_t partsNum(GraphSpaceID spaceId) const {
CHECK(client_ != nullptr);
return client_->partsNum(spaceId);
}

virtual PartMeta getPartMeta(GraphSpaceID spaceId, PartitionID partId) const {
CHECK(client_ != nullptr);
return client_->getPartMetaFromCache(spaceId, partId);
}

private:
std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool_;
meta::MetaClient *client_{nullptr};
std::unique_ptr<thrift::ThriftClientManager<
storage::cpp2::StorageServiceAsyncClient>> clientsMan_;
mutable folly::RWSpinLock leadersLock_;
std::unordered_map<std::pair<GraphSpaceID, PartitionID>, HostAddr> leaders_;
};

} // namespace storage
Expand Down
14 changes: 11 additions & 3 deletions src/storage/client/StorageClient.inl
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ folly::SemiFuture<StorageRpcResponse<Response>> StorageClient::collectResponse(

for (auto& req : requests) {
auto& host = req.first;
auto spaceId = req.second.get_space_id();
auto client = clientsMan_->client(host, evb);
// Result is a pair of <Request&, bool>
auto res = context->insertRequest(host, std::move(req.second));
Expand All @@ -95,7 +96,7 @@ folly::SemiFuture<StorageRpcResponse<Response>> StorageClient::collectResponse(
// Future process code will be executed on the IO thread
// Since all requests are sent using the same eventbase, all then-callback
// will be executed on the same IO thread
.then(evb, [context, host] (folly::Try<Response>&& val) {
.then(evb, [this, context, host, spaceId] (folly::Try<Response>&& val) {
auto& r = context->findRequest(host);
if (val.hasException()) {
LOG(ERROR) << "Request to " << host << " failed: " << val.exception().what();
Expand All @@ -104,6 +105,7 @@ folly::SemiFuture<StorageRpcResponse<Response>> StorageClient::collectResponse(
context->resp.failedParts().emplace(
part.first,
storage::cpp2::ErrorCode::E_RPC_FAILURE);
invalidLeader(spaceId, part.first);
}
context->resp.markFailure();
} else {
Expand All @@ -115,8 +117,14 @@ folly::SemiFuture<StorageRpcResponse<Response>> StorageClient::collectResponse(
<< ", failed code " << static_cast<int32_t>(code.get_code());
hasFailure = true;
if (code.get_code() == storage::cpp2::ErrorCode::E_LEADER_CHANGED) {
// TODO Need to retry the new leader
LOG(FATAL) << "Not implmented";
auto* leader = code.get_leader();
if (leader != nullptr
&& leader->get_ip() != 0
&& leader->get_port() != 0) {
updateLeader(spaceId,
code.get_part_id(),
HostAddr(leader->get_ip(), leader->get_port()));
}
} else {
// Simply keep the result
context->resp.failedParts().emplace(code.get_part_id(),
Expand Down
77 changes: 77 additions & 0 deletions src/storage/test/StorageClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,83 @@ TEST(StorageClientTest, VerticesInterfacesTest) {
threadPool.reset();
}

#define RETURN_LEADER_CHANGED(req, leader) \
UNUSED(req); \
do { \
folly::Promise<storage::cpp2::QueryResponse> pro; \
auto f = pro.getFuture(); \
storage::cpp2::QueryResponse resp; \
storage::cpp2::ResponseCommon rc; \
rc.failed_codes.emplace_back(); \
auto& code = rc.failed_codes.back(); \
code.set_part_id(1); \
code.set_code(storage::cpp2::ErrorCode::E_LEADER_CHANGED); \
code.set_leader(leader); \
resp.set_result(std::move(rc)); \
pro.setValue(std::move(resp)); \
return f; \
} while (false)

class TestStorageServiceRetry : public storage::cpp2::StorageServiceSvIf {
public:
TestStorageServiceRetry(IPv4 ip, Port port) {
leader_.set_ip(ip);
leader_.set_port(port);
}

folly::Future<cpp2::QueryResponse>
future_getOutBound(const cpp2::GetNeighborsRequest& req) override {
RETURN_LEADER_CHANGED(req, leader_);
}

private:
nebula::cpp2::HostAddr leader_;
};

class TestStorageClient : public StorageClient {
public:
explicit TestStorageClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool)
: StorageClient(ioThreadPool, nullptr) {}

int32_t partsNum(GraphSpaceID) const override {
return parts_.size();
}

PartMeta getPartMeta(GraphSpaceID, PartitionID partId) const override {
auto it = parts_.find(partId);
CHECK(it != parts_.end());
return it->second;
}

std::unordered_map<PartitionID, PartMeta> parts_;
};

TEST(StorageClientTest, LeaderChangeTest) {
IPv4 localIp;
network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp);

auto sc = std::make_unique<test::ServerContext>();
auto handler = std::make_shared<TestStorageServiceRetry>(localIp, 10010);
sc->mockCommon("storage", 0, handler);
LOG(INFO) << "Start storage server on " << sc->port_;

auto threadPool = std::make_shared<folly::IOThreadPoolExecutor>(1);
TestStorageClient tsc(threadPool);
PartMeta pm;
pm.spaceId_ = 1;
pm.partId_ = 1;
pm.peers_.emplace_back(HostAddr(localIp, sc->port_));
tsc.parts_.emplace(1, std::move(pm));

folly::Baton<true, std::atomic> baton;
tsc.getNeighbors(0, {1, 2, 3}, 0, true, "", {}).via(threadPool.get()).then([&] {
baton.post();
});
baton.wait();
ASSERT_EQ(1, tsc.leaders_.size());
ASSERT_EQ(HostAddr(localIp, 10010), tsc.leaders_[std::make_pair(0, 1)]);
}

} // namespace storage
} // namespace nebula

Expand Down
2 changes: 1 addition & 1 deletion src/tools/storage-perf/StoragePerfTool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class Perf {
threads.emplace_back(std::move(t));
}
threadPool_ = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_io_threads);
client_ = std::make_unique<StorageClient>(threadPool_);
client_ = std::make_unique<StorageClient>(threadPool_, nullptr);
time::Duration duration;
for (auto& t : threads) {
CHECK(t->start("TaskThread"));
Expand Down

0 comments on commit ae73b70

Please sign in to comment.