Skip to content

Commit

Permalink
Use java.lang.ref.Cleaner to close cloned IndexInputs
Browse files Browse the repository at this point in the history
As detailed [in this issue][1], Lucene does not close the cloned
IndexInput instances, so we are using the Cleaner mechanism from the JDK
to close any unclosed clones.

A single static Cleaner instance to ensure any unclosed clone of an
IndexInput is closed. This instance creates a single daemon thread on
which it performs the cleaning actions. For an already-closed
IndexInput, the cleaning action is a no-op. For an open IndexInput, the
close action will decrement a reference count.

[1]: opensearch-project#5243 (comment)

Signed-off-by: Andrew Ross <andrross@amazon.com>
  • Loading branch information
andrross committed Feb 17, 2023
1 parent 1cdff3b commit c55f61e
Show file tree
Hide file tree
Showing 6 changed files with 321 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package org.opensearch.snapshots;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.hamcrest.MatcherAssert;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
Expand All @@ -27,21 +28,23 @@
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.repositories.fs.FsRepository;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.notNullValue;
import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.FS;
import static org.opensearch.common.util.CollectionUtils.iterableAsArrayList;

@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
public final class SearchableSnapshotIT extends AbstractSnapshotIntegTestCase {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@

package org.opensearch.index.store.remote.file;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.RandomAccessInput;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;

import java.io.EOFException;
import java.io.IOException;
import java.lang.ref.Cleaner;
import java.util.Objects;

/**
* Class acts as a virtual file mechanism for the accessed files and only fetches the required blocks of the actual file.
Expand All @@ -25,6 +30,19 @@
* @opensearch.internal
*/
abstract class OnDemandBlockIndexInput extends IndexInput implements RandomAccessInput {
private static final Logger logger = LogManager.getLogger(OnDemandBlockIndexInput.class);

public static final String CLEANER_THREAD_NAME_PREFIX = "index-input-cleaner";

/**
* A single static Cleaner instance to ensure any unclosed clone of an
* IndexInput is closed. This instance creates a single daemon thread on
* which it performs the cleaning actions. For an already-closed IndexInput,
* the cleaning action is a no-op. For an open IndexInput, the close action
* will decrement a reference count.
*/
private static final Cleaner CLEANER = Cleaner.create(OpenSearchExecutors.daemonThreadFactory(CLEANER_THREAD_NAME_PREFIX));

/**
* Start offset of the virtual file : non-zero in the slice case
*/
Expand Down Expand Up @@ -55,16 +73,12 @@ abstract class OnDemandBlockIndexInput extends IndexInput implements RandomAcces
*/
protected final int blockMask;

// Variables for actual held open block
/**
* Current block for read, it should be a cloned block always. In current implementation this will be a FileCachedIndexInput
*/
protected IndexInput currentBlock;

/**
* ID of the current block
*/
protected int currentBlockId;
private int currentBlockId;

private final BlockHolder blockHolder = new BlockHolder();

OnDemandBlockIndexInput(Builder builder) {
super(builder.resourceDescription);
Expand All @@ -74,6 +88,7 @@ abstract class OnDemandBlockIndexInput extends IndexInput implements RandomAcces
this.blockSizeShift = builder.blockSizeShift;
this.blockSize = builder.blockSize;
this.blockMask = builder.blockMask;
CLEANER.register(this, blockHolder::closeQuietly);
}

/**
Expand All @@ -89,16 +104,7 @@ abstract class OnDemandBlockIndexInput extends IndexInput implements RandomAcces
protected abstract IndexInput fetchBlock(int blockId) throws IOException;

@Override
public OnDemandBlockIndexInput clone() {
OnDemandBlockIndexInput clone = buildSlice("clone", offset, length());
// Ensures that clones may be positioned at the same point as the blocked file they were cloned from
if (currentBlock != null) {
clone.currentBlock = currentBlock.clone();
clone.currentBlockId = currentBlockId;
}

return clone;
}
public abstract OnDemandBlockIndexInput clone();

@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
Expand All @@ -123,17 +129,13 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw

@Override
public void close() throws IOException {
// current block
if (currentBlock != null) {
currentBlock.close();
currentBlock = null;
currentBlockId = 0;
}
blockHolder.close();
currentBlockId = 0;
}

@Override
public long getFilePointer() {
if (currentBlock == null) return 0L;
if (blockHolder.block == null) return 0L;
return currentBlockStart() + currentBlockPosition() - offset;
}

Expand All @@ -144,56 +146,56 @@ public long length() {

@Override
public byte readByte() throws IOException {
if (currentBlock == null) {
if (blockHolder.block == null) {
// seek to the beginning
seek(0);
} else if (currentBlockPosition() >= blockSize) {
int blockId = currentBlockId + 1;
demandBlock(blockId);
}
return currentBlock.readByte();
return blockHolder.block.readByte();
}

@Override
public short readShort() throws IOException {
if (currentBlock != null && Short.BYTES <= (blockSize - currentBlockPosition())) {
return currentBlock.readShort();
if (blockHolder.block != null && Short.BYTES <= (blockSize - currentBlockPosition())) {
return blockHolder.block.readShort();
} else {
return super.readShort();
}
}

@Override
public int readInt() throws IOException {
if (currentBlock != null && Integer.BYTES <= (blockSize - currentBlockPosition())) {
return currentBlock.readInt();
if (blockHolder.block != null && Integer.BYTES <= (blockSize - currentBlockPosition())) {
return blockHolder.block.readInt();
} else {
return super.readInt();
}
}

@Override
public long readLong() throws IOException {
if (currentBlock != null && Long.BYTES <= (blockSize - currentBlockPosition())) {
return currentBlock.readLong();
if (blockHolder.block != null && Long.BYTES <= (blockSize - currentBlockPosition())) {
return blockHolder.block.readLong();
} else {
return super.readLong();
}
}

@Override
public final int readVInt() throws IOException {
if (currentBlock != null && 5 <= (blockSize - currentBlockPosition())) {
return currentBlock.readVInt();
if (blockHolder.block != null && 5 <= (blockSize - currentBlockPosition())) {
return blockHolder.block.readVInt();
} else {
return super.readVInt();
}
}

@Override
public final long readVLong() throws IOException {
if (currentBlock != null && 9 <= (blockSize - currentBlockPosition())) {
return currentBlock.readVLong();
if (blockHolder.block != null && 9 <= (blockSize - currentBlockPosition())) {
return blockHolder.block.readVLong();
} else {
return super.readVLong();
}
Expand All @@ -212,24 +214,24 @@ public void seek(long pos) throws IOException {
public final byte readByte(long pos) throws IOException {
// adjust the pos if it's sliced
pos = pos + offset;
if (currentBlock != null && isInCurrentBlockRange(pos)) {
if (blockHolder.block != null && isInCurrentBlockRange(pos)) {
// the block contains the byte
return ((RandomAccessInput) currentBlock).readByte(getBlockOffset(pos));
return ((RandomAccessInput) blockHolder.block).readByte(getBlockOffset(pos));
} else {
// the block does not have the byte, seek to the pos first
seekInternal(pos);
// then read the byte
return currentBlock.readByte();
return blockHolder.block.readByte();
}
}

@Override
public short readShort(long pos) throws IOException {
// adjust the pos if it's sliced
pos = pos + offset;
if (currentBlock != null && isInCurrentBlockRange(pos, Short.BYTES)) {
if (blockHolder.block != null && isInCurrentBlockRange(pos, Short.BYTES)) {
// the block contains enough data to satisfy this request
return ((RandomAccessInput) currentBlock).readShort(getBlockOffset(pos));
return ((RandomAccessInput) blockHolder.block).readShort(getBlockOffset(pos));
} else {
// the block does not have enough data, seek to the pos first
seekInternal(pos);
Expand All @@ -242,9 +244,9 @@ public short readShort(long pos) throws IOException {
public int readInt(long pos) throws IOException {
// adjust the pos if it's sliced
pos = pos + offset;
if (currentBlock != null && isInCurrentBlockRange(pos, Integer.BYTES)) {
if (blockHolder.block != null && isInCurrentBlockRange(pos, Integer.BYTES)) {
// the block contains enough data to satisfy this request
return ((RandomAccessInput) currentBlock).readInt(getBlockOffset(pos));
return ((RandomAccessInput) blockHolder.block).readInt(getBlockOffset(pos));
} else {
// the block does not have enough data, seek to the pos first
seekInternal(pos);
Expand All @@ -257,9 +259,9 @@ public int readInt(long pos) throws IOException {
public long readLong(long pos) throws IOException {
// adjust the pos if it's sliced
pos = pos + offset;
if (currentBlock != null && isInCurrentBlockRange(pos, Long.BYTES)) {
if (blockHolder.block != null && isInCurrentBlockRange(pos, Long.BYTES)) {
// the block contains enough data to satisfy this request
return ((RandomAccessInput) currentBlock).readLong(getBlockOffset(pos));
return ((RandomAccessInput) blockHolder.block).readLong(getBlockOffset(pos));
} else {
// the block does not have enough data, seek to the pos first
seekInternal(pos);
Expand All @@ -270,19 +272,19 @@ public long readLong(long pos) throws IOException {

@Override
public final void readBytes(byte[] b, int offset, int len) throws IOException {
if (currentBlock == null) {
if (blockHolder.block == null) {
// lazy seek to the beginning
seek(0);
}

int available = blockSize - currentBlockPosition();
if (len <= available) {
// the block contains enough data to satisfy this request
currentBlock.readBytes(b, offset, len);
blockHolder.block.readBytes(b, offset, len);
} else {
// the block does not have enough data. First serve all we've got.
if (available > 0) {
currentBlock.readBytes(b, offset, available);
blockHolder.block.readBytes(b, offset, available);
offset += available;
len -= available;
}
Expand All @@ -293,7 +295,7 @@ public final void readBytes(byte[] b, int offset, int len) throws IOException {
int blockId = currentBlockId + 1;
int toRead = Math.min(len, blockSize);
demandBlock(blockId);
currentBlock.readBytes(b, offset, toRead);
blockHolder.block.readBytes(b, offset, toRead);
offset += toRead;
len -= toRead;
}
Expand All @@ -306,10 +308,10 @@ public final void readBytes(byte[] b, int offset, int len) throws IOException {
* NOTE: the pos should be an adjusted position for slices
*/
private void seekInternal(long pos) throws IOException {
if (currentBlock == null || !isInCurrentBlockRange(pos)) {
if (blockHolder.block == null || !isInCurrentBlockRange(pos)) {
demandBlock(getBlock(pos));
}
currentBlock.seek(getBlockOffset(pos));
blockHolder.block.seek(getBlockOffset(pos));
}

/**
Expand All @@ -331,17 +333,22 @@ private boolean isInCurrentBlockRange(long pos, int len) {
}

private void demandBlock(int blockId) throws IOException {
if (currentBlock != null && currentBlockId == blockId) return;
if (blockHolder.block != null && currentBlockId == blockId) return;

// close the current block before jumping to the new block
if (currentBlock != null) {
currentBlock.close();
}
blockHolder.close();

currentBlock = fetchBlock(blockId);
blockHolder.set(fetchBlock(blockId));
currentBlockId = blockId;
}

protected void cloneBlock(OnDemandBlockIndexInput other) {
if (other.blockHolder.block != null) {
this.blockHolder.set(other.blockHolder.block.clone());
this.currentBlockId = other.currentBlockId;
}
}

protected int getBlock(long pos) {
return (int) (pos >>> blockSizeShift);
}
Expand All @@ -359,7 +366,7 @@ protected long currentBlockStart() {
}

protected int currentBlockPosition() {
return (int) currentBlock.getFilePointer();
return (int) blockHolder.block.getFilePointer();
}

public static Builder builder() {
Expand Down Expand Up @@ -409,4 +416,47 @@ Builder blockSizeShift(int blockSizeShift) {
return this;
}
}

/**
* Simple class to hold the currently open IndexInput backing an instance
* of an {@link OnDemandBlockIndexInput}. Lucene may clone one of these
* instances, and per the contract[1], the clones will never be closed.
* However, closing the instances is critical for our reference counting.
* Therefore, we are using the {@link Cleaner} mechanism from the JDK to
* close these clones when they become phantom reachable. The clean action
* must not hold a reference to the {@link OnDemandBlockIndexInput} itself
* (otherwise it would never become phantom reachable!) so we need a wrapper
* instance to hold the current underlying IndexInput, while allowing it to
* be changed out with different instances as {@link OnDemandBlockIndexInput}
* reads through the data.
*
* [1]: https://github.com/apache/lucene/blob/8340b01c3cc229f33584ce2178b07b8984daa6a9/lucene/core/src/java/org/apache/lucene/store/IndexInput.java#L32-L33
*/
private static class BlockHolder {
private IndexInput block;

private void set(IndexInput block) {
if (this.block != null) {
throw new IllegalStateException("Previous block was not closed!");
}
this.block = Objects.requireNonNull(block);
}

private void close() throws IOException {
if (block != null) {
block.close();
block = null;
}
}

private void closeQuietly() {
try {
close();
} catch (IOException e) {
// Exceptions thrown in the cleaning action are ignored,
// so log and swallow the exception here
logger.info("Exception thrown while closing block owned by phantom reachable instance", e);
}
}
}
}
Loading

0 comments on commit c55f61e

Please sign in to comment.