diff --git a/src/common/thrift/ThriftClientManager.h b/src/common/thrift/ThriftClientManager.h index 8ecafe246cf..75c3bd8e6ef 100644 --- a/src/common/thrift/ThriftClientManager.h +++ b/src/common/thrift/ThriftClientManager.h @@ -18,7 +18,8 @@ class ThriftClientManager final { public: std::shared_ptr client(const HostAddr& host, folly::EventBase* evb = nullptr, - bool compatibility = false); + bool compatibility = false, + uint32_t timeout = 0); ~ThriftClientManager() { VLOG(3) << "~ThriftClientManager"; diff --git a/src/common/thrift/ThriftClientManager.inl b/src/common/thrift/ThriftClientManager.inl index aa48110a7f2..69615af1b8e 100644 --- a/src/common/thrift/ThriftClientManager.inl +++ b/src/common/thrift/ThriftClientManager.inl @@ -17,7 +17,7 @@ namespace thrift { template std::shared_ptr ThriftClientManager::client( - const HostAddr& host, folly::EventBase* evb, bool compatibility) { + const HostAddr& host, folly::EventBase* evb, bool compatibility, uint32_t timeout) { VLOG(2) << "Getting a client to " << network::NetworkUtils::intToIPv4(host.first) << ":" << host.second; @@ -38,7 +38,7 @@ std::shared_ptr ThriftClientManager::client( << ipAddr << ":" << port << ", trying to create one"; auto channel = apache::thrift::ReconnectingRequestChannel::newChannel( - *evb, [compatibility, ipAddr, port] (folly::EventBase& eb) mutable { + *evb, [compatibility, ipAddr, port, timeout] (folly::EventBase& eb) mutable { static thread_local int connectionCount = 0; VLOG(2) << "Connecting to " << ipAddr << ":" << port << " for " << ++connectionCount << " times"; @@ -49,6 +49,9 @@ std::shared_ptr ThriftClientManager::client( &eb, ipAddr, port, FLAGS_conn_timeout_ms); }); auto headerClientChannel = apache::thrift::HeaderClientChannel::newChannel(socket); + if (timeout > 0) { + headerClientChannel->setTimeout(timeout); + } if (compatibility) { headerClientChannel->setProtocolId(apache::thrift::protocol::T_BINARY_PROTOCOL); headerClientChannel->setClientType(THRIFT_UNFRAMED_DEPRECATED); diff --git a/src/daemons/MetaDaemon.cpp b/src/daemons/MetaDaemon.cpp index c9107fe2326..756751a58a6 100644 --- a/src/daemons/MetaDaemon.cpp +++ b/src/daemons/MetaDaemon.cpp @@ -34,6 +34,7 @@ DEFINE_string(peers, "", "It is a list of IPs split by comma," "If empty, it means replica is 1"); DEFINE_string(local_ip, "", "Local ip speicified for NetworkUtils::getLocalIP"); DEFINE_int32(num_io_threads, 16, "Number of IO threads"); +DEFINE_int32(num_worker_threads, 32, "Number of workers"); DECLARE_string(part_man_type); DEFINE_string(pid_file, "pids/nebula-metad.pid", "File to hold the process id"); @@ -104,38 +105,41 @@ int main(int argc, char *argv[]) { // folly IOThreadPoolExecutor auto ioPool = std::make_shared(FLAGS_num_io_threads); - + std::shared_ptr threadManager( + apache::thrift::concurrency::PriorityThreadManager::newPriorityThreadManager( + FLAGS_num_worker_threads, true /*stats*/)); + threadManager->setNamePrefix("executor"); + threadManager->start(); nebula::kvstore::KVOptions options; options.dataPaths_ = {FLAGS_data_path}; options.partMan_ = std::move(partMan); - auto kvstore = std::make_unique(std::move(options), - ioPool, - localhost); + + auto kvstore = std::make_unique( + std::move(options), + ioPool, + localhost, + threadManager); if (!(kvstore->init())) { LOG(ERROR) << "nebula store init failed"; return EXIT_FAILURE; } - - auto *kvstore_ = kvstore.get(); - + auto clusterMan = std::make_unique(FLAGS_peers, ""); - if (!clusterMan->loadOrCreateCluId(kvstore_)) { + if (!clusterMan->loadOrCreateCluId(kvstore.get())) { LOG(ERROR) << "clusterId init error!"; return EXIT_FAILURE; } - std::unique_ptr helper = std::make_unique(); - auto *helperPtr = helper.get(); LOG(INFO) << "Starting Meta HTTP Service"; nebula::WebService::registerHandler("/status", [] { return new nebula::meta::MetaHttpStatusHandler(); }); - nebula::WebService::registerHandler("/download-dispatch", [kvstore_, helperPtr] { + nebula::WebService::registerHandler("/download-dispatch", [&] { auto handler = new nebula::meta::MetaHttpDownloadHandler(); - handler->init(kvstore_, helperPtr); + handler->init(kvstore.get(), helper.get()); return handler; }); status = nebula::WebService::start(); @@ -152,20 +156,22 @@ int main(int argc, char *argv[]) { return EXIT_FAILURE; } - auto handler = std::make_shared(kvstore_, + auto handler = std::make_shared(kvstore.get(), clusterMan->getClusterId()); - nebula::meta::ActiveHostsMan::instance(kvstore_); + nebula::meta::ActiveHostsMan::instance(kvstore.get()); + auto gflagsManager = std::make_unique(kvstore.get()); gflagsManager->init(); LOG(INFO) << "The meta deamon start on " << localhost; try { gServer = std::make_unique(); - gServer->setInterface(std::move(handler)); gServer->setPort(FLAGS_port); gServer->setReusePort(FLAGS_reuse_port); gServer->setIdleTimeout(std::chrono::seconds(0)); // No idle timeout on client connection gServer->setIOThreadPool(ioPool); + gServer->setThreadManager(threadManager); + gServer->setInterface(std::move(handler)); gServer->serve(); // Will wait until the server shuts down } catch (const std::exception &e) { nebula::WebService::stop(); diff --git a/src/daemons/StorageDaemon.cpp b/src/daemons/StorageDaemon.cpp index 56dd98fcc91..db350467eb0 100644 --- a/src/daemons/StorageDaemon.cpp +++ b/src/daemons/StorageDaemon.cpp @@ -24,6 +24,7 @@ #include "storage/CompactionFilter.h" #include "hdfs/HdfsHelper.h" #include "hdfs/HdfsCommandHelper.h" +#include DEFINE_int32(port, 44500, "Storage daemon listening port"); DEFINE_bool(reuse_port, true, "Whether to turn on the SO_REUSEPORT option"); @@ -62,6 +63,7 @@ std::unique_ptr getStoreInstance( HostAddr localhost, std::vector paths, std::shared_ptr ioPool, + std::shared_ptr workers, nebula::meta::MetaClient* metaClient, nebula::meta::SchemaManager* schemaMan) { nebula::kvstore::KVOptions options; @@ -74,7 +76,8 @@ std::unique_ptr getStoreInstance( if (FLAGS_store_type == "nebula") { auto nbStore = std::make_unique(std::move(options), ioPool, - localhost); + localhost, + workers); if (!(nbStore->init())) { LOG(ERROR) << "nebula store init failed"; return nullptr; @@ -158,6 +161,11 @@ int main(int argc, char *argv[]) { } auto ioThreadPool = std::make_shared(FLAGS_num_io_threads); + std::shared_ptr threadManager( + apache::thrift::concurrency::PriorityThreadManager::newPriorityThreadManager( + FLAGS_num_worker_threads, true /*stats*/)); + threadManager->setNamePrefix("executor"); + threadManager->start(); std::string clusteridFile = folly::stringPrintf("%s/%s", paths[0].c_str(), "/storage.cluster.id"); @@ -187,6 +195,7 @@ int main(int argc, char *argv[]) { std::unique_ptr kvstore = getStoreInstance(localhost, std::move(paths), ioThreadPool, + threadManager, metaClient.get(), schemaMan.get()); @@ -227,13 +236,12 @@ int main(int argc, char *argv[]) { try { LOG(INFO) << "The storage deamon start on " << localhost; gServer = std::make_unique(); - gServer->setInterface(std::move(handler)); gServer->setPort(FLAGS_port); gServer->setReusePort(FLAGS_reuse_port); gServer->setIdleTimeout(std::chrono::seconds(0)); // No idle timeout on client connection gServer->setIOThreadPool(ioThreadPool); - gServer->setNumCPUWorkerThreads(FLAGS_num_worker_threads); - gServer->setCPUWorkerThreadName("executor"); + gServer->setThreadManager(threadManager); + gServer->setInterface(std::move(handler)); gServer->serve(); // Will wait until the server shuts down } catch (const std::exception& e) { nebula::WebService::stop(); diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 1afc2681c11..a93020606c6 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -39,8 +39,8 @@ namespace kvstore { NebulaStore::~NebulaStore() { LOG(INFO) << "Cut off the relationship with meta client"; options_.partMan_.reset(); - workers_->stop(); - workers_->wait(); + bgWorkers_->stop(); + bgWorkers_->wait(); LOG(INFO) << "Stop the raft service..."; raftService_->stop(); raftService_->waitUntilStop(); @@ -50,9 +50,9 @@ NebulaStore::~NebulaStore() { bool NebulaStore::init() { LOG(INFO) << "Start the raft service..."; - workers_ = std::make_shared(); - workers_->start(FLAGS_num_workers); - raftService_ = raftex::RaftexService::createService(ioPool_, raftAddr_.second); + bgWorkers_ = std::make_shared(); + bgWorkers_->start(FLAGS_num_workers); + raftService_ = raftex::RaftexService::createService(ioPool_, handlersPool_, raftAddr_.second); if (!raftService_->start()) { LOG(ERROR) << "Start the raft service failed"; return false; @@ -221,8 +221,9 @@ std::shared_ptr NebulaStore::newPart(GraphSpaceID spaceId, partId), engine, ioPool_, - workers_, - flusher_.get()); + bgWorkers_, + flusher_.get(), + handlersPool_); auto partMeta = options_.partMan_->partMeta(spaceId, partId); std::vector peers; for (auto& h : partMeta.peers_) { diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index ca2e5c5bb63..5298077d060 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -40,9 +40,11 @@ class NebulaStore : public KVStore, public Handler { public: NebulaStore(KVOptions options, std::shared_ptr ioPool, - HostAddr serviceAddr) + HostAddr serviceAddr, + std::shared_ptr handlersPool) : ioPool_(ioPool) , storeSvcAddr_(serviceAddr) + , handlersPool_(handlersPool) , raftAddr_(getRaftAddr(serviceAddr)) , options_(std::move(options)) { } @@ -77,7 +79,7 @@ class NebulaStore : public KVStore, public Handler { } std::shared_ptr getWorkers() const { - return workers_; + return bgWorkers_; } // Return the current leader @@ -186,8 +188,9 @@ class NebulaStore : public KVStore, public Handler { std::unordered_map> spaces_; std::shared_ptr ioPool_; - std::shared_ptr workers_; + std::shared_ptr bgWorkers_; HostAddr storeSvcAddr_; + std::shared_ptr handlersPool_; HostAddr raftAddr_; KVOptions options_; diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index 693cd262f4a..f36ee5745c4 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -14,7 +14,7 @@ namespace kvstore { using raftex::AppendLogResult; -const char kLastCommittedIdKey[] = "_last_committed_log_id"; +const char* kCommitKeyPrefix = "__system_commit_msg_"; namespace { @@ -39,7 +39,8 @@ Part::Part(GraphSpaceID spaceId, KVEngine* engine, std::shared_ptr ioPool, std::shared_ptr workers, - wal::BufferFlusher* flusher) + wal::BufferFlusher* flusher, + std::shared_ptr handlers) : RaftPart(FLAGS_cluster_id, spaceId, partId, @@ -47,7 +48,8 @@ Part::Part(GraphSpaceID spaceId, walPath, flusher, ioPool, - workers) + workers, + handlers) , spaceId_(spaceId) , partId_(partId) , walPath_(walPath) @@ -55,19 +57,21 @@ Part::Part(GraphSpaceID spaceId, } -LogID Part::lastCommittedLogId() { +std::pair Part::lastCommittedLogId() { std::string val; - ResultCode res = engine_->get(kLastCommittedIdKey, &val); + ResultCode res = engine_->get(folly::stringPrintf("%s%d", kCommitKeyPrefix, partId_), &val); if (res != ResultCode::SUCCEEDED) { LOG(ERROR) << "Cannot fetch the last committed log id from the storage engine"; - return 0; + return std::make_pair(0, 0); } - CHECK_EQ(val.size(), sizeof(LogID)); + CHECK_EQ(val.size(), sizeof(LogID) + sizeof(TermID)); LogID lastId; memcpy(reinterpret_cast(&lastId), val.data(), sizeof(LogID)); + TermID termId; + memcpy(reinterpret_cast(&termId), val.data() + sizeof(LogID), sizeof(TermID)); - return lastId; + return std::make_pair(lastId, termId); } @@ -159,8 +163,10 @@ std::string Part::compareAndSet(const std::string& log) { bool Part::commitLogs(std::unique_ptr iter) { auto batch = engine_->startBatchWrite(); LogID lastId = -1; + TermID lastTerm = -1; while (iter->valid()) { lastId = iter->logId(); + lastTerm = iter->logTerm(); auto log = iter->logMsg(); if (log.empty()) { VLOG(3) << idStr_ << "Skip the heartbeat!"; @@ -238,8 +244,11 @@ bool Part::commitLogs(std::unique_ptr iter) { } if (lastId >= 0) { - batch->put(kLastCommittedIdKey, - folly::StringPiece(reinterpret_cast(&lastId), sizeof(LogID))); + std::string commitMsg; + commitMsg.reserve(sizeof(LogID) + sizeof(TermID)); + commitMsg.append(reinterpret_cast(&lastId), sizeof(LogID)); + commitMsg.append(reinterpret_cast(&lastTerm), sizeof(TermID)); + batch->put(folly::stringPrintf("%s%d", kCommitKeyPrefix, partId_), commitMsg); } return engine_->commitBatchWrite(std::move(batch)) == ResultCode::SUCCEEDED; @@ -251,8 +260,7 @@ bool Part::preProcessLog(LogID logId, const std::string& log) { VLOG(3) << idStr_ << "logId " << logId << ", termId " << termId - << ", clusterId " << clusterId - << ", log " << log; + << ", clusterId " << clusterId; if (!log.empty()) { switch (log[sizeof(int64_t)]) { case OP_ADD_LEARNER: { diff --git a/src/kvstore/Part.h b/src/kvstore/Part.h index c4b4e4d2339..87de9543947 100644 --- a/src/kvstore/Part.h +++ b/src/kvstore/Part.h @@ -24,7 +24,9 @@ class Part : public raftex::RaftPart { KVEngine* engine, std::shared_ptr pool, std::shared_ptr workers, - wal::BufferFlusher* flusher); + wal::BufferFlusher* flusher, + std::shared_ptr handlers); + virtual ~Part() { LOG(INFO) << idStr_ << "~Part()"; @@ -48,7 +50,7 @@ class Part : public raftex::RaftPart { /** * Methods inherited from RaftPart */ - LogID lastCommittedLogId() override; + std::pair lastCommittedLogId() override; void onLostLeadership(TermID term) override; diff --git a/src/kvstore/raftex/Host.cpp b/src/kvstore/raftex/Host.cpp index 15b727b3a9b..249c85021d6 100644 --- a/src/kvstore/raftex/Host.cpp +++ b/src/kvstore/raftex/Host.cpp @@ -8,13 +8,15 @@ #include "kvstore/raftex/Host.h" #include "kvstore/raftex/RaftPart.h" #include "kvstore/wal/FileBasedWal.h" -#include #include "network/NetworkUtils.h" +#include +#include DEFINE_uint32(max_appendlog_batch_size, 128, "The max number of logs in each appendLog request batch"); DEFINE_uint32(max_outstanding_requests, 1024, "The max number of outstanding appendLog requests"); +DEFINE_int32(raft_rpc_timeout_ms, 500, "rpc timeout for raft client"); namespace nebula { @@ -90,7 +92,7 @@ folly::Future Host::appendLogs( LogID prevLogId) { VLOG(3) << idStr_ << "Entering Host::appendLogs()"; - VLOG(3) << idStr_ + VLOG(2) << idStr_ << "Append logs to the host [term = " << term << ", logId = " << logId << ", committedLogId = " << committedLogId @@ -109,15 +111,20 @@ folly::Future Host::appendLogs( // This is a re-send or a heartbeat. If there is an // ongoing request, we will just return SUCCEEDED if (requestOnGoing_) { - VLOG(2) << idStr_ << "Another request is onging," - "ignore the re-send request"; + LOG(INFO) << idStr_ << "Another request is onging," + "ignore the re-send request"; cpp2::AppendLogResponse r; r.set_error_code(cpp2::ErrorCode::SUCCEEDED); return r; } } else { // Otherwise, logId has to be greater - CHECK_GT(logId, logIdToSend_); + if (logId < logIdToSend_) { + LOG(INFO) << idStr_ << "The log has been sended"; + cpp2::AppendLogResponse r; + r.set_error_code(cpp2::ErrorCode::SUCCEEDED); + return r; + } } if (requestOnGoing_ && res == cpp2::ErrorCode::SUCCEEDED) { @@ -129,8 +136,8 @@ folly::Future Host::appendLogs( prevLogId); return cachingPromise_.getFuture(); } else { - VLOG(2) << idStr_ - << "Too many requests are waiting, return error"; + LOG(INFO) << idStr_ + << "Too many requests are waiting, return error"; cpp2::AppendLogResponse r; r.set_error_code(cpp2::ErrorCode::E_TOO_MANY_REQUESTS); return r; @@ -180,10 +187,9 @@ void Host::setResponse(const cpp2::AppendLogResponse& r) { requestOnGoing_ = false; } - void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptr req) { - sendAppendLogRequest(eb, std::move(req)).then( + sendAppendLogRequest(eb, std::move(req)).via(eb).then( [eb, self = shared_from_this()] (folly::Try&& t) { VLOG(3) << self->idStr_ << "appendLogs() call got response"; if (t.hasException()) { @@ -239,12 +245,14 @@ void Host::appendLogsInternal(folly::EventBase* eb, VLOG(2) << self->idStr_ << "No request any more!"; self->requestOnGoing_ = false; } else { - VLOG(2) << self->idStr_ - << "Sending the pending request in the queue"; auto& tup = self->pendingReq_; self->logTermToSend_ = std::get<0>(tup); self->logIdToSend_ = std::get<1>(tup); self->committedLogId_ = std::get<2>(tup); + VLOG(2) << self->idStr_ + << "Sending the pending request in the queue" + << ", from " << self->lastLogIdSent_ + 1 + << " to " << self->logIdToSend_; newReq = self->prepareAppendLogRequest(); self->promise_ = std::move(self->cachingPromise_); self->cachingPromise_ @@ -293,7 +301,8 @@ void Host::appendLogsInternal(folly::EventBase* eb, return r; } default: { - LOG(ERROR) << self->idStr_ + PLOG_EVERY_N(ERROR, 100) + << self->idStr_ << "Failed to append logs to the host (Err: " << static_cast(resp.get_error_code()) << ")"; @@ -316,7 +325,7 @@ Host::prepareAppendLogRequest() const { req->set_space(part_->spaceId()); req->set_part(part_->partitionId()); req->set_current_term(logTermToSend_); - // req->set_last_log_id(logIdToSend_); + req->set_last_log_id(logIdToSend_); req->set_leader_ip(part_->address().first); req->set_leader_port(part_->address().second); req->set_committed_log_id(committedLogId_); @@ -324,7 +333,7 @@ Host::prepareAppendLogRequest() const { req->set_last_log_id_sent(lastLogIdSent_); VLOG(2) << idStr_ << "Prepare AppendLogs request from Log " - << lastLogIdSent_ + 1 << " to " << logIdToSend_; + << lastLogIdSent_ + 1 << " to " << logIdToSend_; auto it = part_->wal()->iterator(lastLogIdSent_ + 1, logIdToSend_); if (it->valid()) { VLOG(2) << idStr_ << "Prepare the list of log entries to send"; @@ -345,7 +354,7 @@ Host::prepareAppendLogRequest() const { } req->set_log_str_list(std::move(logs)); } else { - req->set_log_term(0); + LOG(FATAL) << idStr_ << "We have not support snapshot yet"; } return req; @@ -361,8 +370,8 @@ folly::Future Host::sendAppendLogRequest( std::lock_guard g(lock_); auto res = checkStatus(); if (res != cpp2::ErrorCode::SUCCEEDED) { - VLOG(2) << idStr_ - << "The Host is not in a proper status, do not send"; + LOG(WARNING) << idStr_ + << "The Host is not in a proper status, do not send"; cpp2::AppendLogResponse resp; resp.set_error_code(res); return resp; @@ -377,7 +386,7 @@ folly::Future Host::sendAppendLogRequest( << ", last_log_term_sent" << req->get_last_log_term_sent() << ", last_log_id_sent " << req->get_last_log_id_sent(); // Get client connection - auto client = tcManager().client(addr_, eb); + auto client = tcManager().client(addr_, eb, false, FLAGS_raft_rpc_timeout_ms); return client->future_appendLog(*req); } diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 9280004d7c1..7c4e68fb17c 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -195,7 +195,8 @@ RaftPart::RaftPart(ClusterID clusterId, const folly::StringPiece walRoot, BufferFlusher* flusher, std::shared_ptr pool, - std::shared_ptr workers) + std::shared_ptr workers, + std::shared_ptr executor) : idStr_{folly::stringPrintf("[Port: %d, Space: %d, Part: %d] ", localAddr.second, spaceId, partId)} , clusterId_{clusterId} @@ -206,7 +207,8 @@ RaftPart::RaftPart(ClusterID clusterId, , role_{Role::FOLLOWER} , leader_{0, 0} , ioThreadPool_{pool} - , workers_{workers} { + , bgWorkers_{workers} + , executor_(executor) { // TODO Configure the wal policy wal_ = FileBasedWal::getWal(walRoot, FileBasedWalPolicy(), @@ -221,7 +223,9 @@ RaftPart::RaftPart(ClusterID clusterId, log); }); lastLogId_ = wal_->lastLogId(); - term_ = proposedTerm_ = lastLogTerm_ = wal_->lastLogTerm(); + lastLogTerm_ = wal_->lastLogTerm(); + logs_.reserve(FLAGS_max_batch_size); + CHECK(!!executor_) << idStr_ << "Should not be nullptr"; } @@ -263,7 +267,15 @@ void RaftPart::start(std::vector&& peers, bool asLearner) { << " copies. The quorum is " << quorum_ + 1 << ", as learner " << asLearner; - committedLogId_ = lastCommittedLogId(); + auto logIdAndTerm = lastCommittedLogId(); + committedLogId_ = logIdAndTerm.first; + term_ = proposedTerm_ = logIdAndTerm.second; + + if (lastLogId_ < committedLogId_) { + lastLogId_ = committedLogId_; + lastLogTerm_ = term_; + wal_->reset(); + } // Start all peer hosts for (auto& addr : peers) { @@ -278,7 +290,7 @@ void RaftPart::start(std::vector&& peers, bool asLearner) { } // Set up a leader election task size_t delayMS = 100 + folly::Random::rand32(900); - workers_->addDelayTask(delayMS, [self = shared_from_this()] { + bgWorkers_->addDelayTask(delayMS, [self = shared_from_this()] { self->statusPolling(); }); } @@ -372,6 +384,13 @@ folly::Future RaftPart::appendLogAsync(ClusterID source, LogCache swappedOutLogs; auto retFuture = folly::Future::makeEmpty(); + if (bufferOverFlow_) { + PLOG_EVERY_N(WARNING, 30) << idStr_ + << "The appendLog buffer is full." + " Please slow down the log appending rate." + << "replicatingLogs_ :" << replicatingLogs_; + return AppendLogResult::E_BUFFER_OVERFLOW; + } { std::lock_guard lck(logsLock_); @@ -381,7 +400,9 @@ folly::Future RaftPart::appendLogAsync(ClusterID source, // Buffer is full LOG(WARNING) << idStr_ << "The appendLog buffer is full." - " Please slow down the log appending rate"; + " Please slow down the log appending rate." + << "replicatingLogs_ :" << replicatingLogs_; + bufferOverFlow_ = true; return AppendLogResult::E_BUFFER_OVERFLOW; } @@ -409,6 +430,7 @@ folly::Future RaftPart::appendLogAsync(ClusterID source, sendingPromise_ = std::move(cachingPromise_); cachingPromise_.reset(); std::swap(swappedOutLogs, logs_); + bufferOverFlow_ = false; } else { VLOG(2) << idStr_ << "Another AppendLogs request is ongoing," @@ -419,18 +441,20 @@ folly::Future RaftPart::appendLogAsync(ClusterID source, LogID firstId = 0; TermID termId = 0; + AppendLogResult res; { std::lock_guard g(raftLock_); - auto res = canAppendLogs(); - if (res != AppendLogResult::SUCCEEDED) { - LOG(ERROR) << idStr_ - << "Cannot append logs, clean the buffer"; - sendingPromise_.setValue(std::move(res)); - replicatingLogs_ = false; - return res; + res = canAppendLogs(); + if (res == AppendLogResult::SUCCEEDED) { + firstId = lastLogId_ + 1; + termId = term_; } - firstId = lastLogId_ + 1; - termId = term_; + } + + if (!checkAppendLogResult(res)) { + LOG(ERROR) << idStr_ + << "Cannot append logs, clean the buffer"; + return res; } // Replicate buffered logs to all followers // Replication will happen on a separate thread and will block @@ -442,19 +466,18 @@ folly::Future RaftPart::appendLogAsync(ClusterID source, termId, std::move(swappedOutLogs), [this] (const std::string& msg) -> std::string { - auto res = compareAndSet(msg); - if (res.empty()) { + auto casRet = compareAndSet(msg); + if (casRet.empty()) { // Failed sendingPromise_.setOneSingleValue(AppendLogResult::E_CAS_FAILURE); } - return res; + return casRet; }); appendLogsInternal(std::move(it), termId); return retFuture; } - void RaftPart::appendLogsInternal(AppendLogsIterator iter, TermID termId) { TermID currTerm = 0; LogID prevLogId = 0; @@ -466,31 +489,31 @@ void RaftPart::appendLogsInternal(AppendLogsIterator iter, TermID termId) { << iter.logId() << " (Current term is " << currTerm << ")"; } else { - VLOG(2) << idStr_ << "Ready to send a heartbeat"; + LOG(ERROR) << idStr_ << "Only happend when CAS failed"; + replicatingLogs_ = false; + return; } - - { + AppendLogResult res = AppendLogResult::SUCCEEDED; + do { std::lock_guard g(raftLock_); if (status_ != Status::RUNNING) { // The partition is not running VLOG(2) << idStr_ << "The partition is stopped"; - sendingPromise_.setValue(AppendLogResult::E_STOPPED); - replicatingLogs_ = false; - return; + res = AppendLogResult::E_STOPPED; + break; } if (role_ != Role::LEADER) { // Is not a leader any more VLOG(2) << idStr_ << "The leader has changed"; - sendingPromise_.setValue(AppendLogResult::E_NOT_A_LEADER); - replicatingLogs_ = false; - return; + res = AppendLogResult::E_NOT_A_LEADER; + break; } if (term_ != termId) { - VLOG(2) << idStr_ << "Term has been updated, origin " << termId << ", new " << term_; - sendingPromise_.setValue(AppendLogResult::E_TERM_OUT_OF_DATE); - replicatingLogs_ = false; - return; + VLOG(2) << idStr_ << "Term has been updated, origin " + << termId << ", new " << term_; + res = AppendLogResult::E_TERM_OUT_OF_DATE; + break; } currTerm = term_; prevLogId = lastLogId_; @@ -499,17 +522,20 @@ void RaftPart::appendLogsInternal(AppendLogsIterator iter, TermID termId) { // Step 1: Write WAL if (!wal_->appendLogs(iter)) { LOG(ERROR) << idStr_ << "Failed to write into WAL"; - sendingPromise_.setValue(AppendLogResult::E_WAL_FAILURE); - replicatingLogs_ = false; - return; + res = AppendLogResult::E_WAL_FAILURE; + break; } lastId = wal_->lastLogId(); VLOG(2) << idStr_ << "Succeeded writing logs [" << iter.firstLogId() << ", " << lastId << "] to WAL"; - } + } while (false); + if (!checkAppendLogResult(res)) { + LOG(ERROR) << idStr_ << "Failed append logs"; + return; + } // Step 2: Replicate to followers - auto eb = ioThreadPool_->getEventBase(); + auto* eb = ioThreadPool_->getEventBase(); replicateLogs(eb, std::move(iter), currTerm, @@ -531,26 +557,30 @@ void RaftPart::replicateLogs(folly::EventBase* eb, using namespace folly; // NOLINT since the fancy overload of | operator decltype(hosts_) hosts; - { + AppendLogResult res = AppendLogResult::SUCCEEDED; + do { std::lock_guard g(raftLock_); if (status_ != Status::RUNNING) { // The partition is not running VLOG(2) << idStr_ << "The partition is stopped"; - sendingPromise_.setValue(AppendLogResult::E_STOPPED); - replicatingLogs_ = false; - return; + res = AppendLogResult::E_STOPPED; + break; } if (role_ != Role::LEADER) { // Is not a leader any more VLOG(2) << idStr_ << "The leader has changed"; - sendingPromise_.setValue(AppendLogResult::E_NOT_A_LEADER); - replicatingLogs_ = false; - return; + res = AppendLogResult::E_NOT_A_LEADER; + break; } hosts = hosts_; + } while (false); + + if (!checkAppendLogResult(res)) { + LOG(ERROR) << idStr_ << "Replicate logs failed"; + return; } VLOG(2) << idStr_ << "About to replicate logs to all peer hosts"; @@ -567,16 +597,14 @@ void RaftPart::replicateLogs(folly::EventBase* eb, VLOG(2) << self->idStr_ << "Appending logs to " << hostPtr->idStr(); - return via( - eb, - [=] () -> Future { - return hostPtr->appendLogs(eb, - currTerm, - lastLogId, - committedId, - prevLogTerm, - prevLogId); - }); + return via(eb, [=] () -> Future { + return hostPtr->appendLogs(eb, + currTerm, + lastLogId, + committedId, + prevLogTerm, + prevLogId); + }); }) | gen::as(), // Number of succeeded required @@ -586,7 +614,7 @@ void RaftPart::replicateLogs(folly::EventBase* eb, return resp.get_error_code() == cpp2::ErrorCode::SUCCEEDED && !hosts[index]->isLearner(); }) - .then(eb, [self = shared_from_this(), + .then(executor_.get(), [self = shared_from_this(), eb, it = std::move(iter), currTerm, @@ -638,27 +666,23 @@ void RaftPart::processAppendLogResponses( << " hosts have accepted the logs"; LogID firstLogId = 0; - { + AppendLogResult res = AppendLogResult::SUCCEEDED; + do { std::lock_guard g(raftLock_); if (status_ != Status::RUNNING) { - // The partition is not running - VLOG(2) << idStr_ << "The partition is stopped"; - sendingPromise_.setValue(AppendLogResult::E_STOPPED); - replicatingLogs_ = false; - return; + LOG(INFO) << idStr_ << "The partition is stopped"; + res = AppendLogResult::E_STOPPED; + break; } if (role_ != Role::LEADER) { - // Is not a leader any more - VLOG(2) << idStr_ << "The leader has changed"; - sendingPromise_.setValue(AppendLogResult::E_NOT_A_LEADER); - replicatingLogs_ = false; - return; + LOG(INFO) << idStr_ << "The leader has changed"; + res = AppendLogResult::E_NOT_A_LEADER; + break; } if (currTerm != term_) { LOG(INFO) << idStr_ << "The leader has changed, ABA problem."; - sendingPromise_.setValue(AppendLogResult::E_TERM_OUT_OF_DATE); - replicatingLogs_ = false; - return; + res = AppendLogResult::E_TERM_OUT_OF_DATE; + break; } lastLogId_ = lastLogId; lastLogTerm_ = currTerm; @@ -675,6 +699,11 @@ void RaftPart::processAppendLogResponses( } VLOG(2) << idStr_ << "Leader succeeded in committing the logs " << committedId + 1 << " to " << lastLogId; + } while (false); + + if (!checkAppendLogResult(res)) { + LOG(ERROR) << idStr_ << "processAppendLogResponses failed!"; + return; } // Step 4: Fulfill the promise if (iter.hasNonCASLogs()) { @@ -690,6 +719,7 @@ void RaftPart::processAppendLogResponses( iter.resume(); if (iter.empty()) { std::lock_guard lck(logsLock_); + VLOG(2) << idStr_ << "logs size " << logs_.size(); if (logs_.size() > 0) { // continue to replicate the logs sendingPromise_ = std::move(cachingPromise_); @@ -699,22 +729,23 @@ void RaftPart::processAppendLogResponses( currTerm, std::move(logs_), [this] (const std::string& log) -> std::string { - auto res = compareAndSet(log); - if (res.empty()) { + auto casRet = compareAndSet(log); + if (casRet.empty()) { // Failed sendingPromise_.setOneSingleValue( AppendLogResult::E_CAS_FAILURE); } - return res; + return casRet; }); logs_.clear(); + bufferOverFlow_ = false; } else { replicatingLogs_ = false; VLOG(2) << idStr_ << "No more log to be replicated"; } } if (!iter.empty()) { - appendLogsInternal(std::move(iter), currTerm); + this->appendLogsInternal(std::move(iter), currTerm); } } else { // Not enough hosts accepted the log, re-try @@ -832,16 +863,16 @@ bool RaftPart::leaderElection() { } // Send out the AskForVoteRequest - VLOG(2) << idStr_ << "Sending out an election request " - << "(space = " << voteReq.get_space() - << ", part = " << voteReq.get_part() - << ", term = " << voteReq.get_term() - << ", lastLogId = " << voteReq.get_last_log_id() - << ", lastLogTerm = " << voteReq.get_last_log_term() - << ", candidateIP = " - << NetworkUtils::intToIPv4(voteReq.get_candidate_ip()) - << ", candidatePort = " << voteReq.get_candidate_port() - << ")"; + LOG(INFO) << idStr_ << "Sending out an election request " + << "(space = " << voteReq.get_space() + << ", part = " << voteReq.get_part() + << ", term = " << voteReq.get_term() + << ", lastLogId = " << voteReq.get_last_log_id() + << ", lastLogTerm = " << voteReq.get_last_log_term() + << ", candidateIP = " + << NetworkUtils::intToIPv4(voteReq.get_candidate_ip()) + << ", candidatePort = " << voteReq.get_candidate_port() + << ")"; auto resps = ElectionResponses(); if (hosts.empty()) { @@ -892,7 +923,7 @@ bool RaftPart::leaderElection() { std::lock_guard g(raftLock_); if (status_ == Status::RUNNING) { leader_ = addr_; - workers_->addTask([self = shared_from_this(), + bgWorkers_->addTask([self = shared_from_this(), term = voteReq.get_term()] { self->onElected(term); }); @@ -903,13 +934,13 @@ bool RaftPart::leaderElection() { } case Role::FOLLOWER: { // Someone was elected - VLOG(2) << idStr_ << "Someone else was elected"; + LOG(INFO) << idStr_ << "Someone else was elected"; return true; } case Role::CANDIDATE: { // No one has been elected - VLOG(2) << idStr_ - << "No one is elected, continue the election"; + LOG(INFO) << idStr_ + << "No one is elected, continue the election"; return false; } case Role::LEARNER: { @@ -926,7 +957,7 @@ bool RaftPart::leaderElection() { void RaftPart::statusPolling() { size_t delay = FLAGS_heartbeat_interval * 1000 / 3; if (needToStartElection()) { - VLOG(2) << idStr_ << "Need to start leader election"; + LOG(INFO) << idStr_ << "Need to start leader election"; if (leaderElection()) { VLOG(2) << idStr_ << "Stop the election"; } else { @@ -940,11 +971,11 @@ void RaftPart::statusPolling() { sendHeartbeat(); } + PLOG_EVERY_N(INFO, 30) << idStr_ << "statusPolling"; { std::lock_guard g(raftLock_); if (status_ == Status::RUNNING) { - VLOG(2) << idStr_ << "Schedule next polling"; - workers_->addDelayTask( + bgWorkers_->addDelayTask( delay, [self = shared_from_this()] { self->statusPolling(); @@ -957,16 +988,16 @@ void RaftPart::statusPolling() { void RaftPart::processAskForVoteRequest( const cpp2::AskForVoteRequest& req, cpp2::AskForVoteResponse& resp) { - VLOG(2) << idStr_ - << "Recieved a VOTING request" - << ": space = " << req.get_space() - << ", partition = " << req.get_part() - << ", candidateAddr = " - << NetworkUtils::intToIPv4(req.get_candidate_ip()) << ":" - << req.get_candidate_port() - << ", term = " << req.get_term() - << ", lastLogId = " << req.get_last_log_id() - << ", lastLogTerm = " << req.get_last_log_term(); + LOG(INFO) << idStr_ + << "Recieved a VOTING request" + << ": space = " << req.get_space() + << ", partition = " << req.get_part() + << ", candidateAddr = " + << NetworkUtils::intToIPv4(req.get_candidate_ip()) << ":" + << req.get_candidate_port() + << ", term = " << req.get_term() + << ", lastLogId = " << req.get_last_log_id() + << ", lastLogTerm = " << req.get_last_log_term(); std::lock_guard g(raftLock_); @@ -983,24 +1014,24 @@ void RaftPart::processAskForVoteRequest( // Check term id auto term = role_ == Role::CANDIDATE ? proposedTerm_ : term_; if (req.get_term() <= term) { - VLOG(2) << idStr_ - << (role_ == Role::CANDIDATE + LOG(INFO) << idStr_ + << (role_ == Role::CANDIDATE ? "The partition is currently proposing term " : "The partition currently is on term ") - << term - << ". The term proposed by the candidate is" - " no greater, so it will be rejected"; + << term + << ". The term proposed by the candidate is" + " no greater, so it will be rejected"; resp.set_error_code(cpp2::ErrorCode::E_TERM_OUT_OF_DATE); return; } // Check the last term to receive a log if (req.get_last_log_term() < lastLogTerm_) { - VLOG(2) << idStr_ - << "The partition's last term to receive a log is " - << lastLogTerm_ - << ", which is newer than the candidate's" - ". So the candidate will be rejected"; + LOG(INFO) << idStr_ + << "The partition's last term to receive a log is " + << lastLogTerm_ + << ", which is newer than the candidate's" + ". So the candidate will be rejected"; resp.set_error_code(cpp2::ErrorCode::E_TERM_OUT_OF_DATE); return; } @@ -1008,17 +1039,17 @@ void RaftPart::processAskForVoteRequest( if (req.get_last_log_term() == lastLogTerm_) { // Check last log id if (req.get_last_log_id() < lastLogId_) { - VLOG(2) << idStr_ - << "The partition's last log id is " << lastLogId_ - << ". The candidate's last log id is smaller" - ", so it will be rejected"; + LOG(INFO) << idStr_ + << "The partition's last log id is " << lastLogId_ + << ". The candidate's last log id is smaller" + ", so it will be rejected"; resp.set_error_code(cpp2::ErrorCode::E_LOG_STALE); return; } } // Ok, no reason to refuse, we will vote for the candidate - VLOG(2) << idStr_ << "The partition will vote for the candidate"; + LOG(INFO) << idStr_ << "The partition will vote for the candidate"; resp.set_error_code(cpp2::ErrorCode::SUCCEEDED); Role oldRole = role_; @@ -1034,8 +1065,8 @@ void RaftPart::processAskForVoteRequest( // If the partition used to be a leader, need to fire the callback if (oldRole == Role::LEADER) { // Need to invoke the onLostLeadership callback - VLOG(2) << idStr_ << "Was a leader, need to do some clean-up"; - workers_->addTask( + LOG(INFO) << idStr_ << "Was a leader, need to do some clean-up"; + bgWorkers_->addTask( [self = shared_from_this(), oldTerm] { self->onLostLeadership(oldTerm); }); @@ -1055,7 +1086,7 @@ void RaftPart::processAppendLogRequest( << ": GraphSpaceId = " << req.get_space() << ", partition = " << req.get_part() << ", current_term = " << req.get_current_term() -// << ", lastLogId = " << req.get_last_log_id() + << ", lastLogId = " << req.get_last_log_id() << ", committedLogId = " << req.get_committed_log_id() << ", leaderIp = " << req.get_leader_ip() << ", leaderPort = " << req.get_leader_port() @@ -1124,20 +1155,26 @@ void RaftPart::processAppendLogRequest( // } // } - // Check the last log - CHECK_GE(req.get_last_log_id_sent(), committedLogId_) << idStr_; + if (req.get_last_log_id_sent() < committedLogId_) { + LOG(INFO) << idStr_ << "The log " << req.get_last_log_id_sent() + << " i had committed yet. My committedLogId is " + << committedLogId_; + resp.set_error_code(cpp2::ErrorCode::E_LOG_STALE); + return; + } if (lastLogTerm_ > 0 && req.get_last_log_term_sent() != lastLogTerm_) { VLOG(2) << idStr_ << "The local last log term is " << lastLogTerm_ << ", which is different from the leader's prevLogTerm " << req.get_last_log_term_sent() << ". So need to rollback to last committedLogId_ " << committedLogId_; - wal_->rollbackToLog(committedLogId_); - lastLogId_ = wal_->lastLogId(); - lastLogTerm_ = wal_->lastLogTerm(); - resp.set_last_log_id(lastLogId_); - resp.set_last_log_term(lastLogTerm_); - resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP); - return; + if (wal_->rollbackToLog(committedLogId_)) { + lastLogId_ = wal_->lastLogId(); + lastLogTerm_ = wal_->lastLogTerm(); + resp.set_last_log_id(lastLogId_); + resp.set_last_log_term(lastLogTerm_); + } + resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP); + return; } else if (req.get_last_log_id_sent() > lastLogId_) { // There is a gap VLOG(2) << idStr_ << "Local is missing logs from id " @@ -1199,7 +1236,7 @@ void RaftPart::processAppendLogRequest( if (oldRole == Role::LEADER) { // Need to invoke onLostLeadership callback VLOG(2) << idStr_ << "Was a leader, need to do some clean-up"; - workers_->addTask([self = shared_from_this(), oldTerm] { + bgWorkers_->addTask([self = shared_from_this(), oldTerm] { self->onLostLeadership(oldTerm); }); } @@ -1237,8 +1274,10 @@ cpp2::ErrorCode RaftPart::verifyLeader( // Make sure the remote term is greater than local's if (req.get_current_term() < term_) { - LOG(ERROR) << idStr_ << "The local term is " << term_ - << ". The remote term is not newer"; + PLOG_EVERY_N(ERROR, 100) << idStr_ + << "The current role is " << roleStr(role_) + << ". The local term is " << term_ + << ". The remote term is not newer"; return cpp2::ErrorCode::E_TERM_OUT_OF_DATE; } if (role_ == Role::FOLLOWER || role_ == Role::LEARNER) { @@ -1250,11 +1289,11 @@ cpp2::ErrorCode RaftPart::verifyLeader( } // Ok, no reason to refuse, just follow the leader - VLOG(2) << idStr_ << "The current role is " << roleStr(role_) - << ". Will follow the new leader " - << network::NetworkUtils::intToIPv4(req.get_leader_ip()) - << ":" << req.get_leader_port() - << " [Term: " << req.get_current_term() << "]"; + LOG(INFO) << idStr_ << "The current role is " << roleStr(role_) + << ". Will follow the new leader " + << network::NetworkUtils::intToIPv4(req.get_leader_ip()) + << ":" << req.get_leader_port() + << " [Term: " << req.get_current_term() << "]"; if (role_ != Role::LEARNER) { role_ = Role::FOLLOWER; @@ -1284,6 +1323,22 @@ std::vector> RaftPart::followers() const { return hosts; } +bool RaftPart::checkAppendLogResult(AppendLogResult res) { + if (res != AppendLogResult::SUCCEEDED) { + { + std::lock_guard lck(logsLock_); + logs_.clear(); + cachingPromise_.setValue(res); + cachingPromise_.reset(); + bufferOverFlow_ = false; + } + sendingPromise_.setValue(res); + replicatingLogs_ = false; + return false;; + } + return true; +} + } // namespace raftex } // namespace nebula diff --git a/src/kvstore/raftex/RaftPart.h b/src/kvstore/raftex/RaftPart.h index 1e47187fb21..7fea92d3a0b 100644 --- a/src/kvstore/raftex/RaftPart.h +++ b/src/kvstore/raftex/RaftPart.h @@ -175,7 +175,8 @@ class RaftPart : public std::enable_shared_from_this { const folly::StringPiece walRoot, wal::BufferFlusher* flusher, std::shared_ptr pool, - std::shared_ptr workers); + std::shared_ptr workers, + std::shared_ptr executor); const char* idStr() const { return idStr_.c_str(); @@ -185,7 +186,7 @@ class RaftPart : public std::enable_shared_from_this { // // Inherited classes should implement this method to provide the last // committed log id - virtual LogID lastCommittedLogId() = 0; + virtual std::pair lastCommittedLogId() = 0; // This method is called when this partition's leader term // is finished, either by receiving a new leader election @@ -321,6 +322,8 @@ class RaftPart : public std::enable_shared_from_this { std::vector> followers() const; + bool checkAppendLogResult(AppendLogResult res); + protected: template class PromiseSet final { @@ -411,6 +414,7 @@ class RaftPart : public std::enable_shared_from_this { // The lock is used to protect logs_ and cachingPromise_ mutable std::mutex logsLock_; std::atomic_bool replicatingLogs_{false}; + std::atomic_bool bufferOverFlow_{false}; PromiseSet cachingPromise_; LogCache logs_; @@ -454,7 +458,9 @@ class RaftPart : public std::enable_shared_from_this { // IO Thread pool std::shared_ptr ioThreadPool_; // Shared worker thread pool - std::shared_ptr workers_; + std::shared_ptr bgWorkers_; + // Workers pool + std::shared_ptr executor_; }; } // namespace raftex diff --git a/src/kvstore/raftex/RaftexService.cpp b/src/kvstore/raftex/RaftexService.cpp index ff21cae4807..fd08c22e76e 100644 --- a/src/kvstore/raftex/RaftexService.cpp +++ b/src/kvstore/raftex/RaftexService.cpp @@ -19,6 +19,7 @@ namespace raftex { ******************************************************/ std::shared_ptr RaftexService::createService( std::shared_ptr pool, + std::shared_ptr workers, uint16_t port) { auto svc = std::shared_ptr(new RaftexService()); CHECK(svc != nullptr) << "Failed to create a raft service"; @@ -27,7 +28,7 @@ std::shared_ptr RaftexService::createService( CHECK(svc->server_ != nullptr) << "Failed to create a thrift server"; svc->server_->setInterface(svc); - svc->initThriftServer(pool, port); + svc->initThriftServer(pool, workers, port); return svc; } @@ -59,15 +60,23 @@ void RaftexService::waitUntilReady() { void RaftexService::initThriftServer(std::shared_ptr pool, + std::shared_ptr workers, uint16_t port) { LOG(INFO) << "Init thrift server for raft service."; server_->setPort(port); + server_->setIdleTimeout(std::chrono::seconds(0)); if (pool != nullptr) { server_->setIOThreadPool(pool); } + if (workers != nullptr) { + server_->setThreadManager( + std::dynamic_pointer_cast< + apache::thrift::concurrency::ThreadManager>(workers)); + } } + bool RaftexService::setup() { try { server_->setup(); @@ -110,6 +119,9 @@ RaftexService::getIOThreadPool() const { return server_->getIOThreadPool(); } +std::shared_ptr RaftexService::getThreadManager() { + return server_->getThreadManager(); +} void RaftexService::stop() { if (status_.load() != STATUS_RUNNING) { diff --git a/src/kvstore/raftex/RaftexService.h b/src/kvstore/raftex/RaftexService.h index b7dd4f592fa..684ed2357c1 100644 --- a/src/kvstore/raftex/RaftexService.h +++ b/src/kvstore/raftex/RaftexService.h @@ -24,6 +24,7 @@ class RaftexService : public cpp2::RaftexServiceSvIf { public: static std::shared_ptr createService( std::shared_ptr pool, + std::shared_ptr workers, uint16_t port = 0); virtual ~RaftexService(); @@ -33,6 +34,8 @@ class RaftexService : public cpp2::RaftexServiceSvIf { std::shared_ptr getIOThreadPool() const; + std::shared_ptr getThreadManager(); + bool start(); void stop(); void waitUntilStop(); @@ -47,7 +50,9 @@ class RaftexService : public cpp2::RaftexServiceSvIf { void removePartition(std::shared_ptr part); private: - void initThriftServer(std::shared_ptr pool, uint16_t port = 0); + void initThriftServer(std::shared_ptr pool, + std::shared_ptr workers, + uint16_t port = 0); bool setup(); void serve(); diff --git a/src/kvstore/raftex/test/LeaderElectionTest.cpp b/src/kvstore/raftex/test/LeaderElectionTest.cpp index 26bdc093c09..fca3ce44cf1 100644 --- a/src/kvstore/raftex/test/LeaderElectionTest.cpp +++ b/src/kvstore/raftex/test/LeaderElectionTest.cpp @@ -98,6 +98,7 @@ TEST(LeaderElection, LeaderCrash) { flusher.get(), services[idx]->getIOThreadPool(), workers, + services[idx]->getThreadManager(), std::bind(&onLeadershipLost, std::ref(copies), std::ref(leader), diff --git a/src/kvstore/raftex/test/LogAppendTest.cpp b/src/kvstore/raftex/test/LogAppendTest.cpp index d180cd28fd0..a6bf5a06619 100644 --- a/src/kvstore/raftex/test/LogAppendTest.cpp +++ b/src/kvstore/raftex/test/LogAppendTest.cpp @@ -17,7 +17,7 @@ #include "kvstore/raftex/test/TestShard.h" DECLARE_uint32(heartbeat_interval); - +DECLARE_uint32(max_batch_size); namespace nebula { namespace raftex { @@ -84,6 +84,7 @@ TEST(LogAppend, MultiThreadAppend) { LOG(INFO) << "=====> Start multi-thread appending logs"; const int numThreads = 4; const int numLogs = 100; + FLAGS_max_batch_size = numThreads * numLogs + 1; std::vector threads; for (int i = 0; i < numThreads; ++i) { threads.emplace_back(std::thread([i, numLogs, leader] { @@ -93,9 +94,7 @@ TEST(LogAppend, MultiThreadAppend) { 0, folly::stringPrintf("Log %03d for t%d", j, i)); if (fut.isReady() && fut.value() == AppendLogResult::E_BUFFER_OVERFLOW) { - // Buffer overflow, while a little - usleep(5000); - continue; + LOG(FATAL) << "Should not reach here"; } else if (j == numLogs) { // Only wait on the last log messaage ASSERT_EQ(AppendLogResult::SUCCEEDED, std::move(fut).get()); diff --git a/src/kvstore/raftex/test/RaftexTestBase.cpp b/src/kvstore/raftex/test/RaftexTestBase.cpp index 5aa451a08b7..099517003a6 100644 --- a/src/kvstore/raftex/test/RaftexTestBase.cpp +++ b/src/kvstore/raftex/test/RaftexTestBase.cpp @@ -173,7 +173,7 @@ void setupRaft( // Set up services for (int i = 0; i < numCopies; ++i) { - services.emplace_back(RaftexService::createService(nullptr)); + services.emplace_back(RaftexService::createService(nullptr, nullptr)); if (!services.back()->start()) return; uint16_t port = services.back()->getServerPort(); @@ -194,6 +194,7 @@ void setupRaft( flusher.get(), services[i]->getIOThreadPool(), workers, + services[i]->getThreadManager(), std::bind(&onLeadershipLost, std::ref(copies), std::ref(leader), diff --git a/src/kvstore/raftex/test/TestShard.cpp b/src/kvstore/raftex/test/TestShard.cpp index 2d963c7c92c..dcd40c31efc 100644 --- a/src/kvstore/raftex/test/TestShard.cpp +++ b/src/kvstore/raftex/test/TestShard.cpp @@ -38,6 +38,7 @@ TestShard::TestShard(size_t idx, wal::BufferFlusher* flusher, std::shared_ptr ioPool, std::shared_ptr workers, + std::shared_ptr handlersPool, std::function leadershipLostCB, std::function @@ -49,7 +50,8 @@ TestShard::TestShard(size_t idx, walRoot, flusher, ioPool, - workers) + workers, + handlersPool) , idx_(idx) , service_(svc) , leadershipLostCB_(leadershipLostCB) diff --git a/src/kvstore/raftex/test/TestShard.h b/src/kvstore/raftex/test/TestShard.h index a95cc2b21e6..da0305d955a 100644 --- a/src/kvstore/raftex/test/TestShard.h +++ b/src/kvstore/raftex/test/TestShard.h @@ -36,13 +36,14 @@ class TestShard : public RaftPart { wal::BufferFlusher* flusher, std::shared_ptr ioPool, std::shared_ptr workers, + std::shared_ptr handlersPool, std::function leadershipLostCB, std::function becomeLeaderCB); - LogID lastCommittedLogId() override { - return lastCommittedLogId_; + std::pair lastCommittedLogId() override { + return std::make_pair(committedLogId_, term_); } std::shared_ptr getService() const { diff --git a/src/kvstore/test/NebulaStoreTest.cpp b/src/kvstore/test/NebulaStoreTest.cpp index 29efeb40ab1..e134cdefc9c 100644 --- a/src/kvstore/test/NebulaStoreTest.cpp +++ b/src/kvstore/test/NebulaStoreTest.cpp @@ -13,6 +13,7 @@ #include "kvstore/PartManager.h" #include "kvstore/RocksEngine.h" #include "network/NetworkUtils.h" +#include DECLARE_uint32(heartbeat_interval); @@ -28,6 +29,15 @@ void dump(const std::vector& v) { VLOG(1) << ss.str(); } +std::shared_ptr +getHandlers() { + auto handlersPool + = apache::thrift::concurrency::PriorityThreadManager::newPriorityThreadManager( + 1, true /*stats*/); + handlersPool->setNamePrefix("executor"); + handlersPool->start(); + return handlersPool; +} TEST(NebulaStoreTest, SimpleTest) { auto partMan = std::make_unique(); @@ -56,7 +66,8 @@ TEST(NebulaStoreTest, SimpleTest) { HostAddr local = {0, 0}; auto store = std::make_unique(std::move(options), ioThreadPool, - local); + local, + getHandlers()); store->init(); sleep(1); EXPECT_EQ(2, store->spaces_.size()); @@ -156,7 +167,8 @@ TEST(NebulaStoreTest, PartsTest) { HostAddr local = {0, 0}; auto store = std::make_unique(std::move(options), ioThreadPool, - local); + local, + getHandlers()); store->init(); auto check = [&](GraphSpaceID spaceId) { for (auto i = 0; i < 2; i++) { @@ -259,7 +271,8 @@ TEST(NebulaStoreTest, ThreeCopiesTest) { HostAddr local = peers[index]; return std::make_unique(std::move(options), sIoThreadPool, - local); + local, + getHandlers()); }; int32_t replicas = 3; IPv4 ip; @@ -380,6 +393,5 @@ int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); folly::init(&argc, &argv, true); google::SetStderrLogging(google::INFO); - return RUN_ALL_TESTS(); } diff --git a/src/kvstore/wal/FileBasedWal.cpp b/src/kvstore/wal/FileBasedWal.cpp index 24bde72e0ae..e80a8ebe76f 100644 --- a/src/kvstore/wal/FileBasedWal.cpp +++ b/src/kvstore/wal/FileBasedWal.cpp @@ -49,6 +49,7 @@ FileBasedWal::FileBasedWal(const folly::StringPiece dir, scanAllWalFiles(); if (!walFiles_.empty()) { auto& info = walFiles_.rbegin()->second; + firstLogId_ = walFiles_.begin()->second->firstId(); lastLogId_ = info->lastId(); lastLogTerm_ = info->lastTerm(); currFd_ = open(info->path(), O_WRONLY | O_APPEND); @@ -427,7 +428,6 @@ BufferPtr FileBasedWal::createNewBuffer( return buffers_.back(); } - bool FileBasedWal::appendLogInternal(BufferPtr& buffer, LogID id, TermID term, @@ -438,12 +438,13 @@ bool FileBasedWal::appendLogInternal(BufferPtr& buffer, return false; } - if (id != lastLogId_ + 1) { + if (lastLogId_ != 0 && firstLogId_ != 0 && id != lastLogId_ + 1) { LOG(ERROR) << "There is a gap in the log id. The last log id is " << lastLogId_ << ", and the id being appended is " << id; return false; } + if (!preProcessor_(id, term, cluster, msg)) { LOG(ERROR) << "Pre process failed for log " << id; return false; @@ -474,11 +475,13 @@ bool FileBasedWal::appendLogInternal(BufferPtr& buffer, buffer->push(term, cluster, std::move(msg)); lastLogId_ = id; lastLogTerm_ = term; + if (firstLogId_ == 0) { + firstLogId_ = id; + } return true; } - bool FileBasedWal::appendLog(LogID id, TermID term, ClusterID cluster, @@ -495,11 +498,9 @@ bool FileBasedWal::appendLog(LogID id, LOG(ERROR) << "Failed to append log for logId " << id; return false; } - return true; } - bool FileBasedWal::appendLogs(LogIterator& iter) { BufferPtr buffer; { @@ -537,8 +538,7 @@ std::unique_ptr FileBasedWal::iterator(LogID firstLogId, bool FileBasedWal::rollbackToLog(LogID id) { std::lock_guard flushGuard(flushMutex_); bool foundTarget{false}; - - if (id < firstLogId_ || id > lastLogId_) { + if (id < firstLogId_ - 1 || id > lastLogId_) { LOG(ERROR) << "Rollback target id " << id << " is not in the range of [" << firstLogId_ << "," @@ -602,9 +602,6 @@ bool FileBasedWal::rollbackToLog(LogID id) { } auto it = walFiles_.upper_bound(id); - // TODO: If we roll back to a log in last wal, and crash before any new log is appended. - // The lastLogId in wal would be wrong when reboot, because we don't actually delete the - // outdated log in a wal. // We need to remove wal files whose entire log range // are rolled back @@ -673,6 +670,27 @@ bool FileBasedWal::rollbackToLog(LogID id) { return true; } +bool FileBasedWal::reset() { + std::lock_guard flushGuard(flushMutex_); + closeCurrFile(); + { + std::lock_guard g(buffersMutex_); + buffers_.clear(); + } + { + std::lock_guard g(walFilesMutex_); + walFiles_.clear(); + } + std::vector files = + FileUtils::listAllFilesInDir(dir_.c_str(), false, "*.wal"); + for (auto& fn : files) { + auto absFn = FileUtils::joinPath(dir_, fn); + LOG(INFO) << "Removing " << absFn; + unlink(absFn.c_str()); + } + lastLogId_ = firstLogId_ = 0; + return true; +} size_t FileBasedWal::accessAllWalInfo(std::function fn) const { std::lock_guard g(walFilesMutex_); diff --git a/src/kvstore/wal/FileBasedWal.h b/src/kvstore/wal/FileBasedWal.h index 9408ee90262..ef03dd2074b 100644 --- a/src/kvstore/wal/FileBasedWal.h +++ b/src/kvstore/wal/FileBasedWal.h @@ -95,6 +95,8 @@ class FileBasedWal final // appending logs bool rollbackToLog(LogID id) override; + bool reset() override; + // Scan [firstLogId, lastLogId] // This method IS thread-safe std::unique_ptr iterator(LogID firstLogId, @@ -173,7 +175,7 @@ class FileBasedWal final const FileBasedWalPolicy policy_; const size_t maxFileSize_; const size_t maxBufferSize_; - const LogID firstLogId_{0}; + LogID firstLogId_{0}; LogID lastLogId_{0}; TermID lastLogTerm_{0}; diff --git a/src/kvstore/wal/FileBasedWalIterator.cpp b/src/kvstore/wal/FileBasedWalIterator.cpp index 0085260737c..7ebbcc67b88 100644 --- a/src/kvstore/wal/FileBasedWalIterator.cpp +++ b/src/kvstore/wal/FileBasedWalIterator.cpp @@ -36,6 +36,10 @@ FileBasedWalIterator::FileBasedWalIterator( } else { // Pick all buffers that match the range [currId_, lastId_] wal_->accessAllBuffers([this] (BufferPtr buffer) { + if (buffer->empty()) { + // Skip th empty one. + return true; + } if (lastId_ >= buffer->firstLogId()) { buffers_.push_front(buffer); firstIdInBuffer_ = buffer->firstLogId(); diff --git a/src/kvstore/wal/Wal.h b/src/kvstore/wal/Wal.h index 91a416f501e..652a1971501 100644 --- a/src/kvstore/wal/Wal.h +++ b/src/kvstore/wal/Wal.h @@ -41,6 +41,9 @@ class Wal { // Rollback to the given id, all logs after the id will be discarded virtual bool rollbackToLog(LogID id) = 0; + // Clean all wal files + virtual bool reset() = 0; + // Scan [firstLogId, lastLogId] virtual std::unique_ptr iterator(LogID firstLogId, LogID lastLogId) = 0; diff --git a/src/meta/test/TestUtils.h b/src/meta/test/TestUtils.h index ad97779261d..9187d8d1399 100644 --- a/src/meta/test/TestUtils.h +++ b/src/meta/test/TestUtils.h @@ -21,6 +21,7 @@ #include "interface/gen-cpp2/common_types.h" #include "time/WallClock.h" #include "meta/ActiveHostsMan.h" +#include DECLARE_string(part_man_type); @@ -34,6 +35,10 @@ class TestUtils { static std::unique_ptr initKV(const char* rootPath) { auto ioPool = std::make_shared(4); auto partMan = std::make_unique(); + auto workers = apache::thrift::concurrency::PriorityThreadManager::newPriorityThreadManager( + 1, true /*stats*/); + workers->setNamePrefix("executor"); + workers->start(); // GraphSpaceID => {PartitionIDs} // 0 => {0} @@ -50,7 +55,8 @@ class TestUtils { auto store = std::make_unique(std::move(options), ioPool, - localhost); + localhost, + workers); store->init(); sleep(1); return std::move(store); diff --git a/src/storage/client/StorageClient.h b/src/storage/client/StorageClient.h index dfd824507cd..75385d0806e 100644 --- a/src/storage/client/StorageClient.h +++ b/src/storage/client/StorageClient.h @@ -144,6 +144,7 @@ class StorageClient { } void updateLeader(GraphSpaceID spaceId, PartitionID partId, const HostAddr& leader) { + LOG(INFO) << "Update leader for " << spaceId << ", " << partId << " to " << leader; folly::RWSpinLock::WriteHolder wh(leadersLock_); leaders_[std::make_pair(spaceId, partId)] = leader; } diff --git a/src/storage/client/StorageClient.inl b/src/storage/client/StorageClient.inl index 227b2ba7087..ad499875fb7 100644 --- a/src/storage/client/StorageClient.inl +++ b/src/storage/client/StorageClient.inl @@ -87,12 +87,13 @@ folly::SemiFuture> StorageClient::collectResponse( for (auto& req : requests) { auto& host = req.first; auto spaceId = req.second.get_space_id(); - auto client = clientsMan_->client(host, evb); - // Result is a pair of auto res = context->insertRequest(host, std::move(req.second)); DCHECK(res.second); // Invoke the remote method - context->serverMethod(client.get(), *res.first) + folly::via(evb, [this, evb, context, host, spaceId, res] () mutable { + auto client = clientsMan_->client(host, evb); + // Result is a pair of + context->serverMethod(client.get(), *res.first) // Future process code will be executed on the IO thread // Since all requests are sent using the same eventbase, all then-callback // will be executed on the same IO thread @@ -147,7 +148,8 @@ folly::SemiFuture> StorageClient::collectResponse( context->promise.setValue(std::move(context->resp)); } }); - } + }); // via + } // for if (context->finishSending()) { // Received all responses, most likely, all rpc failed context->promise.setValue(std::move(context->resp)); diff --git a/src/storage/test/StorageClientTest.cpp b/src/storage/test/StorageClientTest.cpp index 935a33b67d5..699fa8fe9d7 100644 --- a/src/storage/test/StorageClientTest.cpp +++ b/src/storage/test/StorageClientTest.cpp @@ -276,6 +276,8 @@ TEST(StorageClientTest, VerticesInterfacesTest) { LOG(INFO) << "Stop data server..."; sc.reset(); LOG(INFO) << "Stop data client..."; + threadPool->stop(); + threadPool->join(); client.reset(); LOG(INFO) << "Stop meta server..."; metaServerContext.reset(); diff --git a/src/storage/test/TestUtils.h b/src/storage/test/TestUtils.h index d8da23a5cad..6c0ef6554d8 100644 --- a/src/storage/test/TestUtils.h +++ b/src/storage/test/TestUtils.h @@ -21,6 +21,7 @@ #include "meta/SchemaManager.h" #include #include +#include namespace nebula { @@ -35,6 +36,11 @@ class TestUtils { bool useMetaServer = false, std::shared_ptr cfFactory = nullptr) { auto ioPool = std::make_shared(4); + auto workers = apache::thrift::concurrency::PriorityThreadManager::newPriorityThreadManager( + 1, true /*stats*/); + workers->setNamePrefix("executor"); + workers->start(); + kvstore::KVOptions options; if (useMetaServer) { @@ -62,7 +68,8 @@ class TestUtils { options.cfFactory_ = std::move(cfFactory); auto store = std::make_unique(std::move(options), ioPool, - localhost); + localhost, + workers); store->init(); sleep(1); return store;