Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sst download/ingest #2265

Merged
merged 3 commits into from
Aug 11, 2020
Merged

sst download/ingest #2265

merged 3 commits into from
Aug 11, 2020

Conversation

xuguruogu
Copy link
Collaborator

@xuguruogu xuguruogu commented Aug 2, 2020

To work with https://github.com/xuguruogu/nebula-sst.

  1. Use sub dir for every partition for SST files.
  2. Can ingest tag/edge only, support download/ingest concurrently by using sub dir for every tag/edge type.

@@ -26,6 +26,15 @@ class HdfsHelper {
const std::string& hdfsPath,
const std::string& localPath) = 0;

virtual StatusOr<bool> exist(const std::string& hdfsHost,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you don't implement this function?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement it in another class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't define it as a pure virtual function ?

@@ -171,7 +171,7 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options &baseOpts) {
bbtOpts.no_block_cache = true;
} else {
static std::shared_ptr<rocksdb::Cache> blockCache
= rocksdb::NewLRUCache(FLAGS_rocksdb_block_cache * 1024 * 1024, 8/*shard bits*/);
= rocksdb::NewLRUCache(FLAGS_rocksdb_block_cache * 1024 * 1024, 12/*shard bits*/);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why modify the default value ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2^8 = 256 shards seem a little bit small for modern machines with nearly 100 CPUs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep unchanged.

if (tag()) {
buf += "tag ";
buf += *tag();
buf += " ";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can move the space before HDFS

if (tag()) {
buf += "tag ";
buf += *tag();
buf += " ";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

auto helper = HdfsCommandHelper();
auto result = helper.ls("127.0.0.1", 9000, "/");
ASSERT_TRUE(result.ok()) << result.status();
// auto helper = HdfsCommandHelper();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why cancel this test case ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should never pass.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you add some check about the result code Could you remove it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, it should fail if can not connect to hdfs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the code if useless.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the comment

}
}

StatusOr<std::string> HdfsCommandHelper::copyToLocal(const std::string& hdfsHost,
int32_t hdfsPort,
const std::string& hdfsPath,
const std::string& localPath) {
auto command = folly::stringPrintf("hdfs dfs -copyToLocal hdfs://%s:%d%s %s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are any different between hdfs dfs and hadoop fs ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The later is the recommended usage in the newer version.

@@ -87,7 +106,7 @@ void MetaHttpDownloadHandler::onEOM() noexcept {
}

if (helper_->checkHadoopPath()) {
if (dispatchSSTFiles(hdfsHost_, hdfsPort_, hdfsPath_)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

int32_t hdfsPort,
const std::string& hdfsPath) {
auto path = folly::stringPrintf("hdfs://%s:%d%s", hdfsHost.c_str(), hdfsPort, hdfsPath.c_str());
auto command = folly::stringPrintf("hadoop fs -test -e %s", path.c_str());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should support -d to check it is a directory?

@@ -340,6 +357,12 @@ int32_t RocksEngine::totalPartsNum() {

ResultCode RocksEngine::ingest(const std::vector<std::string>& files) {
rocksdb::IngestExternalFileOptions options;
options.move_files = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you set verify_checksums_before_ingest, would you set verify_checksums_readahead_size to improve the performance?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

@xuguruogu xuguruogu force-pushed the sst branch 3 times, most recently from 584aedd to f4a8e5d Compare August 4, 2020 11:00
@jude-zhu jude-zhu requested a review from darionyaphet August 5, 2020 08:42
continue;
}

auto code = engine->ingest(files);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, how many SST files generated for one part?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be configured in spark sst generater side. we use 100 at most, for each sst file 800MB

Copy link
Contributor

@dangleptr dangleptr Aug 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you ensure they are global in order?

auto existStatus = helper_->exist(hdfsHost_, hdfsPort_, hdfsPath_);
if (!existStatus.ok()) {
LOG(ERROR) << "Run Hdfs Test failed. hdfs://" << hdfsHost_ << ":" << hdfsPort_ << hdfsPath_;
err_ = HttpCode::E_ILLEGAL_ARGUMENT;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

}
if (headers->hasQueryParam("tag")) {
auto& tag = headers->getQueryParam("tag");
tag_.assign(folly::to<TagID>(tag));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

folly::to maybe throw exceptions, we should catch them, or use tagName directly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the comments

}
if (headers->hasQueryParam("edge")) {
auto& edge = headers->getQueryParam("edge");
edge_.assign(folly::to<EdgeType>(edge));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

std::string downloadRootPathEdge = downloadRootPath + "/edge";
std::string downloadRootPathTag = downloadRootPath + "/tag";
std::string downloadRootPathGeneral = downloadRootPath + "/general";
_clean_subdirs(downloadRootPathEdge);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please follow our code style for the function name.

const std::string& hdfsPath,
const std::vector<std::string>& parts) {
static std::atomic_flag isRunning = ATOMIC_FLAG_INIT;
if (isRunning.test_and_set()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

concurrently download is needed, with different tag/edge in different sub dirs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense.

std::string localPath;
if (edge_.has_value()) {
localPath = folly::stringPrintf(
"%s/download/edge/%d", partDataRoot, edge_.value());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You put all parts data together?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// output dir:
// -- 1/1-1.sst
//     /1-2.sst
//     /1-3.sst
// -- 2/2-1.sst
//     /2-2.sst

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

spaceID_ = headers->getIntQueryParam("space");
if (headers->hasQueryParam("tag")) {
auto& tag = headers->getQueryParam("tag");
tag_.assign(folly::to<TagID>(tag));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the comments

}
if (headers->hasQueryParam("edge")) {
auto& edge = headers->getQueryParam("edge");
edge_.assign(folly::to<EdgeType>(edge));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -142,9 +142,12 @@ pid_t ProcessUtils::maxPid() {


StatusOr<std::string> ProcessUtils::runCommand(const char* command) {
std::string cmd;
cmd += command;
cmd += " 2>&1";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why put it in command by default. I don't think it is really a good idea.

Copy link
Collaborator Author

@xuguruogu xuguruogu Aug 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With checking exit code, it can be removed.

@@ -18,6 +18,8 @@

DEFINE_int32(download_thread_num, 3, "download thread number");

DEFINE_int32(download_keep_day, 1, "download keep days");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose about this flag. Why not delete them directly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SST files are moved to rocksdb. The flag is used to handle some SST files left for some unknown reasons, e.g. OOM.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So how to handle the cases just partial SST files ingested successfully.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete them after some days when download invoked

auto helper = HdfsCommandHelper();
auto result = helper.ls("127.0.0.1", 9000, "/");
ASSERT_TRUE(result.ok()) << result.status();
// auto helper = HdfsCommandHelper();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the code if useless.

@xuguruogu xuguruogu requested a review from dangleptr August 10, 2020 07:35
@xuguruogu xuguruogu force-pushed the sst branch 2 times, most recently from 5549867 to a60be82 Compare August 10, 2020 08:59
@dangleptr dangleptr added the ready-for-testing PR: ready for the CI test label Aug 10, 2020
@dangleptr
Copy link
Contributor

Please check your code style.

Copy link
Contributor

@dangleptr dangleptr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution.
The pr looks good to me now.

Copy link
Contributor

@darionyaphet darionyaphet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM thanks

@dangleptr dangleptr merged commit 0e2aa4a into vesoft-inc:master Aug 11, 2020
tong-hao pushed a commit to tong-hao/nebula that referenced this pull request Jun 1, 2021
Co-authored-by: trippli <trippli@tencent.com>
Co-authored-by: dangleptr <37216992+dangleptr@users.noreply.github.com>
Co-authored-by: yaphet <darion.wang@vesoft.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ready-for-testing PR: ready for the CI test
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants