Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable mmap for vector index #25877

Merged
merged 1 commit into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ build-cpp-with-unittest: generated-proto

build-cpp-with-coverage: generated-proto
@echo "Building Milvus cpp library with coverage and unittest ..."
@(env bash $(PWD)/scripts/core_build.sh -t ${mode} -u -a ${useasan} -c -f "$(CUSTOM_THIRDPARTY_PATH)" -n ${disk_index} -y ${use_dynamic_simd})
@(env bash $(PWD)/scripts/core_build.sh -t ${mode} -u -c -f "$(CUSTOM_THIRDPARTY_PATH)" -n ${disk_index} -y ${use_dynamic_simd})

check-proto-product: generated-proto
@(env bash $(PWD)/scripts/check_proto_product.sh)
Expand Down
26 changes: 26 additions & 0 deletions internal/core/src/index/Index.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@

#include <memory>
#include <boost/dynamic_bitset.hpp>
#include "exceptions/EasyAssert.h"
#include "knowhere/comp/index_param.h"
#include "knowhere/dataset.h"
#include "common/Types.h"

const std::string kMmapFilepath = "mmap_filepath";

namespace milvus::index {

class IndexBase {
public:
IndexBase() = default;
virtual ~IndexBase() = default;

virtual BinarySet
Expand Down Expand Up @@ -53,7 +58,28 @@ class IndexBase {
virtual BinarySet
Upload(const Config& config = {}) = 0;

bool
IsMmapSupported() const {
return index_type_ == knowhere::IndexEnum::INDEX_HNSW ||
// index_type_ == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT || IVF_FLAT is not supported as it doesn't stores the vectors
index_type_ == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT_CC ||
index_type_ == knowhere::IndexEnum::INDEX_FAISS_IVFPQ ||
index_type_ == knowhere::IndexEnum::INDEX_FAISS_IVFSQ8 ||
index_type_ == knowhere::IndexEnum::INDEX_FAISS_BIN_IVFFLAT ||
index_type_ == knowhere::IndexEnum::INDEX_FAISS_IDMAP ||
index_type_ == knowhere::IndexEnum::INDEX_FAISS_BIN_IDMAP;
}

const IndexType&
Type() const {
return index_type_;
}

protected:
explicit IndexBase(IndexType index_type)
: index_type_(std::move(index_type)) {
}

IndexType index_type_ = "";
};

Expand Down
5 changes: 5 additions & 0 deletions internal/core/src/index/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
// limitations under the License.

#include <algorithm>
#include <cerrno>
#include <cstring>
#include <filesystem>
#include <string>
#include <tuple>
#include <unordered_map>
Expand All @@ -25,11 +28,13 @@
#include "index/Utils.h"
#include "index/Meta.h"
#include <google/protobuf/text_format.h>
#include <unistd.h>
#include "exceptions/EasyAssert.h"
#include "knowhere/comp/index_param.h"
#include "common/Slice.h"
#include "storage/FieldData.h"
#include "storage/Util.h"
#include "utils/File.h"

namespace milvus::index {

Expand Down
3 changes: 1 addition & 2 deletions internal/core/src/index/VectorIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class VectorIndex : public IndexBase {
public:
explicit VectorIndex(const IndexType& index_type,
const MetricType& metric_type)
: index_type_(index_type), metric_type_(metric_type) {
: IndexBase(index_type), metric_type_(metric_type) {
}

public:
Expand Down Expand Up @@ -87,7 +87,6 @@ class VectorIndex : public IndexBase {
}

private:
IndexType index_type_;
MetricType metric_type_;
int64_t dim_;
};
Expand Down
126 changes: 125 additions & 1 deletion internal/core/src/index/VectorMemIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@

#include "index/VectorMemIndex.h"

#include <unistd.h>
#include <cmath>
#include <filesystem>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>

#include "fmt/format.h"

#include "index/Index.h"
#include "index/Meta.h"
#include "index/Utils.h"
#include "exceptions/EasyAssert.h"
#include "config/ConfigKnowhere.h"

#include "knowhere/factory.h"
#include "knowhere/comp/time_recorder.h"
#include "common/BitsetView.h"
Expand All @@ -39,6 +43,7 @@
#include "storage/MemFileManagerImpl.h"
#include "storage/ThreadPools.h"
#include "storage/Util.h"
#include "utils/File.h"

namespace milvus::index {

Expand Down Expand Up @@ -101,6 +106,10 @@ VectorMemIndex::Load(const BinarySet& binary_set, const Config& config) {

void
VectorMemIndex::Load(const Config& config) {
if (config.contains(kMmapFilepath)) {
return LoadFromFile(config);
}

auto index_files =
GetValueFromConfig<std::vector<std::string>>(config, "index_files");
AssertInfo(index_files.has_value(),
Expand Down Expand Up @@ -368,5 +377,120 @@ VectorMemIndex::GetVector(const DatasetPtr dataset) const {
memcpy(raw_data.data(), tensor, data_size);
return raw_data;
}
void
VectorMemIndex::LoadFromFile(const Config& config) {
auto filepath = GetValueFromConfig<std::string>(config, kMmapFilepath);
AssertInfo(filepath.has_value(), "mmap filepath is empty when load index");

std::filesystem::create_directories(
std::filesystem::path(filepath.value()).parent_path());

auto file = File::Open(filepath.value(), O_CREAT | O_TRUNC | O_RDWR);

auto index_files =
GetValueFromConfig<std::vector<std::string>>(config, "index_files");
AssertInfo(index_files.has_value(),
"index file paths is empty when load index");

std::unordered_set<std::string> pending_index_files(index_files->begin(),
index_files->end());

LOG_SEGCORE_INFO_ << "load index files: " << index_files.value().size();

auto parallel_degree =
static_cast<uint64_t>(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);

// try to read slice meta first
std::string slice_meta_filepath;
for (auto& file : pending_index_files) {
auto file_name = file.substr(file.find_last_of('/') + 1);
if (file_name == INDEX_FILE_SLICE_META) {
slice_meta_filepath = file;
pending_index_files.erase(file);
break;
}
}

LOG_SEGCORE_INFO_ << "load with slice meta: "
<< !slice_meta_filepath.empty();

if (!slice_meta_filepath
.empty()) { // load with the slice meta info, then we can load batch by batch
std::string index_file_prefix = slice_meta_filepath.substr(
0, slice_meta_filepath.find_last_of('/') + 1);
std::vector<std::string> batch{};
batch.reserve(parallel_degree);

auto result = file_manager_->LoadIndexToMemory({slice_meta_filepath});
auto raw_slice_meta = result[INDEX_FILE_SLICE_META];
Config meta_data = Config::parse(
std::string(static_cast<const char*>(raw_slice_meta->Data()),
raw_slice_meta->Size()));

for (auto& item : meta_data[META]) {
std::string prefix = item[NAME];
int slice_num = item[SLICE_NUM];
auto total_len = static_cast<size_t>(item[TOTAL_LEN]);

auto HandleBatch = [&](int index) {
auto batch_data = file_manager_->LoadIndexToMemory(batch);
for (int j = index - batch.size() + 1; j <= index; j++) {
std::string file_name = GenSlicedFileName(prefix, j);
AssertInfo(batch_data.find(file_name) != batch_data.end(),
"lost index slice data");
auto data = batch_data[file_name];
auto written = file.Write(data->Data(), data->Size());
AssertInfo(
written == data->Size(),
fmt::format("failed to write index data to disk {}: {}",
filepath->data(),
strerror(errno)));
}
for (auto& file : batch) {
pending_index_files.erase(file);
}
batch.clear();
};

for (auto i = 0; i < slice_num; ++i) {
std::string file_name = GenSlicedFileName(prefix, i);
batch.push_back(index_file_prefix + file_name);
if (batch.size() >= parallel_degree) {
HandleBatch(i);
}
}
if (batch.size() > 0) {
HandleBatch(slice_num - 1);
}
}
} else {
auto result = file_manager_->LoadIndexToMemory(std::vector<std::string>(
pending_index_files.begin(), pending_index_files.end()));
for (auto& [_, index_data] : result) {
file.Write(index_data->Data(), index_data->Size());
}
}
file.Close();

LOG_SEGCORE_INFO_ << "load index into Knowhere...";
auto conf = config;
conf.erase(kMmapFilepath);
auto stat = index_.DeserializeFromFile(filepath.value(), conf);
if (stat != knowhere::Status::success) {
PanicCodeInfo(ErrorCodeEnum::UnexpectedError,
fmt::format("failed to Deserialize index: {}",
KnowhereStatusString(stat)));
}

auto dim = index_.Dim();
this->SetDim(index_.Dim());

auto ok = unlink(filepath->data());
AssertInfo(ok == 0,
fmt::format("failed to unlink mmap index file {}: {}",
filepath.value(),
strerror(errno)));
LOG_SEGCORE_INFO_ << "load vector index done";
}

} // namespace milvus::index
4 changes: 4 additions & 0 deletions internal/core/src/index/VectorMemIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ class VectorMemIndex : public VectorIndex {
virtual void
LoadWithoutAssemble(const BinarySet& binary_set, const Config& config);

private:
void
LoadFromFile(const Config& config);

protected:
Config config_;
knowhere::Index<knowhere::IndexNode> index_;
Expand Down
17 changes: 9 additions & 8 deletions internal/core/src/mmap/Column.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@
#include "exceptions/EasyAssert.h"
#include "fmt/format.h"
#include "mmap/Utils.h"
#include "utils/File.h"

namespace milvus {

#ifdef MAP_POPULATE
static int mmap_flags = MAP_PRIVATE | MAP_POPULATE;
static int mmap_flags = MAP_SHARED | MAP_POPULATE;
#else
static int mmap_flags = MAP_PRIVATE;
static int mmap_flags = MAP_SHARED;
#endif

class ColumnBase {
Expand Down Expand Up @@ -64,15 +65,15 @@ class ColumnBase {
}

// mmap mode ctor
ColumnBase(int fd, size_t size, const FieldMeta& field_meta) {
ColumnBase(const File& file, size_t size, const FieldMeta& field_meta) {
padding_ = field_meta.get_data_type() == DataType::JSON
? simdjson::SIMDJSON_PADDING
: 0;

len_ = size;
size_ = size + padding_;
data_ = static_cast<char*>(
mmap(nullptr, size_, PROT_READ, mmap_flags, fd, 0));
mmap(nullptr, size_, PROT_READ, mmap_flags, file.Descriptor(), 0));
#ifndef MAP_POPULATE
// Manually access the mapping to populate it
const size_t page_size = getpagesize();
Expand Down Expand Up @@ -164,8 +165,8 @@ class Column : public ColumnBase {
}

// mmap mode ctor
Column(int fd, size_t size, const FieldMeta& field_meta)
: ColumnBase(fd, size, field_meta),
Column(const File& file, size_t size, const FieldMeta& field_meta)
: ColumnBase(file, size, field_meta),
num_rows_(size / field_meta.get_sizeof()) {
}

Expand Down Expand Up @@ -202,8 +203,8 @@ class VariableColumn : public ColumnBase {
}

// mmap mode ctor
VariableColumn(int fd, size_t size, const FieldMeta& field_meta)
: ColumnBase(fd, size, field_meta) {
VariableColumn(const File& file, size_t size, const FieldMeta& field_meta)
: ColumnBase(file, size, field_meta) {
}

VariableColumn(VariableColumn&& column) noexcept
Expand Down
11 changes: 7 additions & 4 deletions internal/core/src/mmap/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "common/FieldMeta.h"
#include "mmap/Types.h"
#include "storage/Util.h"
#include "utils/File.h"

namespace milvus {

Expand Down Expand Up @@ -77,7 +78,9 @@ FillField(DataType data_type, const storage::FieldDataPtr data, void* dst) {
}

inline size_t
WriteFieldData(int fd, DataType data_type, const storage::FieldDataPtr& data) {
WriteFieldData(File& file,
DataType data_type,
const storage::FieldDataPtr& data) {
size_t total_written{0};
if (datatype_is_variable(data_type)) {
switch (data_type) {
Expand All @@ -86,7 +89,7 @@ WriteFieldData(int fd, DataType data_type, const storage::FieldDataPtr& data) {
for (auto i = 0; i < data->get_num_rows(); ++i) {
auto str =
static_cast<const std::string*>(data->RawValue(i));
ssize_t written = write(fd, str->data(), str->size());
ssize_t written = file.Write(str->data(), str->size());
if (written < str->size()) {
break;
}
Expand All @@ -99,7 +102,7 @@ WriteFieldData(int fd, DataType data_type, const storage::FieldDataPtr& data) {
auto padded_string =
static_cast<const Json*>(data->RawValue(i))->data();
ssize_t written =
write(fd, padded_string.data(), padded_string.size());
file.Write(padded_string.data(), padded_string.size());
if (written < padded_string.size()) {
break;
}
Expand All @@ -112,7 +115,7 @@ WriteFieldData(int fd, DataType data_type, const storage::FieldDataPtr& data) {
datatype_name(data_type)));
}
} else {
total_written += write(fd, data->Data(), data->Size());
total_written += file.Write(data->Data(), data->Size());
}

return total_written;
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/query/SearchOnSealed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <string>

#include "common/QueryInfo.h"
#include "common/Types.h"
#include "query/SearchBruteForce.h"
#include "query/SearchOnSealed.h"
#include "query/helper.h"
Expand Down
Loading