Skip to content

Commit

Permalink
Fix bunch of bugs in write test. (vesoft-inc#706)
Browse files Browse the repository at this point in the history
* Fix bugs in RAFT

* Fix failed LogAppend.MultiThreadAppend

* Fix the second bunch of bugs

* Raft share the same threadpool with thrift server

* Fix crash problem in LearnerTest
  • Loading branch information
dangleptr authored Aug 6, 2019
1 parent 506375b commit a13ad50
Show file tree
Hide file tree
Showing 28 changed files with 413 additions and 233 deletions.
3 changes: 2 additions & 1 deletion src/common/thrift/ThriftClientManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ class ThriftClientManager final {
public:
std::shared_ptr<ClientType> client(const HostAddr& host,
folly::EventBase* evb = nullptr,
bool compatibility = false);
bool compatibility = false,
uint32_t timeout = 0);

~ThriftClientManager() {
VLOG(3) << "~ThriftClientManager";
Expand Down
7 changes: 5 additions & 2 deletions src/common/thrift/ThriftClientManager.inl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace thrift {

template<class ClientType>
std::shared_ptr<ClientType> ThriftClientManager<ClientType>::client(
const HostAddr& host, folly::EventBase* evb, bool compatibility) {
const HostAddr& host, folly::EventBase* evb, bool compatibility, uint32_t timeout) {
VLOG(2) << "Getting a client to "
<< network::NetworkUtils::intToIPv4(host.first)
<< ":" << host.second;
Expand All @@ -38,7 +38,7 @@ std::shared_ptr<ClientType> ThriftClientManager<ClientType>::client(
<< ipAddr << ":" << port
<< ", trying to create one";
auto channel = apache::thrift::ReconnectingRequestChannel::newChannel(
*evb, [compatibility, ipAddr, port] (folly::EventBase& eb) mutable {
*evb, [compatibility, ipAddr, port, timeout] (folly::EventBase& eb) mutable {
static thread_local int connectionCount = 0;
VLOG(2) << "Connecting to " << ipAddr << ":" << port
<< " for " << ++connectionCount << " times";
Expand All @@ -49,6 +49,9 @@ std::shared_ptr<ClientType> ThriftClientManager<ClientType>::client(
&eb, ipAddr, port, FLAGS_conn_timeout_ms);
});
auto headerClientChannel = apache::thrift::HeaderClientChannel::newChannel(socket);
if (timeout > 0) {
headerClientChannel->setTimeout(timeout);
}
if (compatibility) {
headerClientChannel->setProtocolId(apache::thrift::protocol::T_BINARY_PROTOCOL);
headerClientChannel->setClientType(THRIFT_UNFRAMED_DEPRECATED);
Expand Down
36 changes: 21 additions & 15 deletions src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ DEFINE_string(peers, "", "It is a list of IPs split by comma,"
"If empty, it means replica is 1");
DEFINE_string(local_ip, "", "Local ip speicified for NetworkUtils::getLocalIP");
DEFINE_int32(num_io_threads, 16, "Number of IO threads");
DEFINE_int32(num_worker_threads, 32, "Number of workers");
DECLARE_string(part_man_type);

DEFINE_string(pid_file, "pids/nebula-metad.pid", "File to hold the process id");
Expand Down Expand Up @@ -104,38 +105,41 @@ int main(int argc, char *argv[]) {

// folly IOThreadPoolExecutor
auto ioPool = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_num_io_threads);

std::shared_ptr<apache::thrift::concurrency::ThreadManager> threadManager(
apache::thrift::concurrency::PriorityThreadManager::newPriorityThreadManager(
FLAGS_num_worker_threads, true /*stats*/));
threadManager->setNamePrefix("executor");
threadManager->start();
nebula::kvstore::KVOptions options;
options.dataPaths_ = {FLAGS_data_path};
options.partMan_ = std::move(partMan);
auto kvstore = std::make_unique<nebula::kvstore::NebulaStore>(std::move(options),
ioPool,
localhost);

auto kvstore = std::make_unique<nebula::kvstore::NebulaStore>(
std::move(options),
ioPool,
localhost,
threadManager);
if (!(kvstore->init())) {
LOG(ERROR) << "nebula store init failed";
return EXIT_FAILURE;
}

auto *kvstore_ = kvstore.get();


auto clusterMan
= std::make_unique<nebula::meta::ClusterManager>(FLAGS_peers, "");
if (!clusterMan->loadOrCreateCluId(kvstore_)) {
if (!clusterMan->loadOrCreateCluId(kvstore.get())) {
LOG(ERROR) << "clusterId init error!";
return EXIT_FAILURE;
}

std::unique_ptr<nebula::hdfs::HdfsHelper> helper =
std::make_unique<nebula::hdfs::HdfsCommandHelper>();
auto *helperPtr = helper.get();

LOG(INFO) << "Starting Meta HTTP Service";
nebula::WebService::registerHandler("/status", [] {
return new nebula::meta::MetaHttpStatusHandler();
});
nebula::WebService::registerHandler("/download-dispatch", [kvstore_, helperPtr] {
nebula::WebService::registerHandler("/download-dispatch", [&] {
auto handler = new nebula::meta::MetaHttpDownloadHandler();
handler->init(kvstore_, helperPtr);
handler->init(kvstore.get(), helper.get());
return handler;
});
status = nebula::WebService::start();
Expand All @@ -152,20 +156,22 @@ int main(int argc, char *argv[]) {
return EXIT_FAILURE;
}

auto handler = std::make_shared<nebula::meta::MetaServiceHandler>(kvstore_,
auto handler = std::make_shared<nebula::meta::MetaServiceHandler>(kvstore.get(),
clusterMan->getClusterId());
nebula::meta::ActiveHostsMan::instance(kvstore_);
nebula::meta::ActiveHostsMan::instance(kvstore.get());

auto gflagsManager = std::make_unique<nebula::meta::KVBasedGflagsManager>(kvstore.get());
gflagsManager->init();

LOG(INFO) << "The meta deamon start on " << localhost;
try {
gServer = std::make_unique<apache::thrift::ThriftServer>();
gServer->setInterface(std::move(handler));
gServer->setPort(FLAGS_port);
gServer->setReusePort(FLAGS_reuse_port);
gServer->setIdleTimeout(std::chrono::seconds(0)); // No idle timeout on client connection
gServer->setIOThreadPool(ioPool);
gServer->setThreadManager(threadManager);
gServer->setInterface(std::move(handler));
gServer->serve(); // Will wait until the server shuts down
} catch (const std::exception &e) {
nebula::WebService::stop();
Expand Down
16 changes: 12 additions & 4 deletions src/daemons/StorageDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "storage/CompactionFilter.h"
#include "hdfs/HdfsHelper.h"
#include "hdfs/HdfsCommandHelper.h"
#include <thrift/lib/cpp/concurrency/ThreadManager.h>

DEFINE_int32(port, 44500, "Storage daemon listening port");
DEFINE_bool(reuse_port, true, "Whether to turn on the SO_REUSEPORT option");
Expand Down Expand Up @@ -62,6 +63,7 @@ std::unique_ptr<nebula::kvstore::KVStore> getStoreInstance(
HostAddr localhost,
std::vector<std::string> paths,
std::shared_ptr<folly::IOThreadPoolExecutor> ioPool,
std::shared_ptr<folly::Executor> workers,
nebula::meta::MetaClient* metaClient,
nebula::meta::SchemaManager* schemaMan) {
nebula::kvstore::KVOptions options;
Expand All @@ -74,7 +76,8 @@ std::unique_ptr<nebula::kvstore::KVStore> getStoreInstance(
if (FLAGS_store_type == "nebula") {
auto nbStore = std::make_unique<nebula::kvstore::NebulaStore>(std::move(options),
ioPool,
localhost);
localhost,
workers);
if (!(nbStore->init())) {
LOG(ERROR) << "nebula store init failed";
return nullptr;
Expand Down Expand Up @@ -158,6 +161,11 @@ int main(int argc, char *argv[]) {
}

auto ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_num_io_threads);
std::shared_ptr<apache::thrift::concurrency::ThreadManager> threadManager(
apache::thrift::concurrency::PriorityThreadManager::newPriorityThreadManager(
FLAGS_num_worker_threads, true /*stats*/));
threadManager->setNamePrefix("executor");
threadManager->start();

std::string clusteridFile =
folly::stringPrintf("%s/%s", paths[0].c_str(), "/storage.cluster.id");
Expand Down Expand Up @@ -187,6 +195,7 @@ int main(int argc, char *argv[]) {
std::unique_ptr<KVStore> kvstore = getStoreInstance(localhost,
std::move(paths),
ioThreadPool,
threadManager,
metaClient.get(),
schemaMan.get());

Expand Down Expand Up @@ -227,13 +236,12 @@ int main(int argc, char *argv[]) {
try {
LOG(INFO) << "The storage deamon start on " << localhost;
gServer = std::make_unique<apache::thrift::ThriftServer>();
gServer->setInterface(std::move(handler));
gServer->setPort(FLAGS_port);
gServer->setReusePort(FLAGS_reuse_port);
gServer->setIdleTimeout(std::chrono::seconds(0)); // No idle timeout on client connection
gServer->setIOThreadPool(ioThreadPool);
gServer->setNumCPUWorkerThreads(FLAGS_num_worker_threads);
gServer->setCPUWorkerThreadName("executor");
gServer->setThreadManager(threadManager);
gServer->setInterface(std::move(handler));
gServer->serve(); // Will wait until the server shuts down
} catch (const std::exception& e) {
nebula::WebService::stop();
Expand Down
15 changes: 8 additions & 7 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ namespace kvstore {
NebulaStore::~NebulaStore() {
LOG(INFO) << "Cut off the relationship with meta client";
options_.partMan_.reset();
workers_->stop();
workers_->wait();
bgWorkers_->stop();
bgWorkers_->wait();
LOG(INFO) << "Stop the raft service...";
raftService_->stop();
raftService_->waitUntilStop();
Expand All @@ -50,9 +50,9 @@ NebulaStore::~NebulaStore() {

bool NebulaStore::init() {
LOG(INFO) << "Start the raft service...";
workers_ = std::make_shared<thread::GenericThreadPool>();
workers_->start(FLAGS_num_workers);
raftService_ = raftex::RaftexService::createService(ioPool_, raftAddr_.second);
bgWorkers_ = std::make_shared<thread::GenericThreadPool>();
bgWorkers_->start(FLAGS_num_workers);
raftService_ = raftex::RaftexService::createService(ioPool_, handlersPool_, raftAddr_.second);
if (!raftService_->start()) {
LOG(ERROR) << "Start the raft service failed";
return false;
Expand Down Expand Up @@ -221,8 +221,9 @@ std::shared_ptr<Part> NebulaStore::newPart(GraphSpaceID spaceId,
partId),
engine,
ioPool_,
workers_,
flusher_.get());
bgWorkers_,
flusher_.get(),
handlersPool_);
auto partMeta = options_.partMan_->partMeta(spaceId, partId);
std::vector<HostAddr> peers;
for (auto& h : partMeta.peers_) {
Expand Down
9 changes: 6 additions & 3 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ class NebulaStore : public KVStore, public Handler {
public:
NebulaStore(KVOptions options,
std::shared_ptr<folly::IOThreadPoolExecutor> ioPool,
HostAddr serviceAddr)
HostAddr serviceAddr,
std::shared_ptr<folly::Executor> handlersPool)
: ioPool_(ioPool)
, storeSvcAddr_(serviceAddr)
, handlersPool_(handlersPool)
, raftAddr_(getRaftAddr(serviceAddr))
, options_(std::move(options)) {
}
Expand Down Expand Up @@ -77,7 +79,7 @@ class NebulaStore : public KVStore, public Handler {
}

std::shared_ptr<thread::GenericThreadPool> getWorkers() const {
return workers_;
return bgWorkers_;
}

// Return the current leader
Expand Down Expand Up @@ -186,8 +188,9 @@ class NebulaStore : public KVStore, public Handler {
std::unordered_map<GraphSpaceID, std::shared_ptr<SpacePartInfo>> spaces_;

std::shared_ptr<folly::IOThreadPoolExecutor> ioPool_;
std::shared_ptr<thread::GenericThreadPool> workers_;
std::shared_ptr<thread::GenericThreadPool> bgWorkers_;
HostAddr storeSvcAddr_;
std::shared_ptr<folly::Executor> handlersPool_;
HostAddr raftAddr_;
KVOptions options_;

Expand Down
32 changes: 20 additions & 12 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace kvstore {

using raftex::AppendLogResult;

const char kLastCommittedIdKey[] = "_last_committed_log_id";
const char* kCommitKeyPrefix = "__system_commit_msg_";

namespace {

Expand All @@ -39,35 +39,39 @@ Part::Part(GraphSpaceID spaceId,
KVEngine* engine,
std::shared_ptr<folly::IOThreadPoolExecutor> ioPool,
std::shared_ptr<thread::GenericThreadPool> workers,
wal::BufferFlusher* flusher)
wal::BufferFlusher* flusher,
std::shared_ptr<folly::Executor> handlers)
: RaftPart(FLAGS_cluster_id,
spaceId,
partId,
localAddr,
walPath,
flusher,
ioPool,
workers)
workers,
handlers)
, spaceId_(spaceId)
, partId_(partId)
, walPath_(walPath)
, engine_(engine) {
}


LogID Part::lastCommittedLogId() {
std::pair<LogID, TermID> Part::lastCommittedLogId() {
std::string val;
ResultCode res = engine_->get(kLastCommittedIdKey, &val);
ResultCode res = engine_->get(folly::stringPrintf("%s%d", kCommitKeyPrefix, partId_), &val);
if (res != ResultCode::SUCCEEDED) {
LOG(ERROR) << "Cannot fetch the last committed log id from the storage engine";
return 0;
return std::make_pair(0, 0);
}
CHECK_EQ(val.size(), sizeof(LogID));
CHECK_EQ(val.size(), sizeof(LogID) + sizeof(TermID));

LogID lastId;
memcpy(reinterpret_cast<void*>(&lastId), val.data(), sizeof(LogID));
TermID termId;
memcpy(reinterpret_cast<void*>(&termId), val.data() + sizeof(LogID), sizeof(TermID));

return lastId;
return std::make_pair(lastId, termId);
}


Expand Down Expand Up @@ -159,8 +163,10 @@ std::string Part::compareAndSet(const std::string& log) {
bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
auto batch = engine_->startBatchWrite();
LogID lastId = -1;
TermID lastTerm = -1;
while (iter->valid()) {
lastId = iter->logId();
lastTerm = iter->logTerm();
auto log = iter->logMsg();
if (log.empty()) {
VLOG(3) << idStr_ << "Skip the heartbeat!";
Expand Down Expand Up @@ -238,8 +244,11 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
}

if (lastId >= 0) {
batch->put(kLastCommittedIdKey,
folly::StringPiece(reinterpret_cast<char*>(&lastId), sizeof(LogID)));
std::string commitMsg;
commitMsg.reserve(sizeof(LogID) + sizeof(TermID));
commitMsg.append(reinterpret_cast<char*>(&lastId), sizeof(LogID));
commitMsg.append(reinterpret_cast<char*>(&lastTerm), sizeof(TermID));
batch->put(folly::stringPrintf("%s%d", kCommitKeyPrefix, partId_), commitMsg);
}

return engine_->commitBatchWrite(std::move(batch)) == ResultCode::SUCCEEDED;
Expand All @@ -251,8 +260,7 @@ bool Part::preProcessLog(LogID logId,
const std::string& log) {
VLOG(3) << idStr_ << "logId " << logId
<< ", termId " << termId
<< ", clusterId " << clusterId
<< ", log " << log;
<< ", clusterId " << clusterId;
if (!log.empty()) {
switch (log[sizeof(int64_t)]) {
case OP_ADD_LEARNER: {
Expand Down
6 changes: 4 additions & 2 deletions src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ class Part : public raftex::RaftPart {
KVEngine* engine,
std::shared_ptr<folly::IOThreadPoolExecutor> pool,
std::shared_ptr<thread::GenericThreadPool> workers,
wal::BufferFlusher* flusher);
wal::BufferFlusher* flusher,
std::shared_ptr<folly::Executor> handlers);


virtual ~Part() {
LOG(INFO) << idStr_ << "~Part()";
Expand All @@ -48,7 +50,7 @@ class Part : public raftex::RaftPart {
/**
* Methods inherited from RaftPart
*/
LogID lastCommittedLogId() override;
std::pair<LogID, TermID> lastCommittedLogId() override;

void onLostLeadership(TermID term) override;

Expand Down
Loading

0 comments on commit a13ad50

Please sign in to comment.