-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sst download/ingest #2265
sst download/ingest #2265
Conversation
@@ -26,6 +26,15 @@ class HdfsHelper { | |||
const std::string& hdfsPath, | |||
const std::string& localPath) = 0; | |||
|
|||
virtual StatusOr<bool> exist(const std::string& hdfsHost, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So you don't implement this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implement it in another class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why don't define it as a pure virtual function ?
src/kvstore/RocksEngineConfig.cpp
Outdated
@@ -171,7 +171,7 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options &baseOpts) { | |||
bbtOpts.no_block_cache = true; | |||
} else { | |||
static std::shared_ptr<rocksdb::Cache> blockCache | |||
= rocksdb::NewLRUCache(FLAGS_rocksdb_block_cache * 1024 * 1024, 8/*shard bits*/); | |||
= rocksdb::NewLRUCache(FLAGS_rocksdb_block_cache * 1024 * 1024, 12/*shard bits*/); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why modify the default value ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2^8 = 256 shards seem a little bit small for modern machines with nearly 100 CPUs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep unchanged.
src/parser/MutateSentences.cpp
Outdated
if (tag()) { | ||
buf += "tag "; | ||
buf += *tag(); | ||
buf += " "; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can move the space before HDFS
src/parser/MutateSentences.cpp
Outdated
if (tag()) { | ||
buf += "tag "; | ||
buf += *tag(); | ||
buf += " "; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
auto helper = HdfsCommandHelper(); | ||
auto result = helper.ls("127.0.0.1", 9000, "/"); | ||
ASSERT_TRUE(result.ok()) << result.status(); | ||
// auto helper = HdfsCommandHelper(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why cancel this test case ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should never pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe you add some check about the result code Could you remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, it should fail if can not connect to hdfs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove the code if useless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix the comment
} | ||
} | ||
|
||
StatusOr<std::string> HdfsCommandHelper::copyToLocal(const std::string& hdfsHost, | ||
int32_t hdfsPort, | ||
const std::string& hdfsPath, | ||
const std::string& localPath) { | ||
auto command = folly::stringPrintf("hdfs dfs -copyToLocal hdfs://%s:%d%s %s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are any different between hdfs dfs
and hadoop fs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The later is the recommended usage in the newer version.
@@ -87,7 +106,7 @@ void MetaHttpDownloadHandler::onEOM() noexcept { | |||
} | |||
|
|||
if (helper_->checkHadoopPath()) { | |||
if (dispatchSSTFiles(hdfsHost_, hdfsPort_, hdfsPath_)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice
int32_t hdfsPort, | ||
const std::string& hdfsPath) { | ||
auto path = folly::stringPrintf("hdfs://%s:%d%s", hdfsHost.c_str(), hdfsPort, hdfsPath.c_str()); | ||
auto command = folly::stringPrintf("hadoop fs -test -e %s", path.c_str()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should support -d
to check it is a directory?
@@ -340,6 +357,12 @@ int32_t RocksEngine::totalPartsNum() { | |||
|
|||
ResultCode RocksEngine::ingest(const std::vector<std::string>& files) { | |||
rocksdb::IngestExternalFileOptions options; | |||
options.move_files = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you set verify_checksums_before_ingest
, would you set verify_checksums_readahead_size
to improve the performance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
584aedd
to
f4a8e5d
Compare
continue; | ||
} | ||
|
||
auto code = engine->ingest(files); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, how many SST files generated for one part?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be configured in spark sst generater side. we use 100 at most, for each sst file 800MB
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you ensure they are global in order?
auto existStatus = helper_->exist(hdfsHost_, hdfsPort_, hdfsPath_); | ||
if (!existStatus.ok()) { | ||
LOG(ERROR) << "Run Hdfs Test failed. hdfs://" << hdfsHost_ << ":" << hdfsPort_ << hdfsPath_; | ||
err_ = HttpCode::E_ILLEGAL_ARGUMENT; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
} | ||
if (headers->hasQueryParam("tag")) { | ||
auto& tag = headers->getQueryParam("tag"); | ||
tag_.assign(folly::to<TagID>(tag)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
folly::to maybe throw exceptions, we should catch them, or use tagName directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix the comments
} | ||
if (headers->hasQueryParam("edge")) { | ||
auto& edge = headers->getQueryParam("edge"); | ||
edge_.assign(folly::to<EdgeType>(edge)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
std::string downloadRootPathEdge = downloadRootPath + "/edge"; | ||
std::string downloadRootPathTag = downloadRootPath + "/tag"; | ||
std::string downloadRootPathGeneral = downloadRootPath + "/general"; | ||
_clean_subdirs(downloadRootPathEdge); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please follow our code style for the function name.
const std::string& hdfsPath, | ||
const std::vector<std::string>& parts) { | ||
static std::atomic_flag isRunning = ATOMIC_FLAG_INIT; | ||
if (isRunning.test_and_set()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
concurrently download is needed, with different tag/edge in different sub dirs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense.
std::string localPath; | ||
if (edge_.has_value()) { | ||
localPath = folly::stringPrintf( | ||
"%s/download/edge/%d", partDataRoot, edge_.value()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You put all parts data together?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// output dir:
// -- 1/1-1.sst
// /1-2.sst
// /1-3.sst
// -- 2/2-1.sst
// /2-2.sst
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
spaceID_ = headers->getIntQueryParam("space"); | ||
if (headers->hasQueryParam("tag")) { | ||
auto& tag = headers->getQueryParam("tag"); | ||
tag_.assign(folly::to<TagID>(tag)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix the comments
} | ||
if (headers->hasQueryParam("edge")) { | ||
auto& edge = headers->getQueryParam("edge"); | ||
edge_.assign(folly::to<EdgeType>(edge)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
src/common/process/ProcessUtils.cpp
Outdated
@@ -142,9 +142,12 @@ pid_t ProcessUtils::maxPid() { | |||
|
|||
|
|||
StatusOr<std::string> ProcessUtils::runCommand(const char* command) { | |||
std::string cmd; | |||
cmd += command; | |||
cmd += " 2>&1"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why put it in command by default. I don't think it is really a good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With checking exit code, it can be removed.
@@ -18,6 +18,8 @@ | |||
|
|||
DEFINE_int32(download_thread_num, 3, "download thread number"); | |||
|
|||
DEFINE_int32(download_keep_day, 1, "download keep days"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose about this flag. Why not delete them directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SST files are moved to rocksdb. The flag is used to handle some SST files left for some unknown reasons, e.g. OOM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So how to handle the cases just partial SST files ingested successfully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete them after some days when download invoked
auto helper = HdfsCommandHelper(); | ||
auto result = helper.ls("127.0.0.1", 9000, "/"); | ||
ASSERT_TRUE(result.ok()) << result.status(); | ||
// auto helper = HdfsCommandHelper(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove the code if useless.
5549867
to
a60be82
Compare
Please check your code style. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution.
The pr looks good to me now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM thanks
Co-authored-by: trippli <trippli@tencent.com> Co-authored-by: dangleptr <37216992+dangleptr@users.noreply.github.com> Co-authored-by: yaphet <darion.wang@vesoft.com>
To work with https://github.com/xuguruogu/nebula-sst.