Skip to content

Commit

Permalink
sst download/ingest
Browse files Browse the repository at this point in the history
  • Loading branch information
xuguruogu committed Aug 10, 2020
1 parent 09e74ea commit feb3929
Show file tree
Hide file tree
Showing 32 changed files with 731 additions and 244 deletions.
4 changes: 1 addition & 3 deletions src/common/hdfs/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
nebula_add_library(
hdfs_helper_obj OBJECT
HdfsCommandHelper.cpp
)

nebula_add_subdirectory(test)
)
77 changes: 65 additions & 12 deletions src/common/hdfs/HdfsCommandHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,83 @@ namespace hdfs {
StatusOr<std::string> HdfsCommandHelper::ls(const std::string& hdfsHost,
int32_t hdfsPort,
const std::string& hdfsPath) {
auto command = folly::stringPrintf("hdfs dfs -ls hdfs://%s:%d%s",
auto command = folly::stringPrintf("hadoop fs -ls hdfs://%s:%d%s",
hdfsHost.c_str(), hdfsPort, hdfsPath.c_str());
LOG(INFO) << "Running HDFS Command: " << command;
auto result = ProcessUtils::runCommand(command.c_str());
if (result.ok()) {
return result.value();
LOG(INFO) << "Start Running HDFS Command: [" << command << "]";
auto resultStatus = ProcessUtils::runCommand(command.c_str());
if (!resultStatus.ok()) {
LOG(INFO) << "HDFS Command Failed." << resultStatus.status().toString();
return resultStatus.status();
}

auto result = std::move(resultStatus).value();

if (folly::StringPiece(result).startsWith("ERR")) {
LOG(INFO) << "HDFS Command Failed. " << result;
return Status::Error(result);
} else {
return Status::Error(folly::stringPrintf("Failed to run %s", command.c_str()));
LOG(INFO) << "HDFS Command Finished. [" << command << "], output: [" << result << "]";
return result;
}
}

StatusOr<std::string> HdfsCommandHelper::copyToLocal(const std::string& hdfsHost,
int32_t hdfsPort,
const std::string& hdfsPath,
const std::string& localPath) {
auto command = folly::stringPrintf("hdfs dfs -copyToLocal hdfs://%s:%d%s %s",
auto command = folly::stringPrintf("hadoop fs -copyToLocal hdfs://%s:%d%s %s",
hdfsHost.c_str(), hdfsPort, hdfsPath.c_str(),
localPath.c_str());
LOG(INFO) << "Running HDFS Command: " << command;
auto result = ProcessUtils::runCommand(command.c_str());
if (result.ok()) {
return result.value();
LOG(INFO) << "Start Running HDFS Command: [" << command << "]";
auto resultStatus = ProcessUtils::runCommand(command.c_str());
if (!resultStatus.ok()) {
LOG(INFO) << "HDFS Command Failed. " << resultStatus.status().toString();
return resultStatus.status();
}

auto result = std::move(resultStatus).value();

if (folly::StringPiece(result).startsWith("ERR")) {
LOG(INFO) << "HDFS Command Failed. " << result;
return Status::Error(result);
} else if (!result.empty()) {
LOG(INFO) << "HDFS Command Failed. command: ["
<< command << "], output: [" << result << "]";
return Status::Error(folly::stringPrintf(
"HDFS Command Failed. command: [%s], output: [%s]",
command.c_str(), result.c_str()));
} else {
LOG(INFO) << "HDFS Command Finished. [" << command << "], output: [" << result << "]";
return result;
}
}

StatusOr<bool> HdfsCommandHelper::exist(const std::string& hdfsHost,
int32_t hdfsPort,
const std::string& hdfsPath) {
auto path = folly::stringPrintf("hdfs://%s:%d%s", hdfsHost.c_str(), hdfsPort, hdfsPath.c_str());
auto command = folly::stringPrintf("hadoop fs -test -e %s", path.c_str());
LOG(INFO) << "Start Running HDFS Command: [" << command << "]";
auto resultStatus = ProcessUtils::runCommand(command.c_str());
if (!resultStatus.ok()) {
LOG(INFO) << "HDFS Command Failed. " << resultStatus.status().toString();
return resultStatus.status();
}

auto result = std::move(resultStatus).value();

if (folly::StringPiece(result).startsWith("ERR: [1]")) {
LOG(INFO) << "HDFS Command Failed. " << result;
return false;
} else if (!result.empty()) {
LOG(INFO) << "HDFS Command Failed. command: ["
<< command << "], output: [" << result << "]";
return Status::Error(folly::stringPrintf(
"HDFS Command Failed. command: [%s], output: [%s]",
command.c_str(), result.c_str()));
} else {
return Status::Error(folly::stringPrintf("Failed to run %s", command.c_str()));
LOG(INFO) << "HDFS Command Finished. [" << command << "], output: [" << result << "]";
return true;
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/common/hdfs/HdfsCommandHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class HdfsCommandHelper : public HdfsHelper {
const std::string& hdfsPath,
const std::string& localPath) override;

StatusOr<bool> exist(const std::string& hdfsHost,
int32_t hdfsPort,
const std::string& hdfsPath) override;

bool checkHadoopPath() override;
};

Expand Down
9 changes: 9 additions & 0 deletions src/common/hdfs/HdfsHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ class HdfsHelper {
const std::string& hdfsPath,
const std::string& localPath) = 0;

virtual StatusOr<bool> exist(const std::string& hdfsHost,
int32_t hdfsPort,
const std::string& hdfsPath) {
UNUSED(hdfsHost);
UNUSED(hdfsPort);
UNUSED(hdfsPath);
return true;
}

virtual bool checkHadoopPath() = 0;
};

Expand Down
14 changes: 0 additions & 14 deletions src/common/hdfs/test/CMakeLists.txt

This file was deleted.

36 changes: 0 additions & 36 deletions src/common/hdfs/test/HdfsHelperTest.cpp

This file was deleted.

13 changes: 9 additions & 4 deletions src/common/http/HttpClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ namespace http {

StatusOr<std::string> HttpClient::get(const std::string& path, const std::string& options) {
auto command = folly::stringPrintf("/usr/bin/curl %s \"%s\"", options.c_str(), path.c_str());
LOG(INFO) << "HTTP Get Command: " << command;
LOG(INFO) << "HTTP Start Get Command: " << command;
auto result = nebula::ProcessUtils::runCommand(command.c_str());

if (result.ok()) {
return result.value();
LOG(INFO) << "HTTP Get Finished: " << command << " : " << result.value();
return result;
} else {
LOG(INFO) << "HTTP Get Failed: " << command << " : " << result.status().toString();
return Status::Error(folly::stringPrintf("Http Get Failed: %s", path.c_str()));
}
}
Expand All @@ -25,11 +28,13 @@ StatusOr<std::string> HttpClient::post(const std::string& path, const std::strin
auto command = folly::stringPrintf("/usr/bin/curl -X POST %s \"%s\"",
header.c_str(),
path.c_str());
LOG(INFO) << "HTTP Post Command: " << command;
LOG(INFO) << "HTTP Start Post Command: " << command;
auto result = nebula::ProcessUtils::runCommand(command.c_str());
if (result.ok()) {
return result.value();
LOG(INFO) << "HTTP Post Finished: " << command << " : " << result.value();
return result;
} else {
LOG(INFO) << "HTTP Post Failed: " << command << " : " << result.status().toString();
return Status::Error(folly::stringPrintf("Http Post Failed: %s", path.c_str()));
}
}
Expand Down
15 changes: 12 additions & 3 deletions src/common/process/ProcessUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ pid_t ProcessUtils::maxPid() {
StatusOr<std::string> ProcessUtils::runCommand(const char* command) {
FILE* f = popen(command, "re");
if (f == nullptr) {
return Status::Error("Failed to execute the command \"%s\"", command);
return Status::Error("Failed to execute the command [%s]", command);
}

Cord out;
Expand All @@ -160,10 +160,19 @@ StatusOr<std::string> ProcessUtils::runCommand(const char* command) {
if (ferror(f)) {
// Something is wrong
fclose(f);
return Status::Error("Failed to read the output of the command");
return Status::Error("Failed to read the output of the command. [%s]", command);
}

pclose(f);
int st = pclose(f);
if (!WIFEXITED(st)) {
return Status::Error("Cmd Exist unexcepted. command: [%s]", command);
}

if (WEXITSTATUS(st)) {
return folly::stringPrintf(
"ERR: [%d], output: [%s], command: [%s]",
WEXITSTATUS(st), out.str().c_str(), command);
}
return out.str();
}

Expand Down
41 changes: 35 additions & 6 deletions src/graph/DownloadExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,46 @@ void DownloadExecutor::execute() {
return;
}

auto func = [metaHost, hdfsHost, hdfsPort, hdfsPath, spaceId]() {
static const char *tmp = "http://%s:%d/%s?host=%s&port=%d&path=%s&space=%d";
auto url = folly::stringPrintf(tmp, metaHost.c_str(), FLAGS_ws_meta_http_port,
"download-dispatch", hdfsHost->c_str(),
hdfsPort, hdfsPath->c_str(), spaceId);
std::string url;

if (sentence_->edge()) {
auto edgeStatus = ectx()->schemaManager()->toEdgeType(
spaceId, *sentence_->edge());
if (!edgeStatus.ok()) {
doError(Status::Error("edge not found."));
return;
}
auto edgeType = edgeStatus.value();
url = folly::stringPrintf(
"http://%s:%d/download-dispatch?host=%s&port=%d&path=%s&space=%d&edge=%d",
metaHost.c_str(), FLAGS_ws_meta_http_port,
hdfsHost->c_str(), hdfsPort, hdfsPath->c_str(), spaceId, edgeType);
} else if (sentence_->tag()) {
auto tagStatus = ectx()->schemaManager()->toTagID(
spaceId, *sentence_->tag());
if (!tagStatus.ok()) {
doError(Status::Error("tag not found."));
return;
}
auto tagType = tagStatus.value();
url = folly::stringPrintf(
"http://%s:%d/download-dispatch?host=%s&port=%d&path=%s&space=%d&tag=%d",
metaHost.c_str(), FLAGS_ws_meta_http_port,
hdfsHost->c_str(), hdfsPort, hdfsPath->c_str(), spaceId, tagType);
} else {
url = folly::stringPrintf(
"http://%s:%d/download-dispatch?host=%s&port=%d&path=%s&space=%d",
metaHost.c_str(), FLAGS_ws_meta_http_port,
hdfsHost->c_str(), hdfsPort, hdfsPath->c_str(), spaceId);
}

auto func = [url] {
auto result = http::HttpClient::get(url);
if (result.ok() && result.value() == "SSTFile dispatch successfully") {
LOG(INFO) << "Download Successfully";
return true;
} else {
LOG(ERROR) << "Download Failed ";
LOG(ERROR) << "Download Failed: " << result.value();
return false;
}
};
Expand Down
36 changes: 31 additions & 5 deletions src/graph/IngestExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,37 @@ void IngestExecutor::execute() {
auto metaHost = network::NetworkUtils::intToIPv4(addresses[0].first);
auto spaceId = ectx()->rctx()->session()->space();

auto func = [metaHost, spaceId]() {
static const char *tmp = "http://%s:%d/%s?space=%d";
auto url = folly::stringPrintf(tmp, metaHost.c_str(),
FLAGS_ws_meta_http_port,
"ingest-dispatch", spaceId);
std::string url;

if (sentence_->edge()) {
auto edgeStatus = ectx()->schemaManager()->toEdgeType(
spaceId, *sentence_->edge());
if (!edgeStatus.ok()) {
doError(Status::Error("edge not found."));
return;
}
auto edgeType = edgeStatus.value();
url = folly::stringPrintf(
"http://%s:%d/ingest-dispatch?space=%d&edge=%d",
metaHost.c_str(), FLAGS_ws_meta_http_port, spaceId, edgeType);
} else if (sentence_->tag()) {
auto tagStatus = ectx()->schemaManager()->toTagID(
spaceId, *sentence_->tag());
if (!tagStatus.ok()) {
doError(Status::Error("tag not found."));
return;
}
auto tagType = tagStatus.value();
url = folly::stringPrintf(
"http://%s:%d/ingest-dispatch?space=%d&tag=%d",
metaHost.c_str(), FLAGS_ws_meta_http_port, spaceId, tagType);
} else {
url = folly::stringPrintf(
"http://%s:%d/ingest-dispatch?space=%d",
metaHost.c_str(), FLAGS_ws_meta_http_port, spaceId);
}

auto func = [url] {
auto result = http::HttpClient::get(url);
if (result.ok() && result.value() == "SSTFile ingest successfully") {
LOG(INFO) << "Ingest Successfully";
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ class KVStore {

virtual ResultCode ingest(GraphSpaceID spaceId) = 0;

virtual ResultCode ingestTag(GraphSpaceID spaceId, TagID tagId) = 0;

virtual ResultCode ingestEdge(GraphSpaceID spaceId, EdgeType edgeType) = 0;

virtual int32_t allLeader(std::unordered_map<GraphSpaceID,
std::vector<PartitionID>>& leaderIds) = 0;

Expand Down
Loading

0 comments on commit feb3929

Please sign in to comment.