-
Notifications
You must be signed in to change notification settings - Fork 21
[SVS] Implement 2-stage backend SVS index initialization #903
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
35abcba
45e6dca
adba42a
8f0dd5b
e270c00
9862f95
2a3d44c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,12 +39,23 @@ struct SVSIndexBase | |
| virtual ~SVSIndexBase() = default; | ||
| virtual int addVectors(const void *vectors_data, const labelType *labels, size_t n) = 0; | ||
| virtual int deleteVectors(const labelType *labels, size_t n) = 0; | ||
| virtual bool isLabelExists(labelType label) const = 0; | ||
| virtual size_t indexStorageSize() const = 0; | ||
| virtual size_t getNumThreads() const = 0; | ||
| virtual void setNumThreads(size_t numThreads) = 0; | ||
| virtual size_t getThreadPoolCapacity() const = 0; | ||
| virtual bool isCompressed() const = 0; | ||
| size_t getNumMarkedDeleted() const { return num_marked_deleted; } | ||
|
|
||
| // Abstract handler to manage SVS implementation instance | ||
| // declared to avoid unsafe unique_ptr<void> usage | ||
| // Derived SVSIndex class should implement it | ||
| struct ImplHandler { | ||
| virtual ~ImplHandler() = default; | ||
| }; | ||
| virtual std::unique_ptr<ImplHandler> createImpl(const void *vectors_data, | ||
| const labelType *labels, size_t n) = 0; | ||
| virtual void setImpl(std::unique_ptr<ImplHandler> impl) = 0; | ||
| #ifdef BUILD_TESTS | ||
| virtual svs::logging::logger_ptr getLogger() const = 0; | ||
| #endif | ||
|
|
@@ -144,7 +155,8 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl | |
| // Create SVS index instance with initial data | ||
| // Data should not be empty | ||
| template <svs::data::ImmutableMemoryDataset Dataset> | ||
| void initImpl(const Dataset &points, std::span<const labelType> ids) { | ||
| std::unique_ptr<impl_type> initImpl(const Dataset &points, | ||
| std::span<const labelType> ids) const { | ||
| svs::threads::ThreadPoolHandle threadpool_handle{VecSimSVSThreadPool{threadpool_}}; | ||
|
|
||
| // Construct SVS index initial storage with compression if needed | ||
|
|
@@ -160,25 +172,26 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl | |
|
|
||
| // Construct initial Vamana Graph | ||
| auto graph = | ||
| graph_builder_t::build_graph(parameters, data, distance, threadpool_, entry_point, | ||
| graph_builder_t::build_graph(parameters, data, distance, threadpool_handle, entry_point, | ||
| this->blockSize, this->getAllocator(), logger_); | ||
|
|
||
| // Create SVS MutableIndex instance | ||
| impl_ = std::make_unique<impl_type>(std::move(graph), std::move(data), entry_point, | ||
| std::move(distance), ids, threadpool_, logger_); | ||
| auto impl = std::make_unique<impl_type>(std::move(graph), std::move(data), entry_point, | ||
| std::move(distance), ids, threadpool_, logger_); | ||
|
|
||
| // Set SVS MutableIndex build parameters to be used in future updates | ||
| impl_->set_construction_window_size(parameters.window_size); | ||
| impl_->set_max_candidates(parameters.max_candidate_pool_size); | ||
| impl_->set_prune_to(parameters.prune_to); | ||
| impl_->set_alpha(parameters.alpha); | ||
| impl_->set_full_search_history(parameters.use_full_search_history); | ||
| impl->set_construction_window_size(parameters.window_size); | ||
| impl->set_max_candidates(parameters.max_candidate_pool_size); | ||
| impl->set_prune_to(parameters.prune_to); | ||
| impl->set_alpha(parameters.alpha); | ||
| impl->set_full_search_history(parameters.use_full_search_history); | ||
|
|
||
| // Configure default search parameters | ||
| auto sp = impl_->get_search_parameters(); | ||
| auto sp = impl->get_search_parameters(); | ||
| sp.buffer_config({this->search_window_size, this->search_buffer_capacity}); | ||
| impl_->set_search_parameters(sp); | ||
| impl_->reset_performance_parameters(); | ||
| impl->set_search_parameters(sp); | ||
| impl->reset_performance_parameters(); | ||
| return impl; | ||
| } | ||
|
|
||
| // Preprocess batch of vectors | ||
|
|
@@ -204,6 +217,40 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl | |
| return processed_blob; | ||
| } | ||
|
|
||
| // Handler to manage SVS implementation instance | ||
| struct SVSImplHandler : public SVSIndexBase::ImplHandler { | ||
| std::unique_ptr<impl_type> impl; | ||
| SVSImplHandler(std::unique_ptr<impl_type> impl) : impl{std::move(impl)} {} | ||
| }; | ||
|
|
||
| std::unique_ptr<ImplHandler> createImpl(const void *vectors_data, const labelType *labels, | ||
| size_t n) override { | ||
| // If no data provided, return empty handler | ||
| if (n == 0) { | ||
| return std::make_unique<SVSImplHandler>(nullptr); | ||
| } | ||
|
|
||
| std::span<const labelType> ids(labels, n); | ||
| auto processed_blob = this->preprocessForBatchStorage(vectors_data, n); | ||
| auto typed_vectors_data = static_cast<DataType *>(processed_blob.get()); | ||
| // Wrap data into SVS SimpleDataView for SVS API | ||
| auto points = svs::data::SimpleDataView<DataType>{typed_vectors_data, n, this->dim}; | ||
|
|
||
|
Comment on lines
+233
to
+238
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic seems to be a duplication of what we do in
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The main point here is the std::tuple<std::span<const labelType>, MemoryUtils::unique_blob, svs::data::SimpleDataView<DataType>> preprocessAndPrepareSVSArgs(...) |
||
| return std::make_unique<SVSImplHandler>(initImpl(points, ids)); | ||
| } | ||
|
|
||
| void setImpl(std::unique_ptr<ImplHandler> handler) override { | ||
| if (impl_ != nullptr) { | ||
| throw std::logic_error("SVSIndex::setImpl called on non-empty impl_"); | ||
| } | ||
|
|
||
| SVSImplHandler *svs_handler = dynamic_cast<SVSImplHandler *>(handler.get()); | ||
| if (!svs_handler) { | ||
| throw std::logic_error("Failed to cast to SVSImplHandler"); | ||
| } | ||
|
Comment on lines
+247
to
+250
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the motivation to have an abstract
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
template <typename MetricType,
typename DataType,
bool isMulti,
size_t QuantBits,
size_t ResidualBits,
bool IsLeanVec>
struct SVSIndex<MetricType, DataType, isMulti, QuantBits, ResidualBits, IsLeanVec>::SVSImplHandler;This why the abstract |
||
| this->impl_ = std::move(svs_handler->impl); | ||
| } | ||
|
|
||
| // Assuming numThreads was updated to reflect the number of available threads before this | ||
| // function was called. | ||
| // This function assumes that the caller has already set numThreads to the appropriate value | ||
|
|
@@ -230,7 +277,7 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl | |
|
|
||
| if (!impl_) { | ||
| // SVS index instance cannot be empty, so we have to construct it at first rows | ||
| initImpl(points, ids); | ||
| impl_ = initImpl(points, ids); | ||
| } else { | ||
| // Add new points to existing SVS index | ||
| impl_->add_points(points, ids); | ||
|
|
@@ -239,6 +286,17 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl | |
| return n - deleted_num; | ||
| } | ||
|
|
||
| int deleteVectorImpl(const labelType label) { | ||
| if (indexLabelCount() == 0 || !impl_->has_id(label)) { | ||
| return 0; | ||
| } | ||
|
|
||
| const auto deleted_num = impl_->delete_entries(std::span{&label, 1}); | ||
|
|
||
| this->markIndexUpdate(deleted_num); | ||
| return deleted_num; | ||
| } | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| int deleteVectorsImpl(const labelType *labels, size_t n) { | ||
| if (indexLabelCount() == 0) { | ||
| return 0; | ||
|
|
@@ -257,19 +315,8 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl | |
| return 0; | ||
| } | ||
|
|
||
| // If entries_to_delete.size() == 1, we should ensure single-threading | ||
| const size_t current_num_threads = getNumThreads(); | ||
| if (n == 1 && current_num_threads > 1) { | ||
| setNumThreads(1); | ||
| } | ||
|
|
||
| const auto deleted_num = impl_->delete_entries(entries_to_delete); | ||
|
|
||
| // Restore multi-threading if needed | ||
| if (n == 1 && current_num_threads > 1) { | ||
| setNumThreads(current_num_threads); | ||
| } | ||
|
|
||
| this->markIndexUpdate(deleted_num); | ||
| return deleted_num; | ||
| } | ||
|
|
@@ -484,12 +531,16 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl | |
| return addVectorsImpl(vectors_data, labels, n); | ||
| } | ||
|
|
||
| int deleteVector(labelType label) override { return deleteVectorsImpl(&label, 1); } | ||
| int deleteVector(labelType label) override { return deleteVectorImpl(label); } | ||
|
|
||
| int deleteVectors(const labelType *labels, size_t n) override { | ||
| return deleteVectorsImpl(labels, n); | ||
| } | ||
|
|
||
| bool isLabelExists(labelType label) const override { | ||
| return impl_ ? impl_->has_id(label) : false; | ||
| } | ||
|
|
||
| size_t getNumThreads() const override { return threadpool_.size(); } | ||
| void setNumThreads(size_t numThreads) override { threadpool_.resize(numThreads); } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -218,6 +218,10 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> { | |
| using swap_record = std::tuple<labelType, idType, idType>; | ||
| constexpr static size_t SKIP_LABEL = std::numeric_limits<labelType>::max(); | ||
| std::vector<swap_record> swaps_journal; | ||
| // deleted_labels_journal is used by updateSVSIndex() to track vectors that were deleted from | ||
| // Flat index during SVS index updating. The journal contains the deleted labels. These labels | ||
| // are used to delete the same vectors from the SVS index at the end of the update. | ||
| std::vector<labelType> deleted_labels_journal; | ||
|
|
||
| size_t trainingTriggerThreshold; | ||
| size_t updateTriggerThreshold; | ||
|
|
@@ -651,6 +655,7 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> { | |
| void updateSVSIndex(size_t availableThreads) { | ||
| std::vector<labelType> labels_to_move; | ||
| std::vector<DataType> vectors_to_move; | ||
| std::vector<labelType> deleted_labels_during_update; | ||
|
|
||
| { // lock frontendIndex from modifications | ||
| std::shared_lock flat_lock{this->flatIndexGuard}; | ||
|
|
@@ -668,22 +673,43 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> { | |
| } | ||
| // reset journal to the current frontend index state | ||
| swaps_journal.clear(); | ||
| deleted_labels_journal.clear(); | ||
| } // release frontend index | ||
|
|
||
| executeTracingCallback("UpdateJob::before_add_to_svs"); | ||
| { // lock backend index for writing and add vectors there | ||
| std::lock_guard lock(this->mainIndexGuard); | ||
| std::shared_lock main_shared_lock(this->mainIndexGuard); | ||
| auto svs_index = GetSVSIndex(); | ||
| assert(labels_to_move.size() == vectors_to_move.size() / this->frontendIndex->getDim()); | ||
| svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size())); | ||
| svs_index->addVectors(vectors_to_move.data(), labels_to_move.data(), | ||
| labels_to_move.size()); | ||
| if (this->backendIndex->indexSize() == 0) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this also handle re-initialization after the index was emptied? Is that scenario tested?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, as it was before in |
||
| // If backend index is empty, we need to initialize it first. | ||
| svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size())); | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| auto impl = svs_index->createImpl(vectors_to_move.data(), labels_to_move.data(), | ||
| labels_to_move.size()); | ||
|
|
||
| // Upgrade to unique lock to set the new impl | ||
| main_shared_lock.unlock(); | ||
| std::lock_guard lock(this->mainIndexGuard); | ||
| svs_index->setImpl(std::move(impl)); | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } else { | ||
| // Backend index is initialized - just add the vectors. | ||
| main_shared_lock.unlock(); | ||
| std::lock_guard lock(this->mainIndexGuard); | ||
| // Upgrade to unique lock to add vectors | ||
| svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size())); | ||
| svs_index->addVectors(vectors_to_move.data(), labels_to_move.data(), | ||
| labels_to_move.size()); | ||
| } | ||
| } | ||
| executeTracingCallback("UpdateJob::after_add_to_svs"); | ||
| // clean-up frontend index | ||
| { // lock frontend index for writing and delete moved vectors | ||
| std::lock_guard lock(this->flatIndexGuard); | ||
|
|
||
| // swap deleted labels journal with the local variable to track deleted labels during | ||
| // update | ||
| std::swap(deleted_labels_during_update, deleted_labels_journal); | ||
|
|
||
| // Apply swaps from journal to labels_to_move to reflect changes made in meanwhile. | ||
| applySwapsToLabelsArray(labels_to_move, this->swaps_journal); | ||
|
|
||
|
|
@@ -703,6 +729,19 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> { | |
| [](labelType label) { return label != SKIP_LABEL; }) && | ||
| "Deleted vectors count does not match the number of labels to delete"); | ||
| } | ||
| // delete vectors from backend index that were deleted from the frontend index during | ||
| // the update process. | ||
| { | ||
| std::lock_guard main_lock(this->mainIndexGuard); | ||
|
|
||
| std::sort(deleted_labels_during_update.begin(), deleted_labels_during_update.end()); | ||
| auto it = std::unique(deleted_labels_during_update.begin(), | ||
| deleted_labels_during_update.end()); | ||
| deleted_labels_during_update.erase(it, deleted_labels_during_update.end()); | ||
| auto svs_index = GetSVSIndex(); | ||
| svs_index->deleteVectors(deleted_labels_during_update.data(), | ||
| deleted_labels_during_update.size()); | ||
| } | ||
|
Comment on lines
+732
to
+744
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Jurnal mechianism for removing vectors that were deleted during the training looks good. Can you add a unit test for this edge case? This isn't covered in
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added appropriate test to |
||
| } | ||
|
|
||
| public: | ||
|
|
@@ -801,8 +840,15 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> { | |
| } | ||
| } | ||
| // Remove vector from the backend index if it exists in case of non-MULTI. | ||
| std::lock_guard lock(this->mainIndexGuard); | ||
| ret -= svs_index->deleteVectors(&label, 1); | ||
| auto label_exists = [&]() { | ||
| std::shared_lock lock(this->mainIndexGuard); | ||
| return svs_index->isLabelExists(label); | ||
| }(); | ||
|
|
||
| if (label_exists) { | ||
| std::lock_guard lock(this->mainIndexGuard); | ||
| ret -= this->backendIndex->deleteVector(label); | ||
| } | ||
rfsaliev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| { // Add vector to the frontend index. | ||
| std::lock_guard lock(this->flatIndexGuard); | ||
|
|
@@ -814,6 +860,7 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> { | |
| for (auto id : this->frontendIndex->getElementIds(label)) { | ||
| this->swaps_journal.emplace_back(SKIP_LABEL, id, id); | ||
| } | ||
| deleted_labels_journal.push_back(label); | ||
| } | ||
| ret = std::max(ret + ft_ret, 0); | ||
| // Check frontend index size to determine if an update job schedule is needed. | ||
|
|
@@ -868,6 +915,7 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> { | |
| this->swaps_journal.emplace_back(SKIP_LABEL, id, id); | ||
| } | ||
| } | ||
| deleted_labels_journal.push_back(label); | ||
|
|
||
| return deleting_ids.size(); | ||
| } | ||
|
|
@@ -887,9 +935,15 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> { | |
| std::lock_guard lock(this->flatIndexGuard); | ||
| ret = this->deleteAndRecordSwaps_Unsafe(label); | ||
| } | ||
| { | ||
|
|
||
| label_exists = [&]() { | ||
| std::shared_lock lock(this->mainIndexGuard); | ||
| return svs_index->isLabelExists(label); | ||
| }(); | ||
|
|
||
| if (label_exists) { | ||
| std::lock_guard lock(this->mainIndexGuard); | ||
| ret += svs_index->deleteVectors(&label, 1); | ||
| ret += this->backendIndex->deleteVector(label); | ||
| } | ||
| return ret; | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicated preprocessing logic in createImpl and addVectorsImpl
Low Severity
The new
createImplmethod duplicates the same preprocessing sequence found inaddVectorsImpl: constructing astd::spanfrom labels, callingpreprocessForBatchStorage, casting the blob toDataType *, and constructing aSimpleDataView. If the preprocessing logic changes in one place but not the other, the two code paths will silently diverge, potentially causing subtle bugs.Additional Locations (1)
src/VecSim/algorithms/svs/svs.h#L271-L276