Skip to content

Commit

Permalink
enhance: Refine code for get_deleted_bitmap (milvus-io#36819)
Browse files Browse the repository at this point in the history
issue: milvus-io#33744 

Check whether the PK is truly sorted in the debug model.

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
  • Loading branch information
xiaocai2333 authored Oct 28, 2024
1 parent 4926021 commit 86687bd
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 95 deletions.
8 changes: 8 additions & 0 deletions internal/core/src/segcore/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,11 @@

add_source_at_current_directory_recursively()
add_library(milvus_segcore OBJECT ${SOURCE_FILES})

if(CMAKE_BUILD_TYPE STREQUAL "Debug")
set(CHECK_SORTED ON)
else()
set(CHECK_SORTED OFF)
endif()

add_definitions(-DCHECK_SORTED=${CHECK_SORTED})
137 changes: 54 additions & 83 deletions internal/core/src/segcore/SegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,41 @@

namespace milvus::segcore {

#ifdef CHECK_SORTED
#define ASSERT_COLUMN_ORDERED(data_type, column) \
{ \
switch (data_type) { \
case DataType::INT64: { \
auto col = \
std::dynamic_pointer_cast<SingleChunkColumn>(column); \
auto pks = reinterpret_cast<const int64_t*>(col->Data()); \
for (int i = 1; i < col->NumRows(); ++i) { \
assert(pks[i - 1] <= pks[i] && \
"INT64 Column is not ordered!"); \
} \
break; \
} \
case DataType::VARCHAR: { \
auto col = std::dynamic_pointer_cast< \
SingleChunkVariableColumn<std::string>>(column); \
auto pks = col->Views(); \
for (int i = 1; i < col->NumRows(); ++i) { \
assert(pks[i - 1] <= pks[i] && \
"VARCHAR Column is not ordered!"); \
} \
break; \
} \
default: { \
PanicInfo(DataTypeInvalid, \
fmt::format("unsupported primary key data type", \
data_type)); \
} \
} \
}
#else
#define ASSERT_COLUMN_ORDERED(data_type, column) ((void)0)
#endif

static inline void
set_bit(BitsetType& bitset, FieldId field_id, bool flag = true) {
auto pos = field_id.get() - START_USER_FIELDID;
Expand Down Expand Up @@ -458,11 +493,15 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
// set pks to offset
// if the segments are already sorted by pk, there is no need to build a pk offset index.
// it can directly perform a binary search on the pk column.
if (schema_->get_primary_field_id() == field_id && !is_sorted_by_pk_) {
AssertInfo(field_id.get() != -1, "Primary key is -1");
AssertInfo(insert_record_.empty_pks(), "already exists");
insert_record_.insert_pks(data_type, column);
insert_record_.seal_pks();
if (schema_->get_primary_field_id() == field_id) {
if (!is_sorted_by_pk_) {
AssertInfo(field_id.get() != -1, "Primary key is -1");
AssertInfo(insert_record_.empty_pks(), "already exists");
insert_record_.insert_pks(data_type, column);
insert_record_.seal_pks();
} else {
ASSERT_COLUMN_ORDERED(data_type, column);
}
}

bool use_temp_index = false;
Expand Down Expand Up @@ -889,74 +928,6 @@ SegmentSealedImpl::search_pk(const PkType& pk, int64_t insert_barrier) const {
return pk_offsets;
}

std::shared_ptr<DeletedRecord::TmpBitmap>
SegmentSealedImpl::get_deleted_bitmap_s(int64_t del_barrier,
int64_t insert_barrier,
DeletedRecord& delete_record,
Timestamp query_timestamp) const {
// if insert_barrier and del_barrier have not changed, use cache data directly
bool hit_cache = false;
int64_t old_del_barrier = 0;
auto current = delete_record.clone_lru_entry(
insert_barrier, del_barrier, old_del_barrier, hit_cache);
if (hit_cache) {
return current;
}

auto bitmap = current->bitmap_ptr;

int64_t start, end;
if (del_barrier < old_del_barrier) {
// in this case, ts of delete record[current_del_barrier : old_del_barrier] > query_timestamp
// so these deletion records do not take effect in query/search
// so bitmap corresponding to those pks in delete record[current_del_barrier:old_del_barrier] will be reset to 0
// for example, current_del_barrier = 2, query_time = 120, the bitmap will be reset to [0, 1, 1, 0, 0, 0, 0, 0]
start = del_barrier;
end = old_del_barrier;
} else {
// the cache is not enough, so update bitmap using new pks in delete record[old_del_barrier:current_del_barrier]
// for example, current_del_barrier = 4, query_time = 300, bitmap will be updated to [0, 1, 1, 0, 1, 1, 0, 0]
start = old_del_barrier;
end = del_barrier;
}

// Avoid invalid calculations when there are a lot of repeated delete pks
std::unordered_map<PkType, Timestamp> delete_timestamps;
for (auto del_index = start; del_index < end; ++del_index) {
auto pk = delete_record.pks()[del_index];
auto timestamp = delete_record.timestamps()[del_index];

delete_timestamps[pk] = timestamp > delete_timestamps[pk]
? timestamp
: delete_timestamps[pk];
}

for (auto& [pk, timestamp] : delete_timestamps) {
auto segOffsets = search_pk(pk, insert_barrier);
for (auto offset : segOffsets) {
int64_t insert_row_offset = offset.get();

// The deletion record do not take effect in search/query,
// and reset bitmap to 0
if (timestamp > query_timestamp) {
bitmap->reset(insert_row_offset);
continue;
}
// Insert after delete with same pk, delete will not task effect on this insert record,
// and reset bitmap to 0
if (insert_record_.timestamps_[offset.get()] >= timestamp) {
bitmap->reset(insert_row_offset);
continue;
}
// insert data corresponding to the insert_row_offset will be ignored in search/query
bitmap->set(insert_row_offset);
}
}

delete_record.insert_lru_entry(current);
return current;
}

void
SegmentSealedImpl::mask_with_delete(BitsetTypeView& bitset,
int64_t ins_barrier,
Expand All @@ -968,16 +939,16 @@ SegmentSealedImpl::mask_with_delete(BitsetTypeView& bitset,

auto bitmap_holder = std::shared_ptr<DeletedRecord::TmpBitmap>();

if (!is_sorted_by_pk_) {
bitmap_holder = get_deleted_bitmap(del_barrier,
ins_barrier,
deleted_record_,
insert_record_,
timestamp);
} else {
bitmap_holder = get_deleted_bitmap_s(
del_barrier, ins_barrier, deleted_record_, timestamp);
}
auto search_fn = [this](const PkType& pk, int64_t barrier) {
return this->search_pk(pk, barrier);
};
bitmap_holder = get_deleted_bitmap(del_barrier,
ins_barrier,
deleted_record_,
insert_record_,
timestamp,
is_sorted_by_pk_,
search_fn);

if (!bitmap_holder || !bitmap_holder->bitmap_ptr) {
return;
Expand Down
6 changes: 0 additions & 6 deletions internal/core/src/segcore/SegmentSealedImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,6 @@ class SegmentSealedImpl : public SegmentSealed {
std::vector<SegOffset>
search_pk(const PkType& pk, int64_t insert_barrier) const;

std::shared_ptr<DeletedRecord::TmpBitmap>
get_deleted_bitmap_s(int64_t del_barrier,
int64_t insert_barrier,
DeletedRecord& delete_record,
Timestamp query_timestamp) const;

std::unique_ptr<DataArray>
get_vector(FieldId field_id,
const int64_t* ids,
Expand Down
18 changes: 12 additions & 6 deletions internal/core/src/segcore/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,15 @@ MergeDataArray(std::vector<MergeBase>& merge_bases,

template <bool is_sealed>
std::shared_ptr<DeletedRecord::TmpBitmap>
get_deleted_bitmap(int64_t del_barrier,
int64_t insert_barrier,
DeletedRecord& delete_record,
const InsertRecord<is_sealed>& insert_record,
Timestamp query_timestamp) {
get_deleted_bitmap(
int64_t del_barrier,
int64_t insert_barrier,
DeletedRecord& delete_record,
const InsertRecord<is_sealed>& insert_record,
Timestamp query_timestamp,
bool is_sorted_by_pk = false,
const std::function<std::vector<SegOffset>(const PkType&, int64_t)>&
search_fn = nullptr) {
// if insert_barrier and del_barrier have not changed, use cache data directly
bool hit_cache = false;
int64_t old_del_barrier = 0;
Expand Down Expand Up @@ -153,7 +157,9 @@ get_deleted_bitmap(int64_t del_barrier,
}

for (auto& [pk, timestamp] : delete_timestamps) {
auto segOffsets = insert_record.search_pk(pk, insert_barrier);
auto segOffsets = is_sorted_by_pk
? search_fn(pk, insert_barrier)
: insert_record.search_pk(pk, insert_barrier);
for (auto offset : segOffsets) {
int64_t insert_row_offset = offset.get();

Expand Down

0 comments on commit 86687bd

Please sign in to comment.