Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: [2.5]intermin index support different index type and more data type(fp16/bf16) #39180

Open
wants to merge 1 commit into
base: 2.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,11 @@ queryNode:
# Milvus will eventually seals and indexes all segments, but enabling this optimizes search performance for immediate queries following data insertion.
# This defaults to true, indicating that Milvus creates temporary index for growing segments and the sealed segments that are not indexed upon searches.
enableIndex: true
nlist: 128 # temp index nlist, recommend to set sqrt(chunkRows), must smaller than chunkRows/8
nprobe: 16 # nprobe to search small index, based on your accuracy requirement, must smaller than nlist
nlist: 128 # interim index nlist, recommend to set sqrt(chunkRows), must smaller than chunkRows/8
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change these config before merge

nprobe: 36 # nprobe to search small index, based on your accuracy requirement, must smaller than nlist
subDim: 4 # interim index sub dim, recommend to (subDim % vector dim == 0)
refineRatio: 4.5 # interim index parameters, should set to be >= 1.0
denseVectorIndexType: SCANN_DVR # Dense vector intermin index type
memExpansionRate: 1.15 # extra memory needed by building interim index
buildParallelRate: 0.5 # the ratio of building interim index parallel matched with cpu num
multipleChunkedEnable: true # Enable multiple chunked search
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/common/type_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ typedef struct CMmapConfig {
uint64_t fix_file_size;
bool growing_enable_mmap;
bool scalar_index_enable_mmap;
bool scalar_field_enable_mmap;
bool vector_index_enable_mmap;
bool vector_field_enable_mmap;
} CMmapConfig;

typedef struct CTraceConfig {
Expand Down
24 changes: 24 additions & 0 deletions internal/core/src/index/VectorMemIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,30 @@
}
}

template <typename T>
VectorMemIndex<T>::VectorMemIndex(const IndexType& index_type,
const MetricType& metric_type,
const IndexVersion& version,
const knowhere::ViewDataOp view_data)
: VectorIndex(index_type, metric_type) {
CheckMetricTypeSupport<T>(metric_type);
AssertInfo(!is_unsupported(index_type, metric_type),
index_type + " doesn't support metric: " + metric_type);

auto view_data_pack = knowhere::Pack(view_data);
auto get_index_obj = knowhere::IndexFactory::Instance().Create<T>(
GetIndexType(), version, view_data_pack);
if (get_index_obj.has_value()) {
index_ = get_index_obj.value();
} else {
auto err = get_index_obj.error();
if (err == knowhere::Status::invalid_index_error) {
PanicInfo(ErrorCode::Unsupported, get_index_obj.what());

Check warning on line 104 in internal/core/src/index/VectorMemIndex.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/index/VectorMemIndex.cpp#L103-L104

Added lines #L103 - L104 were not covered by tests
}
PanicInfo(ErrorCode::KnowhereError, get_index_obj.what());

Check warning on line 106 in internal/core/src/index/VectorMemIndex.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/index/VectorMemIndex.cpp#L106

Added line #L106 was not covered by tests
}
}

template <typename T>
knowhere::expected<std::vector<knowhere::IndexNode::IteratorPtr>>
VectorMemIndex<T>::VectorIterators(const milvus::DatasetPtr dataset,
Expand Down
6 changes: 6 additions & 0 deletions internal/core/src/index/VectorMemIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ class VectorMemIndex : public VectorIndex {
const storage::FileManagerContext& file_manager_context =
storage::FileManagerContext());

// knowhere data view index special constucter for intermin index, no need to hold file_manager_ to upload or download files
VectorMemIndex(const IndexType& index_type,
const MetricType& metric_type,
const IndexVersion& version,
const knowhere::ViewDataOp view_data);

BinarySet
Serialize(const Config& config) override;

Expand Down
1 change: 1 addition & 0 deletions internal/core/src/mmap/ChunkVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#pragma once
#include "mmap/ChunkData.h"
#include "storage/MmapManager.h"
#include "segcore/SegcoreConfig.h"
namespace milvus {
template <typename Type>
class ChunkVectorBase {
Expand Down
7 changes: 3 additions & 4 deletions internal/core/src/query/SearchOnGrowing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ FloatSegmentIndexSearch(const segcore::SegmentGrowingImpl& segment,
// TODO(SPARSE): see todo in PlanImpl.h::PlaceHolder.
auto dim = is_sparse ? 0 : field.get_dim();

AssertInfo(field.get_data_type() == DataType::VECTOR_FLOAT ||
field.get_data_type() == DataType::VECTOR_SPARSE_FLOAT,
"[FloatSearch]Field data type isn't VECTOR_FLOAT or "
"VECTOR_SPARSE_FLOAT");
AssertInfo(IsVectorDataType(field.get_data_type()),
"[FloatSearch]Field data type isn't VECTOR_FLOAT, "
"VECTOR_FLOAT16, VECTOR_BFLOAT16 or VECTOR_SPARSE_FLOAT");
dataset::SearchDataset search_dataset{info.metric_type_,
num_queries,
info.topk_,
Expand Down
97 changes: 71 additions & 26 deletions internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "common/Tracer.h"
#include "common/Types.h"
#include "google/protobuf/message_lite.h"
#include "index/VectorIndex.h"
#include "index/VectorMemIndex.h"
#include "mmap/ChunkedColumn.h"
#include "mmap/Utils.h"
Expand Down Expand Up @@ -115,7 +116,8 @@
if (get_bit(field_data_ready_bitset_, field_id)) {
fields_.erase(field_id);
set_bit(field_data_ready_bitset_, field_id, false);
} else if (get_bit(binlog_index_bitset_, field_id)) {
}
if (get_bit(binlog_index_bitset_, field_id)) {

Check warning on line 120 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L120

Added line #L120 was not covered by tests
set_bit(binlog_index_bitset_, field_id, false);
vector_indexings_.drop_field_indexing(field_id);
}
Expand All @@ -136,8 +138,7 @@
auto& field_meta = schema_->operator[](field_id);
AssertInfo(field_meta.is_vector(), "vector field is not vector type");

if (!get_bit(index_ready_bitset_, field_id) &&
!get_bit(binlog_index_bitset_, field_id)) {
if (!get_bit(index_ready_bitset_, field_id)) {

Check warning on line 141 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L141

Added line #L141 was not covered by tests
return;
}

Expand Down Expand Up @@ -489,21 +490,13 @@
insert_record_.seal_pks();
}

bool use_temp_index = false;
{
// update num_rows to build temperate binlog index
// update num_rows to build temperate intermin index
std::unique_lock lck(mutex_);
update_row_count(num_rows);
}

if (generate_interim_index(field_id)) {
std::unique_lock lck(mutex_);
fields_.erase(field_id);
set_bit(field_data_ready_bitset_, field_id, false);
use_temp_index = true;
}

if (!use_temp_index) {
if (!generate_interim_index(field_id)) {
std::unique_lock lck(mutex_);
set_bit(field_data_ready_bitset_, field_id, true);
}
Expand Down Expand Up @@ -1744,6 +1737,10 @@
return fill_with_empty(field_id, count);
}

if (HasFieldData(field_id)) {

Check warning on line 1740 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L1740

Added line #L1740 was not covered by tests
Assert(get_bit(field_data_ready_bitset_, field_id));
return get_raw_data(field_id, field_meta, seg_offsets, count);

Check warning on line 1742 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L1742

Added line #L1742 was not covered by tests
}
if (HasIndex(field_id)) {
// if field has load scalar index, reverse raw data from index
if (!IsVectorDataType(field_meta.get_data_type())) {
Expand All @@ -1757,11 +1754,9 @@
return get_raw_data(field_id, field_meta, seg_offsets, count);
}
return get_vector(field_id, seg_offsets, count);
} else {
return fill_with_empty(field_id, count);
}

Assert(get_bit(field_data_ready_bitset_, field_id));

return get_raw_data(field_id, field_meta, seg_offsets, count);
}

std::unique_ptr<DataArray>
Expand Down Expand Up @@ -1818,15 +1813,22 @@
auto fieldID = FieldId(field_id);
const auto& field_meta = schema_->operator[](fieldID);
if (IsVectorDataType(field_meta.get_data_type())) {
if (get_bit(index_ready_bitset_, fieldID) |
get_bit(binlog_index_bitset_, fieldID)) {
if (get_bit(index_ready_bitset_, fieldID)) {

Check warning on line 1816 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L1816

Added line #L1816 was not covered by tests
AssertInfo(vector_indexings_.is_ready(fieldID),
"vector index is not ready");
auto field_indexing = vector_indexings_.get_field_indexing(fieldID);
auto vec_index = dynamic_cast<index::VectorIndex*>(
field_indexing->indexing_.get());
return vec_index->HasRawData();
}
} else if (get_bit(binlog_index_bitset_, fieldID)) {
AssertInfo(vector_indexings_.is_ready(fieldID),
"vector index is not ready");
auto field_indexing = vector_indexings_.get_field_indexing(fieldID);

Check warning on line 1827 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L1824-L1827

Added lines #L1824 - L1827 were not covered by tests
auto vec_index =
dynamic_cast<index::VectorIndex*>(field_indexing->indexing_.get());
return vec_index->HasRawData() ||
get_bit(field_data_ready_bitset_, fieldID);
} else {
auto scalar_index = scalar_indexings_.find(fieldID);
if (scalar_index != scalar_indexings_.end()) {
Expand Down Expand Up @@ -2017,6 +2019,8 @@
}
// check data type
if (field_meta.get_data_type() != DataType::VECTOR_FLOAT &&
field_meta.get_data_type() != DataType::VECTOR_FLOAT16 &&
field_meta.get_data_type() != DataType::VECTOR_BFLOAT16 &&
!is_sparse) {
return false;
}
Expand Down Expand Up @@ -2062,16 +2066,50 @@
is_sparse
? dynamic_cast<ChunkedSparseFloatColumn*>(vec_data.get())->Dim()
: field_meta.get_dim();
auto index_metric = field_binlog_config->GetMetricType();
std::unique_ptr<index::VectorIndex> vec_index = nullptr;
if (!is_sparse) {
knowhere::ViewDataOp view_data = [field_raw_data_ptr =
vec_data](size_t id) {
return field_raw_data_ptr->ValueAt(id);

Check warning on line 2074 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2069-L2074

Added lines #L2069 - L2074 were not covered by tests
};
if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) {

Check warning on line 2076 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2076

Added line #L2076 was not covered by tests
vec_index = std::make_unique<index::VectorMemIndex<float>>(
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber(),
view_data);

Check warning on line 2081 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2078-L2081

Added lines #L2078 - L2081 were not covered by tests
} else if (field_meta.get_data_type() == DataType::VECTOR_FLOAT16) {
vec_index =

Check warning on line 2083 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2083

Added line #L2083 was not covered by tests
std::make_unique<index::VectorMemIndex<knowhere::fp16>>(
field_binlog_config->GetIndexType(),

Check warning on line 2085 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2085

Added line #L2085 was not covered by tests
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber(),
view_data);
} else if (field_meta.get_data_type() ==

Check warning on line 2089 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2087-L2089

Added lines #L2087 - L2089 were not covered by tests
DataType::VECTOR_BFLOAT16) {
vec_index =

Check warning on line 2091 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2091

Added line #L2091 was not covered by tests
std::make_unique<index::VectorMemIndex<knowhere::bf16>>(
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber(),
view_data);

Check warning on line 2096 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2095-L2096

Added lines #L2095 - L2096 were not covered by tests
}
} else {

Check warning on line 2098 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2098

Added line #L2098 was not covered by tests
vec_index = std::make_unique<index::VectorMemIndex<float>>(
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber());

Check warning on line 2102 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2100-L2102

Added lines #L2100 - L2102 were not covered by tests
}
if (vec_index == nullptr) {
LOG_INFO("fail to generate intermin index, invalid data type.");
return false;
}

Check warning on line 2107 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2105-L2107

Added lines #L2105 - L2107 were not covered by tests

auto build_config = field_binlog_config->GetBuildBaseParams();
build_config[knowhere::meta::DIM] = std::to_string(dim);
build_config[knowhere::meta::NUM_BUILD_THREAD] = std::to_string(1);
auto index_metric = field_binlog_config->GetMetricType();

auto vec_index = std::make_unique<index::VectorMemIndex<float>>(
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber());
auto num_chunk = vec_data->num_chunks();
for (int i = 0; i < num_chunk; ++i) {
auto dataset = knowhere::GenDataSet(
Expand All @@ -2088,19 +2126,26 @@

if (enable_binlog_index()) {
std::unique_lock lck(mutex_);
if (vec_index->HasRawData()) {
// some knowhere view data index not has raw data, still keep it

Check warning on line 2130 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2130

Added line #L2130 was not covered by tests
fields_.erase(field_id);
set_bit(field_data_ready_bitset_, field_id, false);

Check warning on line 2132 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2132

Added line #L2132 was not covered by tests
} else {
set_bit(field_data_ready_bitset_, field_id, true);
}

Check warning on line 2135 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2135

Added line #L2135 was not covered by tests
vector_indexings_.append_field_indexing(
field_id, index_metric, std::move(vec_index));

vec_binlog_config_[field_id] = std::move(field_binlog_config);
set_bit(binlog_index_bitset_, field_id, true);
LOG_INFO(
"replace binlog with binlog index in segment {}, field {}.",
"replace binlog with intermin index in segment {}, field {}.",

Check warning on line 2142 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L2142

Added line #L2142 was not covered by tests
this->get_segment_id(),
field_id.get());
}
return true;
} catch (std::exception& e) {
LOG_WARN("fail to generate binlog index, because {}", e.what());
LOG_WARN("fail to generate intermin index, because {}", e.what());
return false;
}
}
Expand Down
13 changes: 13 additions & 0 deletions internal/core/src/segcore/ConcurrentVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -541,4 +541,17 @@ class ConcurrentVector<BFloat16Vector>
}
};

static bool
ConcurrentDenseVectorCheck(const VectorBase* vec_base, DataType data_type) {
if (data_type == DataType::VECTOR_FLOAT) {
return dynamic_cast<const ConcurrentVector<FloatVector>*>(vec_base);
} else if (data_type == DataType::VECTOR_FLOAT16) {
return dynamic_cast<const ConcurrentVector<Float16Vector>*>(vec_base);
} else if (data_type == DataType::VECTOR_BFLOAT16) {
return dynamic_cast<const ConcurrentVector<BFloat16Vector>*>(vec_base);
} else {
return false;
}
}

} // namespace milvus::segcore
Loading
Loading