diff --git a/.github/workflows/prerelease.yml b/.github/workflows/prerelease.yml index 397d4e4e..d277da42 100644 --- a/.github/workflows/prerelease.yml +++ b/.github/workflows/prerelease.yml @@ -17,6 +17,33 @@ permissions: jobs: + build_test: + name: Test USearch + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + with: + ref: main-dev + - run: git submodule update --init --recursive + - name: Prepare Environment + run: | + sudo apt update && + sudo apt install -y cmake g++-12 build-essential libjemalloc-dev + - name: Build + run: | + cmake -B ./build_release \ + -DCMAKE_CXX_COMPILER="g++-12" \ + -DCMAKE_BUILD_TYPE=Release \ + -DUSEARCH_BUILD_TEST=1 \ + -DUSEARCH_USE_OPENMP=1 \ + -DUSEARCH_USE_SIMSIMD=1 \ + -DUSEARCH_USE_JEMALLOC=1 \ + -DUSEARCH_BUILD_BENCHMARK=0 && + make -C ./build_release -j + - name: Run tests + run: ./build_release/test + + test_python_311: name: Test Python runs-on: ${{ matrix.os }} diff --git a/README.md b/README.md index 9ece5120..ad9fd525 100644 --- a/README.md +++ b/README.md @@ -759,9 +759,28 @@ index = Index(ndim=2048, metric=MetricKind.BitwiseTanimoto) labels = np.arange(len(molecules)) index.add(labels, fingerprints) -matches = index.search(fingerprints, 10) +matches: Matches = index.search(fingerprints, 10) ``` +Of if you need a bit of our SIMD superpowers - take some Chemsitry-oriented precompiled metrics from [SimSIMD][simsimd]. + +```python +import simsimd as sisi + +index = Index( + ndim=166, # MACCS fingerprints are 166-dimensional + metric=CompiledMetric( + pointer=sisi.to_int(sisi.tanimoto_maccs_neon), # For Arm Neon + signature=MetricSignature.ArrayArray, + kind=MetricKind.Tanimoto, + ), +) + +index.add(42, np.packbits([1] * 166)) +matches: Matches = index.search(np.packbits([1] * 166)) +``` + +[simsimd]: https://github.com/ashvardanian/simsimd [smiles]: https://en.wikipedia.org/wiki/Simplified_molecular-input_line-entry_system [rdkit-fingerprints]: https://www.rdkit.org/docs/RDKit_Book.html#additional-information-about-the-fingerprints diff --git a/c/Makefile b/c/Makefile index e3a0f88e..b13bfe5d 100644 --- a/c/Makefile +++ b/c/Makefile @@ -1,16 +1,17 @@ CC = gcc-12 +CXX = g++-12 C_FLAGS = -std=c99 +CXX_FLAGS = -std=c++17 -C_CXX_FLAGS = -Wall -Wextra -Wno-conversion -Wno-unknown-pragmas -C_CXX_FLAGS += -O3 -march=native +CXX_FLAGS += -Wall -Wextra -Wno-conversion -Wno-unknown-pragmas -O3 -march=native HEADER_INCLUDES = -I. -I ../include/ -I ../fp16/include/ -I ../robin-map/include/ .PHONY: build build: - $(CC) $(C_FLAGS) $(C_CXX_FLAGS) -o libusearch.so -O3 lib.cpp $(HEADER_INCLUDES) -shared -fPIC + $(CXX) $(CXX_FLAGS) -o libusearch.so -O3 lib.cpp $(HEADER_INCLUDES) -shared -fPIC .PHONY: test test: - $(CC) $(C_FLAGS) $(C_CXX_FLAGS) test.c -L. -lusearch -Wl,-rpath,. -o test + $(CC) $(C_FLAGS) test.c -L. -lusearch -Wl,-rpath,. -o test diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index daac2f50..44bb2ca0 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -13,7 +13,9 @@ endif() # https://cmake.org/cmake/help/latest/variable/CMAKE_LANG_COMPILER_ID.html if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3") - set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fsanitize=address") + set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -g") + set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO} -O3 -g") + set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fsanitize=address -fsanitize=leak -fsanitize=alignment -fsanitize=undefined") set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -Wall -Wextra -Wno-conversion -Wno-unknown-pragmas") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -march=native") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fmax-errors=1") @@ -22,7 +24,9 @@ if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") elseif(CMAKE_CXX_COMPILER_ID MATCHES "Clang") set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3") - set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fsanitize=address") + set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -g") + set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO} -O3 -g") + set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fsanitize=address -fsanitize=leak -fsanitize=alignment -fsanitize=undefined") set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -Wfatal-errors") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fopenmp") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pedantic") diff --git a/cpp/bench.cpp b/cpp/bench.cpp index b0eb383d..23a54a9d 100644 --- a/cpp/bench.cpp +++ b/cpp/bench.cpp @@ -24,6 +24,7 @@ #include // `stat` +#include #include #include #include // `std::cerr` @@ -32,6 +33,7 @@ #include // `std::to_string` #include // `std::thread::hardware_concurrency()` #include // `std::monostate` +#include #include // Command Line Interface #include // `omp_set_num_threads()` @@ -243,10 +245,10 @@ struct running_stats_printer_t { std::size_t new_progress = progress.load(); if (new_progress - last_printed_progress < step) return; - print(new_progress); + print(new_progress, total); } - void print(std::size_t progress) { + void print(std::size_t progress, std::size_t total) { constexpr char bars_k[] = "||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||"; constexpr std::size_t bars_len_k = 60; @@ -267,6 +269,7 @@ struct running_stats_printer_t { last_printed_progress = progress; last_printed_time = time_new; + this->total = total; } }; @@ -332,6 +335,8 @@ template // static void single_shot(dataset_at& dataset, index_at& index, bool construct = true) { using label_t = typename index_at::label_t; using distance_t = typename index_at::distance_t; + using join_result_t = typename index_at::join_result_t; + constexpr std::size_t missing_label = std::numeric_limits::max(); std::printf("\n"); std::printf("------------\n"); @@ -348,7 +353,7 @@ static void single_shot(dataset_at& dataset, index_at& index, bool construct = t search_many(index, dataset.queries_count(), dataset.query(0), dataset.dimensions(), dataset.neighborhood_size(), found_neighbors.data(), found_distances.data()); - // Evaluate quality + // Evaluate search quality std::size_t recall_at_1 = 0, recall_full = 0; for (std::size_t i = 0; i != dataset.queries_count(); ++i) { auto expected = dataset.neighborhood(i); @@ -360,6 +365,38 @@ static void single_shot(dataset_at& dataset, index_at& index, bool construct = t std::printf("Recall@1 %.2f %%\n", recall_at_1 * 100.f / dataset.queries_count()); std::printf("Recall %.2f %%\n", recall_full * 100.f / dataset.queries_count()); + // Perform joins + std::vector man_to_woman(dataset.vectors_count()); + std::vector woman_to_man(dataset.vectors_count()); + std::size_t join_attempts = 0; + { + index_at& men = index; + index_at women = index.copy().index; + std::fill(man_to_woman.begin(), man_to_woman.end(), missing_label); + std::fill(woman_to_man.begin(), woman_to_man.end(), missing_label); + { + executor_default_t executor(index.limits().threads()); + running_stats_printer_t printer{1, "Join"}; + join_result_t result = index_at::join( // + men, women, join_config_t{executor.size()}, // + man_to_woman.data(), woman_to_man.data(), // + executor, [&](std::size_t progress, std::size_t total) { + if (progress % 1000 == 0) + printer.print(progress, total); + }); + join_attempts = result.cycles; + } + } + // Evaluate join quality + std::size_t recall_join = 0, unmatched_count = 0; + for (std::size_t i = 0; i != index.size(); ++i) { + recall_join += man_to_woman[i] == i; + unmatched_count += man_to_woman[i] == missing_label; + } + std::printf("Recall Joins %.2f %%\n", recall_join * 100.f / index.size()); + std::printf("Unmatched %.2f %% (%zu items)\n", unmatched_count * 100.f / index.size(), unmatched_count); + std::printf("Proposals %.2f / man (%zu total)\n", join_attempts * 1.f / index.size(), join_attempts); + // Paginate std::vector hints(dataset.queries_count()); for (std::size_t i = 0; i != hints.size(); ++i) @@ -491,7 +528,7 @@ void run_punned(dataset_at& dataset, args_t const& args, index_config_t config, std::printf("Will benchmark an on-disk view\n"); - index_at index_view = index.fork(); + index_at index_view = index.fork().index; index_view.view(args.path_output.c_str()); single_shot(dataset, index_view, false); } diff --git a/docs/benchmarks.md b/docs/benchmarks.md index 8581a6c5..5eccd813 100644 --- a/docs/benchmarks.md +++ b/docs/benchmarks.md @@ -117,21 +117,23 @@ OPTIONS BigANN benchmark is a good starting point, if you are searching for large collections of high-dimensional vectors. Those often come with precomputed ground-truth neighbors, which is handy for recall evaluation. -| Dataset | Scalar Type | Dimensions | Metric | Size | -| :-------------------------------------- | :---------: | :--------: | :----: | :-------: | -| [Unum UForm Wiki][unum-wiki] | float32 | 256 | IP | 1 GB | -| [Yandex Text-to-Image Sample][unum-t2i] | float32 | 200 | Cos | 1 GB | -| | | | | | -| [Microsoft SPACEV][spacev] | int8 | 100 | L2 | 93 GB | -| [Microsoft Turing-ANNS][turing] | float32 | 100 | L2 | 373 GB | -| [Yandex Deep1B][deep] | float32 | 96 | L2 | 358 GB | -| [Yandex Text-to-Image][t2i] | float32 | 200 | Cos | 750 GB | -| | | | | | -| [ViT-L/12 LAION][laion] | float32 | 2048 | Cos | 2 - 10 TB | +| Dataset | Scalar Type | Dimensions | Metric | Size | +| :----------------------------------------- | :---------: | :--------: | :----: | :-------: | +| [Unum UForm Creative Captions][unum-cc-3m] | float32 | 256 | IP | 3 GB | +| [Unum UForm Wiki][unum-wiki-1m] | float32 | 256 | IP | 1 GB | +| [Yandex Text-to-Image Sample][unum-t2i] | float32 | 200 | Cos | 1 GB | +| | | | | | +| [Microsoft SPACEV][spacev] | int8 | 100 | L2 | 93 GB | +| [Microsoft Turing-ANNS][turing] | float32 | 100 | L2 | 373 GB | +| [Yandex Deep1B][deep] | float32 | 96 | L2 | 358 GB | +| [Yandex Text-to-Image][t2i] | float32 | 200 | Cos | 750 GB | +| | | | | | +| [ViT-L/12 LAION][laion] | float32 | 2048 | Cos | 2 - 10 TB | Luckily, smaller samples of those datasets are available. -[unum-wiki]: https://huggingface.co/datasets/unum-cloud/ann-wiki-1m +[unum-cc-3m]: https://huggingface.co/datasets/unum-cloud/ann-cc-3m +[unum-wiki-1m]: https://huggingface.co/datasets/unum-cloud/ann-wiki-1m [unum-t2i]: https://huggingface.co/datasets/unum-cloud/ann-t2i-1m [spacev]: https://github.com/microsoft/SPTAG/tree/main/datasets/SPACEV1B [turing]: https://learning2hash.github.io/publications/microsoftturinganns1B/ diff --git a/include/usearch/index.hpp b/include/usearch/index.hpp index 6c532ccc..814d6ae6 100644 --- a/include/usearch/index.hpp +++ b/include/usearch/index.hpp @@ -258,10 +258,13 @@ template class span_gt { std::size_t size_; public: - span_gt(scalar_at* begin, scalar_at const* end) noexcept : data_(begin), size_(end - begin) {} + span_gt() noexcept : data_(nullptr), size_(0u) {} + span_gt(scalar_at* begin, scalar_at* end) noexcept : data_(begin), size_(end - begin) {} span_gt(scalar_at* begin, std::size_t size) noexcept : data_(begin), size_(size) {} scalar_at* data() const noexcept { return data_; } std::size_t size() const noexcept { return size_; } + scalar_at* begin() const noexcept { return data_; } + scalar_at* end() const noexcept { return data_ + size_; } operator scalar_at*() const noexcept { return data(); } }; @@ -617,8 +620,8 @@ template struct expected_gt { } result_at const& operator*() const noexcept { return result; } explicit operator bool() const noexcept { return !error; } - expected_gt failed(char const* message) noexcept { - error = message; + expected_gt failed(error_t message) noexcept { + error = std::move(message); return std::move(*this); } }; @@ -1007,7 +1010,7 @@ class usearch_pack_m uint40_t { inline uint40_t& operator+=(std::size_t n) noexcept { unsigned char* n_octets = reinterpret_cast(&n); - std::uint32_t& n_tail = *reinterpret_cast(&n); + std::uint32_t& n_tail = *reinterpret_cast(n_octets); std::uint32_t& tail = *reinterpret_cast(octets); octets[4] += static_cast((tail + n_tail) < tail); tail += n_tail; @@ -1042,6 +1045,89 @@ class usearch_pack_m uint40_t { static_assert(sizeof(uint40_t) == 5, "uint40_t must be exactly 5 bytes"); +template > // +class ring_gt { + public: + using element_t = element_at; + using allocator_t = allocator_at; + + static_assert(std::is_trivially_destructible(), "This heap is designed for trivial structs"); + static_assert(std::is_trivially_copy_constructible(), "This heap is designed for trivial structs"); + + using value_type = element_t; + + private: + element_t* elements_; + std::size_t capacity_; + std::size_t head_; + std::size_t tail_; + bool empty_; + allocator_t allocator_; + + public: + explicit ring_gt(allocator_t const& alloc = allocator_t()) noexcept + : elements_(nullptr), capacity_(0), head_(0), tail_(0), empty_(true), allocator_(alloc) {} + + ring_gt(ring_gt const&) = delete; + ring_gt& operator=(ring_gt const&) = delete; + + ~ring_gt() noexcept { reset(); } + + bool empty() const noexcept { return empty_; } + size_t size() const noexcept { + if (empty_) + return 0; + else if (head_ >= tail_) + return head_ - tail_; + else + return capacity_ - (tail_ - head_); + } + + void reset() noexcept { + if (elements_) + allocator_.deallocate(elements_, capacity_); + elements_ = nullptr; + capacity_ = 0; + head_ = 0; + tail_ = 0; + empty_ = true; + } + + bool resize(std::size_t n) noexcept { + element_t* elements = allocator_.allocate(n); + if (!elements) + return false; + reset(); + elements_ = elements; + capacity_ = n; + return true; + } + + void push(element_t const& value) noexcept { + elements_[head_] = value; + head_ = (head_ + 1) % capacity_; + empty_ = false; + } + + bool try_push(element_t const& value) noexcept { + if (head_ == tail_ && !empty_) + return false; // elements_ is full + + return push(value); + return true; + } + + bool try_pop(element_t& value) noexcept { + if (empty_) + return false; + + value = std::move(elements_[tail_]); + tail_ = (tail_ + 1) % capacity_; + empty_ = head_ == tail_; + return true; + } +}; + /// @brief Number of neighbors per graph node. /// Defaults to 32 in FAISS and 16 in hnswlib. /// > It is called `M` in the paper. @@ -1123,6 +1209,21 @@ struct search_config_t { bool exact = false; }; +struct copy_config_t { + bool copy_vectors = true; +}; + +struct join_config_t { + /// @brief Controls maximum number of proposals per man during stable marriage. + std::size_t max_proposals = 0; + /// @brief Hyper-parameter controlling the quality of search. + /// Defaults to 16 in FAISS and 10 in hnswlib. + /// > It is called `ef` in the paper. + std::size_t expansion = default_expansion_search(); + /// @brief Brute-forces exhaustive search over all entries in the index. + bool exact = false; +}; + using file_header_t = byte_t[64]; /** @@ -1227,8 +1328,8 @@ struct file_head_result_t { error_t error; explicit operator bool() const noexcept { return !error; } - file_head_result_t failed(char const* message) noexcept { - error = message; + file_head_result_t failed(error_t message) noexcept { + error = std::move(message); return std::move(*this); } }; @@ -1262,6 +1363,37 @@ struct viewed_file_t { }; #endif +struct dummy_predicate_t { + template constexpr bool operator()(match_at&&) const noexcept { return true; } +}; + +struct dummy_progress_t { + inline void operator()(std::size_t /*progress*/, std::size_t /*total*/) const noexcept {} +}; + +struct dummy_executor_t { + dummy_executor_t() noexcept {} + std::size_t size() const noexcept { return 1; } + + template + void execute_bulk(std::size_t tasks, thread_aware_function_at&& thread_aware_function) noexcept { + for (std::size_t task_idx = 0; task_idx != tasks; ++task_idx) + thread_aware_function(0, task_idx); + } + + template + void execute_bulk(thread_aware_function_at&& thread_aware_function) noexcept { + thread_aware_function(0); + } +}; + +struct dummy_label_to_label_mapping_t { + struct member_ref_t { + template member_ref_t& operator=(label_at&&) noexcept { return *this; } + }; + template member_ref_t operator[](label_at&&) const noexcept { return {}; } +}; + /** * @brief Approximate Nearest Neighbors Search index using the * Hierarchical Navigable Small World graph algorithm. @@ -1552,7 +1684,7 @@ class index_gt { index_config_t config_{}; index_limits_t limits_{}; metric_t metric_{}; - allocator_t allocator_{}; + mutable allocator_t allocator_{}; point_allocator_t point_allocator_{}; precomputed_constants_t pre_{}; viewed_file_t viewed_file_{}; @@ -1606,6 +1738,35 @@ class index_gt { return *this; } + struct copy_result_t { + error_t error; + index_gt index; + + explicit operator bool() const noexcept { return !error; } + copy_result_t failed(error_t message) noexcept { + error = std::move(message); + return std::move(*this); + } + }; + + copy_result_t copy(copy_config_t /*config*/ = {}) const noexcept { + copy_result_t result; + index_gt& other = result.index; + other = index_gt(config_, metric_, allocator_, point_allocator_); + if (!other.reserve(limits_)) + return result.failed("Failed to reserve the contexts"); + + // Now all is left - is to allocate new `node_t` instances and populate + // the `other.nodes_` array into it. + for (std::size_t i = 0; i != size_; ++i) + other.nodes_[i] = other.node_make_copy_(node_bytes_split_(nodes_[i])); + + other.size_ = size_.load(); + other.max_level_ = max_level_; + other.entry_id_ = entry_id_; + return result; + } + member_citerator_t cbegin() const noexcept { return {this, 0}; } member_citerator_t cend() const noexcept { return {this, size()}; } member_citerator_t begin() const noexcept { return {this, 0}; } @@ -1657,11 +1818,13 @@ class index_gt { std::swap(limits_, other.limits_); std::swap(metric_, other.metric_); std::swap(allocator_, other.allocator_); + std::swap(point_allocator_, other.point_allocator_); std::swap(pre_, other.pre_); std::swap(viewed_file_, other.viewed_file_); std::swap(max_level_, other.max_level_); std::swap(entry_id_, other.entry_id_); std::swap(nodes_, other.nodes_); + std::swap(nodes_mutexes_, other.nodes_mutexes_); std::swap(contexts_, other.contexts_); // Non-atomic parts. @@ -1760,8 +1923,8 @@ class index_gt { id_t id{}; explicit operator bool() const noexcept { return !error; } - add_result_t failed(char const* message) noexcept { - error = message; + add_result_t failed(error_t message) noexcept { + error = std::move(message); return std::move(*this); } }; @@ -1793,14 +1956,17 @@ class index_gt { inline search_result_t& operator=(search_result_t&&) = default; explicit operator bool() const noexcept { return !error; } - search_result_t failed(char const* message) noexcept { - error = message; + search_result_t failed(error_t message) noexcept { + error = std::move(message); return std::move(*this); } inline operator std::size_t() const noexcept { return count; } inline std::size_t size() const noexcept { return count; } + inline bool empty() const noexcept { return !count; } inline match_t operator[](std::size_t i) const noexcept { return at(i); } + inline match_t front() const noexcept { return at(0); } + inline match_t back() const noexcept { return at(count - 1); } inline bool contains(label_t label) const noexcept { for (std::size_t i = 0; i != count; ++i) if (at(i).member.label == label) @@ -1866,7 +2032,7 @@ class index_gt { new_level_lock.unlock(); // Allocate the neighbors - node_t node = node_malloc_(label, vector, target_level, config.store_vector); + node_t node = node_make_(label, vector, target_level, config.store_vector); if (!node) return result.failed("Out of memory!"); std::size_t old_size = size_.fetch_add(1); @@ -1910,10 +2076,6 @@ class index_gt { return result; } - struct dummy_predicate_t { - constexpr bool operator()(match_t const&) const noexcept { return true; } - }; - /** * @brief Searches for the closest elements to the given ::query. Thread-safe. * @@ -2083,8 +2245,8 @@ class index_gt { error_t error; explicit operator bool() const noexcept { return !error; } - serialization_result_t failed(char const* message) noexcept { - error = message; + serialization_result_t failed(error_t message) noexcept { + error = std::move(message); return std::move(*this); } }; @@ -2094,7 +2256,8 @@ class index_gt { * co-locating vectors and neighbors lists. * Available on Linux, MacOS, Windows. */ - serialization_result_t save(char const* file_path) const noexcept { + template + serialization_result_t save(char const* file_path, progress_at&& progress = {}) const noexcept { // Make sure we have right to write to that file serialization_result_t result; @@ -2164,6 +2327,7 @@ class index_gt { write_chunk(node.vector(), node_vector_bytes); if (result.error) return result; + progress(i, state.size); } std::fclose(file); @@ -2175,7 +2339,8 @@ class index_gt { * copying both vectors and neighbors lists into RAM. * Available on Linux, MacOS, Windows. */ - serialization_result_t load(char const* file_path) noexcept { + template + serialization_result_t load(char const* file_path, progress_at&& progress = {}) noexcept { serialization_result_t result; file_header_t state_buffer{}; @@ -2223,7 +2388,8 @@ class index_gt { } // Load nodes one by one - for (std::size_t i = 0; i != size_; ++i) { + std::size_t const size = size_; + for (std::size_t i = 0; i != size; ++i) { label_t label; dim_t dim; level_t level; @@ -2238,11 +2404,15 @@ class index_gt { return result; std::size_t node_bytes = node_bytes_(dim, level); - node_t node = node_malloc_(label, {nullptr, dim}, level, true); + node_t node = node_malloc_(dim, level); + node.label(label); + node.dim(dim); + node.level(level); read_chunk(node.tape() + node_head_bytes_(), node_bytes - node_head_bytes_()); if (result.error) return result; nodes_[i] = node; + progress(i, size); } std::fclose(file); @@ -2255,7 +2425,8 @@ class index_gt { * @b without copying the vectors and neighbors lists into RAM. * Available on Linux, MacOS, but @b not on Windows. */ - serialization_result_t view(char const* file_path) noexcept { + template + serialization_result_t view(char const* file_path, progress_at&& progress = {}) noexcept { serialization_result_t result; #if defined(USEARCH_DEFINED_WINDOWS) @@ -2337,16 +2508,18 @@ class index_gt { } // Locate every node packed into file - std::size_t progress = sizeof(file_header_t); - for (std::size_t i = 0; i != size_; ++i) { - byte_t* tape = (byte_t*)(file + progress); + std::size_t progress_bytes = sizeof(file_header_t); + std::size_t const size = size_; + for (std::size_t i = 0; i != size; ++i) { + byte_t* tape = (byte_t*)(file + progress_bytes); dim_t dim = misaligned_load(tape + sizeof(label_t)); level_t level = misaligned_load(tape + sizeof(label_t) + sizeof(dim_t)); std::size_t node_bytes = node_bytes_(dim, level); std::size_t node_vector_bytes = dim * sizeof(scalar_t); nodes_[i] = node_t{tape, (scalar_t*)(tape + node_bytes - node_vector_bytes)}; - progress += node_bytes; + progress_bytes += node_bytes; + progress(i, size); } return {}; @@ -2354,7 +2527,224 @@ class index_gt { #pragma endregion + struct join_result_t { + error_t error{}; + std::size_t intersection_size{}; + std::size_t engagements{}; + std::size_t cycles{}; + std::size_t measurements{}; + + explicit operator bool() const noexcept { return !error; } + join_result_t failed(error_t message) noexcept { + error = std::move(message); + return std::move(*this); + } + }; + + /** + * @brief Adapts the Male-Optimal Stable Marriage algorithm for unequal sets + * to perform fast one-to-one matching between two large collections + * of vectors, using approximate nearest neighbors search. + */ + template < // + typename first_to_second_at = dummy_label_to_label_mapping_t, // + typename second_to_first_at = dummy_label_to_label_mapping_t, // + typename executor_at = dummy_executor_t, // + typename progress_at = dummy_progress_t // + > + static join_result_t join( // + index_gt const& first, index_gt const& second, // + join_config_t config = {}, // + first_to_second_at&& first_to_second = first_to_second_at{}, // + second_to_first_at&& second_to_first = second_to_first_at{}, // + executor_at&& executor = executor_at{}, // + progress_at&& progress = progress_at{}) noexcept { + + if (second.size() < first.size()) + return join_small_and_big_( // + second, first, // + config, // + std::forward(second_to_first), // + std::forward(first_to_second), // + std::forward(executor), // + std::forward(progress)); + else + return join_small_and_big_( // + first, second, // + config, // + std::forward(first_to_second), // + std::forward(second_to_first), // + std::forward(executor), // + std::forward(progress)); + } + private: + template + static join_result_t join_small_and_big_( // + index_gt const& men, index_gt const& women, // + join_config_t config, // + first_to_second_at&& man_to_woman, // + second_to_first_at&& woman_to_man, // + executor_at&& executor, // + progress_at&& progress) noexcept { + + join_result_t result; + + // Sanity checks and argument validation: + if (&men == &women) + return result.failed("Can't join with itself, consider copying"); + if (config.max_proposals == 0) + config.max_proposals = std::log(men.size()) + executor.size(); + config.max_proposals = (std::min)(men.size(), config.max_proposals); + + using proposals_count_t = std::uint16_t; + using ids_allocator_t = typename allocator_traits_t::template rebind_alloc; + allocator_t& alloc = men.allocator_; + + // Create an atomic queue, as a ring structure, from/to which + // free men will be added/pulled. + std::mutex free_men_mutex{}; + ring_gt free_men; + free_men.resize(men.size()); + for (std::size_t i = 0; i != men.size(); ++i) + free_men.push(static_cast(i)); + + // We are gonna need some temporary memory. + proposals_count_t* proposal_counts = (proposals_count_t*)alloc.allocate(sizeof(proposals_count_t) * men.size()); + id_t* man_to_woman_ids = (id_t*)alloc.allocate(sizeof(id_t) * men.size()); + id_t* woman_to_man_ids = (id_t*)alloc.allocate(sizeof(id_t) * women.size()); + if (!proposal_counts || !man_to_woman_ids || !woman_to_man_ids) + return result.failed("Can't temporary mappings"); + + id_t missing_id; + std::memset((void*)&missing_id, 0xFF, sizeof(id_t)); + std::memset((void*)man_to_woman_ids, 0xFF, sizeof(id_t) * men.size()); + std::memset((void*)woman_to_man_ids, 0xFF, sizeof(id_t) * women.size()); + std::memset(proposal_counts, 0, sizeof(proposals_count_t) * men.size()); + + // Define locks, to limit concurrent accesses to `man_to_woman_ids` and `woman_to_man_ids`. + visits_bitset_t men_locks, women_locks; + if (!men_locks.resize(men.size()) || !women_locks.resize(women.size())) + return result.failed("Can't allocate locks"); + + // Accumulate statistics from all the contexts, + // to have a baseline to compare with, by the time the `join` is finished. + std::size_t old_measurements{}; + std::size_t old_cycles{}; + for (std::size_t thread_idx = 0; thread_idx != executor.size(); ++thread_idx) { + old_measurements += women.contexts_[thread_idx].measurements_count; + old_cycles += women.contexts_[thread_idx].iteration_cycles; + } + std::atomic rounds{0}; + std::atomic engagements{0}; + + // Concurrently process all the men + executor.execute_bulk([&](std::size_t thread_idx) { + context_t& context = women.contexts_[thread_idx]; + search_config_t search_config; + search_config.expansion = config.expansion; + search_config.exact = config.exact; + search_config.thread = thread_idx; + id_t free_man_id; + + // While there exist a free man who still has a woman to propose to. + while (true) { + std::size_t passed_rounds = 0; + std::size_t total_rounds = 0; + { + std::unique_lock pop_lock(free_men_mutex); + if (!free_men.try_pop(free_man_id)) + // Primary exit path, we have exhausted the list of candidates + break; + passed_rounds = ++rounds; + total_rounds = passed_rounds + free_men.size(); + } + progress(passed_rounds, total_rounds); + while (men_locks.atomic_set(free_man_id)) + ; + + node_t free_man = men.node_with_id_(free_man_id); + proposals_count_t& free_man_proposals = proposal_counts[free_man_id]; + if (free_man_proposals >= config.max_proposals) + continue; + + // Find the closest woman, to whom this man hasn't proposed yet. + free_man_proposals++; + search_result_t candidates = women.search(free_man.vector_view(), free_man_proposals, search_config); + if (!candidates) { + // TODO: + } + + match_t match = candidates.back(); + member_cref_t woman = match.member; + while (women_locks.atomic_set(woman.id)) + ; + + id_t husband_id = woman_to_man_ids[woman.id]; + bool woman_is_free = husband_id == missing_id; + if (woman_is_free) { + // Engagement + man_to_woman_ids[free_man_id] = woman.id; + woman_to_man_ids[woman.id] = free_man_id; + engagements++; + } else { + distance_t distance_from_husband = context.measure(woman.vector, men.node_with_id_(husband_id)); + distance_t distance_from_candidate = match.distance; + if (distance_from_husband > distance_from_candidate) { + // Break-up + while (men_locks.atomic_set(husband_id)) + ; + man_to_woman_ids[husband_id] = missing_id; + men_locks.atomic_reset(husband_id); + + // New Engagement + man_to_woman_ids[free_man_id] = woman.id; + woman_to_man_ids[woman.id] = free_man_id; + engagements++; + + std::unique_lock push_lock(free_men_mutex); + free_men.push(husband_id); + } else { + std::unique_lock push_lock(free_men_mutex); + free_men.push(free_man_id); + } + } + + men_locks.atomic_reset(free_man_id); + women_locks.atomic_reset(woman.id); + } + }); + + // Export the IDs into labels: + std::size_t intersection_size = 0; + for (std::size_t i = 0; i != men.size(); ++i) { + id_t woman_id = man_to_woman_ids[i]; + if (woman_id != missing_id) { + label_t man = men.node_with_id_(i).label(); + label_t woman = women.node_with_id_(woman_id).label(); + man_to_woman[man] = woman; + woman_to_man[woman] = man; + intersection_size++; + } + } + + // Deallocate memory + alloc.deallocate((byte_t*)proposal_counts, sizeof(proposals_count_t) * men.size()); + alloc.deallocate((byte_t*)man_to_woman_ids, sizeof(id_t) * men.size()); + alloc.deallocate((byte_t*)woman_to_man_ids, sizeof(id_t) * women.size()); + + // Export stats + result.engagements = engagements; + result.intersection_size = intersection_size; + for (std::size_t thread_idx = 0; thread_idx != executor.size(); ++thread_idx) { + result.measurements += women.contexts_[thread_idx].measurements_count; + result.cycles += women.contexts_[thread_idx].iteration_cycles; + } + result.measurements -= old_measurements; + result.cycles -= old_cycles; + return result; + } + void reset_view_() noexcept { if (!viewed_file_) return; @@ -2384,51 +2774,80 @@ class index_gt { return node_head_bytes_() + pre_.neighbors_base_bytes + pre_.neighbors_bytes * level + sizeof(scalar_t) * dim; } - inline bool node_stored_(node_t node) const noexcept { + using span_bytes_t = span_gt; + struct node_bytes_split_t { + span_bytes_t tape{}; + span_bytes_t vector{}; + + node_bytes_split_t() {} + node_bytes_split_t(span_bytes_t tape, span_bytes_t vector) noexcept : tape(tape), vector(vector) {} + + std::size_t memory_usage() const noexcept { return tape.size() + vector.size(); } + bool colocated() const noexcept { return tape.end() == vector.begin(); } + operator node_t() const noexcept { return node_t{tape.begin(), reinterpret_cast(vector.begin())}; } + }; + + inline node_bytes_split_t node_bytes_split_(node_t node) const noexcept { std::size_t levels_bytes = pre_.neighbors_base_bytes + pre_.neighbors_bytes * node.level(); - return (node.tape() + node_head_bytes_() + levels_bytes) == (byte_t*)node.vector(); + std::size_t bytes_in_tape = node_head_bytes_() + levels_bytes; + return {{node.tape(), bytes_in_tape}, {(byte_t*)node.vector(), node_vector_bytes_(node)}}; } inline std::size_t node_vector_bytes_(dim_t dim) const noexcept { return dim * sizeof(scalar_t); } inline std::size_t node_vector_bytes_(node_t node) const noexcept { return node_vector_bytes_(node.dim()); } - void node_free_(std::size_t id) noexcept { - - if (viewed_file_) - return; - - node_t& node = nodes_[id]; - std::size_t node_bytes = node_bytes_(node) - node_vector_bytes_(node) * !node_stored_(node); - point_allocator_.deallocate(node.tape(), node_bytes); - node = node_t{}; - } - - node_t node_malloc_(label_t label, vector_view_t vector, level_t level, bool store_vector) noexcept { + node_bytes_split_t node_malloc_(dim_t dims_to_store, level_t level) noexcept { - std::size_t dim = vector.size(); - std::size_t stored_vector_bytes = node_vector_bytes_(static_cast(dim)) * std::size_t(store_vector); - std::size_t node_bytes = node_bytes_(static_cast(dim), level) - - node_vector_bytes_(static_cast(dim)) * std::size_t(!store_vector); + std::size_t vector_bytes = node_vector_bytes_(dims_to_store); + std::size_t node_bytes = node_bytes_(dims_to_store, level); + std::size_t non_vector_bytes = node_bytes - vector_bytes; byte_t* data = (byte_t*)point_allocator_.allocate(node_bytes); if (!data) - return {}; - - std::memset(data, 0, node_bytes); - if (vector.data()) - std::memcpy(data + node_bytes - stored_vector_bytes, vector.data(), stored_vector_bytes); - - scalar_t* scalars = store_vector // - ? (scalar_t*)(data + node_bytes - stored_vector_bytes) - : (scalar_t*)(vector.data()); + return node_bytes_split_t{}; + return {{data, non_vector_bytes}, {data + non_vector_bytes, vector_bytes}}; + } - node_t node{data, scalars}; + node_t node_make_(label_t label, vector_view_t vector, level_t level, bool store_vector) noexcept { + node_bytes_split_t node_bytes = node_malloc_(vector.size() * store_vector, level); + if (store_vector) { + std::memset(node_bytes.tape.data(), 0, node_bytes.tape.size()); + std::memcpy(node_bytes.vector.data(), vector.data(), node_bytes.vector.size()); + } else { + std::memset(node_bytes.tape.data(), 0, node_bytes.memory_usage()); + } + node_t node = node_bytes; node.label(label); - node.dim(static_cast(dim)); + node.dim(static_cast(vector.size())); node.level(level); return node; } + node_t node_make_copy_(node_bytes_split_t old_bytes) noexcept { + if (old_bytes.colocated()) { + byte_t* data = (byte_t*)point_allocator_.allocate(old_bytes.memory_usage()); + std::memcpy(data, old_bytes.tape.data(), old_bytes.memory_usage()); + return node_t{data, reinterpret_cast(data + old_bytes.tape.size())}; + } else { + node_t old_node = old_bytes; + node_bytes_split_t node_bytes = node_malloc_(old_node.vector_view().size(), old_node.level()); + std::memcpy(node_bytes.tape.data(), old_bytes.tape.data(), old_bytes.tape.size()); + std::memcpy(node_bytes.vector.data(), old_bytes.vector.data(), old_bytes.vector.size()); + return node_bytes; + } + } + + void node_free_(std::size_t id) noexcept { + + if (viewed_file_) + return; + + node_t& node = nodes_[id]; + std::size_t node_bytes = node_bytes_(node) - node_vector_bytes_(node) * !node_bytes_split_(node).colocated(); + point_allocator_.deallocate(node.tape(), node_bytes); + node = node_t{}; + } + inline node_t node_with_id_(std::size_t idx) const noexcept { return nodes_[idx]; } inline neighbors_ref_t neighbors_base_(node_t node) const noexcept { return {node.neighbors_tape()}; } diff --git a/include/usearch/index_punned_dense.hpp b/include/usearch/index_punned_dense.hpp index 85e02454..04aa117f 100644 --- a/include/usearch/index_punned_dense.hpp +++ b/include/usearch/index_punned_dense.hpp @@ -170,6 +170,7 @@ class index_punned_dense_gt { using search_result_t = typename index_t::search_result_t; using add_result_t = typename index_t::add_result_t; using serialization_result_t = typename index_t::serialization_result_t; + using join_result_t = typename index_t::join_result_t; using stats_t = typename index_t::stats_t; index_punned_dense_gt() = default; @@ -184,12 +185,15 @@ class index_punned_dense_gt { void swap(index_punned_dense_gt& other) { std::swap(dimensions_, other.dimensions_); std::swap(scalar_words_, other.scalar_words_); + std::swap(expansion_add_, other.expansion_add_); + std::swap(expansion_search_, other.expansion_search_); std::swap(casted_vector_bytes_, other.casted_vector_bytes_); std::swap(typed_, other.typed_); std::swap(cast_buffer_, other.cast_buffer_); std::swap(casts_, other.casts_); std::swap(root_metric_, other.root_metric_); std::swap(available_threads_, other.available_threads_); + std::swap(lookup_table_, other.lookup_table_); } static index_config_t optimize(index_config_t config) { return index_t::optimize(config); } @@ -199,6 +203,7 @@ class index_punned_dense_gt { std::size_t connectivity() const { return typed_->connectivity(); } std::size_t size() const { return typed_->size(); } std::size_t capacity() const { return typed_->capacity(); } + std::size_t max_level() const noexcept { return typed_->max_level(); } index_config_t const& config() const { return typed_->config(); } index_limits_t const& limits() const { return typed_->limits(); } void clear() { return typed_->clear(); } @@ -323,23 +328,76 @@ class index_punned_dense_gt { metric_t(metric), make_casts_(accuracy)); } - index_punned_dense_gt fork() const { - index_punned_dense_gt result; + struct copy_result_t { + index_punned_dense_gt index; + error_t error; + + explicit operator bool() const noexcept { return !error; } + copy_result_t failed(error_t message) noexcept { + error = std::move(message); + return std::move(*this); + } + }; + + copy_result_t copy(copy_config_t config = {}) const { + copy_result_t result = fork(); + if (!result) + return result; + auto typed_result = typed_->copy(config); + if (!typed_result) + return result.failed(std::move(typed_result.error)); + *result.index.typed_ = std::move(typed_result.index); + result.index.lookup_table_ = lookup_table_; + return result; + } - result.dimensions_ = dimensions_; - result.scalar_words_ = scalar_words_; - result.casted_vector_bytes_ = casted_vector_bytes_; - result.cast_buffer_ = cast_buffer_; - result.casts_ = casts_; + copy_result_t fork() const { + copy_result_t result; + index_punned_dense_gt& other = result.index; + + other.dimensions_ = dimensions_; + other.scalar_words_ = scalar_words_; + other.expansion_add_ = expansion_add_; + other.expansion_search_ = expansion_search_; + other.casted_vector_bytes_ = casted_vector_bytes_; + other.cast_buffer_ = cast_buffer_; + other.casts_ = casts_; + + other.root_metric_ = root_metric_; + other.available_threads_ = available_threads_; - result.root_metric_ = root_metric_; index_t* raw = index_allocator_t{}.allocate(1); - new (raw) index_t(config(), root_metric_); - result.typed_ = raw; + if (!raw) + return result.failed("Can't allocate the index"); + new (raw) index_t(config(), root_metric_); + other.typed_ = raw; return result; } + template < // + typename first_to_second_at = dummy_label_to_label_mapping_t, // + typename second_to_first_at = dummy_label_to_label_mapping_t, // + typename executor_at = dummy_executor_t, // + typename progress_at = dummy_progress_t // + > + static join_result_t join( // + index_punned_dense_gt const& first, // + index_punned_dense_gt const& second, // + join_config_t config = {}, // + first_to_second_at&& first_to_second = first_to_second_at{}, // + second_to_first_at&& second_to_first = second_to_first_at{}, // + executor_at&& executor = executor_at{}, // + progress_at&& progress = progress_at{}) noexcept { + + return index_t::join( // + *first.typed_, *second.typed_, config, // + std::forward(first_to_second), // + std::forward(second_to_first), // + std::forward(executor), // + std::forward(progress)); + } + private: struct thread_lock_t { index_punned_dense_gt const& parent; diff --git a/include/usearch/index_punned_helpers.hpp b/include/usearch/index_punned_helpers.hpp index b81789b8..f4016bed 100644 --- a/include/usearch/index_punned_helpers.hpp +++ b/include/usearch/index_punned_helpers.hpp @@ -313,9 +313,11 @@ class executor_stl_t { std::size_t threads_count_{}; public: - executor_stl_t(std::size_t threads_count) noexcept + executor_stl_t(std::size_t threads_count = 0) noexcept : threads_count_(threads_count ? threads_count : std::thread::hardware_concurrency()) {} + std::size_t size() const noexcept { return threads_count_; } + template void execute_bulk(std::size_t tasks, thread_aware_function_at&& thread_aware_function) noexcept(false) { std::vector threads_pool; @@ -330,22 +332,39 @@ class executor_stl_t { for (std::size_t thread_idx = 0; thread_idx != threads_count_; ++thread_idx) threads_pool[thread_idx].join(); } + + template + void execute_bulk(thread_aware_function_at&& thread_aware_function) noexcept(false) { + std::vector threads_pool; + for (std::size_t thread_idx = 0; thread_idx != threads_count_; ++thread_idx) + threads_pool.emplace_back([=]() { thread_aware_function(thread_idx); }); + for (std::size_t thread_idx = 0; thread_idx != threads_count_; ++thread_idx) + threads_pool[thread_idx].join(); + } }; #if USEARCH_USE_OPENMP class executor_openmp_t { public: - executor_openmp_t(std::size_t threads_count) noexcept { + executor_openmp_t(std::size_t threads_count = 0) noexcept { omp_set_num_threads(threads_count ? threads_count : std::thread::hardware_concurrency()); } + std::size_t size() const noexcept { return omp_get_num_threads(); } + template void execute_bulk(std::size_t tasks, thread_aware_function_at&& thread_aware_function) noexcept(false) { #pragma omp parallel for schedule(dynamic) - for (std::size_t i = 0; i < tasks; ++i) + for (std::size_t i = 0; i != tasks; ++i) thread_aware_function(omp_get_thread_num(), i); } + + template + void execute_bulk(thread_aware_function_at&& thread_aware_function) noexcept(false) { +#pragma omp parallel + { thread_aware_function(omp_get_thread_num()); } + } }; using executor_default_t = executor_openmp_t; @@ -451,6 +470,12 @@ template class memory_mapping_allocator_gt { return *this; } + memory_mapping_allocator_gt(memory_mapping_allocator_gt const&) noexcept {} + memory_mapping_allocator_gt& operator=(memory_mapping_allocator_gt const&) noexcept { + reset(); + return *this; + } + ~memory_mapping_allocator_gt() noexcept { reset(); } inline byte_t* allocate(std::size_t count_bytes) noexcept { diff --git a/python/lib.cpp b/python/lib.cpp index f7f1084f..76cf86fe 100644 --- a/python/lib.cpp +++ b/python/lib.cpp @@ -18,6 +18,7 @@ #define PY_SSIZE_T_CLEAN #include #include +#include #include @@ -356,7 +357,6 @@ static py::tuple search_many_in_index( // Py_ssize_t vectors_count = vectors_info.shape[0]; Py_ssize_t vectors_dimensions = vectors_info.shape[1]; - char const* vectors_data = reinterpret_cast(vectors_info.ptr); if (vectors_dimensions != static_cast(index.scalar_words())) throw std::invalid_argument("The number of vector dimensions doesn't match!"); @@ -381,10 +381,48 @@ static py::tuple search_many_in_index( // return results; } +static std::unordered_map join_index( // + dense_index_py_t const& a, dense_index_py_t const& b, // + std::size_t max_proposals, bool exact) { + + std::unordered_map a_to_b; + a_to_b.reserve((std::min)(a.size(), b.size())); + + using join_result_t = typename dense_index_py_t::join_result_t; + join_config_t config; + + config.max_proposals = max_proposals; + config.exact = exact; + config.expansion = (std::max)(a.expansion_search(), b.expansion_search()); + std::size_t threads = (std::min)(a.limits().threads(), b.limits().threads()); + executor_default_t executor{threads}; + join_result_t result = dense_index_py_t::join( // + a, b, config, // + a_to_b, // + dummy_label_to_label_mapping_t{}, // + executor); + result.error.raise(); + return a_to_b; +} + +static dense_index_py_t copy_index(dense_index_py_t const& index) { + + using copy_result_t = typename dense_index_py_t::copy_result_t; + copy_config_t config; + + copy_result_t result = index.copy(); + result.error.raise(); + return std::move(result.index); +} + // clang-format off template void save_index(index_at const& index, std::string const& path) { index.save(path.c_str()).error.raise(); } template void load_index(index_at& index, std::string const& path) { index.load(path.c_str()).error.raise(); } template void view_index(index_at& index, std::string const& path) { index.view(path.c_str()).error.raise(); } +template void clear_index(index_at& index) { index.clear(); } +template std::size_t max_level(index_at const &index) { return index.max_level(); } +template typename index_at::stats_t compute_stats(index_at const &index) { return index.stats(); } +template typename index_at::stats_t compute_level_stats(index_at const &index, std::size_t level) { return index.stats(level); } // clang-format on template @@ -650,7 +688,21 @@ PYBIND11_MODULE(compiled, m) { i.def("save", &save_index, py::arg("path"), py::call_guard()); i.def("load", &load_index, py::arg("path"), py::call_guard()); i.def("view", &view_index, py::arg("path"), py::call_guard()); - i.def("clear", &dense_index_py_t::clear, py::call_guard()); + i.def("clear", &clear_index, py::call_guard()); + i.def("copy", ©_index, py::call_guard()); + i.def("join", &join_index, py::arg("other"), py::arg("max_proposals") = 0, py::arg("exact") = false, + py::call_guard()); + + using punned_index_stats_t = typename dense_index_py_t::stats_t; + auto i_stats = py::class_(m, "IndexStats"); + i_stats.def_readonly("nodes", &punned_index_stats_t::nodes); + i_stats.def_readonly("edges", &punned_index_stats_t::edges); + i_stats.def_readonly("max_edges", &punned_index_stats_t::max_edges); + i_stats.def_readonly("allocated_bytes", &punned_index_stats_t::allocated_bytes); + + i.def_property_readonly("max_level", &max_level); + i.def_property_readonly("levels_stats", &compute_stats); + i.def("level_stats", &compute_level_stats, py::arg("level")); auto si = py::class_(m, "SparseIndex"); @@ -704,8 +756,8 @@ PYBIND11_MODULE(compiled, m) { si.def_property_readonly("connectivity", &sparse_index_py_t::connectivity); si.def_property_readonly("capacity", &sparse_index_py_t::capacity); - si.def("save", &sparse_index_py_t::save, py::arg("path")); - si.def("load", &sparse_index_py_t::load, py::arg("path")); - si.def("view", &sparse_index_py_t::view, py::arg("path")); + si.def("save", &save_index, py::arg("path")); + si.def("load", &load_index, py::arg("path")); + si.def("view", &view_index, py::arg("path")); si.def("clear", &sparse_index_py_t::clear); } diff --git a/python/scripts/join.py b/python/scripts/join.py new file mode 100644 index 00000000..f37194c0 --- /dev/null +++ b/python/scripts/join.py @@ -0,0 +1,151 @@ +""" +wget -nc https://huggingface.co/datasets/unum-cloud/ann-cc-3m/resolve/main/clip_images.fbin -P datasets/cc_3M/ +wget -nc https://huggingface.co/datasets/unum-cloud/ann-cc-3m/resolve/main/clip_texts.fbin -P datasets/cc_3M/ +wget -nc https://huggingface.co/datasets/unum-cloud/ann-cc-3m/resolve/main/images.fbin -P datasets/cc_3M/ +wget -nc https://huggingface.co/datasets/unum-cloud/ann-cc-3m/resolve/main/texts.fbin -P datasets/cc_3M/ +wget -nc https://huggingface.co/datasets/unum-cloud/ann-cc-3m/resolve/main/uform_english_images.fbin -P datasets/cc_3M/ +wget -nc https://huggingface.co/datasets/unum-cloud/ann-cc-3m/resolve/main/uform_english_texts.fbin -P datasets/cc_3M/ +rm -rf datasets/cc_3M/*.usearch +python python/scripts/join.py +""" +from numpy import dot +from numpy.linalg import norm +from tqdm import tqdm +from simsimd import cos_f32x4_neon, to_int + +from usearch.index import Index, MetricKind, CompiledMetric, MetricSignature +from usearch.io import load_matrix +from usearch.eval import measure_seconds + +k = 10 +exact = False +batch_size = 1024 * 4 +max_elements = 1000000 + +a_name = "cc_3M/texts" +b_name = "cc_3M/images" + +a_mat = load_matrix(f"datasets/{a_name}.fbin", view=True) +b_mat = load_matrix(f"datasets/{b_name}.fbin", view=True) + +a_mat = a_mat[:max_elements] +b_mat = b_mat[:max_elements] + +print(f"Loaded two datasets of shape: {a_mat.shape}, {b_mat.shape}") +print("--------------------------------------") +print("---------------Indexing---------------") +print("--------------------------------------") + +metric = CompiledMetric( + pointer=to_int(cos_f32x4_neon), + kind=MetricKind.Cos, + signature=MetricSignature.ArrayArraySize, +) + +a = Index( + a_mat.shape[1], + metric=metric, + path=f"datasets/{a_name}.f32.usearch", + dtype="f32", +) +b = Index( + b_mat.shape[1], + metric=metric, + path=f"datasets/{b_name}.f32.usearch", + dtype="f32", +) + +if len(a) != a_mat.shape[0]: + a.clear() + a.add(None, a_mat, log=True, batch_size=batch_size) + a.save() + +if len(b) != b_mat.shape[0]: + b.clear() + b.add(None, b_mat, log=True, batch_size=batch_size) + b.save() + + +print( + f"Loaded two indexes of size: {len(a):,} for {a_name} and {len(b):,} for {b_name}" +) +min_elements = min(len(a), len(b)) + +run_diagnostics = input("Would you like to run diagnostics? [Y/n]: ") +if len(run_diagnostics) == 0 or run_diagnostics.lower() == "y": + print("--------------------------------------") + print("-------------Diagnostics--------------") + print("--------------------------------------") + + mean_similarity = 0.0 + mean_recovered_similarity = 0.0 + + for i in tqdm(range(min_elements), desc="Pairwise Similarity"): + a_vec = a_mat[i] + b_vec = b_mat[i] + cos_similarity = dot(a_vec, b_vec) / (norm(a_vec) * norm(b_vec)) + mean_similarity += cos_similarity + + a_vec = a[i] + b_vec = b[i] + cos_similarity = dot(a_vec, b_vec) / (norm(a_vec) * norm(b_vec)) + mean_recovered_similarity += cos_similarity + + mean_similarity /= min_elements + mean_recovered_similarity /= min_elements + print( + f"Average vector similarity is {mean_similarity:.4f} in original dataset, " + f"and {mean_recovered_similarity:.4f} in recovered state in index" + ) + + dt = measure_seconds + args = dict( + k=k, + batch_size=batch_size, + log=True, + exact=exact, + ) + + secs, a_self_recall = dt(lambda: a.search(a.vectors, **args).recall(a.labels)) + print( + "Self-recall @{} of {} index: {:.2f}%, took {:.2f}s".format( + k, a_name, a_self_recall * 100, secs + ) + ) + + secs, b_self_recall = dt(lambda: b.search(b.vectors, **args).recall(b.labels)) + print( + "Self-recall @{} of {} index: {:.2f}%, took {:.2f}s".format( + k, b_name, b_self_recall * 100, secs + ) + ) + + secs, ab_recall = dt(lambda: b.search(a.vectors, **args).recall(b.labels)) + print( + "Cross-recall @{} of {} in {}: {:.2f}%, took {:.2f}s".format( + k, a_name, b_name, ab_recall * 100, secs + ) + ) + + secs, ba_recall = dt(lambda: a.search(b.vectors, **args).recall(a.labels)) + print( + "Cross-recall @{} of {} in {}: {:.2f}%, took {:.2f}s".format( + k, b_name, a_name, ba_recall * 100, secs + ) + ) + + +print("--------------------------------------") +print("-----------------Join-----------------") +print("--------------------------------------") + +secs, bimapping = measure_seconds(lambda: a.join(b, max_proposals=100)) +mapping_size = len(bimapping) +recall = 0 +for i, j in bimapping.items(): + recall += i == j + +recall *= 100.0 / min_elements +print( + f"Took {secs:.2f}s to find {mapping_size:,} pairings with {recall:.2f}% being exact" +) diff --git a/python/scripts/test.py b/python/scripts/test.py index 750d82f5..a64ce62c 100644 --- a/python/scripts/test.py +++ b/python/scripts/test.py @@ -115,6 +115,11 @@ def test_index( assert matches[0] == 42 assert distances[0] == pytest.approx(0, abs=1e-3) + assert index.max_level >= 0 + assert index.levels_stats.nodes >= 1 + assert index.level_stats(0).nodes == 1 + assert str(index).startswith("usearch.index.Index") + index.save("tmp.usearch") index.clear() assert len(index) == 0 @@ -127,6 +132,10 @@ def test_index( assert len(index) == 1 assert len(index[42]) == ndim + index_copy = index.copy() + assert len(index_copy) == 1 + assert len(index_copy[42]) == ndim + # Cleanup os.remove("tmp.usearch") @@ -157,6 +166,10 @@ def test_index_batch( assert matches.counts.shape[0] == batch_size assert np.all(np.sort(index.labels) == np.sort(labels)) + assert index.max_level >= 0 # TODO: This should be 1 + assert index.levels_stats.nodes >= batch_size + assert index.level_stats(0).nodes == batch_size + index.save("tmp.usearch") index.clear() assert len(index) == 0 @@ -205,6 +218,12 @@ def test_exact_recall( for i in range(batch_size): assert found_labels[i, 0] == i + # Match entries aginst themselves + index_copy: Index = index.copy() + mapping: dict = index.join(index_copy, exact=True) + for man, woman in mapping.items(): + assert man == woman, "Stable marriage failed" + @pytest.mark.parametrize("ndim", dimensions) @pytest.mark.parametrize("batch_size", batch_sizes) diff --git a/python/usearch/eval.py b/python/usearch/eval.py index 9a50973e..cedcbf7a 100644 --- a/python/usearch/eval.py +++ b/python/usearch/eval.py @@ -3,6 +3,7 @@ from typing import Tuple, Any, Callable, Union, Optional, List from dataclasses import dataclass, asdict from collections import defaultdict +from math import ceil import numpy as np @@ -47,7 +48,7 @@ def random_vectors( return x -def recall_members(index: Index, *args, **kwargs) -> float: +def recall_members(index: Index, sample: float = 1, **kwargs) -> float: """Simplest benchmark for a quality of search, which queries every existing member of the index, to make sure approximate search finds the point itself. @@ -59,9 +60,16 @@ def recall_members(index: Index, *args, **kwargs) -> float: """ if len(index) == 0: return 0 + if "k" not in kwargs: + kwargs["k"] = 1 - matches: Matches = index.search(index.vectors, 1, *args, **kwargs) - return matches.recall_first(index.labels) + labels = index.labels + if sample != 1: + labels = np.random.choice(labels, int(ceil(len(labels) * sample))) + + queries = index.get_vectors(labels, index.dtype) + matches: Matches = index.search(queries, **kwargs) + return matches.recall_first(labels) def measure_seconds(f: Callable) -> Tuple[float, Any]: diff --git a/python/usearch/index.py b/python/usearch/index.py index 9e28ff8d..64acde85 100644 --- a/python/usearch/index.py +++ b/python/usearch/index.py @@ -5,12 +5,13 @@ # Python tooling, linters, and static analyzers. It also embeds JIT # into the primary `Index` class, connecting USearch with Numba. import os +import math from typing import Optional, Union, NamedTuple, List, Iterable import numpy as np from tqdm import tqdm -from usearch.compiled import Index as _CompiledIndex, IndexMetadata +from usearch.compiled import Index as _CompiledIndex, IndexMetadata, IndexStats from usearch.compiled import SparseIndex as _CompiledSetsIndex from usearch.compiled import MetricKind, ScalarKind, MetricSignature @@ -141,6 +142,13 @@ def recall_first(self, expected: np.ndarray) -> float: best_matches = self.labels if not self.is_multiple else self.labels[:, 0] return np.sum(best_matches == expected) / len(expected) + def recall(self, expected: np.ndarray) -> float: + assert len(expected) == self.batch_size + recall = 0 + for i in range(self.batch_size): + recall += expected[i] in self.labels[i] + return recall / len(expected) + def __repr__(self) -> str: return ( f"usearch.Matches({self.total_matches})" @@ -347,6 +355,10 @@ def add( count_labels = len(labels) if isinstance(labels, Iterable) else 1 assert count_labels == count_vectors + # If logging is requested, and batch size is undefined, set it to grow 1% at a time: + if log and batch_size == 0: + batch_size = int(math.ceil(count_vectors / 100)) + # Split into batches and log progress, if needed if batch_size: labels = [ @@ -401,13 +413,53 @@ def search( :return: Approximate matches for one or more queries :rtype: Matches """ - tuple_ = self._compiled.search( - vectors, - k, - exact=exact, - threads=threads, - ) - return Matches(*tuple_) + + assert isinstance(vectors, np.ndarray), "Expects a NumPy array" + assert vectors.ndim == 1 or vectors.ndim == 2, "Expects a matrix or vector" + count_vectors = vectors.shape[0] if vectors.ndim == 2 else 1 + + if log and batch_size == 0: + batch_size = int(math.ceil(count_vectors / 100)) + + if batch_size: + tasks = [ + vectors[start_row : start_row + batch_size, :] + for start_row in range(0, count_vectors, batch_size) + ] + tasks_matches = [] + name = log if isinstance(log, str) else "Search" + pbar = tqdm( + tasks, + desc=name, + total=count_vectors, + unit="Vector", + disable=log is False, + ) + for vectors in tasks: + tuple_ = self._compiled.search( + vectors, + k, + exact=exact, + threads=threads, + ) + tasks_matches.append(Matches(*tuple_)) + pbar.update(vectors.shape[0]) + + pbar.close() + return Matches( + labels=np.vstack([m.labels for m in tasks_matches]), + distances=np.vstack([m.distances for m in tasks_matches]), + counts=np.concatenate([m.counts for m in tasks_matches], axis=None), + ) + + else: + tuple_ = self._compiled.search( + vectors, + k, + exact=exact, + threads=threads, + ) + return Matches(*tuple_) @property def specs(self) -> dict: @@ -442,8 +494,8 @@ def ndim(self) -> int: return self._compiled.ndim @property - def metric(self) -> MetricKind: - return self._metric_kind + def metric(self) -> Union[MetricKind, CompiledMetric]: + return self._metric_jit if self._metric_jit else self._metric_kind @property def dtype(self) -> ScalarKind: @@ -501,6 +553,26 @@ def clear(self): def close(self): self._compiled.close() + def copy(self) -> Index: + result = Index( + ndim=self.ndim, + metric=self.metric, + dtype=self.dtype, + connectivity=self.connectivity, + expansion_add=self.expansion_add, + expansion_search=self.expansion_search, + path=self.path, + ) + result._compiled = self._compiled.copy() + return result + + def join(self, other: Index, max_proposals: int = 0, exact: bool = False) -> dict: + return self._compiled.join( + other=other._compiled, + max_proposals=max_proposals, + exact=exact, + ) + @property def labels(self) -> np.ndarray: return self._compiled.labels @@ -530,3 +602,40 @@ def __getitem__(self, label: int) -> np.ndarray: vector = self._compiled.__getitem__(label, get_dtype) view_dtype = _to_numpy_dtype(dtype) return None if vector is None else vector.view(view_dtype) + + def remove(self, label: int): + pass + + @property + def max_level(self) -> int: + return self._compiled.max_level + + @property + def levels_stats(self) -> IndexStats: + return self._compiled.levels_stats + + def level_stats(self, level: int) -> IndexStats: + return self._compiled.level_stats(level) + + def __repr__(self) -> str: + return f"usearch.index.Index({self.dtype} x {self.ndim}, {self.metric}, expansion: {self.expansion_add} & {self.expansion_search}, {len(self)} vectors across {self.max_level+1} levels)" + + def _repr_pretty_(self) -> str: + level_stats = [ + f"--- {i}. {self.level_stats(i)} nodes" for i in range(self.max_level) + ] + return "\n".join( + [ + "usearch.index.Index", + "- config" f"-- data type: {self.dtype}", + f"-- dimensions: {self.ndim}", + f"-- metric: {self.metric}", + f"-- expansion on addition:{self.expansion_add} candidates", + f"-- expansion on search: {self.expansion_search} candidates", + "- state", + f"-- size: {self.size} vectors", + f"-- memory usage: {self.memory_usage} bytes", + f"-- max level: {self.max_level}", + *level_stats, + ] + ) diff --git a/python/usearch/join.py b/python/usearch/join.py deleted file mode 100644 index b58fec9d..00000000 --- a/python/usearch/join.py +++ /dev/null @@ -1,98 +0,0 @@ -from collections import deque - -import numpy as np -from numba import njit - -from index import Index, Matches, Label - - -class bidict(dict): - def __init__(self, *args, **kwargs): - super(bidict, self).__init__(*args, **kwargs) - self.inverse = {} - for key, value in self.items(): - self.inverse.setdefault(value, []).append(key) - - def __setitem__(self, key, value): - if key in self: - self.inverse[self[key]].remove(key) - super(bidict, self).__setitem__(key, value) - self.inverse.setdefault(value, []).append(key) - - def __delitem__(self, key): - self.inverse.setdefault(self[key], []).remove(key) - if self[key] in self.inverse and not self.inverse[self[key]]: - del self.inverse[self[key]] - super(bidict, self).__delitem__(key) - - -@njit -def index_in_array(array: np.ndarray, item: Label) -> int: - for idx, val in np.ndenumerate(array): - if val == item: - return idx - - return len(array) - - -def semantic_join(a: Index, b: Index, resolution: int = 10) -> bidict: - """Performs a Semantic Join, matching one entry from `a` with - one entry from `b`, converging towards a Stable Marriage. - Assuming the collections can be different in size, classical solution - doesn't provide - - :param resolution: Approximate matches per member to consider, defaults to 10 - :type resolution: int, optional - :return: Bidirectional mapping from men to women and `.inverse` - :rtype: bidict - """ - - men, women = (a, b) if len(a) > len(b) else (b, a) - matches = bidict() - - men_vectors = np.vstack(men[i] for i in range(len(men))) - women_vectors = np.vstack(women[i] for i in range(len(women))) - - man_count_proposed = np.zeros(len(men)) - man_to_women_preferences: Matches = men.search(women_vectors, resolution) - woman_to_men_preferences: Matches = women.search(men_vectors, resolution) - - # A nice optimization is to prioritize free men by the quality of their - # best remaining variant. TODO: Replace with `heapq` - free_men = deque(range(len(men))) - - idle_cycles: int = 0 - hopeless_men_count = len(men) - len(women) - while len(free_men) > hopeless_men_count: - # In the worst case scenario we may need to match more candidates for every - # remaining man. This, however, may drastically increase the runtime. - if len(free_men) == idle_cycles: - break - - man: Label = free_men.popleft() - count_proposals = man_count_proposed[man] - if count_proposals == man_to_women_preferences.counts[man]: - free_men.append(man) - idle_cycles += 1 - continue - - woman: Label = man_to_women_preferences.labels[man, count_proposals] - man_count_proposed[man] += 1 - - if woman not in matches.inverse: - matches[man] = woman - - else: - her_preferences = woman_to_men_preferences.labels[woman, :] - husband: Label = matches.inverse[woman] - husband_rank = index_in_array(her_preferences, husband) - challenger_rank = index_in_array(her_preferences, man) - if challenger_rank < husband_rank: - del matches[husband] - free_men.append(husband) - matches[man] = woman - - else: - free_men.append(man) - - return matches