diff --git a/Makefile b/Makefile index 3749302de3d25..d35c1efebac50 100644 --- a/Makefile +++ b/Makefile @@ -164,7 +164,7 @@ build-cpp-with-unittest: generated-proto build-cpp-with-coverage: generated-proto @echo "Building Milvus cpp library with coverage and unittest ..." - @(env bash $(PWD)/scripts/core_build.sh -t ${mode} -u -a ${useasan} -c -f "$(CUSTOM_THIRDPARTY_PATH)" -n ${disk_index} -y ${use_dynamic_simd}) + @(env bash $(PWD)/scripts/core_build.sh -t ${mode} -u -c -f "$(CUSTOM_THIRDPARTY_PATH)" -n ${disk_index} -y ${use_dynamic_simd}) check-proto-product: generated-proto @(env bash $(PWD)/scripts/check_proto_product.sh) diff --git a/internal/core/src/index/Index.h b/internal/core/src/index/Index.h index 3d488f36b6917..a4fadb70d1e31 100644 --- a/internal/core/src/index/Index.h +++ b/internal/core/src/index/Index.h @@ -18,13 +18,18 @@ #include #include +#include "exceptions/EasyAssert.h" +#include "knowhere/comp/index_param.h" #include "knowhere/dataset.h" #include "common/Types.h" +const std::string kMmapFilepath = "mmap_filepath"; + namespace milvus::index { class IndexBase { public: + IndexBase() = default; virtual ~IndexBase() = default; virtual BinarySet @@ -53,7 +58,28 @@ class IndexBase { virtual BinarySet Upload(const Config& config = {}) = 0; + bool + IsMmapSupported() const { + return index_type_ == knowhere::IndexEnum::INDEX_HNSW || + // index_type_ == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT || IVF_FLAT is not supported as it doesn't stores the vectors + index_type_ == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT_CC || + index_type_ == knowhere::IndexEnum::INDEX_FAISS_IVFPQ || + index_type_ == knowhere::IndexEnum::INDEX_FAISS_IVFSQ8 || + index_type_ == knowhere::IndexEnum::INDEX_FAISS_BIN_IVFFLAT || + index_type_ == knowhere::IndexEnum::INDEX_FAISS_IDMAP || + index_type_ == knowhere::IndexEnum::INDEX_FAISS_BIN_IDMAP; + } + + const IndexType& + Type() const { + return index_type_; + } + protected: + explicit IndexBase(IndexType index_type) + : index_type_(std::move(index_type)) { + } + IndexType index_type_ = ""; }; diff --git a/internal/core/src/index/Utils.cpp b/internal/core/src/index/Utils.cpp index 084fd10183797..249cb850886cf 100644 --- a/internal/core/src/index/Utils.cpp +++ b/internal/core/src/index/Utils.cpp @@ -15,6 +15,9 @@ // limitations under the License. #include +#include +#include +#include #include #include #include @@ -25,11 +28,13 @@ #include "index/Utils.h" #include "index/Meta.h" #include +#include #include "exceptions/EasyAssert.h" #include "knowhere/comp/index_param.h" #include "common/Slice.h" #include "storage/FieldData.h" #include "storage/Util.h" +#include "utils/File.h" namespace milvus::index { diff --git a/internal/core/src/index/VectorIndex.h b/internal/core/src/index/VectorIndex.h index f82908380b40e..4d14dc1e68328 100644 --- a/internal/core/src/index/VectorIndex.h +++ b/internal/core/src/index/VectorIndex.h @@ -35,7 +35,7 @@ class VectorIndex : public IndexBase { public: explicit VectorIndex(const IndexType& index_type, const MetricType& metric_type) - : index_type_(index_type), metric_type_(metric_type) { + : IndexBase(index_type), metric_type_(metric_type) { } public: @@ -87,7 +87,6 @@ class VectorIndex : public IndexBase { } private: - IndexType index_type_; MetricType metric_type_; int64_t dim_; }; diff --git a/internal/core/src/index/VectorMemIndex.cpp b/internal/core/src/index/VectorMemIndex.cpp index 7fc4cf8296dc8..dc3a1dbc34e9a 100644 --- a/internal/core/src/index/VectorMemIndex.cpp +++ b/internal/core/src/index/VectorMemIndex.cpp @@ -16,17 +16,21 @@ #include "index/VectorMemIndex.h" +#include #include +#include #include #include #include #include + #include "fmt/format.h" + +#include "index/Index.h" #include "index/Meta.h" #include "index/Utils.h" #include "exceptions/EasyAssert.h" #include "config/ConfigKnowhere.h" - #include "knowhere/factory.h" #include "knowhere/comp/time_recorder.h" #include "common/BitsetView.h" @@ -39,6 +43,7 @@ #include "storage/MemFileManagerImpl.h" #include "storage/ThreadPools.h" #include "storage/Util.h" +#include "utils/File.h" namespace milvus::index { @@ -101,6 +106,10 @@ VectorMemIndex::Load(const BinarySet& binary_set, const Config& config) { void VectorMemIndex::Load(const Config& config) { + if (config.contains(kMmapFilepath)) { + return LoadFromFile(config); + } + auto index_files = GetValueFromConfig>(config, "index_files"); AssertInfo(index_files.has_value(), @@ -368,5 +377,120 @@ VectorMemIndex::GetVector(const DatasetPtr dataset) const { memcpy(raw_data.data(), tensor, data_size); return raw_data; } +void +VectorMemIndex::LoadFromFile(const Config& config) { + auto filepath = GetValueFromConfig(config, kMmapFilepath); + AssertInfo(filepath.has_value(), "mmap filepath is empty when load index"); + + std::filesystem::create_directories( + std::filesystem::path(filepath.value()).parent_path()); + + auto file = File::Open(filepath.value(), O_CREAT | O_TRUNC | O_RDWR); + + auto index_files = + GetValueFromConfig>(config, "index_files"); + AssertInfo(index_files.has_value(), + "index file paths is empty when load index"); + + std::unordered_set pending_index_files(index_files->begin(), + index_files->end()); + + LOG_SEGCORE_INFO_ << "load index files: " << index_files.value().size(); + + auto parallel_degree = + static_cast(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); + + // try to read slice meta first + std::string slice_meta_filepath; + for (auto& file : pending_index_files) { + auto file_name = file.substr(file.find_last_of('/') + 1); + if (file_name == INDEX_FILE_SLICE_META) { + slice_meta_filepath = file; + pending_index_files.erase(file); + break; + } + } + + LOG_SEGCORE_INFO_ << "load with slice meta: " + << !slice_meta_filepath.empty(); + + if (!slice_meta_filepath + .empty()) { // load with the slice meta info, then we can load batch by batch + std::string index_file_prefix = slice_meta_filepath.substr( + 0, slice_meta_filepath.find_last_of('/') + 1); + std::vector batch{}; + batch.reserve(parallel_degree); + + auto result = file_manager_->LoadIndexToMemory({slice_meta_filepath}); + auto raw_slice_meta = result[INDEX_FILE_SLICE_META]; + Config meta_data = Config::parse( + std::string(static_cast(raw_slice_meta->Data()), + raw_slice_meta->Size())); + + for (auto& item : meta_data[META]) { + std::string prefix = item[NAME]; + int slice_num = item[SLICE_NUM]; + auto total_len = static_cast(item[TOTAL_LEN]); + + auto HandleBatch = [&](int index) { + auto batch_data = file_manager_->LoadIndexToMemory(batch); + for (int j = index - batch.size() + 1; j <= index; j++) { + std::string file_name = GenSlicedFileName(prefix, j); + AssertInfo(batch_data.find(file_name) != batch_data.end(), + "lost index slice data"); + auto data = batch_data[file_name]; + auto written = file.Write(data->Data(), data->Size()); + AssertInfo( + written == data->Size(), + fmt::format("failed to write index data to disk {}: {}", + filepath->data(), + strerror(errno))); + } + for (auto& file : batch) { + pending_index_files.erase(file); + } + batch.clear(); + }; + + for (auto i = 0; i < slice_num; ++i) { + std::string file_name = GenSlicedFileName(prefix, i); + batch.push_back(index_file_prefix + file_name); + if (batch.size() >= parallel_degree) { + HandleBatch(i); + } + } + if (batch.size() > 0) { + HandleBatch(slice_num - 1); + } + } + } else { + auto result = file_manager_->LoadIndexToMemory(std::vector( + pending_index_files.begin(), pending_index_files.end())); + for (auto& [_, index_data] : result) { + file.Write(index_data->Data(), index_data->Size()); + } + } + file.Close(); + + LOG_SEGCORE_INFO_ << "load index into Knowhere..."; + auto conf = config; + conf.erase(kMmapFilepath); + auto stat = index_.DeserializeFromFile(filepath.value(), conf); + if (stat != knowhere::Status::success) { + PanicCodeInfo(ErrorCodeEnum::UnexpectedError, + fmt::format("failed to Deserialize index: {}", + KnowhereStatusString(stat))); + } + + auto dim = index_.Dim(); + this->SetDim(index_.Dim()); + + auto ok = unlink(filepath->data()); + AssertInfo(ok == 0, + fmt::format("failed to unlink mmap index file {}: {}", + filepath.value(), + strerror(errno))); + LOG_SEGCORE_INFO_ << "load vector index done"; +} } // namespace milvus::index diff --git a/internal/core/src/index/VectorMemIndex.h b/internal/core/src/index/VectorMemIndex.h index fca9fd3591b0a..884fea63a86b3 100644 --- a/internal/core/src/index/VectorMemIndex.h +++ b/internal/core/src/index/VectorMemIndex.h @@ -75,6 +75,10 @@ class VectorMemIndex : public VectorIndex { virtual void LoadWithoutAssemble(const BinarySet& binary_set, const Config& config); + private: + void + LoadFromFile(const Config& config); + protected: Config config_; knowhere::Index index_; diff --git a/internal/core/src/mmap/Column.h b/internal/core/src/mmap/Column.h index dabd77e0bde8e..9fcab78b0c5a7 100644 --- a/internal/core/src/mmap/Column.h +++ b/internal/core/src/mmap/Column.h @@ -26,13 +26,14 @@ #include "exceptions/EasyAssert.h" #include "fmt/format.h" #include "mmap/Utils.h" +#include "utils/File.h" namespace milvus { #ifdef MAP_POPULATE -static int mmap_flags = MAP_PRIVATE | MAP_POPULATE; +static int mmap_flags = MAP_SHARED | MAP_POPULATE; #else -static int mmap_flags = MAP_PRIVATE; +static int mmap_flags = MAP_SHARED; #endif class ColumnBase { @@ -64,7 +65,7 @@ class ColumnBase { } // mmap mode ctor - ColumnBase(int fd, size_t size, const FieldMeta& field_meta) { + ColumnBase(const File& file, size_t size, const FieldMeta& field_meta) { padding_ = field_meta.get_data_type() == DataType::JSON ? simdjson::SIMDJSON_PADDING : 0; @@ -72,7 +73,7 @@ class ColumnBase { len_ = size; size_ = size + padding_; data_ = static_cast( - mmap(nullptr, size_, PROT_READ, mmap_flags, fd, 0)); + mmap(nullptr, size_, PROT_READ, mmap_flags, file.Descriptor(), 0)); #ifndef MAP_POPULATE // Manually access the mapping to populate it const size_t page_size = getpagesize(); @@ -164,8 +165,8 @@ class Column : public ColumnBase { } // mmap mode ctor - Column(int fd, size_t size, const FieldMeta& field_meta) - : ColumnBase(fd, size, field_meta), + Column(const File& file, size_t size, const FieldMeta& field_meta) + : ColumnBase(file, size, field_meta), num_rows_(size / field_meta.get_sizeof()) { } @@ -202,8 +203,8 @@ class VariableColumn : public ColumnBase { } // mmap mode ctor - VariableColumn(int fd, size_t size, const FieldMeta& field_meta) - : ColumnBase(fd, size, field_meta) { + VariableColumn(const File& file, size_t size, const FieldMeta& field_meta) + : ColumnBase(file, size, field_meta) { } VariableColumn(VariableColumn&& column) noexcept diff --git a/internal/core/src/mmap/Utils.h b/internal/core/src/mmap/Utils.h index 3cabc68919137..3f01fe7b4d0d6 100644 --- a/internal/core/src/mmap/Utils.h +++ b/internal/core/src/mmap/Utils.h @@ -27,6 +27,7 @@ #include "common/FieldMeta.h" #include "mmap/Types.h" #include "storage/Util.h" +#include "utils/File.h" namespace milvus { @@ -77,7 +78,9 @@ FillField(DataType data_type, const storage::FieldDataPtr data, void* dst) { } inline size_t -WriteFieldData(int fd, DataType data_type, const storage::FieldDataPtr& data) { +WriteFieldData(File& file, + DataType data_type, + const storage::FieldDataPtr& data) { size_t total_written{0}; if (datatype_is_variable(data_type)) { switch (data_type) { @@ -86,7 +89,7 @@ WriteFieldData(int fd, DataType data_type, const storage::FieldDataPtr& data) { for (auto i = 0; i < data->get_num_rows(); ++i) { auto str = static_cast(data->RawValue(i)); - ssize_t written = write(fd, str->data(), str->size()); + ssize_t written = file.Write(str->data(), str->size()); if (written < str->size()) { break; } @@ -99,7 +102,7 @@ WriteFieldData(int fd, DataType data_type, const storage::FieldDataPtr& data) { auto padded_string = static_cast(data->RawValue(i))->data(); ssize_t written = - write(fd, padded_string.data(), padded_string.size()); + file.Write(padded_string.data(), padded_string.size()); if (written < padded_string.size()) { break; } @@ -112,7 +115,7 @@ WriteFieldData(int fd, DataType data_type, const storage::FieldDataPtr& data) { datatype_name(data_type))); } } else { - total_written += write(fd, data->Data(), data->Size()); + total_written += file.Write(data->Data(), data->Size()); } return total_written; diff --git a/internal/core/src/query/SearchOnSealed.cpp b/internal/core/src/query/SearchOnSealed.cpp index c2514776ce5c4..1321c40ffd011 100644 --- a/internal/core/src/query/SearchOnSealed.cpp +++ b/internal/core/src/query/SearchOnSealed.cpp @@ -13,6 +13,7 @@ #include #include "common/QueryInfo.h" +#include "common/Types.h" #include "query/SearchBruteForce.h" #include "query/SearchOnSealed.h" #include "query/helper.h" diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 0e35fbd5d66ad..b23b1978e206a 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -35,6 +35,7 @@ #include "storage/FieldData.h" #include "storage/Util.h" #include "storage/ThreadPools.h" +#include "utils/File.h" namespace milvus::segcore { @@ -335,10 +336,7 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) { auto dir = filepath.parent_path(); std::filesystem::create_directories(dir); - int fd = - open(filepath.c_str(), O_CREAT | O_TRUNC | O_RDWR, S_IRUSR | S_IWUSR); - AssertInfo(fd != -1, - fmt::format("failed to create mmap file {}", filepath.c_str())); + auto file = File::Open(filepath.string(), O_CREAT | O_TRUNC | O_RDWR); auto& field_meta = (*schema_)[field_id]; auto data_type = field_meta.get_data_type(); @@ -350,7 +348,7 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) { storage::FieldDataPtr field_data; while (data.channel->pop(field_data)) { data_size += field_data->Size(); - auto written = WriteFieldData(fd, data_type, field_data); + auto written = WriteFieldData(file, data_type, field_data); if (written != field_data->Size()) { break; } @@ -369,11 +367,6 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) { total_written, data_size, strerror(errno))); - int ok = fsync(fd); - AssertInfo(ok == 0, - fmt::format("failed to fsync mmap data file {}, err: {}", - filepath.c_str(), - strerror(errno))); auto num_rows = data.row_count; std::shared_ptr column{}; @@ -382,7 +375,7 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) { case milvus::DataType::STRING: case milvus::DataType::VARCHAR: { auto var_column = std::make_shared>( - fd, total_written, field_meta); + file, total_written, field_meta); var_column->Seal(std::move(indices)); column = std::move(var_column); break; @@ -390,7 +383,7 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) { case milvus::DataType::JSON: { auto var_column = std::make_shared>( - fd, total_written, field_meta); + file, total_written, field_meta); var_column->Seal(std::move(indices)); column = std::move(var_column); break; @@ -399,7 +392,7 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) { } } } else { - column = std::make_shared(fd, total_written, field_meta); + column = std::make_shared(file, total_written, field_meta); } { @@ -407,16 +400,11 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) { fields_.emplace(field_id, column); } - ok = unlink(filepath.c_str()); + auto ok = unlink(filepath.c_str()); AssertInfo(ok == 0, fmt::format("failed to unlink mmap data file {}, err: {}", filepath.c_str(), strerror(errno))); - ok = close(fd); - AssertInfo(ok == 0, - fmt::format("failed to close data file {}, err: {}", - filepath.c_str(), - strerror(errno))); // set pks to offset if (schema_->get_primary_field_id() == field_id) { diff --git a/internal/core/src/segcore/Types.h b/internal/core/src/segcore/Types.h index f3c85d67c2a66..ce7b9d2657c41 100644 --- a/internal/core/src/segcore/Types.h +++ b/internal/core/src/segcore/Types.h @@ -35,6 +35,7 @@ struct LoadIndexInfo { int64_t segment_id; int64_t field_id; DataType field_type; + std::string mmap_dir_path; int64_t index_id; int64_t index_build_id; int64_t index_version; diff --git a/internal/core/src/segcore/load_index_c.cpp b/internal/core/src/segcore/load_index_c.cpp index e39bcd20e060b..78cdd0114d1ec 100644 --- a/internal/core/src/segcore/load_index_c.cpp +++ b/internal/core/src/segcore/load_index_c.cpp @@ -12,9 +12,12 @@ #include "segcore/load_index_c.h" #include "common/FieldMeta.h" +#include "exceptions/EasyAssert.h" +#include "index/Index.h" #include "index/IndexFactory.h" #include "index/Meta.h" #include "index/Utils.h" +#include "log/Log.h" #include "segcore/Types.h" #include "storage/Util.h" #include "storage/RemoteChunkManagerSingleton.h" @@ -74,7 +77,8 @@ AppendFieldInfo(CLoadIndexInfo c_load_index_info, int64_t partition_id, int64_t segment_id, int64_t field_id, - enum CDataType field_type) { + enum CDataType field_type, + const char* mmap_dir_path) { try { auto load_index_info = (milvus::segcore::LoadIndexInfo*)c_load_index_info; @@ -83,6 +87,7 @@ AppendFieldInfo(CLoadIndexInfo c_load_index_info, load_index_info->segment_id = segment_id; load_index_info->field_id = field_id; load_index_info->field_type = milvus::DataType(field_type); + load_index_info->mmap_dir_path = std::string(mmap_dir_path); auto status = CStatus(); status.error_code = Success; @@ -250,6 +255,18 @@ AppendIndexV2(CLoadIndexInfo c_load_index_info) { load_index_info->index = milvus::index::IndexFactory::GetInstance().CreateIndex( index_info, file_manager); + + if (!load_index_info->mmap_dir_path.empty() && + load_index_info->index->IsMmapSupported()) { + auto filepath = + std::filesystem::path(load_index_info->mmap_dir_path) / + std::to_string(load_index_info->segment_id) / + std::to_string(load_index_info->field_id) / + std::to_string(load_index_info->index_id); + + config[kMmapFilepath] = filepath.string(); + } + load_index_info->index->Load(config); auto status = CStatus(); status.error_code = Success; diff --git a/internal/core/src/segcore/load_index_c.h b/internal/core/src/segcore/load_index_c.h index 224a08dbbdd84..d386b57b0e1f0 100644 --- a/internal/core/src/segcore/load_index_c.h +++ b/internal/core/src/segcore/load_index_c.h @@ -40,7 +40,8 @@ AppendFieldInfo(CLoadIndexInfo c_load_index_info, int64_t partition_id, int64_t segment_id, int64_t field_id, - enum CDataType field_type); + enum CDataType field_type, + const char* mmap_dir_path); CStatus AppendIndexInfo(CLoadIndexInfo c_load_index_info, diff --git a/internal/core/src/utils/File.h b/internal/core/src/utils/File.h new file mode 100644 index 0000000000000..6ce8ae5722210 --- /dev/null +++ b/internal/core/src/utils/File.h @@ -0,0 +1,65 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include +#include "exceptions/EasyAssert.h" +#include "fmt/core.h" +#include +#include + +namespace milvus { +class File { + public: + File(const File& file) = delete; + File(File&& file) noexcept { + fd_ = file.fd_; + file.fd_ = -1; + } + ~File() { + if (fd_ >= 0) { + close(fd_); + } + } + + static File + Open(const std::string_view filepath, int flags) { + int fd = open(filepath.data(), flags, S_IRUSR | S_IWUSR); + AssertInfo(fd != -1, + fmt::format("failed to create mmap file {}: {}", + filepath, + strerror(errno))); + return File(fd); + } + + int + Descriptor() const { + return fd_; + } + + ssize_t + Write(const void* buf, size_t size) { + return write(fd_, buf, size); + } + + void + Close() { + close(fd_); + fd_ = -1; + } + + private: + explicit File(int fd) : fd_(fd) { + } + int fd_{-1}; +}; +} // namespace milvus \ No newline at end of file diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index 459dca80a96f3..50b835a4a7ac1 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -1690,8 +1690,8 @@ TEST(CApiTest, LoadIndexInfo) { c_load_index_info, index_param_key2.data(), index_param_value2.data()); ASSERT_EQ(status.error_code, Success); std::string field_name = "field0"; - status = - AppendFieldInfo(c_load_index_info, 0, 0, 0, 0, CDataType::FloatVector); + status = AppendFieldInfo( + c_load_index_info, 0, 0, 0, 0, CDataType::FloatVector, ""); ASSERT_EQ(status.error_code, Success); status = AppendIndex(c_load_index_info, c_binary_set); ASSERT_EQ(status.error_code, Success); @@ -1874,7 +1874,8 @@ TEST(CApiTest, Indexing_Without_Predicate) { c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam( c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo(c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector); + AppendFieldInfo( + c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector, ""); AppendIndex(c_load_index_info, (CBinarySet)&binary_set); // load index for vec field, load raw data for scalar field @@ -2021,7 +2022,8 @@ TEST(CApiTest, Indexing_Expr_Without_Predicate) { c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam( c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo(c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector); + AppendFieldInfo( + c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector, ""); AppendIndex(c_load_index_info, (CBinarySet)&binary_set); // load index for vec field, load raw data for scalar field @@ -2225,7 +2227,8 @@ TEST(CApiTest, Indexing_With_float_Predicate_Range) { c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam( c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo(c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector); + AppendFieldInfo( + c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector, ""); AppendIndex(c_load_index_info, (CBinarySet)&binary_set); // load index for vec field, load raw data for scalar field @@ -2401,7 +2404,8 @@ TEST(CApiTest, Indexing_Expr_With_float_Predicate_Range) { c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam( c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo(c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector); + AppendFieldInfo( + c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector, ""); AppendIndex(c_load_index_info, (CBinarySet)&binary_set); // load index for vec field, load raw data for scalar field @@ -2596,7 +2600,8 @@ TEST(CApiTest, Indexing_With_float_Predicate_Term) { c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam( c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo(c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector); + AppendFieldInfo( + c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector, ""); AppendIndex(c_load_index_info, (CBinarySet)&binary_set); // load index for vec field, load raw data for scalar field @@ -2765,7 +2770,8 @@ TEST(CApiTest, Indexing_Expr_With_float_Predicate_Term) { c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam( c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo(c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector); + AppendFieldInfo( + c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector, ""); AppendIndex(c_load_index_info, (CBinarySet)&binary_set); // load index for vec field, load raw data for scalar field @@ -2967,7 +2973,8 @@ TEST(CApiTest, Indexing_With_binary_Predicate_Range) { c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam( c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo(c_load_index_info, 0, 0, 0, 100, CDataType::BinaryVector); + AppendFieldInfo( + c_load_index_info, 0, 0, 0, 100, CDataType::BinaryVector, ""); AppendIndex(c_load_index_info, (CBinarySet)&binary_set); // load index for vec field, load raw data for scalar field @@ -3142,7 +3149,8 @@ TEST(CApiTest, Indexing_Expr_With_binary_Predicate_Range) { c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam( c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo(c_load_index_info, 0, 0, 0, 100, CDataType::BinaryVector); + AppendFieldInfo( + c_load_index_info, 0, 0, 0, 100, CDataType::BinaryVector, ""); AppendIndex(c_load_index_info, (CBinarySet)&binary_set); // load index for vec field, load raw data for scalar field @@ -3338,7 +3346,8 @@ TEST(CApiTest, Indexing_With_binary_Predicate_Term) { c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam( c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo(c_load_index_info, 0, 0, 0, 100, CDataType::BinaryVector); + AppendFieldInfo( + c_load_index_info, 0, 0, 0, 100, CDataType::BinaryVector, ""); AppendIndex(c_load_index_info, (CBinarySet)&binary_set); // load index for vec field, load raw data for scalar field @@ -3529,7 +3538,8 @@ TEST(CApiTest, Indexing_Expr_With_binary_Predicate_Term) { c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam( c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo(c_load_index_info, 0, 0, 0, 100, CDataType::BinaryVector); + AppendFieldInfo( + c_load_index_info, 0, 0, 0, 100, CDataType::BinaryVector, ""); AppendIndex(c_load_index_info, (CBinarySet)&binary_set); // load index for vec field, load raw data for scalar field @@ -3730,7 +3740,8 @@ TEST(CApiTest, SealedSegment_search_float_Predicate_Range) { c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam( c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo(c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector); + AppendFieldInfo( + c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector, ""); AppendIndex(c_load_index_info, (CBinarySet)&binary_set); auto load_index_info = (LoadIndexInfo*)c_load_index_info; @@ -3974,7 +3985,8 @@ TEST(CApiTest, SealedSegment_search_float_With_Expr_Predicate_Range) { c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam( c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo(c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector); + AppendFieldInfo( + c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector, ""); AppendIndex(c_load_index_info, (CBinarySet)&binary_set); // load vec index diff --git a/internal/core/unittest/test_index_c_api.cpp b/internal/core/unittest/test_index_c_api.cpp index 579b54422aed1..f98d473e52349 100644 --- a/internal/core/unittest/test_index_c_api.cpp +++ b/internal/core/unittest/test_index_c_api.cpp @@ -200,7 +200,7 @@ TEST(CBoolIndexTest, All) { { DeleteBinarySet(binary_set); } } - delete[](char*)(half_ds->GetTensor()); + delete[] (char*)(half_ds->GetTensor()); } // TODO: more scalar type. @@ -317,6 +317,6 @@ TEST(CStringIndexTest, All) { { DeleteBinarySet(binary_set); } } - delete[](char*)(str_ds->GetTensor()); + delete[] (char*)(str_ds->GetTensor()); } #endif diff --git a/internal/core/unittest/test_indexing.cpp b/internal/core/unittest/test_indexing.cpp index 0adf4ace142f0..12794922641a2 100644 --- a/internal/core/unittest/test_indexing.cpp +++ b/internal/core/unittest/test_indexing.cpp @@ -16,10 +16,12 @@ #include #include +#include "knowhere/comp/index_param.h" #include "query/SearchBruteForce.h" #include "segcore/Reduce.h" #include "index/IndexFactory.h" #include "common/QueryResult.h" +#include "segcore/Types.h" #include "test_utils/indexbuilder_test_utils.h" #include "test_utils/DataGen.h" #include "test_utils/Timer.h" @@ -290,6 +292,10 @@ class IndexTest : public ::testing::TestWithParam { auto param = GetParam(); index_type = param.first; metric_type = param.second; + NB = 10000; + if (index_type == knowhere::IndexEnum::INDEX_HNSW) { + NB = 270000; + } build_conf = generate_build_conf(index_type, metric_type); load_conf = generate_load_conf(index_type, metric_type, NB); search_conf = generate_search_conf(index_type, metric_type); @@ -418,6 +424,61 @@ TEST_P(IndexTest, BuildAndQuery) { vec_index->Query(xq_dataset, search_info, nullptr); } +TEST_P(IndexTest, Mmap) { + milvus::index::CreateIndexInfo create_index_info; + create_index_info.index_type = index_type; + create_index_info.metric_type = metric_type; + create_index_info.field_type = vec_field_data_type; + index::IndexBasePtr index; + + milvus::storage::FieldDataMeta field_data_meta{1, 2, 3, 100}; + milvus::storage::IndexMeta index_meta{3, 100, 1000, 1}; + auto chunk_manager = milvus::storage::CreateChunkManager(storage_config_); + auto file_manager = milvus::storage::CreateFileManager( + index_type, field_data_meta, index_meta, chunk_manager); + index = milvus::index::IndexFactory::GetInstance().CreateIndex( + create_index_info, file_manager); + + ASSERT_NO_THROW(index->BuildWithDataset(xb_dataset, build_conf)); + milvus::index::IndexBasePtr new_index; + milvus::index::VectorIndex* vec_index = nullptr; + + auto binary_set = index->Upload(); + index.reset(); + + new_index = milvus::index::IndexFactory::GetInstance().CreateIndex( + create_index_info, file_manager); + if (!new_index->IsMmapSupported()) { + return; + } + vec_index = dynamic_cast(new_index.get()); + + std::vector index_files; + for (auto& binary : binary_set.binary_map_) { + index_files.emplace_back(binary.first); + } + load_conf["index_files"] = index_files; + load_conf["mmap_filepath"] = "mmap/test_index_mmap_" + index_type; + vec_index->Load(load_conf); + EXPECT_EQ(vec_index->Count(), NB); + EXPECT_EQ(vec_index->GetDim(), DIM); + + milvus::SearchInfo search_info; + search_info.topk_ = K; + search_info.metric_type_ = metric_type; + search_info.search_params_ = search_conf; + auto result = vec_index->Query(xq_dataset, search_info, nullptr); + EXPECT_EQ(result->total_nq_, NQ); + EXPECT_EQ(result->unity_topK_, K); + EXPECT_EQ(result->distances_.size(), NQ * K); + EXPECT_EQ(result->seg_offsets_.size(), NQ * K); + if (!is_binary) { + EXPECT_EQ(result->seg_offsets_[0], query_offset); + } + search_info.search_params_ = range_search_conf; + vec_index->Query(xq_dataset, search_info, nullptr); +} + TEST_P(IndexTest, GetVector) { milvus::index::CreateIndexInfo create_index_info; create_index_info.index_type = index_type; diff --git a/internal/querynodev2/segments/load_index_info.go b/internal/querynodev2/segments/load_index_info.go index 4cdee24aac9ce..026de64a5e3fe 100644 --- a/internal/querynodev2/segments/load_index_info.go +++ b/internal/querynodev2/segments/load_index_info.go @@ -60,7 +60,8 @@ func (li *LoadIndexInfo) appendLoadIndexInfo(indexInfo *querypb.FieldIndexInfo, fieldID := indexInfo.FieldID indexPaths := indexInfo.IndexFilePaths - err := li.appendFieldInfo(collectionID, partitionID, segmentID, fieldID, fieldType) + mmapDirPath := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue() + err := li.appendFieldInfo(collectionID, partitionID, segmentID, fieldID, fieldType, mmapDirPath) if err != nil { return err } @@ -123,13 +124,15 @@ func (li *LoadIndexInfo) appendIndexFile(filePath string) error { } // appendFieldInfo appends fieldID & fieldType to index -func (li *LoadIndexInfo) appendFieldInfo(collectionID int64, partitionID int64, segmentID int64, fieldID int64, fieldType schemapb.DataType) error { +func (li *LoadIndexInfo) appendFieldInfo(collectionID int64, partitionID int64, segmentID int64, fieldID int64, fieldType schemapb.DataType, mmapDirPath string) error { cColID := C.int64_t(collectionID) cParID := C.int64_t(partitionID) cSegID := C.int64_t(segmentID) cFieldID := C.int64_t(fieldID) cintDType := uint32(fieldType) - status := C.AppendFieldInfo(li.cLoadIndexInfo, cColID, cParID, cSegID, cFieldID, cintDType) + cMmapDirPath := C.CString(mmapDirPath) + defer C.free(unsafe.Pointer(cMmapDirPath)) + status := C.AppendFieldInfo(li.cLoadIndexInfo, cColID, cParID, cSegID, cFieldID, cintDType, cMmapDirPath) return HandleCStatus(&status, "AppendFieldInfo failed") } diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index f3c477a27b66d..d9f38d011fd91 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -311,7 +311,7 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer loader.mut.Lock() defer loader.mut.Unlock() - poolCap := runtime.NumCPU() * paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt() + poolCap := runtime.NumCPU() * paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsInt() if loader.committedResource.WorkNum >= poolCap { return resource, 0, merr.WrapErrServiceRequestLimitExceeded(int32(poolCap)) } else if loader.committedResource.MemorySize+memoryUsage >= totalMemory { diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 2a2642fd03fa9..28837c156d4f9 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -223,11 +223,6 @@ func (node *QueryNode) InitSegcore() error { localDataRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole) initcore.InitLocalChunkManager(localDataRootPath) - mmapDirPath := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue() - if len(mmapDirPath) > 0 { - log.Info("mmap enabled", zap.String("dir", mmapDirPath)) - } - initcore.InitTraceConfig(paramtable.Get()) return initcore.InitRemoteChunkManager(paramtable.Get()) } @@ -366,12 +361,6 @@ func (node *QueryNode) Start() error { paramtable.SetUpdateTime(time.Now()) mmapDirPath := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue() mmapEnabled := len(mmapDirPath) > 0 - if mmapEnabled { - err := os.MkdirAll(mmapDirPath, 0666) - if err != nil { - panic(err) - } - } node.UpdateStateCode(commonpb.StateCode_Healthy) log.Info("query node start successfully", zap.Int64("queryNodeID", paramtable.GetNodeID()),