Skip to content

Commit

Permalink
enhance: support null in text match index
Browse files Browse the repository at this point in the history
Signed-off-by: lixinguo <xinguo.li@zilliz.com>
  • Loading branch information
lixinguo committed Nov 8, 2024
1 parent a031578 commit 6f9410f
Show file tree
Hide file tree
Showing 9 changed files with 579 additions and 109 deletions.
56 changes: 49 additions & 7 deletions internal/core/src/exec/expression/Expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,20 @@ class SegmentExpr : public Expr {
}
}

void
ApplyValidData(const bool* valid_data,
TargetBitmapView res,
TargetBitmapView valid_res,
const int size) {
if (valid_data != nullptr) {
for (int i = 0; i < size; i++) {
if (!valid_data[i]) {
res[i] = valid_res[i] = false;
}
}
}
}

int64_t
GetNextBatchSize() {
auto current_chunk = is_index_mode_ && use_index_ ? current_index_chunk_
Expand Down Expand Up @@ -254,9 +268,9 @@ class SegmentExpr : public Expr {
std::min(active_count_ - current_data_chunk_pos_, batch_size_);

auto& skip_index = segment_->GetSkipIndex();
auto views_info = segment_->get_batch_views<T>(
field_id_, 0, current_data_chunk_pos_, need_size);
if (!skip_func || !skip_func(skip_index, field_id_, 0)) {
auto views_info = segment_->get_batch_views<T>(
field_id_, 0, current_data_chunk_pos_, need_size);
// first is the raw data, second is valid_data
// use valid_data to see if raw data is null
func(views_info.first.data(),
Expand All @@ -265,6 +279,8 @@ class SegmentExpr : public Expr {
res,
valid_res,
values...);
} else {
ApplyValidData(views_info.second.data(), res, valid_res, need_size);
}
current_data_chunk_pos_ += need_size;
return need_size;
Expand Down Expand Up @@ -303,19 +319,24 @@ class SegmentExpr : public Expr {
size = std::min(size, batch_size_ - processed_size);

auto& skip_index = segment_->GetSkipIndex();
auto chunk = segment_->chunk_data<T>(field_id_, i);
const bool* valid_data = chunk.valid_data();
if (valid_data != nullptr) {
valid_data += data_pos;
}
if (!skip_func || !skip_func(skip_index, field_id_, i)) {
auto chunk = segment_->chunk_data<T>(field_id_, i);
const T* data = chunk.data() + data_pos;
const bool* valid_data = chunk.valid_data();
if (valid_data != nullptr) {
valid_data += data_pos;
}
func(data,
valid_data,
size,
res + processed_size,
valid_res + processed_size,
values...);
} else {
ApplyValidData(valid_data,
res + processed_size,
valid_res + processed_size,
size);
}

processed_size += size;
Expand Down Expand Up @@ -390,6 +411,27 @@ class SegmentExpr : public Expr {
valid_res + processed_size,
values...);
}
} else {
const bool* valid_data;
if constexpr (std::is_same_v<T, std::string_view> ||
std::is_same_v<T, Json>) {
if (segment_->type() == SegmentType::Sealed) {
valid_data = segment_

Check warning on line 419 in internal/core/src/exec/expression/Expr.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/expression/Expr.h#L418-L419

Added lines #L418 - L419 were not covered by tests
->get_batch_views<T>(
field_id_, i, data_pos, size)
.second.data();

Check warning on line 422 in internal/core/src/exec/expression/Expr.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/expression/Expr.h#L422

Added line #L422 was not covered by tests
}
} else {
auto chunk = segment_->chunk_data<T>(field_id_, i);

Check warning on line 425 in internal/core/src/exec/expression/Expr.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/expression/Expr.h#L425

Added line #L425 was not covered by tests
valid_data = chunk.valid_data();
if (valid_data != nullptr) {
valid_data += data_pos;

Check warning on line 428 in internal/core/src/exec/expression/Expr.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/expression/Expr.h#L427-L428

Added lines #L427 - L428 were not covered by tests
}
}
ApplyValidData(valid_data,

Check warning on line 431 in internal/core/src/exec/expression/Expr.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/exec/expression/Expr.h#L431

Added line #L431 was not covered by tests
res + processed_size,
valid_res + processed_size,
size);
}

processed_size += size;
Expand Down
59 changes: 57 additions & 2 deletions internal/core/src/index/TextMatchIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ TextMatchIndex::Upload(const Config& config) {
for (auto& file : remote_paths_to_size) {
ret.Append(file.first, nullptr, file.second);
}
auto binary_set = Serialize(config);
mem_file_manager_->AddFile(binary_set);
auto remote_mem_path_to_size =

Check warning on line 108 in internal/core/src/index/TextMatchIndex.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/index/TextMatchIndex.cpp#L106-L108

Added lines #L106 - L108 were not covered by tests
mem_file_manager_->GetRemotePathsToFileSize();
for (auto& file : remote_mem_path_to_size) {
ret.Append(file.first, nullptr, file.second);

Check warning on line 111 in internal/core/src/index/TextMatchIndex.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/index/TextMatchIndex.cpp#L110-L111

Added lines #L110 - L111 were not covered by tests
}

return ret;
}
Expand All @@ -114,21 +121,69 @@ TextMatchIndex::Load(const Config& config) {
AssertInfo(index_files.has_value(),
"index file paths is empty when load text log index");
auto prefix = disk_file_manager_->GetLocalTextIndexPrefix();
auto files_value = index_files.value();
auto it = std::find_if(
files_value.begin(), files_value.end(), [](const std::string& file) {
return file.substr(file.find_last_of('/') + 1) ==
"index_null_offset";

Check warning on line 128 in internal/core/src/index/TextMatchIndex.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/index/TextMatchIndex.cpp#L124-L128

Added lines #L124 - L128 were not covered by tests
});
if (it != files_value.end()) {
std::vector<std::string> file;
file.push_back(*it);
files_value.erase(it);
auto index_datas = mem_file_manager_->LoadIndexToMemory(file);
AssembleIndexDatas(index_datas);
BinarySet binary_set;
for (auto& [key, data] : index_datas) {
auto size = data->DataSize();
auto deleter = [&](uint8_t*) {}; // avoid repeated deconstruction
auto buf = std::shared_ptr<uint8_t[]>(
(uint8_t*)const_cast<void*>(data->Data()), deleter);
binary_set.Append(key, buf, size);

Check warning on line 142 in internal/core/src/index/TextMatchIndex.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/index/TextMatchIndex.cpp#L130-L142

Added lines #L130 - L142 were not covered by tests
}
auto index_valid_data = binary_set.GetByName("index_null_offset");
null_offset.resize((size_t)index_valid_data->size / sizeof(size_t));
memcpy(null_offset.data(),

Check warning on line 146 in internal/core/src/index/TextMatchIndex.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/index/TextMatchIndex.cpp#L144-L146

Added lines #L144 - L146 were not covered by tests
index_valid_data->data.get(),
(size_t)index_valid_data->size);
}

Check warning on line 149 in internal/core/src/index/TextMatchIndex.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/index/TextMatchIndex.cpp#L148-L149

Added lines #L148 - L149 were not covered by tests
disk_file_manager_->CacheTextLogToDisk(index_files.value());
AssertInfo(
tantivy_index_exist(prefix.c_str()), "index not exist: {}", prefix);
wrapper_ = std::make_shared<TantivyIndexWrapper>(prefix.c_str());
}

void
TextMatchIndex::AddText(const std::string& text, int64_t offset) {
AddTexts(1, &text, offset);
TextMatchIndex::AddText(const std::string& text,
const bool valid,
int64_t offset) {
if (!valid) {
AddNull(offset);
return;
}
wrapper_->add_data(&text, 1, offset);
if (shouldTriggerCommit()) {
Commit();

Check warning on line 166 in internal/core/src/index/TextMatchIndex.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/index/TextMatchIndex.cpp#L166

Added line #L166 was not covered by tests
}
}

void
TextMatchIndex::AddNull(int64_t offset) {
null_offset.push_back(offset);
}

void
TextMatchIndex::AddTexts(size_t n,
const std::string* texts,
const bool* valids,
int64_t offset_begin) {
if (valids != nullptr) {
for (int i = 0; i < n; i++) {
if (!valids[i]) {
null_offset.push_back(i);
}
}
}
wrapper_->add_data(texts, n, offset_begin);
if (shouldTriggerCommit()) {
Commit();
Expand Down
10 changes: 8 additions & 2 deletions internal/core/src/index/TextMatchIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,16 @@ class TextMatchIndex : public InvertedIndexTantivy<std::string> {

public:
void
AddText(const std::string& text, int64_t offset);
AddText(const std::string& text, const bool valid, int64_t offset);

void
AddTexts(size_t n, const std::string* texts, int64_t offset_begin);
AddNull(int64_t offset);

void
AddTexts(size_t n,
const std::string* texts,
const bool* valids,
int64_t offset_begin);

void
Finish();
Expand Down
7 changes: 4 additions & 3 deletions internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1538,7 +1538,8 @@ ChunkedSegmentSealedImpl::CreateTextIndex(FieldId field_id) {
field_id.get());
auto n = column->NumRows();
for (size_t i = 0; i < n; i++) {
index->AddText(std::string(column->RawAt(i)), i);
index->AddText(
std::string(column->RawAt(i)), column->IsValid(i), i);

Check warning on line 1542 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L1541-L1542

Added lines #L1541 - L1542 were not covered by tests
}
} else { // fetch raw data from index.
auto field_index_iter = scalar_indexings_.find(field_id);
Expand All @@ -1557,9 +1558,9 @@ ChunkedSegmentSealedImpl::CreateTextIndex(FieldId field_id) {
for (size_t i = 0; i < n; i++) {
auto raw = impl->Reverse_Lookup(i);
if (!raw.has_value()) {
continue;
index->AddNull(i);

Check warning on line 1561 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L1561

Added line #L1561 was not covered by tests
}
index->AddText(raw.value(), i);
index->AddText(raw.value(), true, i);

Check warning on line 1563 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L1563

Added line #L1563 was not covered by tests
}
}
}
Expand Down
16 changes: 14 additions & 2 deletions internal/core/src/segcore/SegmentGrowingImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,18 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset,
.string_data()
.data()
.end());
AddTexts(field_id, texts.data(), num_rows, reserved_offset);
FixedVector<bool> texts_valid_data(
insert_record_proto->fields_data(data_offset)
.valid_data()
.begin(),
insert_record_proto->fields_data(data_offset)
.valid_data()
.end());
AddTexts(field_id,
texts.data(),
texts_valid_data.data(),
num_rows,
reserved_offset);
}

// update average row data size
Expand Down Expand Up @@ -880,12 +891,13 @@ SegmentGrowingImpl::CreateTextIndexes() {
void
SegmentGrowingImpl::AddTexts(milvus::FieldId field_id,
const std::string* texts,
const bool* texts_valid_data,
size_t n,
int64_t offset_begin) {
std::unique_lock lock(mutex_);
auto iter = text_indexes_.find(field_id);
AssertInfo(iter != text_indexes_.end(), "text index not found");
iter->second->AddTexts(n, texts, offset_begin);
iter->second->AddTexts(n, texts, texts_valid_data, offset_begin);
}

} // namespace milvus::segcore
1 change: 1 addition & 0 deletions internal/core/src/segcore/SegmentGrowingImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
void
AddTexts(FieldId field_id,
const std::string* texts,
const bool* texts_valid_data,
size_t n,
int64_t offset_begin);

Expand Down
7 changes: 4 additions & 3 deletions internal/core/src/segcore/SegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2035,7 +2035,8 @@ SegmentSealedImpl::CreateTextIndex(FieldId field_id) {
field_id.get());
auto n = column->NumRows();
for (size_t i = 0; i < n; i++) {
index->AddText(std::string(column->RawAt(i)), i);
index->AddText(
std::string(column->RawAt(i)), column->IsValid(i), i);
}
} else { // fetch raw data from index.
auto field_index_iter = scalar_indexings_.find(field_id);
Expand All @@ -2054,9 +2055,9 @@ SegmentSealedImpl::CreateTextIndex(FieldId field_id) {
for (size_t i = 0; i < n; i++) {
auto raw = impl->Reverse_Lookup(i);
if (!raw.has_value()) {
continue;
index->AddNull(i);

Check warning on line 2058 in internal/core/src/segcore/SegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/SegmentSealedImpl.cpp#L2058

Added line #L2058 was not covered by tests
}
index->AddText(raw.value(), i);
index->AddText(raw.value(), true, i);

Check warning on line 2060 in internal/core/src/segcore/SegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/SegmentSealedImpl.cpp#L2060

Added line #L2060 was not covered by tests
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions internal/core/unittest/test_expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3883,8 +3883,6 @@ TEST(Expr, TestExprNOT) {
FixedVector<bool> valid_data) {
query::ExecPlanNodeVisitor visitor(*seg, MAX_TIMESTAMP);
BitsetType final;
return std::make_shared<expr::LogicalUnaryExpr>(
expr::LogicalUnaryExpr::OpType::LogicalNot, expr);
auto plan =
std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID, expr);
auto start = std::chrono::steady_clock::now();
Expand Down
Loading

0 comments on commit 6f9410f

Please sign in to comment.