Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 76 additions & 25 deletions src/VecSim/algorithms/svs/svs.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,23 @@ struct SVSIndexBase
virtual ~SVSIndexBase() = default;
virtual int addVectors(const void *vectors_data, const labelType *labels, size_t n) = 0;
virtual int deleteVectors(const labelType *labels, size_t n) = 0;
virtual bool isLabelExists(labelType label) const = 0;
virtual size_t indexStorageSize() const = 0;
virtual size_t getNumThreads() const = 0;
virtual void setNumThreads(size_t numThreads) = 0;
virtual size_t getThreadPoolCapacity() const = 0;
virtual bool isCompressed() const = 0;
size_t getNumMarkedDeleted() const { return num_marked_deleted; }

// Abstract handler to manage SVS implementation instance
// declared to avoid unsafe unique_ptr<void> usage
// Derived SVSIndex class should implement it
struct ImplHandler {
virtual ~ImplHandler() = default;
};
virtual std::unique_ptr<ImplHandler> createImpl(const void *vectors_data,
const labelType *labels, size_t n) = 0;
virtual void setImpl(std::unique_ptr<ImplHandler> impl) = 0;
#ifdef BUILD_TESTS
virtual svs::logging::logger_ptr getLogger() const = 0;
#endif
Expand Down Expand Up @@ -144,7 +155,8 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
// Create SVS index instance with initial data
// Data should not be empty
template <svs::data::ImmutableMemoryDataset Dataset>
void initImpl(const Dataset &points, std::span<const labelType> ids) {
std::unique_ptr<impl_type> initImpl(const Dataset &points,
std::span<const labelType> ids) const {
svs::threads::ThreadPoolHandle threadpool_handle{VecSimSVSThreadPool{threadpool_}};

// Construct SVS index initial storage with compression if needed
Expand All @@ -160,25 +172,26 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl

// Construct initial Vamana Graph
auto graph =
graph_builder_t::build_graph(parameters, data, distance, threadpool_, entry_point,
graph_builder_t::build_graph(parameters, data, distance, threadpool_handle, entry_point,
this->blockSize, this->getAllocator(), logger_);

// Create SVS MutableIndex instance
impl_ = std::make_unique<impl_type>(std::move(graph), std::move(data), entry_point,
std::move(distance), ids, threadpool_, logger_);
auto impl = std::make_unique<impl_type>(std::move(graph), std::move(data), entry_point,
std::move(distance), ids, threadpool_, logger_);

// Set SVS MutableIndex build parameters to be used in future updates
impl_->set_construction_window_size(parameters.window_size);
impl_->set_max_candidates(parameters.max_candidate_pool_size);
impl_->set_prune_to(parameters.prune_to);
impl_->set_alpha(parameters.alpha);
impl_->set_full_search_history(parameters.use_full_search_history);
impl->set_construction_window_size(parameters.window_size);
impl->set_max_candidates(parameters.max_candidate_pool_size);
impl->set_prune_to(parameters.prune_to);
impl->set_alpha(parameters.alpha);
impl->set_full_search_history(parameters.use_full_search_history);

// Configure default search parameters
auto sp = impl_->get_search_parameters();
auto sp = impl->get_search_parameters();
sp.buffer_config({this->search_window_size, this->search_buffer_capacity});
impl_->set_search_parameters(sp);
impl_->reset_performance_parameters();
impl->set_search_parameters(sp);
impl->reset_performance_parameters();
return impl;
}

// Preprocess batch of vectors
Expand All @@ -204,6 +217,40 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
return processed_blob;
}

// Handler to manage SVS implementation instance
struct SVSImplHandler : public SVSIndexBase::ImplHandler {
std::unique_ptr<impl_type> impl;
SVSImplHandler(std::unique_ptr<impl_type> impl) : impl{std::move(impl)} {}
};

std::unique_ptr<ImplHandler> createImpl(const void *vectors_data, const labelType *labels,
size_t n) override {
// If no data provided, return empty handler
if (n == 0) {
return std::make_unique<SVSImplHandler>(nullptr);
}

std::span<const labelType> ids(labels, n);
auto processed_blob = this->preprocessForBatchStorage(vectors_data, n);
auto typed_vectors_data = static_cast<DataType *>(processed_blob.get());
// Wrap data into SVS SimpleDataView for SVS API
auto points = svs::data::SimpleDataView<DataType>{typed_vectors_data, n, this->dim};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated preprocessing logic in createImpl and addVectorsImpl

Low Severity

The new createImpl method duplicates the same preprocessing sequence found in addVectorsImpl: constructing a std::span from labels, calling preprocessForBatchStorage, casting the blob to DataType *, and constructing a SimpleDataView. If the preprocessing logic changes in one place but not the other, the two code paths will silently diverge, potentially causing subtle bugs.

Additional Locations (1)

Fix in Cursor Fix in Web


Comment on lines +233 to +238
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic seems to be a duplication of what we do in addVectorsImpl. Consider unifying these into a single function

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main point here is the processed_blob which lifetime should be managed till end of initImpl() and impl_->add_points() calls.
A single function, which will wrap all this code would look like:

std::tuple<std::span<const labelType>, MemoryUtils::unique_blob, svs::data::SimpleDataView<DataType>> preprocessAndPrepareSVSArgs(...)

return std::make_unique<SVSImplHandler>(initImpl(points, ids));
}

void setImpl(std::unique_ptr<ImplHandler> handler) override {
if (impl_ != nullptr) {
throw std::logic_error("SVSIndex::setImpl called on non-empty impl_");
}

SVSImplHandler *svs_handler = dynamic_cast<SVSImplHandler *>(handler.get());
if (!svs_handler) {
throw std::logic_error("Failed to cast to SVSImplHandler");
}
Comment on lines +247 to +250
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the motivation to have an abstract ImplHandler rather than have only SVSImplHandler? The dynamic_cast here seems a bit awkward

Copy link
Collaborator Author

@rfsaliev rfsaliev Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SVSImplHandler is not just a simple type - it is template class with a number of parameters, and it's full declaration looks like:

template <typename MetricType,
          typename DataType,
          bool isMulti,
          size_t QuantBits,
          size_t ResidualBits,
          bool IsLeanVec>
struct SVSIndex<MetricType, DataType, isMulti, QuantBits, ResidualBits, IsLeanVec>::SVSImplHandler;

This why the abstract SVSIndexBase::ImplHandler is defined for client code (TieredSVSIndex).

this->impl_ = std::move(svs_handler->impl);
}

// Assuming numThreads was updated to reflect the number of available threads before this
// function was called.
// This function assumes that the caller has already set numThreads to the appropriate value
Expand All @@ -230,7 +277,7 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl

if (!impl_) {
// SVS index instance cannot be empty, so we have to construct it at first rows
initImpl(points, ids);
impl_ = initImpl(points, ids);
} else {
// Add new points to existing SVS index
impl_->add_points(points, ids);
Expand All @@ -239,6 +286,17 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
return n - deleted_num;
}

int deleteVectorImpl(const labelType label) {
if (indexLabelCount() == 0 || !impl_->has_id(label)) {
return 0;
}

const auto deleted_num = impl_->delete_entries(std::span{&label, 1});

this->markIndexUpdate(deleted_num);
return deleted_num;
}

int deleteVectorsImpl(const labelType *labels, size_t n) {
if (indexLabelCount() == 0) {
return 0;
Expand All @@ -257,19 +315,8 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
return 0;
}

// If entries_to_delete.size() == 1, we should ensure single-threading
const size_t current_num_threads = getNumThreads();
if (n == 1 && current_num_threads > 1) {
setNumThreads(1);
}

const auto deleted_num = impl_->delete_entries(entries_to_delete);

// Restore multi-threading if needed
if (n == 1 && current_num_threads > 1) {
setNumThreads(current_num_threads);
}

this->markIndexUpdate(deleted_num);
return deleted_num;
}
Expand Down Expand Up @@ -484,12 +531,16 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
return addVectorsImpl(vectors_data, labels, n);
}

int deleteVector(labelType label) override { return deleteVectorsImpl(&label, 1); }
int deleteVector(labelType label) override { return deleteVectorImpl(label); }

int deleteVectors(const labelType *labels, size_t n) override {
return deleteVectorsImpl(labels, n);
}

bool isLabelExists(labelType label) const override {
return impl_ ? impl_->has_id(label) : false;
}

size_t getNumThreads() const override { return threadpool_.size(); }
void setNumThreads(size_t numThreads) override { threadpool_.resize(numThreads); }

Expand Down
70 changes: 62 additions & 8 deletions src/VecSim/algorithms/svs/svs_tiered.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
using swap_record = std::tuple<labelType, idType, idType>;
constexpr static size_t SKIP_LABEL = std::numeric_limits<labelType>::max();
std::vector<swap_record> swaps_journal;
// deleted_labels_journal is used by updateSVSIndex() to track vectors that were deleted from
// Flat index during SVS index updating. The journal contains the deleted labels. These labels
// are used to delete the same vectors from the SVS index at the end of the update.
std::vector<labelType> deleted_labels_journal;

size_t trainingTriggerThreshold;
size_t updateTriggerThreshold;
Expand Down Expand Up @@ -651,6 +655,7 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
void updateSVSIndex(size_t availableThreads) {
std::vector<labelType> labels_to_move;
std::vector<DataType> vectors_to_move;
std::vector<labelType> deleted_labels_during_update;

{ // lock frontendIndex from modifications
std::shared_lock flat_lock{this->flatIndexGuard};
Expand All @@ -668,22 +673,43 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
}
// reset journal to the current frontend index state
swaps_journal.clear();
deleted_labels_journal.clear();
} // release frontend index

executeTracingCallback("UpdateJob::before_add_to_svs");
{ // lock backend index for writing and add vectors there
std::lock_guard lock(this->mainIndexGuard);
std::shared_lock main_shared_lock(this->mainIndexGuard);
auto svs_index = GetSVSIndex();
assert(labels_to_move.size() == vectors_to_move.size() / this->frontendIndex->getDim());
svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size()));
svs_index->addVectors(vectors_to_move.data(), labels_to_move.data(),
labels_to_move.size());
if (this->backendIndex->indexSize() == 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this also handle re-initialization after the index was emptied? Is that scenario tested?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as it was before in SVSIndex::AddVectors()

// If backend index is empty, we need to initialize it first.
svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size()));
auto impl = svs_index->createImpl(vectors_to_move.data(), labels_to_move.data(),
labels_to_move.size());

// Upgrade to unique lock to set the new impl
main_shared_lock.unlock();
std::lock_guard lock(this->mainIndexGuard);
svs_index->setImpl(std::move(impl));
} else {
// Backend index is initialized - just add the vectors.
main_shared_lock.unlock();
std::lock_guard lock(this->mainIndexGuard);
// Upgrade to unique lock to add vectors
svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size()));
svs_index->addVectors(vectors_to_move.data(), labels_to_move.data(),
labels_to_move.size());
}
}
executeTracingCallback("UpdateJob::after_add_to_svs");
// clean-up frontend index
{ // lock frontend index for writing and delete moved vectors
std::lock_guard lock(this->flatIndexGuard);

// swap deleted labels journal with the local variable to track deleted labels during
// update
std::swap(deleted_labels_during_update, deleted_labels_journal);

// Apply swaps from journal to labels_to_move to reflect changes made in meanwhile.
applySwapsToLabelsArray(labels_to_move, this->swaps_journal);

Expand All @@ -703,6 +729,19 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
[](labelType label) { return label != SKIP_LABEL; }) &&
"Deleted vectors count does not match the number of labels to delete");
}
// delete vectors from backend index that were deleted from the frontend index during
// the update process.
{
std::lock_guard main_lock(this->mainIndexGuard);

std::sort(deleted_labels_during_update.begin(), deleted_labels_during_update.end());
auto it = std::unique(deleted_labels_during_update.begin(),
deleted_labels_during_update.end());
deleted_labels_during_update.erase(it, deleted_labels_during_update.end());
auto svs_index = GetSVSIndex();
svs_index->deleteVectors(deleted_labels_during_update.data(),
deleted_labels_during_update.size());
}
Comment on lines +732 to +744
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jurnal mechianism for removing vectors that were deleted during the training looks good. Can you add a unit test for this edge case? This isn't covered in two_stage_initialization_test from what I understand

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added appropriate test to test_svs_tiered.cpp

}

public:
Expand Down Expand Up @@ -801,8 +840,15 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
}
}
// Remove vector from the backend index if it exists in case of non-MULTI.
std::lock_guard lock(this->mainIndexGuard);
ret -= svs_index->deleteVectors(&label, 1);
auto label_exists = [&]() {
std::shared_lock lock(this->mainIndexGuard);
return svs_index->isLabelExists(label);
}();

if (label_exists) {
std::lock_guard lock(this->mainIndexGuard);
ret -= this->backendIndex->deleteVector(label);
}
}
{ // Add vector to the frontend index.
std::lock_guard lock(this->flatIndexGuard);
Expand All @@ -814,6 +860,7 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
for (auto id : this->frontendIndex->getElementIds(label)) {
this->swaps_journal.emplace_back(SKIP_LABEL, id, id);
}
deleted_labels_journal.push_back(label);
}
ret = std::max(ret + ft_ret, 0);
// Check frontend index size to determine if an update job schedule is needed.
Expand Down Expand Up @@ -868,6 +915,7 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
this->swaps_journal.emplace_back(SKIP_LABEL, id, id);
}
}
deleted_labels_journal.push_back(label);

return deleting_ids.size();
}
Expand All @@ -887,9 +935,15 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
std::lock_guard lock(this->flatIndexGuard);
ret = this->deleteAndRecordSwaps_Unsafe(label);
}
{

label_exists = [&]() {
std::shared_lock lock(this->mainIndexGuard);
return svs_index->isLabelExists(label);
}();

if (label_exists) {
std::lock_guard lock(this->mainIndexGuard);
ret += svs_index->deleteVectors(&label, 1);
ret += this->backendIndex->deleteVector(label);
}
return ret;
}
Expand Down
72 changes: 72 additions & 0 deletions tests/unit/test_svs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,78 @@ TYPED_TEST(SVSTest, svs_bulk_vectors_add_delete_test) {
VecSimIndex_Free(index);
}

TYPED_TEST(SVSTest, two_stage_initialization_test) {
size_t n = 256;
size_t k = 11;
const size_t dim = 4;

SVSParams params = {
.dim = dim,
.metric = VecSimMetric_L2,
/* SVS-Vamana specifics */
.alpha = 1.2,
.graph_max_degree = 64,
.construction_window_size = 20,
.max_candidate_pool_size = 1024,
.prune_to = 60,
.use_search_history = VecSimOption_ENABLE,
};

VecSimIndex *index = this->CreateNewIndex(params);
ASSERT_INDEX(index);

auto svs_index = this->CastToSVS(index);

std::vector<std::array<TEST_DATA_T, dim>> v(n);
for (size_t i = 0; i < n; i++) {
GenerateVector<TEST_DATA_T>(v[i].data(), dim, i);
}

std::vector<size_t> ids(n);
std::iota(ids.begin(), ids.end(), 0);

// 2-stage initialization
// initialization with null should fail
EXPECT_THROW(svs_index->setImpl(nullptr), std::logic_error);

// initialization with data should succeed
auto impl = svs_index->createImpl(v.data(), ids.data(), n);
svs_index->setImpl(std::move(impl));

ASSERT_EQ(VecSimIndex_IndexSize(index), n);

TEST_DATA_T query[] = {50, 50, 50, 50};
auto verify_res = [&](size_t id, double score, size_t index) { EXPECT_EQ(id, (index + 45)); };
runTopKSearchTest(index, query, k, verify_res, nullptr, BY_ID);

// Try to re-initialize with the same data.
impl = svs_index->createImpl(v.data(), ids.data(), n);
// Should fail because the index is not empty.
EXPECT_THROW(svs_index->setImpl(std::move(impl)), std::logic_error);

// Index should remain unchanged.
ASSERT_EQ(VecSimIndex_IndexSize(index), n);
runTopKSearchTest(index, query, k, verify_res, nullptr, BY_ID);

// Delete almost all vectors
const size_t keep_num = 1;
ASSERT_EQ(svs_index->deleteVectors(ids.data(), n - keep_num), n - keep_num);
// setImpl() should fail again because the index is not empty.
impl = svs_index->createImpl(v.data(), ids.data(), n);
EXPECT_THROW(svs_index->setImpl(std::move(impl)), std::logic_error);

// Delete rest of the vectors - index should be empty now and setImpl() should succeed.
ASSERT_EQ(svs_index->deleteVectors(ids.data() + n - keep_num, keep_num), keep_num);
ASSERT_EQ(VecSimIndex_IndexSize(index), 0);
// Re-initialization should succeed.
impl = svs_index->createImpl(v.data(), ids.data(), n);
svs_index->setImpl(std::move(impl));
ASSERT_EQ(VecSimIndex_IndexSize(index), n);
runTopKSearchTest(index, query, k, verify_res, nullptr, BY_ID);

VecSimIndex_Free(index);
}

TYPED_TEST(SVSTest, svs_get_distance) {
// Scalar quantization accuracy is insufficient for this test.
if (this->isFallbackToSQ()) {
Expand Down
Loading
Loading