Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor fulltext search #1613

Merged
merged 3 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Introduced CommonQueryFilter::AlwaysTrue(), SegmentEntry::CheckRowsVi…
…sible()

Removed DeleteWithBitmaskFilter
  • Loading branch information
yuzhichang committed Aug 7, 2024
commit fc226d967a60cd0878334387ed5aa9cc86d74de8
2 changes: 2 additions & 0 deletions src/common/default_values.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ export {
// column vector related constants
constexpr i64 MAX_BLOCK_CAPACITY = 65536L;
constexpr i64 DEFAULT_BLOCK_CAPACITY = 8192;
constexpr u64 BLOCK_OFFSET_SHIFT = 13; // it should be adjusted together with DEFAULT_BLOCK_CAPACITY
constexpr u64 BLOCK_OFFSET_MASK = 0x1FFF; // it should be adjusted together with DEFAULT_BLOCK_CAPACITY
constexpr i64 MIN_BLOCK_CAPACITY = 8192;
constexpr i16 INVALID_BLOCK_ID = std::numeric_limits<i16>::max();
constexpr i64 MAX_BLOCK_COUNT_IN_SEGMENT = 65536L;
Expand Down
5 changes: 4 additions & 1 deletion src/executor/operator/physical_index_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,10 @@ std::variant<Vector<u32>, Bitmask> SolveSecondaryIndexFilter(const Vector<Filter
Txn *txn) {
if (filter_execute_command.empty()) {
// return all true
return std::variant<Vector<u32>, Bitmask>(std::in_place_type<Bitmask>);
auto res = std::variant<Vector<u32>, Bitmask>(std::in_place_type<Bitmask>);
auto &bitmask = std::get<Bitmask>(res);
bitmask.Initialize(DEFAULT_SEGMENT_CAPACITY);
return res;
}
auto result =
SolveSecondaryIndexFilterInner(filter_execute_command, column_index_map, segment_id, segment_row_count, segment_row_actual_count, txn);
Expand Down
5 changes: 4 additions & 1 deletion src/executor/operator/physical_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,15 @@ struct FilterQueryNode final : public QueryNode {

std::unique_ptr<DocIterator>
CreateSearch(const TableEntry *table_entry, IndexReader &index_reader, EarlyTermAlgo early_term_algo) const override {
if (common_query_filter_ == nullptr || common_query_filter_->filter_result_count_ == 0)
assert(common_query_filter_ != nullptr);
if (!common_query_filter_->AlwaysTrue() && common_query_filter_->filter_result_count_ == 0)
return nullptr;
auto search_iter = query_tree_->CreateSearch(table_entry, index_reader, early_term_algo);
if (!search_iter) {
return nullptr;
}
if (common_query_filter_->AlwaysTrue())
return search_iter;
return MakeUnique<FilterIterator>(common_query_filter_, std::move(search_iter));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ import default_values;
namespace infinity {

bool PhysicalFilterScanBase::CalculateFilterBitmask(SegmentID segment_id, BlockID block_id, BlockOffset row_count, Bitmask &bitmask) const {
if (common_query_filter_->AlwaysTrue()) {
bitmask.SetAllTrue();
return true;
}
auto it_filter = common_query_filter_->filter_result_.find(segment_id);
if (it_filter == common_query_filter_->filter_result_.end()) {
return false;
Expand Down
376 changes: 179 additions & 197 deletions src/executor/operator/physical_scan/physical_knn_scan.cpp

Large diffs are not rendered by default.

72 changes: 34 additions & 38 deletions src/executor/operator/physical_scan/physical_match_sparse_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,33 +436,40 @@ void PhysicalMatchSparseScan::ExecuteInnerT(DistFunc *dist_func,
UnrecoverableError(fmt::format("IndexType: {} is not supported.", (i8)index_base->index_type_));
}

auto it = common_query_filter_->filter_result_.find(segment_id);
if (it == common_query_filter_->filter_result_.end()) {
break;
}
SizeT segment_row_count = 0;
SegmentEntry *segment_entry = nullptr;
{
auto segment_it = block_index->segment_block_index_.find(segment_id);
if (segment_it == block_index->segment_block_index_.end()) {
UnrecoverableError(fmt::format("Cannot find segment with id: {}", segment_id));
}
segment_entry = segment_it->second.segment_entry_;
segment_row_count = segment_it->second.segment_offset_;
}
bool has_some_result = false;
Bitmask bitmask;
const std::variant<Vector<u32>, Bitmask> &filter_result = it->second;
if (std::holds_alternative<Vector<u32>>(filter_result)) {
const Vector<u32> &filter_result_vector = std::get<Vector<u32>>(filter_result);
bitmask.Initialize(std::ceil(segment_row_count));
bitmask.SetAllFalse();
for (u32 row_id : filter_result_vector) {
bitmask.SetTrue(row_id);
}
bool use_bitmask = false;
if (common_query_filter_->AlwaysTrue()) {
has_some_result = true;
bitmask.SetAllTrue();
} else {
bitmask.ShallowCopy(std::get<Bitmask>(filter_result));
auto it = common_query_filter_->filter_result_.find(segment_id);
if (it != common_query_filter_->filter_result_.end()) {
SizeT segment_row_count = 0;
{
auto segment_it = block_index->segment_block_index_.find(segment_id);
if (segment_it == block_index->segment_block_index_.end()) {
UnrecoverableError(fmt::format("Cannot find segment with id: {}", segment_id));
}
segment_row_count = segment_it->second.segment_offset_;
}
const std::variant<Vector<u32>, Bitmask> &filter_result = it->second;
if (std::holds_alternative<Vector<u32>>(filter_result)) {
const Vector<u32> &filter_result_vector = std::get<Vector<u32>>(filter_result);
bitmask.Initialize(std::ceil(segment_row_count));
bitmask.SetAllFalse();
for (u32 row_id : filter_result_vector) {
bitmask.SetTrue(row_id);
}
} else {
bitmask.ShallowCopy(std::get<Bitmask>(filter_result));
}
has_some_result = true;
use_bitmask = !bitmask.IsAllTrue();
}
}
bool use_bitmask = !bitmask.IsAllTrue();
if (!has_some_result)
break;

auto bmp_search = [&](AbstractBMP index, SizeT query_id, bool with_lock, const auto &filter) {
auto query = get_ele(query_vector, query_id);
Expand Down Expand Up @@ -507,21 +514,10 @@ void PhysicalMatchSparseScan::ExecuteInnerT(DistFunc *dist_func,
};

if (use_bitmask) {
if (segment_entry->CheckAnyDelete(begin_ts)) {
DeleteWithBitmaskFilter filter(bitmask, segment_entry, begin_ts);
bmp_scan(filter);
} else {
BitmaskFilter<SegmentOffset> filter(bitmask);
bmp_scan(filter);
}
BitmaskFilter<SegmentOffset> filter(bitmask);
bmp_scan(filter);
} else {
SegmentOffset max_segment_offset = block_index->GetSegmentOffset(segment_id);
if (segment_entry->CheckAnyDelete(begin_ts)) {
DeleteFilter filter(segment_entry, begin_ts, max_segment_offset);
bmp_scan(filter);
} else {
bmp_scan(nullptr);
}
bmp_scan(nullptr);
}

break;
Expand Down
40 changes: 27 additions & 13 deletions src/executor/operator/physical_scan/physical_match_tensor_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,21 +256,35 @@ void PhysicalMatchTensorScan::ExecuteInner(QueryContext *query_context, MatchTen
} else {
segment_entry = iter->second.segment_entry_;
}
if (auto it = common_query_filter_->filter_result_.find(segment_id); it != common_query_filter_->filter_result_.end()) {
LOG_TRACE(fmt::format("MatchTensorScan: index {}/{} not skipped after common_query_filter", task_job_index, index_entries_.size()));
const auto segment_row_count = segment_entry->row_count();
Bitmask segment_bitmask;
const std::variant<Vector<u32>, Bitmask> &filter_result = it->second;
if (std::holds_alternative<Vector<u32>>(filter_result)) {
const Vector<u32> &filter_result_vector = std::get<Vector<u32>>(filter_result);
segment_bitmask.Initialize(std::ceil(segment_row_count));
segment_bitmask.SetAllFalse();
for (u32 row_id : filter_result_vector) {
segment_bitmask.SetTrue(row_id);

bool has_some_result = false;
Bitmask segment_bitmask;
if (common_query_filter_->AlwaysTrue()) {
has_some_result = true;
segment_bitmask.SetAllTrue();
} else {
auto it = common_query_filter_->filter_result_.find(segment_id);
if (it != common_query_filter_->filter_result_.end()) {
LOG_TRACE(fmt::format("MatchTensorScan: index {}/{} not skipped after common_query_filter", task_job_index, index_entries_.size()));

auto segment_row_count = segment_entry->row_count();
const std::variant<Vector<u32>, Bitmask> &filter_result = it->second;
if (std::holds_alternative<Vector<u32>>(filter_result)) {
const Vector<u32> &filter_result_vector = std::get<Vector<u32>>(filter_result);
segment_bitmask.Initialize(std::ceil(segment_row_count));
segment_bitmask.SetAllFalse();
for (u32 row_id : filter_result_vector) {
segment_bitmask.SetTrue(row_id);
}
} else {
segment_bitmask.ShallowCopy(std::get<Bitmask>(filter_result));
}
} else {
segment_bitmask.ShallowCopy(std::get<Bitmask>(filter_result));
has_some_result = true;
}
}

if (has_some_result) {
LOG_TRACE(fmt::format("MatchTensorScan: index {}/{} not skipped after common_query_filter", task_job_index, index_entries_.size()));
// TODO: now only have EMVB index
const Tuple<Vector<SharedPtr<ChunkIndexEntry>>, SharedPtr<EMVBIndexInMem>> emvb_snapshot = index_entry->GetEMVBIndexSnapshot();
// 1. in mem index
Expand Down
12 changes: 0 additions & 12 deletions src/function/table/knn_filter.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,4 @@ private:
const SegmentOffset max_segment_offset_;
};

export class DeleteWithBitmaskFilter final : public FilterBase<SegmentOffset> {
public:
explicit DeleteWithBitmaskFilter(const Bitmask &bitmask, const SegmentEntry *segment, TxnTimeStamp query_ts)
: bitmask_filter_(bitmask), delete_filter_(segment, query_ts, 0) {}

bool operator()(const SegmentOffset &segment_offset) const final { return bitmask_filter_(segment_offset) && delete_filter_(segment_offset); }

private:
BitmaskFilter<SegmentOffset> bitmask_filter_;
DeleteFilter delete_filter_;
};

} // namespace infinity
3 changes: 2 additions & 1 deletion src/storage/invertedindex/search/or_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ float OrIterator::BM25Score() {
}
float sum_score = 0;
for (u32 i = 0; i < children_.size(); ++i) {
sum_score += children_[i]->BM25Score();
if (children_[i]->DocID() == doc_id_)
sum_score += children_[i]->BM25Score();
}
bm25_score_cache_docid_ = doc_id_;
bm25_score_cache_ = sum_score;
Expand Down
20 changes: 4 additions & 16 deletions src/storage/knn_index/emvb/emvb_search.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,23 +404,11 @@ Tuple<u32, UniquePtr<f32[]>, UniquePtr<u32[]>> EMVBSearch<FIXED_QUERY_TOKEN_NUM>
}
};
if (const bool use_bitmask = !bitmask.IsAllTrue(); use_bitmask) {
if (segment_entry->CheckAnyDelete(begin_ts)) {
DeleteWithBitmaskFilter filter(bitmask, segment_entry, begin_ts);
filter_doc(filter);
} else {
BitmaskFilter<SegmentOffset> filter(bitmask);
filter_doc(filter);
}
BitmaskFilter<SegmentOffset> filter(bitmask);
filter_doc(filter);
} else {
if (segment_entry->CheckAnyDelete(begin_ts)) {
const auto segment_id = segment_entry->segment_id();
const SegmentOffset max_segment_offset = block_index->GetSegmentOffset(segment_id);
DeleteFilter filter(segment_entry, begin_ts, max_segment_offset);
filter_doc(filter);
} else {
// no delete
candidate_docs_filtered = std::move(candidate_docs);
}
// no delete
candidate_docs_filtered = std::move(candidate_docs);
}
auto selected_cnt_and_docs = compute_hit_frequency(std::move(candidate_docs_filtered), n_doc_to_score, std::move(centroid_q_token_sim));
auto selected_docs_centroid_scores =
Expand Down
39 changes: 39 additions & 0 deletions src/storage/meta/entry/block_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,45 @@ bool BlockEntry::CheckRowVisible(BlockOffset block_offset, TxnTimeStamp check_ts
return deleted[block_offset] == 0 || deleted[block_offset] > check_ts;
}

void BlockEntry::CheckRowsVisible(Vector<u32> &segment_offsets, TxnTimeStamp check_ts) const {
std::shared_lock lock(rw_locker_);
if (min_row_ts_ > check_ts)
return;

Vector<u32> segment_offsets2;
segment_offsets2.reserve(segment_offsets.size());
auto block_version_handle = this->block_version_->Load();
const auto *block_version = reinterpret_cast<const BlockVersion *>(block_version_handle.GetData());

auto &deleted = block_version->deleted_;
for (const auto segment_offset : segment_offsets) {
BlockOffset off = segment_offset & BLOCK_OFFSET_MASK;
if (deleted[off] == 0 || deleted[off] > check_ts) {
segment_offsets2.push_back(segment_offset);
}
}
segment_offsets = std::move(segment_offsets2);
}

// Similar to SetDeleteBitmask but faster
void BlockEntry::CheckRowsVisible(Bitmask &segment_offsets, TxnTimeStamp check_ts) const {
std::shared_lock lock(rw_locker_);
if (min_row_ts_ > check_ts)
return;

auto block_version_handle = this->block_version_->Load();
const auto *block_version = reinterpret_cast<const BlockVersion *>(block_version_handle.GetData());

auto &deleted = block_version->deleted_;
BlockOffset block_offset_end = block_version->GetRowCount(check_ts);
for (BlockOffset off = 0; off < block_offset_end; off++) {
if (deleted[off] != 0 && deleted[off] <= check_ts) {
SegmentOffset segment_offset = (SegmentOffset(block_id_) << BLOCK_OFFSET_SHIFT) | SegmentOffset(off);
segment_offsets.SetFalse(segment_offset);
}
}
}

bool BlockEntry::CheckDeleteConflict(const Vector<BlockOffset> &block_offsets) const {
std::shared_lock lock(rw_locker_);

Expand Down
5 changes: 5 additions & 0 deletions src/storage/meta/entry/block_entry.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import block_version;
import fast_rough_filter;
import value;
import buffer_obj;
import bitmask;

namespace infinity {

Expand Down Expand Up @@ -148,6 +149,10 @@ public:

bool CheckRowVisible(BlockOffset block_offset, TxnTimeStamp check_ts, bool check_append) const;

void CheckRowsVisible(Vector<u32> &segment_offsets, TxnTimeStamp check_ts) const;

void CheckRowsVisible(Bitmask &segment_offsets, TxnTimeStamp check_ts) const;

bool CheckDeleteConflict(const Vector<BlockOffset> &block_offsets) const;

void SetDeleteBitmask(TxnTimeStamp query_ts, Bitmask &bitmask) const;
Expand Down
35 changes: 34 additions & 1 deletion src/storage/meta/entry/segment_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,39 @@ bool SegmentEntry::CheckRowVisible(SegmentOffset segment_offset, TxnTimeStamp ch
return block_entry->CheckRowVisible(block_offset, check_ts, check_append);
}

void SegmentEntry::CheckRowsVisible(Vector<u32> &segment_offsets, TxnTimeStamp check_ts) const {
std::shared_lock lock(rw_locker_);
if (first_delete_ts_ >= check_ts)
return;

Vector<u32> segment_offsets2;
segment_offsets2.reserve(segment_offsets.size());
Map<BlockID, Vector<u32>> block_offsets_map;
for (const auto segment_offset : segment_offsets) {
BlockID block_id = segment_offset >> BLOCK_OFFSET_SHIFT;
block_offsets_map[block_id].push_back(segment_offset);
}

for (auto &[block_id, block_offsets] : block_offsets_map) {
auto *block_entry = GetBlockEntryByID(block_id).get();
if (block_entry == nullptr || block_entry->commit_ts_ > check_ts) {
continue;
}
block_entry->CheckRowsVisible(block_offsets, check_ts);
std::copy(block_offsets.begin(), block_offsets.end(), std::back_inserter(segment_offsets2));
}
segment_offsets = std::move(segment_offsets2);
}

void SegmentEntry::CheckRowsVisible(Bitmask &segment_offsets, TxnTimeStamp check_ts) const {
std::shared_lock lock(rw_locker_);
if (first_delete_ts_ >= check_ts)
return;
for (auto &block_entry : block_entries_) {
block_entry->CheckRowsVisible(segment_offsets, check_ts);
}
}

bool SegmentEntry::CheckVisible(Txn *txn) const {
TxnTimeStamp begin_ts = txn->BeginTS();
std::shared_lock lock(rw_locker_);
Expand Down Expand Up @@ -401,7 +434,7 @@ void SegmentEntry::CommitSegment(TransactionID txn_id,
auto iter = delete_state->rows_.find(segment_id_);
if (iter != delete_state->rows_.end()) {
const auto &block_row_hashmap = iter->second;
if (this->first_delete_ts_ == UNCOMMIT_TS) {
if (!block_row_hashmap.empty() && this->first_delete_ts_ == UNCOMMIT_TS) {
this->first_delete_ts_ = commit_ts;
}
delete_txns_.erase(txn_id);
Expand Down
5 changes: 5 additions & 0 deletions src/storage/meta/entry/segment_entry.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import value;
import meta_entry_interface;
import cleanup_scanner;
import logger;
import bitmask;

namespace infinity {

Expand Down Expand Up @@ -125,6 +126,10 @@ public:

bool CheckRowVisible(SegmentOffset segment_offset, TxnTimeStamp check_ts, bool check_append) const;

void CheckRowsVisible(Vector<u32> &segment_offsets, TxnTimeStamp check_ts) const;

void CheckRowsVisible(Bitmask &segment_offsets, TxnTimeStamp check_ts) const;

virtual bool CheckVisible(Txn *txn) const override;

bool CheckDeprecate(TxnTimeStamp check_ts) const;
Expand Down
10 changes: 10 additions & 0 deletions src/storage/meta/entry/table_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1244,4 +1244,14 @@ Tuple<Vector<String>, Vector<TableIndexMeta *>, std::shared_lock<std::shared_mut

TableIndexMeta *TableEntry::GetIndexMetaPtrByName(const String &name) const { return index_meta_map_.GetMetaPtrByName(name); }

bool TableEntry::CheckAnyDelete(TxnTimeStamp check_ts) const {
std::shared_lock lock(this->rw_locker_);
for (const auto &[_, segment] : segment_map_) {
if (segment->CheckAnyDelete(check_ts)) {
return true;
}
}
return false;
}

} // namespace infinity
2 changes: 2 additions & 0 deletions src/storage/meta/entry/table_entry.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ private: // TODO: remove it
public: // TODO: remove it?
// HashMap<String, UniquePtr<TableIndexMeta>> &index_meta_map() { return index_meta_map_.meta_map_; }

bool CheckAnyDelete(TxnTimeStamp check_ts) const;

public:
void PickCleanup(CleanupScanner *scanner) override;

Expand Down
Loading
Loading