Skip to content

Commit

Permalink
OAK-11131 - indexing-job: AOT Blob downloader may download blobs that…
Browse files Browse the repository at this point in the history
… are not needed for the indexes (#1738)
  • Loading branch information
nfsantos authored Sep 26, 2024
1 parent 2398540 commit 9d00dc8
Show file tree
Hide file tree
Showing 10 changed files with 366 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.plugins.document.NodeDocument;

import static java.util.Objects.requireNonNull;

/**
* Many methods in this class call themselves recursively, and are susceptible to infinite recursion if a composite
* indexer contains itself, directly or indirectly. In this case, the methods will throw a StackOverflowException.
*/
public class CompositeIndexer implements NodeStateIndexer {

private final List<NodeStateIndexer> indexers;
Expand Down Expand Up @@ -86,6 +91,11 @@ public Set<String> getRelativeIndexedNodeNames() {
return result;
}

@Override
public String getIndexName() {
return indexers.stream().map(NodeStateIndexer::getIndexName).collect(Collectors.joining(",", "CompositeIndexer[", "]"));
}

@Override
public void close() throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,21 @@

package org.apache.jackrabbit.oak.index.indexer.document;

import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.plugins.document.NodeDocument;

import java.io.Closeable;
import java.io.IOException;
import java.util.Set;

import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
public interface NodeStateIndexer extends Closeable {

public interface NodeStateIndexer extends Closeable{
default void onIndexingStarting() {
}

default void onIndexingStarting() {}
default String getIndexName() {
return "";
}

boolean shouldInclude(String path);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.jackrabbit.oak.index.IndexHelper;
import org.apache.jackrabbit.oak.index.indexer.document.CompositeIndexer;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateIndexer;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.ConfigHelper;
import org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStore;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -63,44 +64,60 @@ private AheadOfTimeBlobDownloadingFlatFileStore(FlatFileStore ffs, CompositeInde
}

private @NotNull AheadOfTimeBlobDownloader createAheadOfTimeBlobDownloader(CompositeIndexer indexer, IndexHelper indexHelper) {
if (blobPrefetchBinaryNodeSuffix == null || blobPrefetchBinaryNodeSuffix.isEmpty()) {
if (blobPrefetchBinaryNodeSuffix == null || blobPrefetchBinaryNodeSuffix.isBlank()) {
log.info("Ahead of time blob downloader is disabled, no binary node suffix provided");
return AheadOfTimeBlobDownloader.NOOP;
} else if (!isEnabledForIndexes(blobPrefetchEnableForIndexes, indexHelper.getIndexPaths())) {
log.info("Ahead of time blob downloader is disabled, not enabled for indexes: {}", indexHelper.getIndexPaths());
return AheadOfTimeBlobDownloader.NOOP;
} else {
return new DefaultAheadOfTimeBlobDownloader(
blobPrefetchBinaryNodeSuffix,
ffs.getStoreFile(),
ffs.getAlgorithm(),
indexHelper.getGCBlobStore(),
indexer,
nDownloadThreads,
maxPrefetchWindowSize,
maxPrefetchWindowMB);
List<String> enableIndexesPrefix = splitAndTrim(blobPrefetchEnableForIndexes);
List<NodeStateIndexer> enabledIndexers = filterEnabledIndexes(enableIndexesPrefix, indexer.getIndexers());
if (enabledIndexers.isEmpty()) {
log.info("Ahead of time blob downloader is disabled, not enabled for any indexes: {}", indexHelper.getIndexPaths());
return AheadOfTimeBlobDownloader.NOOP;
} else {
return new DefaultAheadOfTimeBlobDownloader(
blobPrefetchBinaryNodeSuffix,
ffs.getStoreFile(),
ffs.getAlgorithm(),
indexHelper.getGCBlobStore(),
enabledIndexers,
nDownloadThreads,
maxPrefetchWindowSize,
maxPrefetchWindowMB);
}
}
}

/**
* Whether blob downloading is needed for the given indexes.
* Returns the indexes for which AOT blob downloading is enabled, that is,
* for which the index name starts with any of the prefixes in the enabledForIndexes list.
*
* @param enabledIndexesPrefixes list of prefixes of the index definitions that benefit from the download
* @param indexers the index paths
* @return the indexers for which AOT blob download is enabled, or empty list if it is not enabled for any
*/
public static <T extends NodeStateIndexer> List<T> filterEnabledIndexes(List<String> enabledIndexesPrefixes, List<T> indexers) {
return indexers.stream()
.filter(indexer -> enabledIndexesPrefixes.stream().anyMatch(prefix -> indexer.getIndexName().startsWith(prefix)))
.collect(Collectors.toList());
}

/**
* Whether blob downloading is needed for any the given indexes.
*
* @param indexesEnabledPrefix the comma-separated list of prefixes of the index
* definitions that benefit from the download
* @param indexPaths the index paths
* @param enableIndexesPrefix list of prefixes of the index definitions that benefit from the download
* @param indexPaths the index paths
* @return true if any of the indexes start with any of the prefixes
*/
public static boolean isEnabledForIndexes(String indexesEnabledPrefix, List<String> indexPaths) {
List<String> enableForIndexes = splitAndTrim(indexesEnabledPrefix);
public static boolean isEnabledForAnyOfIndexes(List<String> enableIndexesPrefix, List<String> indexPaths) {
for (String indexPath : indexPaths) {
if (enableForIndexes.stream().anyMatch(indexPath::startsWith)) {
if (enableIndexesPrefix.stream().anyMatch(indexPath::startsWith)) {
return true;
}
}
return false;
}

private static List<String> splitAndTrim(String str) {
public static List<String> splitAndTrim(String str) {
if (str == null || str.isBlank()) {
return List.of();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.jackrabbit.oak.commons.Compression;
import org.apache.jackrabbit.oak.commons.IOUtils;
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import org.apache.jackrabbit.oak.index.indexer.document.CompositeIndexer;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateIndexer;
import org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreUtils;
import org.apache.jackrabbit.oak.json.JsonDeserializer;
import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
Expand All @@ -43,13 +43,16 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;

/**
* Scans a FlatFileStore for non-inlined blobs in nodes matching a given pattern and downloads them from the blob store.
Expand Down Expand Up @@ -105,7 +108,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) {
private final File ffsPath;
private final Compression algorithm;
private final GarbageCollectableBlobStore blobStore;
private final CompositeIndexer indexer;
private final List<NodeStateIndexer> indexers;

// Statistics
private final LongAdder totalBytesDownloaded = new LongAdder();
Expand All @@ -121,22 +124,22 @@ protected boolean removeEldestEntry(Map.Entry eldest) {
private ArrayList<Future<?>> downloadFutures;
private final int nDownloadThreads;
private final AheadOfTimeBlobDownloaderThrottler throttler;
private volatile long indexerLastKnownPosition;
private volatile long indexerLastKnownPosition = -1;

/**
* @param binaryBlobsPathSuffix Suffix of nodes that are to be considered for AOT download. Any node that does not match this suffix is ignored.
* @param ffsPath Flat file store path.
* @param algorithm Compression algorithm of the flat file store.
* @param blobStore The blob store. This should be the same blob store used by the indexer and its cache should be
* large enough to hold <code>maxPrefetchWindowMB</code> of data.
* @param indexer The indexer, needed to check if a given path should be indexed.
* @param indexers The indexeres for which AOT blob download is enabled.
* @param nDownloadThreads Number of download threads.
* @param maxPrefetchWindowMB Size of the prefetch window, that is, how much data the downlaoder will retrieve ahead of the indexer.
*/
public DefaultAheadOfTimeBlobDownloader(@NotNull String binaryBlobsPathSuffix,
@NotNull File ffsPath, @NotNull Compression algorithm,
@NotNull GarbageCollectableBlobStore blobStore,
@NotNull CompositeIndexer indexer,
@NotNull List<NodeStateIndexer> indexers,
int nDownloadThreads, int maxPrefetchWindowSize, int maxPrefetchWindowMB) {
if (nDownloadThreads < 1) {
throw new IllegalArgumentException("nDownloadThreads must be greater than 0. Was: " + nDownloadThreads);
Expand All @@ -148,10 +151,11 @@ public DefaultAheadOfTimeBlobDownloader(@NotNull String binaryBlobsPathSuffix,
this.ffsPath = ffsPath;
this.algorithm = algorithm;
this.blobStore = blobStore;
this.indexer = indexer;
this.indexers = indexers;
this.nDownloadThreads = nDownloadThreads;
this.throttler = new AheadOfTimeBlobDownloaderThrottler(maxPrefetchWindowSize, maxPrefetchWindowMB * FileUtils.ONE_MB);
LOG.info("Created AheadOfTimeBlobDownloader. downloadThreads: {}, prefetchMB: {}", nDownloadThreads, maxPrefetchWindowMB);
LOG.info("Created AheadOfTimeBlobDownloader. downloadThreads: {}, prefetchMB: {}, enabledIndexes: {}",
nDownloadThreads, maxPrefetchWindowMB, indexers.stream().map(NodeStateIndexer::getIndexName).collect(Collectors.toList()));
}

public void start() {
Expand All @@ -167,6 +171,13 @@ public void start() {
scanFuture = executor.submit(scanTask);
}

public void join() throws ExecutionException, InterruptedException {
scanFuture.get();
for (Future<?> downloadFuture : downloadFutures) {
downloadFuture.get();
}
}

public void updateIndexed(long positionIndexed) {
this.indexerLastKnownPosition = positionIndexed;
throttler.advanceIndexer(positionIndexed);
Expand Down Expand Up @@ -237,7 +248,7 @@ public void run() {
String entryPath = ffsLine.substring(0, pipeIndex);
if (!isCandidatePath(entryPath)) {
doesNotMatchPattern++;
} else if (!indexer.shouldInclude(entryPath)) {
} else if (indexers.stream().noneMatch(indexer -> indexer.shouldInclude(entryPath))) {
notIncludedInIndex++;
} else if (isBehindIndexer(linesScanned)) {
LOG.debug("Skipping blob at position {} because it was already indexed", linesScanned);
Expand Down Expand Up @@ -393,4 +404,22 @@ private String formatDownloaderStats() {
FormattingUtils.formatNanosToSeconds(timeDownloadingNanos));
}
}


public long getBlobsEnqueuedForDownload() {
return blobsEnqueuedForDownload;
}

public long getTotalBlobsDownloaded() {
return totalBlobsDownloaded.sum();
}


public long getLinesScanned() {
return scanTask.linesScanned;
}

public long getNotIncludedInIndex() {
return scanTask.notIncludedInIndex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,9 @@ public IndexStore buildTreeStoreForIndexing(IndexHelper indexHelper, File file)
AheadOfTimeBlobDownloadingFlatFileStore.BLOB_PREFETCH_ENABLE_FOR_INDEXES_PREFIXES, "");
Prefetcher prefetcher = new Prefetcher(prefetchTreeStore, indexingTreeStore);
String blobSuffix = "";
if (AheadOfTimeBlobDownloadingFlatFileStore.isEnabledForIndexes(
blobPrefetchEnableForIndexes, indexHelper.getIndexPaths())) {
List<String> enabledIndexesPrefix = AheadOfTimeBlobDownloadingFlatFileStore.splitAndTrim(blobPrefetchEnableForIndexes);
if (AheadOfTimeBlobDownloadingFlatFileStore.isEnabledForAnyOfIndexes(
enabledIndexesPrefix, indexHelper.getIndexPaths())) {
blobSuffix = ConfigHelper.getSystemPropertyAsString(
AheadOfTimeBlobDownloadingFlatFileStore.BLOB_PREFETCH_BINARY_NODES_SUFFIX, "");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,51 +18,81 @@
*/
package org.apache.jackrabbit.oak.index.indexer.document.flatfile;

import org.apache.jackrabbit.oak.index.indexer.document.NodeStateIndexer;
import org.junit.Test;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.AheadOfTimeBlobDownloadingFlatFileStore.filterEnabledIndexes;
import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.AheadOfTimeBlobDownloadingFlatFileStore.isEnabledForAnyOfIndexes;
import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.AheadOfTimeBlobDownloadingFlatFileStore.splitAndTrim;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;


public class AheadOfTimeBlobDownloadingFlatFileStoreTest {

@Test
public void isEnabledForIndexes() {
assertFalse(AheadOfTimeBlobDownloadingFlatFileStore.isEnabledForIndexes(
"",
List.of("/oak:index/fooA-34")
));

assertTrue(AheadOfTimeBlobDownloadingFlatFileStore.isEnabledForIndexes(
"/oak:index/foo",
List.of("/oak:index/fooA-34")
));

assertTrue(AheadOfTimeBlobDownloadingFlatFileStore.isEnabledForIndexes(
"/oak:index/foo",
List.of("/oak:index/anotherIndex", "/oak:index/fooA-34")
));
public void testIsEnabledForAnyOfIndexes() {
assertFalse(isEnabledForAnyOfIndexes(List.of(), List.of("/oak:index/fooA-34")));
assertTrue(isEnabledForAnyOfIndexes(List.of("/oak:index/foo"), List.of("/oak:index/fooA-34")));
assertTrue(isEnabledForAnyOfIndexes(List.of("/oak:index/foo"), List.of("/oak:index/anotherIndex", "/oak:index/fooA-34")));
assertFalse(isEnabledForAnyOfIndexes(List.of("/oak:index/foo"), List.of("/oak:index/anotherIndex")));
assertTrue(isEnabledForAnyOfIndexes(List.of("/oak:index/fooA-", "/oak:index/fooB-"), List.of("/oak:index/fooA-34")));
assertTrue(isEnabledForAnyOfIndexes(List.of("/oak:index/fooA-", "/oak:index/fooB-"), List.of("/oak:index/anotherIndex", "/oak:index/fooA-34")));
assertFalse(isEnabledForAnyOfIndexes(List.of("/oak:index/fooA-"), List.of()));
}

assertFalse(AheadOfTimeBlobDownloadingFlatFileStore.isEnabledForIndexes(
"/oak:index/foo",
List.of("/oak:index/anotherIndex")
));
@Test
public void testFilterEnabledIndexes() {
doFilterEnabledIndexesTest(List.of(), // expected enabled indexes
List.of(), // prefix of indexes for which it is enabled
List.of("/oak:index/fooA-34" // all indexes
));
doFilterEnabledIndexesTest(List.of("/oak:index/fooA-34"),
List.of("/oak:index"), List.of("/oak:index/fooA-34"));
doFilterEnabledIndexesTest(List.of("/oak:index/fooA-34", "/oak:index/fooB-34"),
List.of("/oak:index"), List.of("/oak:index/fooA-34", "/oak:index/fooB-34"));
doFilterEnabledIndexesTest(List.of("/oak:index/fooA-34", "/oak:index/fooB-34"),
List.of("/oak:index/foo"), List.of("/oak:index/fooA-34", "/oak:index/fooB-34"));
doFilterEnabledIndexesTest(List.of("/oak:index/fooA-34"),
List.of("/oak:index/foo"), List.of("/oak:index/anotherIndex", "/oak:index/fooA-34"));
doFilterEnabledIndexesTest(List.of(),
List.of("/oak:index/foo"), List.of("/oak:index/anotherIndex"));
doFilterEnabledIndexesTest(List.of("/oak:index/fooA-34"),
List.of("/oak:index/fooA-", "/oak:index/fooB-"), List.of("/oak:index/fooA-34"));
doFilterEnabledIndexesTest(List.of("/oak:index/fooA-34"),
List.of("/oak:index/fooA-", "/oak:index/fooB-"), List.of("/oak:index/anotherIndex", "/oak:index/fooA-34"));
doFilterEnabledIndexesTest(List.of(),
List.of("/oak:index/fooA-"), List.of());
}

assertTrue(AheadOfTimeBlobDownloadingFlatFileStore.isEnabledForIndexes(
"/oak:index/fooA-,/oak:index/fooB-",
List.of("/oak:index/fooA-34")
));
@Test
public void testSplitAndTrim() {
assertEquals(List.of(), splitAndTrim(""));
assertEquals(List.of(), splitAndTrim(" "));
assertEquals(List.of("/oak:index/fooA-34"), splitAndTrim("/oak:index/fooA-34"));
assertEquals(List.of("/oak:index/fooA-34"), splitAndTrim("/oak:index/fooA-34,"));
assertEquals(List.of("/oak:index/fooA", "/oak:index/fooA"), splitAndTrim("/oak:index/fooA,/oak:index/fooA"));
assertEquals(List.of("/oak:index/fooA-34", "/oak:index/fooB-34"), splitAndTrim("/oak:index/fooA-34, /oak:index/fooB-34"));
assertEquals(List.of("/oak:index/fooA-34", "/oak:index/fooB-34"), splitAndTrim("/oak:index/fooA-34 , /oak:index/fooB-34"));
}

assertTrue(AheadOfTimeBlobDownloadingFlatFileStore.isEnabledForIndexes(
"/oak:index/fooA-, /oak:index/fooB-",
List.of("/oak:index/anotherIndex", "/oak:index/fooA-34")
));
private void doFilterEnabledIndexesTest(List<String> expectedIndexes, List<String> enabledForIndexes, List<String> indexNames) {
List<TestNodeStateIndexer> indexers = createIndexersWithName(indexNames);
List<TestNodeStateIndexer> enabledIndexers = filterEnabledIndexes(enabledForIndexes, indexers);
assertEquals(
Set.copyOf(expectedIndexes),
enabledIndexers.stream().map(NodeStateIndexer::getIndexName).collect(Collectors.toSet())
);
}

assertFalse(AheadOfTimeBlobDownloadingFlatFileStore.isEnabledForIndexes(
"/oak:index/fooA-",
List.of()
));
private List<TestNodeStateIndexer> createIndexersWithName(List<String> indexNames) {
return indexNames.stream()
.map(name -> new TestNodeStateIndexer(name, List.of()))
.collect(Collectors.toList());
}
}
Loading

0 comments on commit 9d00dc8

Please sign in to comment.