Skip to content

Commit

Permalink
[feature](inverted index)write separated index files in RAM directory…
Browse files Browse the repository at this point in the history
… to reduce IO(apache#28810)

Normally we write the separate index files to disk before we merge the index files into an idx compound file.
In high-frequency load scenarios, disk IO can become a bottleneck. 
In order to reduce the pressure on the disk, we write the standalone index file to the RAM directory for the first time, and then write it to the disk when merging it into a composite file.

Add config `index_inverted_index_by_ram_dir_enable`, default is `false`.
  • Loading branch information
qidaye authored Dec 28, 2023
1 parent e610044 commit a14daca
Show file tree
Hide file tree
Showing 11 changed files with 358 additions and 139 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,8 @@ DEFINE_Int32(inverted_index_read_buffer_size, "4096");
DEFINE_Int32(max_depth_in_bkd_tree, "32");
// index compaction
DEFINE_Bool(inverted_index_compaction_enable, "false");
// index by RAM directory
DEFINE_mBool(inverted_index_ram_dir_enable, "false");
// use num_broadcast_buffer blocks as buffer to do broadcast
DEFINE_Int32(num_broadcast_buffer, "32");

Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,8 @@ DECLARE_Int32(inverted_index_read_buffer_size);
DECLARE_Int32(max_depth_in_bkd_tree);
// index compaction
DECLARE_Bool(inverted_index_compaction_enable);
// index by RAM directory
DECLARE_mBool(inverted_index_ram_dir_enable);
// use num_broadcast_buffer blocks as buffer to do broadcast
DECLARE_Int32(num_broadcast_buffer);

Expand Down
8 changes: 4 additions & 4 deletions be/src/index-tools/index_tool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
#include "olap/rowset/segment_v2/inverted_index_compound_reader.h"

using doris::segment_v2::DorisCompoundReader;
using doris::segment_v2::DorisCompoundDirectory;
using doris::segment_v2::DorisCompoundDirectoryFactory;
using doris::io::FileInfo;
using namespace lucene::analysis;
using namespace lucene::index;
Expand Down Expand Up @@ -186,7 +186,7 @@ int main(int argc, char** argv) {
auto fs = doris::io::global_local_filesystem();
try {
lucene::store::Directory* dir =
DorisCompoundDirectory::getDirectory(fs, dir_str.c_str());
DorisCompoundDirectoryFactory::getDirectory(fs, dir_str.c_str());
auto reader = new DorisCompoundReader(dir, file_str.c_str(), 4096);
std::vector<std::string> files;
std::cout << "Nested files for " << file_str << std::endl;
Expand All @@ -209,7 +209,7 @@ int main(int argc, char** argv) {
auto fs = doris::io::global_local_filesystem();
try {
lucene::store::Directory* dir =
DorisCompoundDirectory::getDirectory(fs, dir_str.c_str());
DorisCompoundDirectoryFactory::getDirectory(fs, dir_str.c_str());
auto reader = new DorisCompoundReader(dir, file_str.c_str(), 4096);
std::cout << "Term statistics for " << file_str << std::endl;
std::cout << "==================================" << std::endl;
Expand All @@ -226,7 +226,7 @@ int main(int argc, char** argv) {
auto fs = doris::io::global_local_filesystem();
try {
lucene::store::Directory* dir =
DorisCompoundDirectory::getDirectory(fs, FLAGS_directory.c_str());
DorisCompoundDirectoryFactory::getDirectory(fs, FLAGS_directory.c_str());
if (FLAGS_idx_file_name == "") {
//try to search from directory's all files
std::vector<FileInfo> files;
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,8 @@ Status Compaction::construct_output_rowset_writer(RowsetWriterContext& ctx, bool
std::string dir_str = p.parent_path().string();
std::string file_str = p.filename().string();
lucene::store::Directory* dir =
DorisCompoundDirectory::getDirectory(fs, dir_str.c_str());
DorisCompoundDirectoryFactory::getDirectory(
fs, dir_str.c_str());
DorisCompoundReader reader(dir, file_str.c_str());
std::vector<std::string> files;
reader.list(&files);
Expand Down
10 changes: 5 additions & 5 deletions be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ Status InvertedIndexSearcherCache::get_index_searcher(
// During the process of opening the index, write the file information read to the idx file cache.
bool open_idx_file_cache = true;
auto* directory = new DorisCompoundReader(
DorisCompoundDirectory::getDirectory(fs, index_dir.c_str()), file_name.c_str(),
config::inverted_index_read_buffer_size, open_idx_file_cache);
DorisCompoundDirectoryFactory::getDirectory(fs, index_dir.c_str()),
file_name.c_str(), config::inverted_index_read_buffer_size, open_idx_file_cache);
auto null_bitmap_file_name = InvertedIndexDescriptor::get_temporary_null_bitmap_file_name();
if (!directory->fileExists(null_bitmap_file_name.c_str())) {
has_null = false;
Expand Down Expand Up @@ -261,9 +261,9 @@ Status InvertedIndexSearcherCache::insert(const io::FileSystemSPtr& fs,
return Status::Error<ErrorCode::INVERTED_INDEX_NOT_SUPPORTED>(
"InvertedIndexSearcherCache do not support reader type.");
}
auto* directory =
new DorisCompoundReader(DorisCompoundDirectory::getDirectory(fs, index_dir.c_str()),
file_name.c_str(), config::inverted_index_read_buffer_size);
auto* directory = new DorisCompoundReader(
DorisCompoundDirectoryFactory::getDirectory(fs, index_dir.c_str()),
file_name.c_str(), config::inverted_index_read_buffer_size);
OptionalIndexSearcherPtr result;
RETURN_IF_ERROR(builder->build(directory, result));
if (!result.has_value()) {
Expand Down
18 changes: 8 additions & 10 deletions be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,15 @@
#include "inverted_index_compound_directory.h"
#include "inverted_index_compound_reader.h"

namespace doris {
namespace segment_v2 {
namespace doris::segment_v2 {
Status compact_column(int32_t index_id, int src_segment_num, int dest_segment_num,
std::vector<std::string> src_index_files,
std::vector<std::string> dest_index_files, const io::FileSystemSPtr& fs,
std::string index_writer_path, std::string tablet_path,
std::vector<std::vector<std::pair<uint32_t, uint32_t>>> trans_vec,
std::vector<uint32_t> dest_segment_num_rows) {
lucene::store::Directory* dir =
DorisCompoundDirectory::getDirectory(fs, index_writer_path.c_str(), false);
DorisCompoundDirectoryFactory::getDirectory(fs, index_writer_path.c_str());
lucene::analysis::SimpleAnalyzer<char> analyzer;
auto index_writer = _CLNEW lucene::index::IndexWriter(dir, &analyzer, true /* create */,
true /* closeDirOnShutdown */);
Expand All @@ -42,8 +41,8 @@ Status compact_column(int32_t index_id, int src_segment_num, int dest_segment_nu
// format: rowsetId_segmentId_indexId.idx
std::string src_idx_full_name =
src_index_files[i] + "_" + std::to_string(index_id) + ".idx";
DorisCompoundReader* reader = new DorisCompoundReader(
DorisCompoundDirectory::getDirectory(fs, tablet_path.c_str()),
auto* reader = new DorisCompoundReader(
DorisCompoundDirectoryFactory::getDirectory(fs, tablet_path.c_str()),
src_idx_full_name.c_str());
src_index_dirs[i] = reader;
}
Expand All @@ -53,7 +52,7 @@ Status compact_column(int32_t index_id, int src_segment_num, int dest_segment_nu
for (int i = 0; i < dest_segment_num; ++i) {
// format: rowsetId_segmentId_columnId
auto path = tablet_path + "/" + dest_index_files[i] + "_" + std::to_string(index_id);
dest_index_dirs[i] = DorisCompoundDirectory::getDirectory(fs, path.c_str(), true);
dest_index_dirs[i] = DorisCompoundDirectoryFactory::getDirectory(fs, path.c_str(), true);
}

index_writer->indexCompaction(src_index_dirs, dest_index_dirs, trans_vec,
Expand All @@ -65,13 +64,13 @@ Status compact_column(int32_t index_id, int src_segment_num, int dest_segment_nu
// when index_writer is destroyed, if closeDir is set, dir will be close
// _CLDECDELETE(dir) will try to ref_cnt--, when it decreases to 1, dir will be destroyed.
_CLDECDELETE(dir)
for (auto d : src_index_dirs) {
for (auto* d : src_index_dirs) {
if (d != nullptr) {
d->close();
_CLDELETE(d);
}
}
for (auto d : dest_index_dirs) {
for (auto* d : dest_index_dirs) {
if (d != nullptr) {
// NOTE: DO NOT close dest dir here, because it will be closed when dest index writer finalize.
//d->close();
Expand All @@ -83,5 +82,4 @@ Status compact_column(int32_t index_id, int src_segment_num, int dest_segment_nu
static_cast<void>(fs->delete_directory(index_writer_path.c_str()));
return Status::OK();
}
} // namespace segment_v2
} // namespace doris
} // namespace doris::segment_v2
Loading

0 comments on commit a14daca

Please sign in to comment.