Skip to content

Commit

Permalink
Enable mmap for vector index
Browse files Browse the repository at this point in the history
Signed-off-by: yah01 <yah2er0ne@outlook.com>
  • Loading branch information
yah01 committed Aug 9, 2023
1 parent d267559 commit 16bee52
Show file tree
Hide file tree
Showing 20 changed files with 368 additions and 68 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ build-cpp-with-unittest: generated-proto

build-cpp-with-coverage: generated-proto
@echo "Building Milvus cpp library with coverage and unittest ..."
@(env bash $(PWD)/scripts/core_build.sh -t ${mode} -u -a ${useasan} -c -f "$(CUSTOM_THIRDPARTY_PATH)" -n ${disk_index} -y ${use_dynamic_simd})
@(env bash $(PWD)/scripts/core_build.sh -t ${mode} -u -c -f "$(CUSTOM_THIRDPARTY_PATH)" -n ${disk_index} -y ${use_dynamic_simd})

check-proto-product: generated-proto
@(env bash $(PWD)/scripts/check_proto_product.sh)
Expand Down
26 changes: 26 additions & 0 deletions internal/core/src/index/Index.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@

#include <memory>
#include <boost/dynamic_bitset.hpp>
#include "exceptions/EasyAssert.h"
#include "knowhere/comp/index_param.h"
#include "knowhere/dataset.h"
#include "common/Types.h"

const std::string kMmapFilepath = "mmap_filepath";

namespace milvus::index {

class IndexBase {
public:
IndexBase() = default;
virtual ~IndexBase() = default;

virtual BinarySet
Expand Down Expand Up @@ -53,7 +58,28 @@ class IndexBase {
virtual BinarySet
Upload(const Config& config = {}) = 0;

bool
IsMmapSupported() const {
return index_type_ == knowhere::IndexEnum::INDEX_HNSW ||
// index_type_ == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT || IVF_FLAT is not supported as it doesn't stores the vectors
index_type_ == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT_CC ||
index_type_ == knowhere::IndexEnum::INDEX_FAISS_IVFPQ ||
index_type_ == knowhere::IndexEnum::INDEX_FAISS_IVFSQ8 ||
index_type_ == knowhere::IndexEnum::INDEX_FAISS_BIN_IVFFLAT ||
index_type_ == knowhere::IndexEnum::INDEX_FAISS_IDMAP ||
index_type_ == knowhere::IndexEnum::INDEX_FAISS_BIN_IDMAP;
}

const IndexType&
Type() const {
return index_type_;
}

protected:
explicit IndexBase(IndexType index_type)
: index_type_(std::move(index_type)) {
}

IndexType index_type_ = "";
};

Expand Down
5 changes: 5 additions & 0 deletions internal/core/src/index/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
// limitations under the License.

#include <algorithm>
#include <cerrno>
#include <cstring>
#include <filesystem>
#include <string>
#include <tuple>
#include <unordered_map>
Expand All @@ -25,11 +28,13 @@
#include "index/Utils.h"
#include "index/Meta.h"
#include <google/protobuf/text_format.h>
#include <unistd.h>
#include "exceptions/EasyAssert.h"
#include "knowhere/comp/index_param.h"
#include "common/Slice.h"
#include "storage/FieldData.h"
#include "storage/Util.h"
#include "utils/File.h"

namespace milvus::index {

Expand Down
3 changes: 1 addition & 2 deletions internal/core/src/index/VectorIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class VectorIndex : public IndexBase {
public:
explicit VectorIndex(const IndexType& index_type,
const MetricType& metric_type)
: index_type_(index_type), metric_type_(metric_type) {
: IndexBase(index_type), metric_type_(metric_type) {
}

public:
Expand Down Expand Up @@ -87,7 +87,6 @@ class VectorIndex : public IndexBase {
}

private:
IndexType index_type_;
MetricType metric_type_;
int64_t dim_;
};
Expand Down
126 changes: 125 additions & 1 deletion internal/core/src/index/VectorMemIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@

#include "index/VectorMemIndex.h"

#include <unistd.h>
#include <cmath>
#include <filesystem>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>

#include "fmt/format.h"

#include "index/Index.h"
#include "index/Meta.h"
#include "index/Utils.h"
#include "exceptions/EasyAssert.h"
#include "config/ConfigKnowhere.h"

#include "knowhere/factory.h"
#include "knowhere/comp/time_recorder.h"
#include "common/BitsetView.h"
Expand All @@ -39,6 +43,7 @@
#include "storage/MemFileManagerImpl.h"
#include "storage/ThreadPools.h"
#include "storage/Util.h"
#include "utils/File.h"

namespace milvus::index {

Expand Down Expand Up @@ -101,6 +106,10 @@ VectorMemIndex::Load(const BinarySet& binary_set, const Config& config) {

void
VectorMemIndex::Load(const Config& config) {
if (config.contains(kMmapFilepath)) {
return LoadFromFile(config);
}

auto index_files =
GetValueFromConfig<std::vector<std::string>>(config, "index_files");
AssertInfo(index_files.has_value(),
Expand Down Expand Up @@ -368,5 +377,120 @@ VectorMemIndex::GetVector(const DatasetPtr dataset) const {
memcpy(raw_data.data(), tensor, data_size);
return raw_data;
}
void
VectorMemIndex::LoadFromFile(const Config& config) {
auto filepath = GetValueFromConfig<std::string>(config, kMmapFilepath);
AssertInfo(filepath.has_value(), "mmap filepath is empty when load index");

std::filesystem::create_directories(
std::filesystem::path(filepath.value()).parent_path());

auto file = File::Open(filepath.value(), O_CREAT | O_TRUNC | O_RDWR);

auto index_files =
GetValueFromConfig<std::vector<std::string>>(config, "index_files");
AssertInfo(index_files.has_value(),
"index file paths is empty when load index");

std::unordered_set<std::string> pending_index_files(index_files->begin(),
index_files->end());

LOG_SEGCORE_INFO_ << "load index files: " << index_files.value().size();

auto parallel_degree =
static_cast<uint64_t>(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);

// try to read slice meta first
std::string slice_meta_filepath;
for (auto& file : pending_index_files) {
auto file_name = file.substr(file.find_last_of('/') + 1);
if (file_name == INDEX_FILE_SLICE_META) {
slice_meta_filepath = file;
pending_index_files.erase(file);
break;
}
}

LOG_SEGCORE_INFO_ << "load with slice meta: "
<< !slice_meta_filepath.empty();

if (!slice_meta_filepath
.empty()) { // load with the slice meta info, then we can load batch by batch
std::string index_file_prefix = slice_meta_filepath.substr(
0, slice_meta_filepath.find_last_of('/') + 1);
std::vector<std::string> batch{};
batch.reserve(parallel_degree);

auto result = file_manager_->LoadIndexToMemory({slice_meta_filepath});
auto raw_slice_meta = result[INDEX_FILE_SLICE_META];
Config meta_data = Config::parse(
std::string(static_cast<const char*>(raw_slice_meta->Data()),
raw_slice_meta->Size()));

for (auto& item : meta_data[META]) {
std::string prefix = item[NAME];
int slice_num = item[SLICE_NUM];
auto total_len = static_cast<size_t>(item[TOTAL_LEN]);

auto HandleBatch = [&](int index) {
auto batch_data = file_manager_->LoadIndexToMemory(batch);
for (int j = index - batch.size() + 1; j <= index; j++) {
std::string file_name = GenSlicedFileName(prefix, j);
AssertInfo(batch_data.find(file_name) != batch_data.end(),
"lost index slice data");
auto data = batch_data[file_name];
auto written = file.Write(data->Data(), data->Size());
AssertInfo(
written == data->Size(),
fmt::format("failed to write index data to disk {}: {}",
filepath->data(),
strerror(errno)));
}
for (auto& file : batch) {
pending_index_files.erase(file);
}
batch.clear();
};

for (auto i = 0; i < slice_num; ++i) {
std::string file_name = GenSlicedFileName(prefix, i);
batch.push_back(index_file_prefix + file_name);
if (batch.size() >= parallel_degree) {
HandleBatch(i);
}
}
if (batch.size() > 0) {
HandleBatch(slice_num - 1);
}
}
} else {
auto result = file_manager_->LoadIndexToMemory(std::vector<std::string>(
pending_index_files.begin(), pending_index_files.end()));
for (auto& [_, index_data] : result) {
file.Write(index_data->Data(), index_data->Size());
}
}
file.Close();

LOG_SEGCORE_INFO_ << "load index into Knowhere...";
auto conf = config;
conf.erase(kMmapFilepath);
auto stat = index_.DeserializeFromFile(filepath.value(), conf);
if (stat != knowhere::Status::success) {
PanicCodeInfo(ErrorCodeEnum::UnexpectedError,
fmt::format("failed to Deserialize index: {}",
KnowhereStatusString(stat)));
}

auto dim = index_.Dim();
this->SetDim(index_.Dim());

auto ok = unlink(filepath->data());
AssertInfo(ok == 0,
fmt::format("failed to unlink mmap index file {}: {}",
filepath.value(),
strerror(errno)));
LOG_SEGCORE_INFO_ << "load vector index done";
}

} // namespace milvus::index
4 changes: 4 additions & 0 deletions internal/core/src/index/VectorMemIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ class VectorMemIndex : public VectorIndex {
virtual void
LoadWithoutAssemble(const BinarySet& binary_set, const Config& config);

private:
void
LoadFromFile(const Config& config);

protected:
Config config_;
knowhere::Index<knowhere::IndexNode> index_;
Expand Down
17 changes: 9 additions & 8 deletions internal/core/src/mmap/Column.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@
#include "exceptions/EasyAssert.h"
#include "fmt/format.h"
#include "mmap/Utils.h"
#include "utils/File.h"

namespace milvus {

#ifdef MAP_POPULATE
static int mmap_flags = MAP_PRIVATE | MAP_POPULATE;
static int mmap_flags = MAP_SHARED | MAP_POPULATE;
#else
static int mmap_flags = MAP_PRIVATE;
static int mmap_flags = MAP_SHARED;
#endif

class ColumnBase {
Expand Down Expand Up @@ -64,15 +65,15 @@ class ColumnBase {
}

// mmap mode ctor
ColumnBase(int fd, size_t size, const FieldMeta& field_meta) {
ColumnBase(const File& file, size_t size, const FieldMeta& field_meta) {
padding_ = field_meta.get_data_type() == DataType::JSON
? simdjson::SIMDJSON_PADDING
: 0;

len_ = size;
size_ = size + padding_;
data_ = static_cast<char*>(
mmap(nullptr, size_, PROT_READ, mmap_flags, fd, 0));
mmap(nullptr, size_, PROT_READ, mmap_flags, file.Descriptor(), 0));
#ifndef MAP_POPULATE
// Manually access the mapping to populate it
const size_t page_size = getpagesize();
Expand Down Expand Up @@ -164,8 +165,8 @@ class Column : public ColumnBase {
}

// mmap mode ctor
Column(int fd, size_t size, const FieldMeta& field_meta)
: ColumnBase(fd, size, field_meta),
Column(const File& file, size_t size, const FieldMeta& field_meta)
: ColumnBase(file, size, field_meta),
num_rows_(size / field_meta.get_sizeof()) {
}

Expand Down Expand Up @@ -202,8 +203,8 @@ class VariableColumn : public ColumnBase {
}

// mmap mode ctor
VariableColumn(int fd, size_t size, const FieldMeta& field_meta)
: ColumnBase(fd, size, field_meta) {
VariableColumn(const File& file, size_t size, const FieldMeta& field_meta)
: ColumnBase(file, size, field_meta) {
}

VariableColumn(VariableColumn&& column) noexcept
Expand Down
11 changes: 7 additions & 4 deletions internal/core/src/mmap/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "common/FieldMeta.h"
#include "mmap/Types.h"
#include "storage/Util.h"
#include "utils/File.h"

namespace milvus {

Expand Down Expand Up @@ -77,7 +78,9 @@ FillField(DataType data_type, const storage::FieldDataPtr data, void* dst) {
}

inline size_t
WriteFieldData(int fd, DataType data_type, const storage::FieldDataPtr& data) {
WriteFieldData(File& file,
DataType data_type,
const storage::FieldDataPtr& data) {
size_t total_written{0};
if (datatype_is_variable(data_type)) {
switch (data_type) {
Expand All @@ -86,7 +89,7 @@ WriteFieldData(int fd, DataType data_type, const storage::FieldDataPtr& data) {
for (auto i = 0; i < data->get_num_rows(); ++i) {
auto str =
static_cast<const std::string*>(data->RawValue(i));
ssize_t written = write(fd, str->data(), str->size());
ssize_t written = file.Write(str->data(), str->size());
if (written < str->size()) {
break;
}
Expand All @@ -99,7 +102,7 @@ WriteFieldData(int fd, DataType data_type, const storage::FieldDataPtr& data) {
auto padded_string =
static_cast<const Json*>(data->RawValue(i))->data();
ssize_t written =
write(fd, padded_string.data(), padded_string.size());
file.Write(padded_string.data(), padded_string.size());
if (written < padded_string.size()) {
break;
}
Expand All @@ -112,7 +115,7 @@ WriteFieldData(int fd, DataType data_type, const storage::FieldDataPtr& data) {
datatype_name(data_type)));
}
} else {
total_written += write(fd, data->Data(), data->Size());
total_written += file.Write(data->Data(), data->Size());
}

return total_written;
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/query/SearchOnSealed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <string>

#include "common/QueryInfo.h"
#include "common/Types.h"
#include "query/SearchBruteForce.h"
#include "query/SearchOnSealed.h"
#include "query/helper.h"
Expand Down
Loading

0 comments on commit 16bee52

Please sign in to comment.