Skip to content

Commit

Permalink
Support multi copies for kvstore (vesoft-inc#576)
Browse files Browse the repository at this point in the history
* Support multi copies for kvstore

* Address whitewum's comments
  • Loading branch information
dangleptr authored Jul 9, 2019
1 parent 460d7f9 commit d432361
Show file tree
Hide file tree
Showing 22 changed files with 309 additions and 129 deletions.
6 changes: 0 additions & 6 deletions src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ DEFINE_string(peers, "", "It is a list of IPs split by comma,"
"the ips number equals replica number."
"If empty, it means replica is 1");
DEFINE_string(local_ip, "", "Local ip speicified for NetworkUtils::getLocalIP");
DEFINE_int32(num_workers, 4, "Number of worker threads");
DEFINE_int32(num_io_threads, 16, "Number of IO threads");
DECLARE_string(part_man_type);

Expand Down Expand Up @@ -115,10 +114,6 @@ int main(int argc, char *argv[]) {
// The meta server has only one space, one part.
partMan->addPart(0, 0, std::move(peersRet.value()));

// Generic thread pool
auto workers = std::make_shared<nebula::thread::GenericThreadPool>();
workers->start(FLAGS_num_workers);

// folly IOThreadPoolExecutor
auto ioPool = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_num_io_threads);

Expand All @@ -128,7 +123,6 @@ int main(int argc, char *argv[]) {
std::unique_ptr<nebula::kvstore::KVStore> kvstore =
std::make_unique<nebula::kvstore::NebulaStore>(std::move(options),
ioPool,
workers,
localhost);

auto handler = std::make_shared<nebula::meta::MetaServiceHandler>(kvstore.get());
Expand Down
9 changes: 0 additions & 9 deletions src/daemons/StorageDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ DEFINE_string(meta_server_addrs, "", "list of meta server addresses,"
DEFINE_string(store_type, "nebula",
"Which type of KVStore to be used by the storage daemon."
" Options can be \"nebula\", \"hbase\", etc.");
DEFINE_int32(num_workers, 4, "Number of worker threads");
DEFINE_int32(num_io_threads, 16, "Number of IO threads");

using nebula::operator<<;
Expand All @@ -56,7 +55,6 @@ std::unique_ptr<nebula::kvstore::KVStore> getStoreInstance(
HostAddr localhost,
std::vector<std::string> paths,
std::shared_ptr<folly::IOThreadPoolExecutor> ioPool,
std::shared_ptr<nebula::thread::GenericThreadPool> workers,
nebula::meta::MetaClient* metaClient,
nebula::meta::SchemaManager* schemaMan) {
nebula::kvstore::KVOptions options;
Expand All @@ -69,7 +67,6 @@ std::unique_ptr<nebula::kvstore::KVStore> getStoreInstance(
if (FLAGS_store_type == "nebula") {
return std::make_unique<nebula::kvstore::NebulaStore>(std::move(options),
ioPool,
workers,
localhost);
} else if (FLAGS_store_type == "hbase") {
LOG(FATAL) << "HBase store has not been implemented";
Expand Down Expand Up @@ -147,11 +144,6 @@ int main(int argc, char *argv[]) {
return EXIT_FAILURE;
}

// Generic thread pool
auto workers = std::make_shared<nebula::thread::GenericThreadPool>();
workers->start(FLAGS_num_workers);

// folly IOThreadPoolExecutor
auto ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_num_io_threads);

// Meta client
Expand All @@ -168,7 +160,6 @@ int main(int argc, char *argv[]) {
std::unique_ptr<KVStore> kvstore = getStoreInstance(localhost,
std::move(paths),
ioThreadPool,
workers,
metaClient.get(),
schemaMan.get());

Expand Down
15 changes: 9 additions & 6 deletions src/graph/test/TestEnv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ TestEnv::~TestEnv() {
void TestEnv::SetUp() {
FLAGS_load_data_interval_secs = 1;
// Create metaServer
metaServer_ = nebula::meta::TestUtils::mockMetaServer(0, metaRootPath_.path());
metaServer_ = nebula::meta::TestUtils::mockMetaServer(
network::NetworkUtils::getAvailablePort(),
metaRootPath_.path());
FLAGS_meta_server_addrs = folly::stringPrintf("127.0.0.1:%d", metaServerPort());

// Create storageServer
Expand All @@ -41,11 +43,12 @@ void TestEnv::SetUp() {
mClient_->init();
uint32_t localIp;
nebula::network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp);
storageServer_ = nebula::storage::TestUtils::mockStorageServer(mClient_.get(),
storageRootPath_.path(),
localIp,
0,
true);
storageServer_ = nebula::storage::TestUtils::mockStorageServer(
mClient_.get(),
storageRootPath_.path(),
localIp,
network::NetworkUtils::getAvailablePort(),
true);

// Create graphServer
graphServer_ = TestUtils::mockGraphServer(0);
Expand Down
3 changes: 2 additions & 1 deletion src/kvstore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "kvstore/PartManager.h"
#include "kvstore/CompactionFilter.h"
#include "meta/SchemaManager.h"
#include "base/ErrorOr.h"

namespace nebula {
namespace kvstore {
Expand Down Expand Up @@ -63,7 +64,7 @@ class KVStore {
// Retrieve the current leader for the given partition. This
// is usually called when ERR_LEADER_CHANGED result code is
// returned
virtual HostAddr partLeader(GraphSpaceID spaceId, PartitionID partID) = 0;
virtual ErrorOr<ResultCode, HostAddr> partLeader(GraphSpaceID spaceId, PartitionID partID) = 0;

virtual PartManager* partManager() const {
return nullptr;
Expand Down
115 changes: 81 additions & 34 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

DEFINE_string(engine_type, "rocksdb", "rocksdb, memory...");
DEFINE_int32(custom_filter_interval_secs, 24 * 3600, "interval to trigger custom compaction");
DEFINE_int32(num_workers, 4, "Number of worker threads");

/**
* Check spaceId, partId exists or not.
Expand All @@ -32,25 +33,6 @@ DEFINE_int32(custom_filter_interval_secs, 24 * 3600, "interval to trigger custom
return; \
}

/**
* Check spaceId, partId and return related storage engine.
* */
#define CHECK_AND_RETURN_ENGINE(spaceId, partId) \
KVEngine* engine = nullptr; \
do { \
auto it = spaces_.find(spaceId); \
if (UNLIKELY(it == spaces_.end())) { \
return ResultCode::ERR_SPACE_NOT_FOUND; \
} \
auto& parts = it->second->parts_; \
auto partIt = parts.find(partId); \
if (UNLIKELY(partIt == parts.end())) { \
return ResultCode::ERR_PART_NOT_FOUND; \
} \
engine = partIt->second->engine(); \
CHECK_NOTNULL(engine); \
} while (false)

/**
* Check spaceId is exist and return related partitions.
*/
Expand All @@ -74,7 +56,23 @@ DEFINE_int32(custom_filter_interval_secs, 24 * 3600, "interval to trigger custom
namespace nebula {
namespace kvstore {

NebulaStore::~NebulaStore() {
workers_->stop();
workers_->wait();
spaces_.clear();
LOG(INFO) << "Stop the raft service...";
raftService_->stop();
raftService_->waitUntilStop();
LOG(INFO) << "~NebulaStore()";
}

void NebulaStore::init() {
LOG(INFO) << "Start the raft service...";
workers_ = std::make_shared<thread::GenericThreadPool>();
workers_->start(FLAGS_num_workers);
raftService_ = raftex::RaftexService::createService(ioPool_, raftAddr_.second);
raftService_->waitUntilReady();
flusher_ = std::make_unique<wal::BufferFlusher>();
CHECK(!!partMan_);
LOG(INFO) << "Scan the local path, and init the spaces_";
{
Expand Down Expand Up @@ -164,6 +162,19 @@ std::unique_ptr<KVEngine> NebulaStore::newEngine(GraphSpaceID spaceId,
}
}

ErrorOr<ResultCode, HostAddr> NebulaStore::partLeader(GraphSpaceID spaceId, PartitionID partId) {
folly::RWSpinLock::ReadHolder rh(&lock_);
auto it = spaces_.find(spaceId);
if (UNLIKELY(it == spaces_.end())) {
return ResultCode::ERR_SPACE_NOT_FOUND;
}
auto& parts = it->second->parts_;
auto partIt = parts.find(partId);
if (UNLIKELY(partIt == parts.end())) {
return ResultCode::ERR_PART_NOT_FOUND;
}
return getStoreAddr(partIt->second->leader());
}

void NebulaStore::addSpace(GraphSpaceID spaceId) {
folly::RWSpinLock::WriteHolder wh(&lock_);
Expand Down Expand Up @@ -223,8 +234,18 @@ std::shared_ptr<Part> NebulaStore::newPart(GraphSpaceID spaceId,
partId),
engine,
ioPool_,
workers_);
part->start({});
workers_,
flusher_.get());
auto partMeta = partMan_->partMeta(spaceId, partId);
std::vector<HostAddr> peers;
for (auto& h : partMeta.peers_) {
if (h != storeSvcAddr_) {
peers.emplace_back(getRaftAddr(h));
VLOG(1) << "Add peer " << peers.back();
}
}
raftService_->addPartition(part);
part->start(std::move(peers));
return part;
}

Expand Down Expand Up @@ -255,6 +276,7 @@ void NebulaStore::removePart(GraphSpaceID spaceId, PartitionID partId) {
CHECK_NOTNULL(e);
// Stop the raft
partIt->second->stop();
raftService_->removePartition(partIt->second);
spaceIt->second->parts_.erase(partId);
e->removePart(partId);
}
Expand All @@ -267,19 +289,25 @@ ResultCode NebulaStore::get(GraphSpaceID spaceId,
PartitionID partId,
const std::string& key,
std::string* value) {
folly::RWSpinLock::ReadHolder rh(&lock_);
CHECK_AND_RETURN_ENGINE(spaceId, partId);
return engine->get(key, value);
auto ret = engine(spaceId, partId);
if (!ok(ret)) {
return error(ret);
}
auto* e = nebula::value(ret);
return e->get(key, value);
}


ResultCode NebulaStore::multiGet(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<std::string>& keys,
std::vector<std::string>* values) {
folly::RWSpinLock::ReadHolder rh(&lock_);
CHECK_AND_RETURN_ENGINE(spaceId, partId);
return engine->multiGet(keys, values);
auto ret = engine(spaceId, partId);
if (!ok(ret)) {
return error(ret);
}
auto* e = nebula::value(ret);
return e->multiGet(keys, values);
}


Expand All @@ -288,22 +316,27 @@ ResultCode NebulaStore::range(GraphSpaceID spaceId,
const std::string& start,
const std::string& end,
std::unique_ptr<KVIterator>* iter) {
folly::RWSpinLock::ReadHolder rh(&lock_);
CHECK_AND_RETURN_ENGINE(spaceId, partId);
return engine->range(start, end, iter);
auto ret = engine(spaceId, partId);
if (!ok(ret)) {
return error(ret);
}
auto* e = nebula::value(ret);
return e->range(start, end, iter);
}


ResultCode NebulaStore::prefix(GraphSpaceID spaceId,
PartitionID partId,
const std::string& prefix,
std::unique_ptr<KVIterator>* iter) {
folly::RWSpinLock::ReadHolder rh(&lock_);
CHECK_AND_RETURN_ENGINE(spaceId, partId);
return engine->prefix(prefix, iter);
auto ret = engine(spaceId, partId);
if (!ok(ret)) {
return error(ret);
}
auto* e = nebula::value(ret);
return e->prefix(prefix, iter);
}


void NebulaStore::asyncMultiPut(GraphSpaceID spaceId,
PartitionID partId,
std::vector<KV> keyValues,
Expand Down Expand Up @@ -435,6 +468,20 @@ bool NebulaStore::isLeader(GraphSpaceID spaceId, PartitionID partId) {
return false;
}

ErrorOr<ResultCode, KVEngine*> NebulaStore::engine(GraphSpaceID spaceId, PartitionID partId) {
folly::RWSpinLock::ReadHolder rh(&lock_);
auto it = spaces_.find(spaceId);
if (UNLIKELY(it == spaces_.end())) {
return ResultCode::ERR_SPACE_NOT_FOUND;
}
auto& parts = it->second->parts_;
auto partIt = parts.find(partId);
if (UNLIKELY(partIt == parts.end())) {
return ResultCode::ERR_PART_NOT_FOUND;
}
return partIt->second->engine();
}

} // namespace kvstore
} // namespace nebula

Loading

0 comments on commit d432361

Please sign in to comment.