Skip to content

Commit 7558379

Browse files
committed
[ENH]: Load HNSW index without disk intermediary
1 parent 046c2cc commit 7558379

File tree

5 files changed

+105
-25
lines changed

5 files changed

+105
-25
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ bytemuck = "1.21.0"
9292
rayon = "1.10.0"
9393
validator = { version = "0.19", features = ["derive"] }
9494
rust-embed = { version = "8.5.0", features = ["include-exclude", "debug-embed"] }
95-
hnswlib = { version = "0.8.1", git = "https://github.com/chroma-core/hnswlib.git" }
95+
hnswlib = { git = "https://github.com/chroma-core/hnswlib.git", rev = "736bfd15d05843e6cc6ac3b806edd21566821be" }
9696
reqwest = { version = "0.12.9", features = ["rustls-tls-native-roots", "http2"], default-features = false }
9797
random-port = "0.1.1"
9898
ndarray = { version = "0.16.1", features = ["approx"] }

rust/index/src/hnsw.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,29 @@ impl PersistentIndex<HnswIndexConfig> for HnswIndex {
230230
dimensionality: index_config.dimensionality,
231231
persist_path: path.into(),
232232
ef_search,
233+
hnsw_data: hnswlib::HnswData::default(),
234+
})
235+
.map_err(|e| WrappedHnswInitError::Other(e).boxed())?;
236+
237+
Ok(HnswIndex {
238+
index,
239+
id,
240+
distance_function: index_config.distance_function.clone(),
241+
})
242+
}
243+
244+
fn load_from_hnsw_data(
245+
hnsw_data: hnswlib::HnswData,
246+
index_config: &IndexConfig,
247+
ef_search: usize,
248+
id: IndexUuid,
249+
) -> Result<Self, Box<dyn ChromaError>> {
250+
let index = hnswlib::HnswIndex::load_from_hnsw_data(hnswlib::HnswIndexLoadConfig {
251+
distance_function: map_distance_function(index_config.distance_function.clone()),
252+
dimensionality: index_config.dimensionality,
253+
persist_path: "".into(),
254+
ef_search,
255+
hnsw_data: hnsw_data,
233256
})
234257
.map_err(|e| WrappedHnswInitError::Other(e).boxed())?;
235258

rust/index/src/hnsw_provider.rs

Lines changed: 71 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -207,20 +207,27 @@ impl HnswIndexProvider {
207207

208208
let index_config = IndexConfig::new(dimensionality, distance_function);
209209

210-
let storage_path_str = match new_storage_path.to_str() {
211-
Some(storage_path_str) => storage_path_str,
212-
None => {
213-
return Err(Box::new(HnswIndexProviderForkError::PathToStringError(
214-
new_storage_path,
215-
)));
216-
}
217-
};
210+
// let storage_path_str = match new_storage_path.to_str() {
211+
// Some(storage_path_str) => storage_path_str,
212+
// None => {
213+
// return Err(Box::new(HnswIndexProviderForkError::PathToStringError(
214+
// new_storage_path,
215+
// )));
216+
// }
217+
// };
218218

219219
// Check if the entry is in the cache, if it is, we assume
220220
// another thread has loaded the index and we return it.
221221
match self.get(&new_id, cache_key).await {
222222
Some(index) => Ok(index.clone()),
223-
None => match HnswIndex::load(storage_path_str, &index_config, ef_search, new_id) {
223+
None => match HnswIndex::load_from_hnsw_data(
224+
self.fetch_hnsw_segment(&new_id, prefix_path)
225+
.await
226+
.map_err(|e| Box::new(HnswIndexProviderForkError::FileError(*e)))?,
227+
&index_config,
228+
ef_search,
229+
new_id,
230+
) {
224231
Ok(index) => {
225232
let index = HnswIndexRef {
226233
inner: Arc::new(RwLock::new(DistributedHnswInner {
@@ -277,10 +284,33 @@ impl HnswIndexProvider {
277284
prefix_path: &str,
278285
) -> Result<(), Box<HnswIndexProviderFileError>> {
279286
// Fetch the files from storage and put them in the index storage path.
287+
let hnsw_data = self.fetch_hnsw_segment(source_id, prefix_path).await?;
288+
let getters = [
289+
|hnsw_data: &hnswlib::HnswData| Arc::new(Vec::from(hnsw_data.header_buffer())),
290+
|hnsw_data: &hnswlib::HnswData| Arc::new(Vec::from(hnsw_data.data_level0_buffer())),
291+
|hnsw_data: &hnswlib::HnswData| Arc::new(Vec::from(hnsw_data.length_buffer())),
292+
|hnsw_data: &hnswlib::HnswData| Arc::new(Vec::from(hnsw_data.link_list_buffer())),
293+
];
294+
295+
for (file, getter) in FILES.iter().zip(getters) {
296+
let file_path = index_storage_path.join(file);
297+
self.copy_bytes_to_local_file(&file_path, getter(&hnsw_data))
298+
.await?;
299+
}
300+
Ok(())
301+
}
302+
303+
async fn fetch_hnsw_segment(
304+
&self,
305+
source_id: &IndexUuid,
306+
prefix_path: &str,
307+
) -> Result<hnswlib::HnswData, Box<HnswIndexProviderFileError>> {
308+
let mut buffers = Vec::new();
309+
280310
for file in FILES.iter() {
281311
let s3_fetch_span =
282312
tracing::trace_span!(parent: Span::current(), "Read bytes from s3", file = file);
283-
let buf = s3_fetch_span
313+
let _ = s3_fetch_span
284314
.in_scope(|| async {
285315
let key = Self::format_key(prefix_path, source_id, file);
286316
tracing::info!("Loading hnsw index file: {} into directory", key);
@@ -304,13 +334,24 @@ impl HnswIndexProvider {
304334
bytes_read,
305335
key,
306336
);
307-
Ok(buf)
337+
buffers.push(buf);
338+
Ok(())
308339
})
309340
.await?;
310-
let file_path = index_storage_path.join(file);
311-
self.copy_bytes_to_local_file(&file_path, buf).await?;
312341
}
313-
Ok(())
342+
match hnswlib::HnswData::new_from_buffers(
343+
buffers[0].clone(),
344+
buffers[1].clone(),
345+
buffers[2].clone(),
346+
buffers[3].clone(),
347+
) {
348+
Ok(hnsw_data) => Ok(hnsw_data),
349+
Err(e) => Err(Box::new(HnswIndexProviderFileError::StorageError(
350+
chroma_storage::StorageError::Message {
351+
message: e.to_string(),
352+
},
353+
))),
354+
}
314355
}
315356

316357
pub async fn open(
@@ -356,20 +397,27 @@ impl HnswIndexProvider {
356397

357398
let index_config = IndexConfig::new(dimensionality, distance_function);
358399

359-
let index_storage_path_str = match index_storage_path.to_str() {
360-
Some(index_storage_path_str) => index_storage_path_str,
361-
None => {
362-
return Err(Box::new(HnswIndexProviderOpenError::PathToStringError(
363-
index_storage_path,
364-
)));
365-
}
366-
};
400+
// let index_storage_path_str = match index_storage_path.to_str() {
401+
// Some(index_storage_path_str) => index_storage_path_str,
402+
// None => {
403+
// return Err(Box::new(HnswIndexProviderOpenError::PathToStringError(
404+
// index_storage_path,
405+
// )));
406+
// }
407+
// };
367408

368409
// Check if the entry is in the cache, if it is, we assume
369410
// another thread has loaded the index and we return it.
370411
let index = match self.get(id, cache_key).await {
371412
Some(index) => Ok(index.clone()),
372-
None => match HnswIndex::load(index_storage_path_str, &index_config, ef_search, *id) {
413+
None => match HnswIndex::load_from_hnsw_data(
414+
self.fetch_hnsw_segment(id, prefix_path)
415+
.await
416+
.map_err(|e| Box::new(HnswIndexProviderOpenError::FileError(*e)))?,
417+
&index_config,
418+
ef_search,
419+
*id,
420+
) {
373421
Ok(index) => {
374422
let index = HnswIndexRef {
375423
inner: Arc::new(RwLock::new(DistributedHnswInner {

rust/index/src/types.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,15 @@ pub trait PersistentIndex<C>: Index<C> {
6767
) -> Result<Self, Box<dyn ChromaError>>
6868
where
6969
Self: Sized;
70+
71+
fn load_from_hnsw_data(
72+
hnsw_data: hnswlib::HnswData,
73+
index_config: &IndexConfig,
74+
ef_search: usize,
75+
id: IndexUuid,
76+
) -> Result<Self, Box<dyn ChromaError>>
77+
where
78+
Self: Sized;
7079
}
7180

7281
/// IndexUuid is a wrapper around Uuid to provide a type for the index id.

0 commit comments

Comments
 (0)