Skip to content

Commit

Permalink
enhance: intermin index support index without raw data and metric typ…
Browse files Browse the repository at this point in the history
…e fp16/bf16

Signed-off-by: cqy123456 <qianya.cheng@zilliz.com>
  • Loading branch information
cqy123456 committed Jan 16, 2025
1 parent 99a8274 commit 68d4c0e
Show file tree
Hide file tree
Showing 31 changed files with 923 additions and 246 deletions.
7 changes: 5 additions & 2 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,11 @@ queryNode:
# Milvus will eventually seals and indexes all segments, but enabling this optimizes search performance for immediate queries following data insertion.
# This defaults to true, indicating that Milvus creates temporary index for growing segments and the sealed segments that are not indexed upon searches.
enableIndex: true
nlist: 128 # temp index nlist, recommend to set sqrt(chunkRows), must smaller than chunkRows/8
nprobe: 16 # nprobe to search small index, based on your accuracy requirement, must smaller than nlist
nlist: 128 # interim index nlist, recommend to set sqrt(chunkRows), must smaller than chunkRows/8
nprobe: 36 # nprobe to search small index, based on your accuracy requirement, must smaller than nlist
subDim: 4 # interim index sub dim, recommend to (subDim % vector dim == 0)
refineRatio: 4.5 # interim index parameters, should set to be >= 1.0
denseVectorIndexType: SCANN_DVR # Dense vector intermin index type
memExpansionRate: 1.15 # extra memory needed by building interim index
buildParallelRate: 0.5 # the ratio of building interim index parallel matched with cpu num
multipleChunkedEnable: true # Enable multiple chunked search
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/common/type_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ typedef struct CMmapConfig {
uint64_t fix_file_size;
bool growing_enable_mmap;
bool scalar_index_enable_mmap;
bool scalar_field_enable_mmap;
bool vector_index_enable_mmap;
bool vector_field_enable_mmap;
} CMmapConfig;

typedef struct CTraceConfig {
Expand Down
24 changes: 24 additions & 0 deletions internal/core/src/index/VectorMemIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,30 @@ VectorMemIndex<T>::VectorMemIndex(
}
}

template <typename T>
VectorMemIndex<T>::VectorMemIndex(const IndexType& index_type,
const MetricType& metric_type,
const IndexVersion& version,
const knowhere::ViewDataOp view_data)
: VectorIndex(index_type, metric_type) {
CheckMetricTypeSupport<T>(metric_type);
AssertInfo(!is_unsupported(index_type, metric_type),
index_type + " doesn't support metric: " + metric_type);

auto view_data_pack = knowhere::Pack(view_data);
auto get_index_obj = knowhere::IndexFactory::Instance().Create<T>(
GetIndexType(), version, view_data_pack);
if (get_index_obj.has_value()) {
index_ = get_index_obj.value();
} else {
auto err = get_index_obj.error();
if (err == knowhere::Status::invalid_index_error) {
PanicInfo(ErrorCode::Unsupported, get_index_obj.what());

Check warning on line 104 in internal/core/src/index/VectorMemIndex.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/index/VectorMemIndex.cpp#L103-L104

Added lines #L103 - L104 were not covered by tests
}
PanicInfo(ErrorCode::KnowhereError, get_index_obj.what());

Check warning on line 106 in internal/core/src/index/VectorMemIndex.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/index/VectorMemIndex.cpp#L106

Added line #L106 was not covered by tests
}
}

template <typename T>
knowhere::expected<std::vector<knowhere::IndexNode::IteratorPtr>>
VectorMemIndex<T>::VectorIterators(const milvus::DatasetPtr dataset,
Expand Down
6 changes: 6 additions & 0 deletions internal/core/src/index/VectorMemIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ class VectorMemIndex : public VectorIndex {
const storage::FileManagerContext& file_manager_context =
storage::FileManagerContext());

// knowhere data view index special constucter for intermin index, no need to hold file_manager_ to upload or download files
VectorMemIndex(const IndexType& index_type,
const MetricType& metric_type,
const IndexVersion& version,
const knowhere::ViewDataOp view_data);

BinarySet
Serialize(const Config& config) override;

Expand Down
1 change: 1 addition & 0 deletions internal/core/src/mmap/ChunkVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#pragma once
#include "mmap/ChunkData.h"
#include "storage/MmapManager.h"
#include "segcore/SegcoreConfig.h"
namespace milvus {
template <typename Type>
class ChunkVectorBase {
Expand Down
7 changes: 3 additions & 4 deletions internal/core/src/query/SearchOnGrowing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ FloatSegmentIndexSearch(const segcore::SegmentGrowingImpl& segment,
// TODO(SPARSE): see todo in PlanImpl.h::PlaceHolder.
auto dim = is_sparse ? 0 : field.get_dim();

AssertInfo(field.get_data_type() == DataType::VECTOR_FLOAT ||
field.get_data_type() == DataType::VECTOR_SPARSE_FLOAT,
"[FloatSearch]Field data type isn't VECTOR_FLOAT or "
"VECTOR_SPARSE_FLOAT");
AssertInfo(IsVectorDataType(field.get_data_type()),
"[FloatSearch]Field data type isn't VECTOR_FLOAT, "
"VECTOR_FLOAT16, VECTOR_BFLOAT16 or VECTOR_SPARSE_FLOAT");
dataset::SearchDataset search_dataset{info.metric_type_,
num_queries,
info.topk_,
Expand Down
97 changes: 71 additions & 26 deletions internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "common/Tracer.h"
#include "common/Types.h"
#include "google/protobuf/message_lite.h"
#include "index/VectorIndex.h"
#include "index/VectorMemIndex.h"
#include "mmap/ChunkedColumn.h"
#include "mmap/Utils.h"
Expand Down Expand Up @@ -115,7 +116,8 @@ ChunkedSegmentSealedImpl::LoadVecIndex(const LoadIndexInfo& info) {
if (get_bit(field_data_ready_bitset_, field_id)) {
fields_.erase(field_id);
set_bit(field_data_ready_bitset_, field_id, false);
} else if (get_bit(binlog_index_bitset_, field_id)) {
}
if (get_bit(binlog_index_bitset_, field_id)) {

Check warning on line 120 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L120

Added line #L120 was not covered by tests
set_bit(binlog_index_bitset_, field_id, false);
vector_indexings_.drop_field_indexing(field_id);
}
Expand All @@ -136,8 +138,7 @@ ChunkedSegmentSealedImpl::WarmupChunkCache(const FieldId field_id,
auto& field_meta = schema_->operator[](field_id);
AssertInfo(field_meta.is_vector(), "vector field is not vector type");

if (!get_bit(index_ready_bitset_, field_id) &&
!get_bit(binlog_index_bitset_, field_id)) {
if (!get_bit(index_ready_bitset_, field_id)) {

Check warning on line 141 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L141

Added line #L141 was not covered by tests
return;
}

Expand Down Expand Up @@ -489,21 +490,13 @@ ChunkedSegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
insert_record_.seal_pks();
}

bool use_temp_index = false;
{
// update num_rows to build temperate binlog index
// update num_rows to build temperate intermin index
std::unique_lock lck(mutex_);
update_row_count(num_rows);
}

if (generate_interim_index(field_id)) {
std::unique_lock lck(mutex_);
fields_.erase(field_id);
set_bit(field_data_ready_bitset_, field_id, false);
use_temp_index = true;
}

if (!use_temp_index) {
if (!generate_interim_index(field_id)) {
std::unique_lock lck(mutex_);
set_bit(field_data_ready_bitset_, field_id, true);
}
Expand Down Expand Up @@ -1744,6 +1737,10 @@ ChunkedSegmentSealedImpl::bulk_subscript(FieldId field_id,
return fill_with_empty(field_id, count);
}

if (HasFieldData(field_id)) {

Check warning on line 1740 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L1740

Added line #L1740 was not covered by tests
Assert(get_bit(field_data_ready_bitset_, field_id));
return get_raw_data(field_id, field_meta, seg_offsets, count);

Check warning on line 1742 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L1742

Added line #L1742 was not covered by tests
}
if (HasIndex(field_id)) {
// if field has load scalar index, reverse raw data from index
if (!IsVectorDataType(field_meta.get_data_type())) {
Expand All @@ -1757,11 +1754,9 @@ ChunkedSegmentSealedImpl::bulk_subscript(FieldId field_id,
return get_raw_data(field_id, field_meta, seg_offsets, count);
}
return get_vector(field_id, seg_offsets, count);
} else {
return fill_with_empty(field_id, count);
}

Assert(get_bit(field_data_ready_bitset_, field_id));

return get_raw_data(field_id, field_meta, seg_offsets, count);
}

std::unique_ptr<DataArray>
Expand Down Expand Up @@ -1818,15 +1813,22 @@ ChunkedSegmentSealedImpl::HasRawData(int64_t field_id) const {
auto fieldID = FieldId(field_id);
const auto& field_meta = schema_->operator[](fieldID);
if (IsVectorDataType(field_meta.get_data_type())) {
if (get_bit(index_ready_bitset_, fieldID) |
get_bit(binlog_index_bitset_, fieldID)) {
if (get_bit(index_ready_bitset_, fieldID)) {

Check warning on line 1816 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L1816

Added line #L1816 was not covered by tests
AssertInfo(vector_indexings_.is_ready(fieldID),
"vector index is not ready");
auto field_indexing = vector_indexings_.get_field_indexing(fieldID);
auto vec_index = dynamic_cast<index::VectorIndex*>(
field_indexing->indexing_.get());
return vec_index->HasRawData();
}
} else if (get_bit(binlog_index_bitset_, fieldID)) {
AssertInfo(vector_indexings_.is_ready(fieldID),
"vector index is not ready");
auto field_indexing = vector_indexings_.get_field_indexing(fieldID);

Check warning on line 1827 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L1824-L1827

Added lines #L1824 - L1827 were not covered by tests
auto vec_index =
dynamic_cast<index::VectorIndex*>(field_indexing->indexing_.get());
return vec_index->HasRawData() ||
get_bit(field_data_ready_bitset_, fieldID);
} else {
auto scalar_index = scalar_indexings_.find(fieldID);
if (scalar_index != scalar_indexings_.end()) {
Expand Down Expand Up @@ -2017,6 +2019,8 @@ ChunkedSegmentSealedImpl::generate_interim_index(const FieldId field_id) {
}
// check data type
if (field_meta.get_data_type() != DataType::VECTOR_FLOAT &&
field_meta.get_data_type() != DataType::VECTOR_FLOAT16 &&
field_meta.get_data_type() != DataType::VECTOR_BFLOAT16 &&
!is_sparse) {
return false;
}
Expand Down Expand Up @@ -2062,16 +2066,50 @@ ChunkedSegmentSealedImpl::generate_interim_index(const FieldId field_id) {
is_sparse
? dynamic_cast<ChunkedSparseFloatColumn*>(vec_data.get())->Dim()
: field_meta.get_dim();
auto index_metric = field_binlog_config->GetMetricType();
std::unique_ptr<index::VectorIndex> vec_index = nullptr;
if (!is_sparse) {
knowhere::ViewDataOp view_data = [field_raw_data_ptr =
vec_data](size_t id) {
return field_raw_data_ptr->ValueAt(id);

Check warning on line 2074 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2069-L2074

Added lines #L2069 - L2074 were not covered by tests
};
if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) {

Check warning on line 2076 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2076

Added line #L2076 was not covered by tests
vec_index = std::make_unique<index::VectorMemIndex<float>>(
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber(),
view_data);

Check warning on line 2081 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2078-L2081

Added lines #L2078 - L2081 were not covered by tests
} else if (field_meta.get_data_type() == DataType::VECTOR_FLOAT16) {
vec_index =

Check warning on line 2083 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2083

Added line #L2083 was not covered by tests
std::make_unique<index::VectorMemIndex<knowhere::fp16>>(
field_binlog_config->GetIndexType(),

Check warning on line 2085 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2085

Added line #L2085 was not covered by tests
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber(),
view_data);
} else if (field_meta.get_data_type() ==

Check warning on line 2089 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2087-L2089

Added lines #L2087 - L2089 were not covered by tests
DataType::VECTOR_BFLOAT16) {
vec_index =

Check warning on line 2091 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2091

Added line #L2091 was not covered by tests
std::make_unique<index::VectorMemIndex<knowhere::bf16>>(
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber(),
view_data);

Check warning on line 2096 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2095-L2096

Added lines #L2095 - L2096 were not covered by tests
}
} else {

Check warning on line 2098 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2098

Added line #L2098 was not covered by tests
vec_index = std::make_unique<index::VectorMemIndex<float>>(
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber());

Check warning on line 2102 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2100-L2102

Added lines #L2100 - L2102 were not covered by tests
}
if (vec_index == nullptr) {
LOG_INFO("fail to generate intermin index, invalid data type.");
return false;
}

Check warning on line 2107 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2105-L2107

Added lines #L2105 - L2107 were not covered by tests

auto build_config = field_binlog_config->GetBuildBaseParams();
build_config[knowhere::meta::DIM] = std::to_string(dim);
build_config[knowhere::meta::NUM_BUILD_THREAD] = std::to_string(1);
auto index_metric = field_binlog_config->GetMetricType();

auto vec_index = std::make_unique<index::VectorMemIndex<float>>(
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber());
auto num_chunk = vec_data->num_chunks();
for (int i = 0; i < num_chunk; ++i) {
auto dataset = knowhere::GenDataSet(
Expand All @@ -2088,19 +2126,26 @@ ChunkedSegmentSealedImpl::generate_interim_index(const FieldId field_id) {

if (enable_binlog_index()) {
std::unique_lock lck(mutex_);
if (vec_index->HasRawData()) {
// some knowhere view data index not has raw data, still keep it

Check warning on line 2130 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2130

Added line #L2130 was not covered by tests
fields_.erase(field_id);
set_bit(field_data_ready_bitset_, field_id, false);

Check warning on line 2132 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2132

Added line #L2132 was not covered by tests
} else {
set_bit(field_data_ready_bitset_, field_id, true);
}

Check warning on line 2135 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2135

Added line #L2135 was not covered by tests
vector_indexings_.append_field_indexing(
field_id, index_metric, std::move(vec_index));

vec_binlog_config_[field_id] = std::move(field_binlog_config);
set_bit(binlog_index_bitset_, field_id, true);
LOG_INFO(
"replace binlog with binlog index in segment {}, field {}.",
"replace binlog with intermin index in segment {}, field {}.",

Check warning on line 2142 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2142

Added line #L2142 was not covered by tests
this->get_segment_id(),
field_id.get());
}
return true;
} catch (std::exception& e) {
LOG_WARN("fail to generate binlog index, because {}", e.what());
LOG_WARN("fail to generate intermin index, because {}", e.what());
return false;
}
}
Expand Down
13 changes: 13 additions & 0 deletions internal/core/src/segcore/ConcurrentVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -541,4 +541,17 @@ class ConcurrentVector<BFloat16Vector>
}
};

static bool
ConcurrentDenseVectorCheck(const VectorBase* vec_base, DataType data_type) {
if (data_type == DataType::VECTOR_FLOAT) {
return dynamic_cast<const ConcurrentVector<FloatVector>*>(vec_base);
} else if (data_type == DataType::VECTOR_FLOAT16) {
return dynamic_cast<const ConcurrentVector<Float16Vector>*>(vec_base);
} else if (data_type == DataType::VECTOR_BFLOAT16) {
return dynamic_cast<const ConcurrentVector<BFloat16Vector>*>(vec_base);
} else {
return false;
}
}

} // namespace milvus::segcore
Loading

0 comments on commit 68d4c0e

Please sign in to comment.