Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrency optimization for graph native loading update #2441

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add binary index support for Lucene engine. (#2292)[https://github.com/opensearch-project/k-NN/pull/2292]
- Add expand_nested_docs Parameter support to NMSLIB engine (#2331)[https://github.com/opensearch-project/k-NN/pull/2331]
- Add cosine similarity support for faiss engine (#2376)[https://github.com/opensearch-project/k-NN/pull/2376]
- Add concurrency optimizations with native memory graph loading and force eviction (#2265) [https://github.com/opensearch-project/k-NN/pull/2345]

### Enhancements
- Introduced a writing layer in native engines where relies on the writing interface to process IO. (#2241)[https://github.com/opensearch-project/k-NN/pull/2241]
- Allow method parameter override for training based indices (#2290) https://github.com/opensearch-project/k-NN/pull/2290]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

/**
* Manages native memory allocations made by JNI.
Expand All @@ -56,6 +58,7 @@ public class NativeMemoryCacheManager implements Closeable {

private Cache<String, NativeMemoryAllocation> cache;
private Deque<String> accessRecencyQueue;
private final ConcurrentHashMap<String, ReentrantLock> indexLocks = new ConcurrentHashMap<>();
private final ExecutorService executor;
private AtomicBoolean cacheCapacityReached;
private long maxWeight;
Expand Down Expand Up @@ -297,6 +300,55 @@ public CacheStats getCacheStats() {
return cache.stats();
}

/**
* Opens a vector index with proper locking mechanism to ensure thread safety.
* The method uses a ReentrantLock to synchronize access to the index file and
* cleans up the lock when no other threads are waiting.
*
* @param key the unique identifier for the index
* @param nativeMemoryEntryContext the context containing vector index information
*/
private void openIndex(String key, NativeMemoryEntryContext nativeMemoryEntryContext) {
ReentrantLock indexFileLock = indexLocks.computeIfAbsent(key, k -> new ReentrantLock());
try {
indexFileLock.lock();
nativeMemoryEntryContext.openVectorIndex();
} finally {
indexFileLock.unlock();
if (!indexFileLock.hasQueuedThreads()) {
indexLocks.remove(key, indexFileLock);
}
}
}

/**
* Retrieves an entry from the cache and updates its access recency if found.
* This method combines cache access with recency queue management to maintain
* the least recently used (LRU) order of cached entries.
*
* @param key the unique identifier for the cached entry
* @return the cached NativeMemoryAllocation if present, null otherwise
*/
private NativeMemoryAllocation getFromCacheAndUpdateRecency(String key) {
NativeMemoryAllocation result = cache.getIfPresent(key);
if (result != null) {
updateAccessRecency(key);
}
return result;
}

/**
* Updates the access recency of a cached entry by moving it to the end of the queue.
* This method maintains the least recently used (LRU) order by removing the entry
* from its current position and adding it to the end of the queue.
*
* @param key the unique identifier for the cached entry whose recency needs to be updated
*/
private void updateAccessRecency(String key) {
accessRecencyQueue.remove(key);
accessRecencyQueue.addLast(key);
}

/**
* Retrieves NativeMemoryAllocation associated with the nativeMemoryEntryContext.
*
Expand Down Expand Up @@ -329,23 +381,28 @@ public NativeMemoryAllocation get(NativeMemoryEntryContext<?> nativeMemoryEntryC
// In case of a cache miss, least recently accessed entries are evicted in a blocking manner
// before the new entry can be added to the cache.
String key = nativeMemoryEntryContext.getKey();
NativeMemoryAllocation result = cache.getIfPresent(key);

// Cache Hit
// In case of a cache hit, moving the item to the end of the recency queue adds
// some overhead to the get operation. This can be optimized further to make this operation
// as lightweight as possible. Multiple approaches and their outcomes were documented
// before moving forward with the current solution.
// The details are outlined here: https://github.com/opensearch-project/k-NN/pull/2015#issuecomment-2327064680
NativeMemoryAllocation result = getFromCacheAndUpdateRecency(key);
if (result != null) {
accessRecencyQueue.remove(key);
accessRecencyQueue.addLast(key);
return result;
}

// Cache Miss
// Evict before put
// open the graph file before proceeding to load the graph into memory
openIndex(key, nativeMemoryEntryContext);
synchronized (this) {
// recheck if another thread already loaded this entry into the cache
result = getFromCacheAndUpdateRecency(key);
if (result != null) {
return result;
}
if (getCacheSizeInKilobytes() + nativeMemoryEntryContext.calculateSizeInKB() >= maxWeight) {
Iterator<String> lruIterator = accessRecencyQueue.iterator();
while (lruIterator.hasNext()
Expand All @@ -367,7 +424,12 @@ public NativeMemoryAllocation get(NativeMemoryEntryContext<?> nativeMemoryEntryC
return result;
}
} else {
return cache.get(nativeMemoryEntryContext.getKey(), nativeMemoryEntryContext::load);
// open graphFile before load
try (nativeMemoryEntryContext) {
String key = nativeMemoryEntryContext.getKey();
openIndex(key, nativeMemoryEntryContext);
return cache.get(key, nativeMemoryEntryContext::load);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@
package org.opensearch.knn.index.memory;

import lombok.Getter;
import lombok.extern.log4j.Log4j2;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper;
import org.opensearch.knn.index.engine.qframe.QuantizationConfig;
import org.opensearch.knn.index.VectorDataType;
import org.opensearch.knn.index.store.IndexInputWithBuffer;

import java.io.IOException;
import java.util.Map;
Expand All @@ -26,7 +30,7 @@
/**
* Encapsulates all information needed to load a component into native memory.
*/
public abstract class NativeMemoryEntryContext<T extends NativeMemoryAllocation> {
public abstract class NativeMemoryEntryContext<T extends NativeMemoryAllocation> implements AutoCloseable {

protected final String key;

Expand Down Expand Up @@ -55,13 +59,27 @@ public String getKey() {
*/
public abstract Integer calculateSizeInKB();

/**
* Opens the graph file by opening the corresponding indexInput so
* that it is available for graph loading
*/

public void openVectorIndex() {}

/**
* Provides the capability to close the closable objects in the {@link NativeMemoryEntryContext}
*/
@Override
public void close() {}

/**
* Loads entry into memory.
*
* @return NativeMemoryAllocation associated with NativeMemoryEntryContext
*/
public abstract T load() throws IOException;

@Log4j2
public static class IndexEntryContext extends NativeMemoryEntryContext<NativeMemoryAllocation.IndexAllocation> {

@Getter
Expand All @@ -75,6 +93,17 @@ public static class IndexEntryContext extends NativeMemoryEntryContext<NativeMem
@Getter
private final String modelId;

@Getter
private boolean indexGraphFileOpened = false;
@Getter
private int indexSizeKb;

@Getter
private IndexInput readStream;

@Getter
IndexInputWithBuffer indexInputWithBuffer;

/**
* Constructor
*
Expand Down Expand Up @@ -131,10 +160,61 @@ public Integer calculateSizeInKB() {
}
}

@Override
public void openVectorIndex() {
// if graph file is already opened for index, do nothing
if (isIndexGraphFileOpened()) {
return;
}
// Extract vector file name from the given cache key.
// Ex: _0_165_my_field.faiss@1vaqiupVUwvkXAG4Qc/RPg==
final String cacheKey = this.getKey();
final String vectorFileName = NativeMemoryCacheKeyHelper.extractVectorIndexFileName(cacheKey);
if (vectorFileName == null) {
throw new IllegalStateException(
"Invalid cache key was given. The key [" + cacheKey + "] does not contain the corresponding vector file name."
);
}

// Prepare for opening index input from directory.
final Directory directory = this.getDirectory();

// Try to open an index input then pass it down to native engine for loading an index.
try {
indexSizeKb = Math.toIntExact(directory.fileLength(vectorFileName) / 1024);
readStream = directory.openInput(vectorFileName, IOContext.READONCE);
readStream.seek(0);
indexInputWithBuffer = new IndexInputWithBuffer(readStream);
indexGraphFileOpened = true;
log.debug("[KNN] NativeMemoryCacheManager openVectorIndex successful");
} catch (IOException e) {
throw new RuntimeException("Failed to openVectorIndex the index " + openSearchIndexName);
}
}

@Override
public NativeMemoryAllocation.IndexAllocation load() throws IOException {
if (!isIndexGraphFileOpened()) {
throw new IllegalStateException("Index graph file is not open");
}
return indexLoadStrategy.load(this);
}

// close the indexInput
@Override
public void close() {
if (readStream != null) {
try {
readStream.close();
indexGraphFileOpened = false;
} catch (IOException e) {
throw new RuntimeException(
"Exception while closing the indexInput index [" + openSearchIndexName + "] for loading the graph file.",
e
);
}
}
}
}

public static class TrainingDataEntryContext extends NativeMemoryEntryContext<NativeMemoryAllocation.TrainingDataAllocation> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,9 @@

import lombok.extern.log4j.Log4j2;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.opensearch.core.action.ActionListener;
import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper;
import org.opensearch.knn.index.engine.qframe.QuantizationConfig;
import org.opensearch.knn.index.store.IndexInputWithBuffer;
import org.opensearch.knn.index.util.IndexUtil;
import org.opensearch.knn.jni.JNIService;
import org.opensearch.knn.index.engine.KNNEngine;
Expand Down Expand Up @@ -88,10 +85,16 @@ public NativeMemoryAllocation.IndexAllocation load(NativeMemoryEntryContext.Inde
final int indexSizeKb = Math.toIntExact(directory.fileLength(vectorFileName) / 1024);

// Try to open an index input then pass it down to native engine for loading an index.
try (IndexInput readStream = directory.openInput(vectorFileName, IOContext.READONCE)) {
final IndexInputWithBuffer indexInputWithBuffer = new IndexInputWithBuffer(readStream);
final long indexAddress = JNIService.loadIndex(indexInputWithBuffer, indexEntryContext.getParameters(), knnEngine);

// openVectorIndex takes care of opening the indexInput file
if (!indexEntryContext.isIndexGraphFileOpened()) {
throw new IllegalStateException("Index [" + indexEntryContext.getOpenSearchIndexName() + "] is not preloaded");
}
try (indexEntryContext) {
final long indexAddress = JNIService.loadIndex(
indexEntryContext.indexInputWithBuffer,
indexEntryContext.getParameters(),
knnEngine
);
return createIndexAllocation(indexEntryContext, knnEngine, indexAddress, indexSizeKb, vectorFileName);
}
}
Expand Down
Loading
Loading