Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,123 @@
package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache;

import org.apache.jackrabbit.oak.commons.Buffer;
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import org.apache.jackrabbit.oak.segment.data.SegmentData;
import org.apache.jackrabbit.oak.segment.file.tar.SegmentGraph;
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CachingSegmentArchiveReader implements SegmentArchiveReader {

private static final Logger LOG = LoggerFactory.getLogger(CachingSegmentArchiveReader.class);


@NotNull
private final PersistentCache persistentCache;

@NotNull
private final SegmentArchiveReader delegate;

private final ExecutorService prefetchExecutor;
private final Set<UUID> inFlightPrefetch =
Collections.newSetFromMap(
new ConcurrentHashMap<>());
private final boolean prefetchEnabled;
private final int prefetchMaxRefs;



public CachingSegmentArchiveReader(
@NotNull PersistentCache persistentCache,
@NotNull SegmentArchiveReader delegate) {
this.persistentCache = persistentCache;
this.delegate = delegate;
int threads = Integer.getInteger("oak.segment.cache.threads", 10);
this.prefetchEnabled = Boolean.getBoolean("oak.segment.cache.prefetch.enabled");
this.prefetchMaxRefs = Integer.getInteger("oak.segment.cache.prefetch.maxRefs", 20);
this.prefetchExecutor = Executors.newFixedThreadPool(threads);
}

@Override
@Nullable
public Buffer readSegment(long msb, long lsb) throws IOException {
return persistentCache.readSegment(msb, lsb, () -> delegate.readSegment(msb, lsb));
Buffer buf = persistentCache.readSegment(msb, lsb, () -> delegate.readSegment(msb, lsb));
if (buf != null && prefetchEnabled) {
schedulePrefetch(msb, lsb, buf);
}
return buf;
}

private List<UUID> extractReferences(Buffer buffer) {
var data = SegmentData.newSegmentData(buffer);
int refs = data.getSegmentReferencesCount();
ArrayList<UUID> out = new ArrayList<>(refs);
for (int i = 0; i < refs; i++) {
out.add(new UUID(data.getSegmentReferenceMsb(i), data.getSegmentReferenceLsb(i)));
}
return out;
}

private void schedulePrefetch(long msb, long lsb, Buffer buffer) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nfsantos made a good remark that this method, from the start, should be executed in a separate thread context, so it does not impact execution time for the thread invoking CachingSegmentArchiveReader#readSegment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, my tests with #2519, which implements a similar mechanism, but on a different level of abstraction, showed that dispatching from the caller thread hurts performance.

I even opted to trigger preloading only from within the load-callback, so preloading is completely off the critical path for cache hits.

Of course that's a trade-off, as it requires the caller thread to load one segment before any preloading happens.

try {
List<UUID> refs = extractReferences(buffer);
int limit = Math.min(refs.size(), prefetchMaxRefs);
for (int i = 0; i < limit; i++) {
Comment on lines +95 to +97
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are getting a list with all the references but then potentially iterate over only a subset of them. You could save some work by extracting only the references that will be prefetched, this could also avoid allocating an array with all the refs. Using streams with limit()may be an easy way of implementing this optimization, as streams are computed lazily.

final UUID ref = refs.get(i);
final long rMsb = ref.getMostSignificantBits();
final long rLsb = ref.getLeastSignificantBits();

// Skip if already present in cache
if (persistentCache.containsSegment(rMsb, rLsb)) {
continue;
}
Comment on lines +103 to +105
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's better to do this check in the worker thread, just before the segment is downloaded? The task that is scheduled to download the segment might not execute for a while, if the worker pool is busy, so from this point until the actual download, the segment might have been added to the cache. Or we can leave the check here and do another one before trying to download.
It would also be better to have a mechanism similar to the Guava loading cache, where thread requesting a segment that is not in the cache but is being downloaded will block waiting for the first download to complete, instead of starting a new download. This would avoid duplicate downloads.


// Drop prefetch if already in progress for this segment
boolean registered = inFlightPrefetch.add(ref);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice idea! I missed this trick so far in #2519 🙂

if (!registered) {
continue;
}

try {
prefetchExecutor.execute(() -> {
try {
Buffer b = delegate.readSegment(rMsb, rLsb);
if (b != null) {
// Double-check cache before write to avoid redundant writes
if (!persistentCache.containsSegment(rMsb, rLsb)) {
persistentCache.writeSegment(rMsb, rLsb, b);
}
}
} catch (Exception e) {
LOG.debug("Prefetch failed for segment {}", new java.util.UUID(rMsb, rLsb), e);
} finally {
inFlightPrefetch.remove(ref);
}
});
} catch (Throwable t) {
// If task submission failed (e.g., executor shutting down), undo the registration
inFlightPrefetch.remove(ref);
LOG.debug("Prefetch submission failed for segment {}", new java.util.UUID(rMsb, rLsb), t);

}
}
} catch (Throwable t) {
LOG.debug("Prefetch scheduling failed for segment {}", new java.util.UUID(msb, lsb), t);
}
}

@Override
Expand Down Expand Up @@ -88,6 +177,7 @@ public String getName() {
@Override
public void close() throws IOException {
delegate.close();
new ExecutorCloser(prefetchExecutor).close();
}

@Override
Expand Down
Loading
Loading