Skip to content

POC: Use multiple index commits to retain soft-deleted documents #40671

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1085,4 +1085,13 @@ public static void scanSeqNosInReader(DirectoryReader directoryReader, long from
}
}
}

public static long countSoftDeletesInCommit(IndexCommit commit) throws IOException {
long totalSoftDeletes = 0;
final SegmentInfos sis = readSegmentInfos(commit);
for (SegmentCommitInfo si : sis) {
totalSoftDeletes += si.getSoftDelCount();
}
return totalSoftDeletes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,11 @@ public synchronized void onCommit(List<? extends IndexCommit> commits) throws IO
final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong());
lastCommit = commits.get(commits.size() - 1);
safeCommit = commits.get(keptPosition);
softDeletesPolicy.onCommits(commits, Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)));
for (int i = 0; i < keptPosition; i++) {
if (snapshottedCommits.containsKey(commits.get(i)) == false) {
deleteCommit(commits.get(i));
final IndexCommit commit = commits.get(i);
if (snapshottedCommits.containsKey(commit) == false && softDeletesPolicy.shouldKeepCommit(commit) == false) {
deleteCommit(commit);
}
}
updateRetentionPolicy();
Expand All @@ -104,9 +106,6 @@ private void updateRetentionPolicy() throws IOException {
assert minRequiredGen <= lastGen : "minRequiredGen must not be greater than lastGen";
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastGen);
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen);

softDeletesPolicy.setLocalCheckpointOfSafeCommit(
Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2530,16 +2530,25 @@ long getNumDocUpdates() {
@Override
public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService,
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
return newChangesSnapshot(source, mapperService, fromSeqNo, toSeqNo, requiredFullRange,
SoftDeletesChangesSnapshot.DEFAULT_LIVE_READER_BATCH_SIZE, SoftDeletesChangesSnapshot.DEFAULT_COMMITTED_READER_BATCH_SIZE);

}

final Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService,
long fromSeqNo, long toSeqNo, boolean requiredFullRange,
int liveReaderBatchSize, int committedReaderBatchSize) throws IOException {
if (softDeleteEnabled == false) {
throw new IllegalStateException("accessing changes snapshot requires soft-deletes enabled");
}
ensureOpen();
refreshIfNeeded(source, toSeqNo);
Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL);
Searcher liveSearcher = acquireSearcher(source, SearcherScope.INTERNAL);
try {
LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot(
searcher, mapperService, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE, fromSeqNo, toSeqNo, requiredFullRange);
searcher = null;
final Translog.Snapshot snapshot = new SoftDeletesChangesSnapshot(mapperService, softDeletesPolicy,
liveSearcher, commit -> openIndexCommit(source, commit),
fromSeqNo, toSeqNo, requiredFullRange, liveReaderBatchSize, committedReaderBatchSize);
liveSearcher = null;
return snapshot;
} catch (Exception e) {
try {
Expand All @@ -2549,7 +2558,7 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS
}
throw e;
} finally {
IOUtils.close(searcher);
IOUtils.close(liveSearcher);
}
}

Expand Down Expand Up @@ -2745,4 +2754,20 @@ public void reinitializeMaxSeqNoOfUpdatesOrDeletes() {
final long maxSeqNo = SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo());
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNo);
}

// TODO: cache these readers
private Engine.Searcher openIndexCommit(String source, IndexCommit commit) throws IOException {
ensureOpen();
store.incRef();
Closeable onClose = store::decRef;
try {
final DirectoryReader reader = DirectoryReader.open(commit);
onClose = () -> IOUtils.close(reader, store::decRef);
final Searcher searcher = new Searcher(source, new IndexSearcher(reader), onClose);
onClose = () -> {};
return searcher;
} finally {
onClose.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.elasticsearch.index.engine;

/**
* Exception indicating that not all requested operations from {@link LuceneChangesSnapshot}
* Exception indicating that not all requested operations from {@link SoftDeletesChangesSnapshot}
* are available.
*/
public final class MissingHistoryOperationsException extends IllegalStateException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.index.engine;

import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
Expand Down Expand Up @@ -50,17 +51,11 @@
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* A {@link Translog.Snapshot} from changes in a Lucene index
*/
final class LuceneChangesSnapshot implements Translog.Snapshot {
static final int DEFAULT_BATCH_SIZE = 1024;

final class SoftDeletesChangesReader implements Closeable {
private final int searchBatchSize;
private final long fromSeqNo, toSeqNo;
private long lastSeenSeqNo;
private Translog.Operation unconsumedOperation;
private int skippedOperations;
private final boolean requiredFullRange;

private final IndexSearcher indexSearcher;
private final MapperService mapperService;
Expand All @@ -71,17 +66,16 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
private final Closeable onClose;

/**
* Creates a new "translog" snapshot from Lucene for reading operations whose seq# in the specified range.
* Creates a reader that allows to scan history of operations sequentially for a given range of sequence numbers.
*
* @param engineSearcher the internal engine searcher which will be taken over if the snapshot is opened successfully
* @param mapperService the mapper service which will be mainly used to resolve the document's type and uid
* @param searchBatchSize the number of documents should be returned by each search
* @param fromSeqNo the min requesting seq# - inclusive
* @param toSeqNo the maximum requesting seq# - inclusive
* @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo
*/
LuceneChangesSnapshot(Engine.Searcher engineSearcher, MapperService mapperService, int searchBatchSize,
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
SoftDeletesChangesReader(Engine.Searcher engineSearcher, MapperService mapperService, int searchBatchSize,
long fromSeqNo, long toSeqNo) throws IOException {
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) {
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]");
}
Expand All @@ -99,12 +93,11 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
this.searchBatchSize = requestingSize < searchBatchSize ? Math.toIntExact(requestingSize) : searchBatchSize;
this.fromSeqNo = fromSeqNo;
this.toSeqNo = toSeqNo;
this.lastSeenSeqNo = fromSeqNo - 1;
this.requiredFullRange = requiredFullRange;
this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
// TODO: move wrapping to InternalEngine
this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader())); // TODO: wrap outside?
this.indexSearcher.setQueryCache(null);
this.parallelArray = new ParallelArray(this.searchBatchSize);
final TopDocs topDocs = searchOperations(null);
final TopDocs topDocs = searchOperations(null, fromSeqNo);
this.totalHits = Math.toIntExact(topDocs.totalHits.value);
this.scoreDocs = topDocs.scoreDocs;
fillParallelArray(scoreDocs, parallelArray);
Expand All @@ -115,54 +108,44 @@ public void close() throws IOException {
onClose.close();
}

@Override
public int totalOperations() {
return totalHits;
}

@Override
public int skippedOperations() {
return skippedOperations;
}

@Override
public Translog.Operation next() throws IOException {
Translog.Operation op = null;
for (int idx = nextDocIndex(); idx != -1; idx = nextDocIndex()) {
op = readDocAsOp(idx);
if (op != null) {
break;
}
}
if (requiredFullRange) {
rangeCheck(op);
}
if (op != null) {
lastSeenSeqNo = op.seqNo();
}
return op;
DirectoryReader directoryReader() {
return (DirectoryReader) indexSearcher.getIndexReader();
}

private void rangeCheck(Translog.Operation op) {
if (op == null) {
if (lastSeenSeqNo < toSeqNo) {
throw new MissingHistoryOperationsException("Not all operations between from_seqno [" + fromSeqNo + "] " +
"and to_seqno [" + toSeqNo + "] found; prematurely terminated last_seen_seqno [" + lastSeenSeqNo + "]");
}
} else {
final long expectedSeqNo = lastSeenSeqNo + 1;
if (op.seqNo() != expectedSeqNo) {
throw new MissingHistoryOperationsException("Not all operations between from_seqno [" + fromSeqNo + "] " +
"and to_seqno [" + toSeqNo + "] found; expected seqno [" + expectedSeqNo + "]; found [" + op + "]");
/**
* Reads the next operation whose sequence number is at least the given target value in the current reader.
*/
public Translog.Operation readOperation(long targetSeqNo) throws IOException {
assert targetSeqNo >= fromSeqNo : fromSeqNo + " > " + targetSeqNo;
if (unconsumedOperation != null && unconsumedOperation.seqNo() >= targetSeqNo) {
return unconsumedOperation;
}
for (int idx = nextDocIndex(targetSeqNo); idx != -1; idx = nextDocIndex(targetSeqNo)) {
final Translog.Operation op = readDocAsOp(idx, targetSeqNo);
if (op != null) {
assert op.seqNo() >= targetSeqNo : "target_seq_no[" + targetSeqNo + "] op[" + op + "]";
if (op.seqNo() > targetSeqNo) {
unconsumedOperation = op;
}
return op;
}
}
return null;
}

private int nextDocIndex() throws IOException {
private int nextDocIndex(long targetSeqNo) throws IOException {
// we have processed all docs in the current search - fetch the next batch
if (docIndex == scoreDocs.length && docIndex > 0) {
final ScoreDoc prev = scoreDocs[scoreDocs.length - 1];
scoreDocs = searchOperations(prev).scoreDocs;
scoreDocs = searchOperations(prev, targetSeqNo).scoreDocs;
fillParallelArray(scoreDocs, parallelArray);
docIndex = 0;
}
Expand Down Expand Up @@ -212,27 +195,27 @@ private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray
}
}

private TopDocs searchOperations(ScoreDoc after) throws IOException {
final Query rangeQuery = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo);
private TopDocs searchOperations(ScoreDoc after, long targetSeqNo) throws IOException {
final Query rangeQuery = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, Math.max(fromSeqNo, targetSeqNo), toSeqNo);
final Sort sortedBySeqNoThenByTerm = new Sort(
new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG),
new SortField(SeqNoFieldMapper.PRIMARY_TERM_NAME, SortField.Type.LONG, true)
);
return indexSearcher.searchAfter(after, rangeQuery, searchBatchSize, sortedBySeqNoThenByTerm);
}

private Translog.Operation readDocAsOp(int docIndex) throws IOException {
private Translog.Operation readDocAsOp(int docIndex, long targetSeqNo) throws IOException {
final LeafReaderContext leaf = parallelArray.leafReaderContexts[docIndex];
final int segmentDocID = scoreDocs[docIndex].doc - leaf.docBase;
final long primaryTerm = parallelArray.primaryTerm[docIndex];
// We don't have to read the nested child documents - those docs don't have primary terms.
if (primaryTerm == -1) {
final long seqNo = parallelArray.seqNo[docIndex];
// skip until the target founds
if (seqNo < targetSeqNo) {
skippedOperations++;
return null;
}
final long seqNo = parallelArray.seqNo[docIndex];
// Only pick the first seen seq#
if (seqNo == lastSeenSeqNo) {
final long primaryTerm = parallelArray.primaryTerm[docIndex];
// We don't have to read the nested child documents - those docs don't have primary terms.
if (primaryTerm == -1) {
skippedOperations++;
return null;
}
Expand Down Expand Up @@ -261,22 +244,17 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException {
if (source == null) {
// TODO: Callers should ask for the range that source should be retained. Thus we should always
// check for the existence source once we make peer-recovery to send ops after the local checkpoint.
if (requiredFullRange) {
throw new IllegalStateException("source not found for seqno=" + seqNo +
" from_seqno=" + fromSeqNo + " to_seqno=" + toSeqNo);
} else {
skippedOperations++;
return null;
}
skippedOperations++;
return null;
}
// TODO: pass the latest timestamp from engine.
final long autoGeneratedIdTimestamp = -1;
op = new Translog.Index(type, id, seqNo, primaryTerm, version,
source.toBytesRef().bytes, fields.routing(), autoGeneratedIdTimestamp);
}
}
assert fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo && lastSeenSeqNo < op.seqNo() : "Unexpected operation; " +
"last_seen_seqno [" + lastSeenSeqNo + "], from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "], op [" + op + "]";
assert fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo :
"Unexpected operation [" + op + "] from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]";
return op;
}

Expand Down
Loading