Skip to content

Commit

Permalink
MetaClient always send to leader of meta server (vesoft-inc#2403)
Browse files Browse the repository at this point in the history
* always send to meta leader

* remove check leader flag

* update
  • Loading branch information
critical27 authored Dec 7, 2020
1 parent 53f56b6 commit 674a75f
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 37 deletions.
3 changes: 0 additions & 3 deletions src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ DEFINE_int32(num_worker_threads, 32, "Number of workers");

DEFINE_string(pid_file, "pids/nebula-metad.pid", "File to hold the process id");
DEFINE_bool(daemonize, true, "Whether run as a daemon process");
DECLARE_bool(check_leader);

static std::unique_ptr<apache::thrift::ThriftServer> gServer;
static std::unique_ptr<nebula::kvstore::KVStore> gKVStore;
Expand Down Expand Up @@ -75,8 +74,6 @@ std::unique_ptr<nebula::kvstore::KVStore> initKV(std::vector<nebula::HostAddr> p
FLAGS_num_worker_threads, true /*stats*/));
threadManager->setNamePrefix("executor");
threadManager->start();
// On metad, we are allowed to read on follower
FLAGS_check_leader = false;
nebula::kvstore::KVOptions options;
options.dataPaths_ = {FLAGS_data_path};
options.partMan_ = std::move(partMan);
Expand Down
3 changes: 1 addition & 2 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ DEFINE_string(engine_type, "rocksdb", "rocksdb, memory...");
DEFINE_int32(custom_filter_interval_secs, 24 * 3600,
"interval to trigger custom compaction, < 0 means always do default minor compaction");
DEFINE_int32(num_workers, 4, "Number of worker threads");
DEFINE_bool(check_leader, true, "Check leader or not");
DEFINE_int32(clean_wal_interval_secs, 600, "inerval to trigger clean expired wal");
DEFINE_bool(auto_remove_invalid_space, false, "whether remove data of invalid space when restart");

Expand Down Expand Up @@ -875,7 +874,7 @@ int32_t NebulaStore::allLeader(std::unordered_map<GraphSpaceID,
}

bool NebulaStore::checkLeader(std::shared_ptr<Part> part) const {
return !FLAGS_check_leader || (part->isLeader() && part->leaseValid());
return part->isLeader() && part->leaseValid();
}

void NebulaStore::cleanWAL() {
Expand Down
62 changes: 31 additions & 31 deletions src/meta/client/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ folly::Future<StatusOr<GraphSpaceID>> MetaClient::createSpace(SpaceDesc spaceDes
return client->future_createSpace(request);
}, [] (cpp2::ExecResp&& resp) -> GraphSpaceID {
return resp.get_id().get_space_id();
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand All @@ -764,7 +764,7 @@ MetaClient::submitJob(cpp2::AdminJobOp op, std::vector<std::string> paras) {
return client->future_runAdminJob(request);
}, [] (cpp2::AdminJobResp&& resp) -> decltype(auto) {
return resp.get_result();
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand Down Expand Up @@ -792,7 +792,7 @@ folly::Future<StatusOr<bool>> MetaClient::dropSpace(std::string name, const bool
return client->future_dropSpace(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand Down Expand Up @@ -969,7 +969,7 @@ MetaClient::multiPut(std::string segment,
return client->future_multiPut(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand Down Expand Up @@ -1050,7 +1050,7 @@ MetaClient::remove(std::string segment, std::string key) {
return client->future_remove(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand All @@ -1071,7 +1071,7 @@ MetaClient::removeRange(std::string segment, std::string start, std::string end)
return client->future_removeRange(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand Down Expand Up @@ -1166,7 +1166,7 @@ folly::Future<StatusOr<TagID>> MetaClient::createTagSchema(GraphSpaceID spaceId,
return client->future_createTag(request);
}, [] (cpp2::ExecResp&& resp) -> TagID {
return resp.get_id().get_tag_id();
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand All @@ -1186,7 +1186,7 @@ MetaClient::alterTagSchema(GraphSpaceID spaceId,
return client->future_alterTag(request);
}, [] (cpp2::ExecResp&& resp) -> TagID {
return resp.get_id().get_tag_id();
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand Down Expand Up @@ -1218,7 +1218,7 @@ MetaClient::dropTagSchema(int32_t spaceId, std::string tagName, const bool ifExi
return client->future_dropTag(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand Down Expand Up @@ -1276,7 +1276,7 @@ MetaClient::alterEdgeSchema(GraphSpaceID spaceId,
return client->future_alterEdge(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand Down Expand Up @@ -1324,7 +1324,7 @@ MetaClient::dropEdgeSchema(GraphSpaceID spaceId, std::string name, const bool if
return client->future_dropEdge(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand All @@ -1347,7 +1347,7 @@ MetaClient::createTagIndex(GraphSpaceID spaceID,
return client->future_createTagIndex(request);
}, [] (cpp2::ExecResp&& resp) -> IndexID {
return resp.get_id().get_index_id();
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand All @@ -1366,7 +1366,7 @@ MetaClient::dropTagIndex(GraphSpaceID spaceID,
return client->future_dropTagIndex(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand Down Expand Up @@ -1416,7 +1416,7 @@ MetaClient::rebuildTagIndex(GraphSpaceID spaceID,
return client->future_rebuildTagIndex(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand Down Expand Up @@ -1455,7 +1455,7 @@ MetaClient::createEdgeIndex(GraphSpaceID spaceID,
return client->future_createEdgeIndex(request);
}, [] (cpp2::ExecResp&& resp) -> IndexID {
return resp.get_id().get_index_id();
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand All @@ -1474,7 +1474,7 @@ MetaClient::dropEdgeIndex(GraphSpaceID spaceID,
return client->future_dropEdgeIndex(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand Down Expand Up @@ -1524,7 +1524,7 @@ MetaClient::rebuildEdgeIndex(GraphSpaceID spaceID,
return client->future_rebuildEdgeIndex(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand Down Expand Up @@ -1853,7 +1853,7 @@ folly::Future<StatusOr<bool>> MetaClient::heartbeat() {
metadLastUpdateTime_ = resp.get_last_update_time_in_ms();
VLOG(1) << "Metad last update time: " << metadLastUpdateTime_;
return true; // resp.code == cpp2::ErrorCode::SUCCEEDED
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand All @@ -1869,7 +1869,7 @@ MetaClient::createUser(std::string account, std::string password, bool ifNotExis
return client->future_createUser(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand All @@ -1884,7 +1884,7 @@ MetaClient::dropUser(std::string account, bool ifExists) {
return client->future_dropUser(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand All @@ -1899,7 +1899,7 @@ MetaClient::alterUser(std::string account, std::string password) {
return client->future_alterUser(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand All @@ -1913,7 +1913,7 @@ MetaClient::grantToUser(nebula::cpp2::RoleItem roleItem) {
return client->future_grantRole(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand All @@ -1927,7 +1927,7 @@ MetaClient::revokeFromUser(nebula::cpp2::RoleItem roleItem) {
return client->future_revokeRole(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand Down Expand Up @@ -1972,7 +1972,7 @@ MetaClient::changePassword(std::string account,
return client->future_changePassword(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand Down Expand Up @@ -2015,7 +2015,7 @@ folly::Future<StatusOr<int64_t>> MetaClient::balance(std::vector<HostAddr> hostD
return client->future_balance(request);
}, [] (cpp2::BalanceResp&& resp) -> int64_t {
return resp.id;
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand All @@ -2029,7 +2029,7 @@ MetaClient::showBalance(int64_t balanceId) {
return client->future_balance(request);
}, [] (cpp2::BalanceResp&& resp) -> std::vector<cpp2::BalanceTask> {
return resp.tasks;
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand All @@ -2041,7 +2041,7 @@ folly::Future<StatusOr<bool>> MetaClient::balanceLeader() {
return client->future_leaderBalance(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand Down Expand Up @@ -2099,7 +2099,7 @@ MetaClient::regConfig(const std::vector<cpp2::ConfigItem>& items) {
return client->future_regConfig(request);
}, [] (cpp2::ExecResp&& resp) -> decltype(auto) {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand Down Expand Up @@ -2137,7 +2137,7 @@ MetaClient::setConfig(const cpp2::ConfigModule& module, const std::string& name,
return client->future_setConfig(request);
}, [] (cpp2::ExecResp&& resp) -> decltype(auto) {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand All @@ -2163,7 +2163,7 @@ folly::Future<StatusOr<bool>> MetaClient::createSnapshot() {
return client->future_createSnapshot(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand All @@ -2176,7 +2176,7 @@ folly::Future<StatusOr<bool>> MetaClient::dropSnapshot(const std::string& name)
return client->future_dropSnapshot(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, std::move(promise), true);
}, std::move(promise));
return future;
}

Expand Down
2 changes: 1 addition & 1 deletion src/meta/client/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ class MetaClient {
RemoteFunc remoteFunc,
RespGenerator respGen,
folly::Promise<StatusOr<Response>> pro,
bool toLeader = false,
bool toLeader = true,
int32_t retry = 0,
int32_t retryLimit = FLAGS_meta_client_retry_times);

Expand Down

0 comments on commit 674a75f

Please sign in to comment.