Skip to content

Commit

Permalink
[Enhancement](inverted index) refact some inverted index writer code …
Browse files Browse the repository at this point in the history
…to make more effieciency (apache#29602)
  • Loading branch information
airborne12 authored Jan 10, 2024
1 parent 561e351 commit 1c85a36
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ void DorisCompoundFileWriter::sort_files(std::vector<FileInfo>& file_infos) {
});
}

void DorisCompoundFileWriter::writeCompoundFile() {
size_t DorisCompoundFileWriter::writeCompoundFile() {
// list files in current dir
std::vector<std::string> files;
directory->list(&files);
Expand Down Expand Up @@ -171,6 +171,7 @@ void DorisCompoundFileWriter::writeCompoundFile() {
_CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error");
}
std::unique_ptr<lucene::store::IndexOutput> output(out);
size_t start = output->getFilePointer();
output->writeVInt(file_count);
// write file entries
int64_t data_offset = header_len;
Expand Down Expand Up @@ -203,7 +204,10 @@ void DorisCompoundFileWriter::writeCompoundFile() {
// NOTE: need to decrease ref count, but not to delete here,
// because index cache may get the same directory from DIRECTORIES
_CLDECDELETE(out_dir)
auto compound_file_size = output->getFilePointer() - start;
output->close();
//LOG(INFO) << (idx_path / idx_name).c_str() << " size:" << compound_file_size;
return compound_file_size;
}

void DorisCompoundFileWriter::copyFile(const char* fileName, lucene::store::IndexOutput* output,
Expand Down Expand Up @@ -641,7 +645,7 @@ void DorisCompoundDirectory::close() {
if (useCompoundFileWriter) {
auto* cfsWriter = _CLNEW DorisCompoundFileWriter(this);
// write compound file
cfsWriter->writeCompoundFile();
compound_file_size = cfsWriter->writeCompoundFile();
// delete index path, which contains separated inverted index files
deleteDirectory();
_CLDELETE(cfsWriter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class DorisCompoundFileWriter : LUCENE_BASE {
~DorisCompoundFileWriter() override = default;
/** Returns the directory of the compound file. */
CL_NS(store)::Directory* getDirectory();
void writeCompoundFile();
size_t writeCompoundFile();
void copyFile(const char* fileName, lucene::store::IndexOutput* output, uint8_t* buffer,
int64_t bufferLength);

Expand Down Expand Up @@ -77,6 +77,7 @@ class CLUCENE_EXPORT DorisCompoundDirectory : public lucene::store::Directory {
std::string directory;
std::string cfs_directory;
bool useCompoundFileWriter {false};
size_t compound_file_size = 0;

void priv_getFN(char* buffer, const char* name) const;
/// Removes an existing file in the directory.
Expand All @@ -91,6 +92,7 @@ class CLUCENE_EXPORT DorisCompoundDirectory : public lucene::store::Directory {

const io::FileSystemSPtr& getFileSystem() { return fs; }
const io::FileSystemSPtr& getCompoundFileSystem() { return compound_fs; }
size_t getCompoundFileSize() const { return compound_file_size; }
~DorisCompoundDirectory() override;

bool list(std::vector<std::string>* names) const override;
Expand Down
115 changes: 48 additions & 67 deletions be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,11 @@
#include "util/slice.h"
#include "util/string_util.h"

#define FINALIZE_OUTPUT(x) \
if (x != nullptr) { \
x->close(); \
_CLDELETE(x); \
#define FINALLY_CLOSE_OUTPUT(x) \
try { \
if (x != nullptr) x->close(); \
} catch (...) { \
}
#define FINALLY_FINALIZE_OUTPUT(x) \
try { \
FINALIZE_OUTPUT(x) \
} catch (...) { \
}

namespace doris::segment_v2 {
const int32_t MAX_FIELD_LEN = 0x7FFFFFFFL;
const int32_t MERGE_FACTOR = 100000000;
Expand Down Expand Up @@ -152,7 +146,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
_bkd_writer = std::make_shared<lucene::util::bkd::bkd_writer>(
max_doc, DIMS, DIMS, value_length, MAX_LEAF_COUNT, MAXMBSortInHeap,
total_point_count, true, config::max_depth_in_bkd_tree);
return Status::OK();
return create_index_directory(_dir);
}

std::unique_ptr<lucene::analysis::Analyzer> create_chinese_analyzer() {
Expand Down Expand Up @@ -382,7 +376,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
// no values to add inverted index
return Status::OK();
}
auto offsets = reinterpret_cast<const uint64_t*>(offsets_ptr);
const auto* offsets = reinterpret_cast<const uint64_t*>(offsets_ptr);
if constexpr (field_is_slice_type(field_type)) {
if (_field == nullptr || _index_writer == nullptr) {
LOG(ERROR) << "field or index writer is null in inverted index writer.";
Expand All @@ -404,7 +398,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
continue;
}
auto* v = (Slice*)((const uint8_t*)value_ptr + j * field_size);
strings.emplace_back(std::string(v->get_data(), v->get_size()));
strings.emplace_back(v->get_data(), v->get_size());
}

auto value = join(strings, " ");
Expand Down Expand Up @@ -456,7 +450,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
auto* v = (Slice*)item_data_ptr;

if (!values->is_null_at(j)) {
strings.emplace_back(std::string(v->get_data(), v->get_size()));
strings.emplace_back(v->get_data(), v->get_size());
}
item_data_ptr = (uint8_t*)item_data_ptr + field_size;
}
Expand All @@ -471,7 +465,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
auto* item_data_ptr = const_cast<CollectionValue*>(values)->mutable_data();

for (size_t j = 0; j < values->length(); ++j) {
const CppType* p = reinterpret_cast<const CppType*>(item_data_ptr);
const auto* p = reinterpret_cast<const CppType*>(item_data_ptr);
if (values->is_null_at(j)) {
// bkd do not index null values, so we do nothing here.
} else {
Expand Down Expand Up @@ -515,88 +509,75 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
return 0;
}

int64_t file_size() const override {
std::filesystem::path dir(_directory);
dir /= _segment_file_name;
auto file_name = InvertedIndexDescriptor::get_index_file_name(
dir.string(), _index_meta->index_id(), _index_meta->get_index_suffix());
int64_t size = -1;
auto st = _fs->file_size(file_name.c_str(), &size);
if (!st.ok()) {
LOG(ERROR) << "try to get file:" << file_name << " size error:" << st;
}
return size;
}
int64_t file_size() const override { return _dir->getCompoundFileSize(); }

void write_null_bitmap(lucene::store::IndexOutput* null_bitmap_out,
lucene::store::Directory* dir) {
void write_null_bitmap(lucene::store::IndexOutput* null_bitmap_out) {
// write null_bitmap file
_null_bitmap.runOptimize();
size_t size = _null_bitmap.getSizeInBytes(false);
if (size > 0) {
null_bitmap_out = dir->createOutput(
InvertedIndexDescriptor::get_temporary_null_bitmap_file_name().c_str());
faststring buf;
buf.resize(size);
_null_bitmap.write(reinterpret_cast<char*>(buf.data()), false);
null_bitmap_out->writeBytes(reinterpret_cast<uint8_t*>(buf.data()), size);
FINALIZE_OUTPUT(null_bitmap_out)
null_bitmap_out->close();
}
}

Status finish() override {
lucene::store::Directory* dir = nullptr;
lucene::store::IndexOutput* null_bitmap_out = nullptr;
lucene::store::IndexOutput* data_out = nullptr;
lucene::store::IndexOutput* index_out = nullptr;
lucene::store::IndexOutput* meta_out = nullptr;
std::unique_ptr<lucene::store::IndexOutput> null_bitmap_out = nullptr;
std::unique_ptr<lucene::store::IndexOutput> data_out = nullptr;
std::unique_ptr<lucene::store::IndexOutput> index_out = nullptr;
std::unique_ptr<lucene::store::IndexOutput> meta_out = nullptr;
try {
// write bkd file
if constexpr (field_is_numeric_type(field_type)) {
auto index_path = InvertedIndexDescriptor::get_temporary_index_path(
_directory + "/" + _segment_file_name, _index_meta->index_id(),
_index_meta->get_index_suffix());
bool use_compound_file_writer = true;
bool can_use_ram_dir = true;
dir = DorisCompoundDirectoryFactory::getDirectory(
_fs, index_path.c_str(), use_compound_file_writer, can_use_ram_dir);
write_null_bitmap(null_bitmap_out, dir);
_bkd_writer->max_doc_ = _rid;
_bkd_writer->docs_seen_ = _row_ids_seen_for_bkd;
data_out = dir->createOutput(
InvertedIndexDescriptor::get_temporary_bkd_index_data_file_name().c_str());
meta_out = dir->createOutput(
InvertedIndexDescriptor::get_temporary_bkd_index_meta_file_name().c_str());
index_out = dir->createOutput(
InvertedIndexDescriptor::get_temporary_bkd_index_file_name().c_str());
DBUG_EXECUTE_IF("InvertedIndexWriter._set_fulltext_data_out_nullptr",
null_bitmap_out = std::unique_ptr<lucene::store::IndexOutput>(_dir->createOutput(
InvertedIndexDescriptor::get_temporary_null_bitmap_file_name().c_str()));
data_out = std::unique_ptr<lucene::store::IndexOutput>(_dir->createOutput(
InvertedIndexDescriptor::get_temporary_bkd_index_data_file_name().c_str()));
meta_out = std::unique_ptr<lucene::store::IndexOutput>(_dir->createOutput(
InvertedIndexDescriptor::get_temporary_bkd_index_meta_file_name().c_str()));
index_out = std::unique_ptr<lucene::store::IndexOutput>(_dir->createOutput(
InvertedIndexDescriptor::get_temporary_bkd_index_file_name().c_str()));
write_null_bitmap(null_bitmap_out.get());

DBUG_EXECUTE_IF("InvertedIndexWriter._set_bkd_data_out_nullptr",
{ data_out = nullptr; });
if (data_out != nullptr && meta_out != nullptr && index_out != nullptr) {
_bkd_writer->meta_finish(meta_out, _bkd_writer->finish(data_out, index_out),
_bkd_writer->meta_finish(meta_out.get(),
_bkd_writer->finish(data_out.get(), index_out.get()),
int(field_type));
} else {
LOG(WARNING) << "Inverted index writer create output error occurred: nullptr";
_CLTHROWA(CL_ERR_IO, "Create output error with nullptr");
}
FINALIZE_OUTPUT(meta_out)
FINALIZE_OUTPUT(data_out)
FINALIZE_OUTPUT(index_out)
FINALIZE_OUTPUT(dir)
meta_out->close();
data_out->close();
index_out->close();
_dir->close();
} else if constexpr (field_is_slice_type(field_type)) {
dir = _index_writer->getDirectory();
write_null_bitmap(null_bitmap_out, dir);
null_bitmap_out = std::unique_ptr<lucene::store::IndexOutput>(_dir->createOutput(
InvertedIndexDescriptor::get_temporary_null_bitmap_file_name().c_str()));
write_null_bitmap(null_bitmap_out.get());
close();
DBUG_EXECUTE_IF("InvertedIndexWriter._throw_clucene_error_in_bkd_writer_close", {
_CLTHROWA(CL_ERR_IO, "debug point: test throw error in bkd index writer");
});
DBUG_EXECUTE_IF(
"InvertedIndexWriter._throw_clucene_error_in_fulltext_writer_close", {
_CLTHROWA(CL_ERR_IO,
"debug point: test throw error in fulltext index writer");
});
}
} catch (CLuceneError& e) {
FINALLY_FINALIZE_OUTPUT(null_bitmap_out)
FINALLY_FINALIZE_OUTPUT(meta_out)
FINALLY_FINALIZE_OUTPUT(data_out)
FINALLY_FINALIZE_OUTPUT(index_out)
FINALLY_CLOSE_OUTPUT(null_bitmap_out)
FINALLY_CLOSE_OUTPUT(meta_out)
FINALLY_CLOSE_OUTPUT(data_out)
FINALLY_CLOSE_OUTPUT(index_out)
if constexpr (field_is_numeric_type(field_type)) {
FINALLY_FINALIZE_OUTPUT(dir)
FINALLY_CLOSE_OUTPUT(_dir)
} else if constexpr (field_is_slice_type(field_type)) {
FINALLY_CLOSE_OUTPUT(_index_writer);
}
LOG(WARNING) << "Inverted index writer finish error occurred: " << e.what();
return Status::Error<doris::ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,16 @@ suite("test_index_failure_injection", "nonConcurrent") {
create_httplogs_unique_table.call(testTable_unique)

try {
GetDebugPoint().enableDebugPointForAllBEs("InvertedIndexWriter._set_fulltext_data_out_nullptr")
GetDebugPoint().enableDebugPointForAllBEs("InvertedIndexWriter._set_bkd_data_out_nullptr")
load_httplogs_data.call(testTable_dup, 'test_httplogs_load_count_on_index', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("InvertedIndexWriter._set_fulltext_data_out_nullptr")
GetDebugPoint().disableDebugPointForAllBEs("InvertedIndexWriter._set_bkd_data_out_nullptr")
}
try {
GetDebugPoint().enableDebugPointForAllBEs("InvertedIndexWriter._throw_clucene_error_in_bkd_writer_close")
GetDebugPoint().enableDebugPointForAllBEs("InvertedIndexWriter._throw_clucene_error_in_fulltext_writer_close")
load_httplogs_data.call(testTable_unique, 'test_httplogs_load_count_on_index', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("InvertedIndexWriter._throw_clucene_error_in_bkd_writer_close")
GetDebugPoint().disableDebugPointForAllBEs("InvertedIndexWriter._throw_clucene_error_in_fulltext_writer_close")
}
qt_sql "select COUNT() from ${testTable_dup} where request match 'images'"
} finally {
Expand Down
Loading

0 comments on commit 1c85a36

Please sign in to comment.