Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Commit

Permalink
Meta handles leader changes and other errorcodes uniformly
Browse files Browse the repository at this point in the history
  • Loading branch information
panda-sheep committed Apr 9, 2021
1 parent 774dc84 commit 0987a28
Show file tree
Hide file tree
Showing 103 changed files with 2,442 additions and 1,559 deletions.
4 changes: 0 additions & 4 deletions src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,8 @@ DEFINE_string(meta_server_addrs,
DEFINE_int32(num_io_threads, 16, "Number of IO threads");
DEFINE_int32(meta_http_thread_num, 3, "Number of meta daemon's http thread");
DEFINE_int32(num_worker_threads, 32, "Number of workers");

DEFINE_string(pid_file, "pids/nebula-metad.pid", "File to hold the process id");
DEFINE_bool(daemonize, true, "Whether run as a daemon process");
DECLARE_bool(check_leader);

static std::unique_ptr<apache::thrift::ThriftServer> gServer;
static std::unique_ptr<nebula::kvstore::KVStore> gKVStore;
Expand Down Expand Up @@ -81,8 +79,6 @@ std::unique_ptr<nebula::kvstore::KVStore> initKV(std::vector<nebula::HostAddr> p
FLAGS_num_worker_threads, true /*stats*/));
threadManager->setNamePrefix("executor");
threadManager->start();
// On metad, we are allowed to read on follower
FLAGS_check_leader = false;
nebula::kvstore::KVOptions options;
options.dataPaths_ = {FLAGS_data_path};
options.partMan_ = std::move(partMan);
Expand Down
3 changes: 1 addition & 2 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ DEFINE_string(engine_type, "rocksdb", "rocksdb, memory...");
DEFINE_int32(custom_filter_interval_secs, 24 * 3600,
"interval to trigger custom compaction, < 0 means always do default minor compaction");
DEFINE_int32(num_workers, 4, "Number of worker threads");
DEFINE_bool(check_leader, true, "Check leader or not");
DEFINE_int32(clean_wal_interval_secs, 600, "inerval to trigger clean expired wal");
DEFINE_bool(auto_remove_invalid_space, false, "whether remove data of invalid space when restart");

Expand Down Expand Up @@ -1027,7 +1026,7 @@ int32_t NebulaStore::allLeader(std::unordered_map<GraphSpaceID,
}

bool NebulaStore::checkLeader(std::shared_ptr<Part> part, bool canReadFromFollower) const {
return !FLAGS_check_leader || canReadFromFollower || (part->isLeader() && part->leaseValid());
return canReadFromFollower || (part->isLeader() && part->leaseValid());
}

void NebulaStore::cleanWAL() {
Expand Down
118 changes: 75 additions & 43 deletions src/meta/ActiveHostsMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "meta/ActiveHostsMan.h"
#include "meta/processors/Common.h"
#include "meta/common/MetaCommon.h"
#include "utils/Utils.h"

DECLARE_int32(heartbeat_interval_secs);
Expand All @@ -14,10 +15,10 @@ DECLARE_uint32(expired_time_factor);
namespace nebula {
namespace meta {

kvstore::ResultCode ActiveHostsMan::updateHostInfo(kvstore::KVStore* kv,
const HostAddr& hostAddr,
const HostInfo& info,
const LeaderParts* leaderParts) {
cpp2::ErrorCode ActiveHostsMan::updateHostInfo(kvstore::KVStore* kv,
const HostAddr& hostAddr,
const HostInfo& info,
const LeaderParts* leaderParts) {
CHECK_NOTNULL(kv);
std::vector<kvstore::KV> data;
data.emplace_back(MetaServiceUtils::hostKey(hostAddr.host, hostAddr.port),
Expand All @@ -35,20 +36,22 @@ kvstore::ResultCode ActiveHostsMan::updateHostInfo(kvstore::KVStore* kv,
baton.post();
});
baton.wait();
return ret;
return MetaCommon::to(ret);
}

std::vector<HostAddr> ActiveHostsMan::getActiveHosts(kvstore::KVStore* kv,
int32_t expiredTTL,
cpp2::HostRole role) {
std::vector<HostAddr> hosts;
ErrorOr<cpp2::ErrorCode, std::vector<HostAddr>>
ActiveHostsMan::getActiveHosts(kvstore::KVStore* kv, int32_t expiredTTL, cpp2::HostRole role) {
const auto& prefix = MetaServiceUtils::hostPrefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
if (ret != kvstore::ResultCode::SUCCEEDED) {
FLOG_ERROR("getActiveHosts failed(%d)", static_cast<int>(ret));
return hosts;
auto retCode = MetaCommon::to(ret);
LOG(ERROR) << "Failed to getActiveHosts, error "
<< static_cast<int32_t>(retCode);
return retCode;
}

std::vector<HostAddr> hosts;
int64_t threshold = (expiredTTL == 0 ?
FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor :
expiredTTL) * 1000;
Expand All @@ -67,16 +70,19 @@ std::vector<HostAddr> ActiveHostsMan::getActiveHosts(kvstore::KVStore* kv,
return hosts;
}

std::vector<HostAddr> ActiveHostsMan::getActiveHostsInZone(kvstore::KVStore* kv,
const std::string& zoneName,
int32_t expiredTTL) {
ErrorOr<cpp2::ErrorCode, std::vector<HostAddr>>
ActiveHostsMan::getActiveHostsInZone(kvstore::KVStore* kv,
const std::string& zoneName,
int32_t expiredTTL) {
std::vector<HostAddr> activeHosts;
std::string zoneValue;
auto zoneKey = MetaServiceUtils::zoneKey(zoneName);
auto ret = kv->get(kDefaultSpaceId, kDefaultPartId, zoneKey, &zoneValue);
if (ret != kvstore::ResultCode::SUCCEEDED) {
LOG(ERROR) << "Get zone " << zoneName << " failed";
return activeHosts;
auto retCode = MetaCommon::to(ret);
LOG(ERROR) << "Get zone " << zoneName << " failed, error: "
<< static_cast<int32_t>(retCode);
return retCode;
}

auto hosts = MetaServiceUtils::parseZoneHosts(std::move(zoneValue));
Expand All @@ -86,76 +92,98 @@ std::vector<HostAddr> ActiveHostsMan::getActiveHostsInZone(kvstore::KVStore* kv,
expiredTTL) * 1000;
for (auto& host : hosts) {
auto infoRet = getHostInfo(kv, host);
if (!infoRet.ok()) {
activeHosts.clear();
return activeHosts;
if (!nebula::ok(infoRet)) {
return nebula::error(infoRet);
}

auto info = infoRet.value();
auto info = nebula::value(infoRet);
if (now - info.lastHBTimeInMilliSec_ < threshold) {
activeHosts.emplace_back(host.host, host.port);
}
}
return activeHosts;
}

std::vector<HostAddr> ActiveHostsMan::getActiveHostsWithGroup(kvstore::KVStore* kv,
GraphSpaceID spaceId,
int32_t expiredTTL) {
ErrorOr<cpp2::ErrorCode, std::vector<HostAddr>>
ActiveHostsMan::getActiveHostsWithGroup(kvstore::KVStore* kv,
GraphSpaceID spaceId,
int32_t expiredTTL) {
std::string spaceValue;
std::vector<HostAddr> activeHosts;
auto spaceKey = MetaServiceUtils::spaceKey(spaceId);
auto ret = kv->get(kDefaultSpaceId, kDefaultPartId, spaceKey, &spaceValue);
if (ret != kvstore::ResultCode::SUCCEEDED) {
LOG(ERROR) << "Space " << spaceId << " not exist";
return activeHosts;
auto retCode = MetaCommon::to(ret);
LOG(ERROR) << "Get space failed, error: "
<< static_cast<int32_t>(retCode);
return retCode;
}

std::string groupValue;
auto space = MetaServiceUtils::parseSpace(std::move(spaceValue));
auto groupKey = MetaServiceUtils::groupKey(*space.group_name_ref());
ret = kv->get(kDefaultSpaceId, kDefaultPartId, groupKey, &groupValue);
if (ret != kvstore::ResultCode::SUCCEEDED) {
LOG(ERROR) << "Get group " << *space.group_name_ref() << " failed";
return activeHosts;
auto retCode = MetaCommon::to(ret);
LOG(ERROR) << "Get group " << *space.group_name_ref() << " failed, error: "
<< static_cast<int32_t>(retCode);
return retCode;
}

auto zoneNames = MetaServiceUtils::parseZoneNames(std::move(groupValue));
for (const auto& zoneName : zoneNames) {
auto hosts = getActiveHostsInZone(kv, zoneName, expiredTTL);
auto hostsRet = getActiveHostsInZone(kv, zoneName, expiredTTL);
if (!nebula::ok(hostsRet)) {
return nebula::error(hostsRet);
}
auto hosts = nebula::value(hostsRet);
activeHosts.insert(activeHosts.end(), hosts.begin(), hosts.end());
}
return activeHosts;
}

std::vector<HostAddr> ActiveHostsMan::getActiveAdminHosts(kvstore::KVStore* kv,
int32_t expiredTTL,
cpp2::HostRole role) {
auto hosts = getActiveHosts(kv, expiredTTL, role);
ErrorOr<cpp2::ErrorCode, std::vector<HostAddr>>
ActiveHostsMan::getActiveAdminHosts(kvstore::KVStore* kv,
int32_t expiredTTL,
cpp2::HostRole role) {
auto hostsRet = getActiveHosts(kv, expiredTTL, role);
if (!nebula::ok(hostsRet)) {
return nebula::error(hostsRet);
}
auto hosts = nebula::value(hostsRet);

std::vector<HostAddr> adminHosts(hosts.size());
std::transform(hosts.begin(), hosts.end(), adminHosts.begin(), [](const auto& h) {
return Utils::getAdminAddrFromStoreAddr(h);
});
return adminHosts;
}

bool ActiveHostsMan::isLived(kvstore::KVStore* kv, const HostAddr& host) {
auto activeHosts = getActiveHosts(kv);
ErrorOr<cpp2::ErrorCode, bool> ActiveHostsMan::isLived(kvstore::KVStore* kv, const HostAddr& host) {
auto activeHostsRet = getActiveHosts(kv);
if (!nebula::ok(activeHostsRet)) {
return nebula::error(activeHostsRet);
}
auto activeHosts = nebula::value(activeHostsRet);

return std::find(activeHosts.begin(), activeHosts.end(), host) != activeHosts.end();
}

StatusOr<HostInfo> ActiveHostsMan::getHostInfo(kvstore::KVStore* kv, const HostAddr& host) {
ErrorOr<cpp2::ErrorCode, HostInfo>
ActiveHostsMan::getHostInfo(kvstore::KVStore* kv, const HostAddr& host) {
auto hostKey = MetaServiceUtils::hostKey(host.host, host.port);
std::string hostValue;
auto ret = kv->get(kDefaultSpaceId, kDefaultPartId, hostKey, &hostValue);
if (ret != kvstore::ResultCode::SUCCEEDED) {
LOG(ERROR) << "Get host info " << host << " failed";
return Status::Error("Get host info failed");
auto retCode = MetaCommon::to(ret);
LOG(ERROR) << "Get host info " << host << " failed, error: "
<< static_cast<int32_t>(retCode);
return retCode;
}
return HostInfo::decode(hostValue);
}

kvstore::ResultCode LastUpdateTimeMan::update(kvstore::KVStore* kv, const int64_t timeInMilliSec) {
cpp2::ErrorCode LastUpdateTimeMan::update(kvstore::KVStore* kv, const int64_t timeInMilliSec) {
CHECK_NOTNULL(kv);
std::vector<kvstore::KV> data;
data.emplace_back(MetaServiceUtils::lastUpdateTimeKey(),
Expand All @@ -170,18 +198,22 @@ kvstore::ResultCode LastUpdateTimeMan::update(kvstore::KVStore* kv, const int64_
baton.post();
});
baton.wait();
return kv->sync(kDefaultSpaceId, kDefaultPartId);
ret = kv->sync(kDefaultSpaceId, kDefaultPartId);
return MetaCommon::to(ret);
}

int64_t LastUpdateTimeMan::get(kvstore::KVStore* kv) {
ErrorOr<cpp2::ErrorCode, int64_t> LastUpdateTimeMan::get(kvstore::KVStore* kv) {
CHECK_NOTNULL(kv);
auto key = MetaServiceUtils::lastUpdateTimeKey();
std::string val;
auto ret = kv->get(kDefaultSpaceId, kDefaultPartId, key, &val);
if (ret == kvstore::ResultCode::SUCCEEDED) {
return *reinterpret_cast<const int64_t*>(val.data());
if (ret != kvstore::ResultCode::SUCCEEDED) {
auto retCode = MetaCommon::to(ret);
LOG(ERROR) << "Get last update time failed, error: "
<< static_cast<int32_t>(retCode);
return retCode;
}
return 0;
return *reinterpret_cast<const int64_t*>(val.data());
}

} // namespace meta
Expand Down
46 changes: 26 additions & 20 deletions src/meta/ActiveHostsMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,42 +108,48 @@ class ActiveHostsMan final {
public:
~ActiveHostsMan() = default;

static kvstore::ResultCode updateHostInfo(kvstore::KVStore* kv,
const HostAddr& hostAddr,
const HostInfo& info,
const LeaderParts* leaderParts = nullptr);
static cpp2::ErrorCode updateHostInfo(kvstore::KVStore* kv,
const HostAddr& hostAddr,
const HostInfo& info,
const LeaderParts* leaderParts = nullptr);

static std::vector<HostAddr> getActiveHosts(kvstore::KVStore* kv,
int32_t expiredTTL = 0,
cpp2::HostRole role = cpp2::HostRole::STORAGE);
static ErrorOr<cpp2::ErrorCode, std::vector<HostAddr>>
getActiveHosts(kvstore::KVStore* kv,
int32_t expiredTTL = 0,
cpp2::HostRole role = cpp2::HostRole::STORAGE);

static std::vector<HostAddr> getActiveHostsInZone(kvstore::KVStore* kv,
const std::string& zoneName,
int32_t expiredTTL = 0);
static ErrorOr<cpp2::ErrorCode, std::vector<HostAddr>>
getActiveHostsInZone(kvstore::KVStore* kv,
const std::string& zoneName,
int32_t expiredTTL = 0);

static std::vector<HostAddr> getActiveHostsWithGroup(kvstore::KVStore* kv,
GraphSpaceID spaceId,
int32_t expiredTTL = 0);
static ErrorOr<cpp2::ErrorCode, std::vector<HostAddr>>
getActiveHostsWithGroup(kvstore::KVStore* kv,
GraphSpaceID spaceId,
int32_t expiredTTL = 0);

static std::vector<HostAddr> getActiveAdminHosts(kvstore::KVStore* kv,
int32_t expiredTTL = 0,
cpp2::HostRole role = cpp2::HostRole::STORAGE);
static ErrorOr<cpp2::ErrorCode, std::vector<HostAddr>>
getActiveAdminHosts(kvstore::KVStore* kv,
int32_t expiredTTL = 0,
cpp2::HostRole role = cpp2::HostRole::STORAGE);

static bool isLived(kvstore::KVStore* kv, const HostAddr& host);
static ErrorOr<cpp2::ErrorCode, bool> isLived(kvstore::KVStore* kv, const HostAddr& host);

static StatusOr<HostInfo> getHostInfo(kvstore::KVStore* kv, const HostAddr& host);
static ErrorOr<cpp2::ErrorCode, HostInfo>
getHostInfo(kvstore::KVStore* kv, const HostAddr& host);

protected:
ActiveHostsMan() = default;
};


class LastUpdateTimeMan final {
public:
~LastUpdateTimeMan() = default;

static kvstore::ResultCode update(kvstore::KVStore* kv, const int64_t timeInMilliSec);
static cpp2::ErrorCode update(kvstore::KVStore* kv, const int64_t timeInMilliSec);

static int64_t get(kvstore::KVStore* kv);
static ErrorOr<cpp2::ErrorCode, int64_t> get(kvstore::KVStore* kv);

protected:
LastUpdateTimeMan() = default;
Expand Down
Loading

0 comments on commit 0987a28

Please sign in to comment.