-
Couldn't load subscription status.
- Fork 425
OAK-11932: Segment prefetching for CachingSegmentReader #2513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,34 +19,123 @@ | |
| package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache; | ||
|
|
||
| import org.apache.jackrabbit.oak.commons.Buffer; | ||
| import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser; | ||
| import org.apache.jackrabbit.oak.segment.data.SegmentData; | ||
| import org.apache.jackrabbit.oak.segment.file.tar.SegmentGraph; | ||
| import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry; | ||
| import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader; | ||
| import org.jetbrains.annotations.NotNull; | ||
| import org.jetbrains.annotations.Nullable; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.Set; | ||
| import java.util.UUID; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
|
|
||
| public class CachingSegmentArchiveReader implements SegmentArchiveReader { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(CachingSegmentArchiveReader.class); | ||
|
|
||
|
|
||
| @NotNull | ||
| private final PersistentCache persistentCache; | ||
|
|
||
| @NotNull | ||
| private final SegmentArchiveReader delegate; | ||
|
|
||
| private final ExecutorService prefetchExecutor; | ||
| private final Set<UUID> inFlightPrefetch = | ||
| Collections.newSetFromMap( | ||
| new ConcurrentHashMap<>()); | ||
| private final boolean prefetchEnabled; | ||
| private final int prefetchMaxRefs; | ||
|
|
||
|
|
||
|
|
||
| public CachingSegmentArchiveReader( | ||
| @NotNull PersistentCache persistentCache, | ||
| @NotNull SegmentArchiveReader delegate) { | ||
| this.persistentCache = persistentCache; | ||
| this.delegate = delegate; | ||
| int threads = Integer.getInteger("oak.segment.cache.threads", 10); | ||
| this.prefetchEnabled = Boolean.getBoolean("oak.segment.cache.prefetch.enabled"); | ||
| this.prefetchMaxRefs = Integer.getInteger("oak.segment.cache.prefetch.maxRefs", 20); | ||
| this.prefetchExecutor = Executors.newFixedThreadPool(threads); | ||
| } | ||
|
|
||
| @Override | ||
| @Nullable | ||
| public Buffer readSegment(long msb, long lsb) throws IOException { | ||
| return persistentCache.readSegment(msb, lsb, () -> delegate.readSegment(msb, lsb)); | ||
| Buffer buf = persistentCache.readSegment(msb, lsb, () -> delegate.readSegment(msb, lsb)); | ||
| if (buf != null && prefetchEnabled) { | ||
| schedulePrefetch(msb, lsb, buf); | ||
| } | ||
| return buf; | ||
| } | ||
|
|
||
| private List<UUID> extractReferences(Buffer buffer) { | ||
| var data = SegmentData.newSegmentData(buffer); | ||
| int refs = data.getSegmentReferencesCount(); | ||
| ArrayList<UUID> out = new ArrayList<>(refs); | ||
| for (int i = 0; i < refs; i++) { | ||
| out.add(new UUID(data.getSegmentReferenceMsb(i), data.getSegmentReferenceLsb(i))); | ||
| } | ||
| return out; | ||
| } | ||
|
|
||
| private void schedulePrefetch(long msb, long lsb, Buffer buffer) { | ||
| try { | ||
| List<UUID> refs = extractReferences(buffer); | ||
| int limit = Math.min(refs.size(), prefetchMaxRefs); | ||
| for (int i = 0; i < limit; i++) { | ||
|
Comment on lines
+95
to
+97
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are getting a list with all the references but then potentially iterate over only a subset of them. You could save some work by extracting only the references that will be prefetched, this could also avoid allocating an array with all the refs. Using streams with |
||
| final UUID ref = refs.get(i); | ||
| final long rMsb = ref.getMostSignificantBits(); | ||
| final long rLsb = ref.getLeastSignificantBits(); | ||
|
|
||
| // Skip if already present in cache | ||
| if (persistentCache.containsSegment(rMsb, rLsb)) { | ||
| continue; | ||
| } | ||
|
Comment on lines
+103
to
+105
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe it's better to do this check in the worker thread, just before the segment is downloaded? The task that is scheduled to download the segment might not execute for a while, if the worker pool is busy, so from this point until the actual download, the segment might have been added to the cache. Or we can leave the check here and do another one before trying to download. |
||
|
|
||
| // Drop prefetch if already in progress for this segment | ||
| boolean registered = inFlightPrefetch.add(ref); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice idea! I missed this trick so far in #2519 🙂 |
||
| if (!registered) { | ||
| continue; | ||
| } | ||
|
|
||
| try { | ||
| prefetchExecutor.execute(() -> { | ||
| try { | ||
| Buffer b = delegate.readSegment(rMsb, rLsb); | ||
| if (b != null) { | ||
| // Double-check cache before write to avoid redundant writes | ||
| if (!persistentCache.containsSegment(rMsb, rLsb)) { | ||
| persistentCache.writeSegment(rMsb, rLsb, b); | ||
| } | ||
| } | ||
| } catch (Exception e) { | ||
| LOG.debug("Prefetch failed for segment {}", new java.util.UUID(rMsb, rLsb), e); | ||
| } finally { | ||
| inFlightPrefetch.remove(ref); | ||
| } | ||
| }); | ||
| } catch (Throwable t) { | ||
| // If task submission failed (e.g., executor shutting down), undo the registration | ||
| inFlightPrefetch.remove(ref); | ||
| LOG.debug("Prefetch submission failed for segment {}", new java.util.UUID(rMsb, rLsb), t); | ||
|
|
||
| } | ||
| } | ||
| } catch (Throwable t) { | ||
| LOG.debug("Prefetch scheduling failed for segment {}", new java.util.UUID(msb, lsb), t); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -88,6 +177,7 @@ public String getName() { | |
| @Override | ||
| public void close() throws IOException { | ||
| delegate.close(); | ||
| new ExecutorCloser(prefetchExecutor).close(); | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nfsantos made a good remark that this method, from the start, should be executed in a separate thread context, so it does not impact execution time for the thread invoking CachingSegmentArchiveReader#readSegment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, my tests with #2519, which implements a similar mechanism, but on a different level of abstraction, showed that dispatching from the caller thread hurts performance.
I even opted to trigger preloading only from within the load-callback, so preloading is completely off the critical path for cache hits.
Of course that's a trade-off, as it requires the caller thread to load one segment before any preloading happens.