Skip to content

Commit f47174f

Browse files
authored
Do not use soft-deletes to resolve indexing strategy (#43336)
This PR reverts #35230. Previously, we reply on soft-deletes to fill the mismatch between the version map and the Lucene index. This is no longer needed after #43202 where we rebuild the version map when opening an engine. Moreover, PrunePostingsMergePolicy can prune _id of soft-deleted documents out of order; thus the lookup result including soft-deletes sometimes does not return the latest version (although it's okay as we only use a valid result in an engine). With this change, we use only live documents in Lucene to resolve the indexing strategy. This is perfectly safe since we keep all deleted documents after the local checkpoint in the version map. Closes #42979
1 parent 680d6ed commit f47174f

File tree

6 files changed

+55
-87
lines changed

6 files changed

+55
-87
lines changed

server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java

Lines changed: 20 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -102,38 +102,20 @@ public DocIdAndVersion lookupVersion(BytesRef id, boolean loadSeqNo, LeafReaderC
102102
throws IOException {
103103
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
104104
"context's reader is not the same as the reader class was initialized on.";
105-
int docID = getDocID(id, context.reader().getLiveDocs());
105+
int docID = getDocID(id, context);
106106

107107
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
108-
final NumericDocValues versions = context.reader().getNumericDocValues(VersionFieldMapper.NAME);
109-
if (versions == null) {
110-
throw new IllegalArgumentException("reader misses the [" + VersionFieldMapper.NAME + "] field");
111-
}
112-
if (versions.advanceExact(docID) == false) {
113-
throw new IllegalArgumentException("Document [" + docID + "] misses the [" + VersionFieldMapper.NAME + "] field");
114-
}
115108
final long seqNo;
116109
final long term;
117110
if (loadSeqNo) {
118-
NumericDocValues seqNos = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
119-
// remove the null check in 7.0 once we can't read indices with no seq#
120-
if (seqNos != null && seqNos.advanceExact(docID)) {
121-
seqNo = seqNos.longValue();
122-
} else {
123-
seqNo = UNASSIGNED_SEQ_NO;
124-
}
125-
NumericDocValues terms = context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
126-
if (terms != null && terms.advanceExact(docID)) {
127-
term = terms.longValue();
128-
} else {
129-
term = UNASSIGNED_PRIMARY_TERM;
130-
}
131-
111+
seqNo = readNumericDocValues(context.reader(), SeqNoFieldMapper.NAME, docID);
112+
term = readNumericDocValues(context.reader(), SeqNoFieldMapper.PRIMARY_TERM_NAME, docID);
132113
} else {
133114
seqNo = UNASSIGNED_SEQ_NO;
134115
term = UNASSIGNED_PRIMARY_TERM;
135116
}
136-
return new DocIdAndVersion(docID, versions.longValue(), seqNo, term, context.reader(), context.docBase);
117+
final long version = readNumericDocValues(context.reader(), VersionFieldMapper.NAME, docID);
118+
return new DocIdAndVersion(docID, version, seqNo, term, context.reader(), context.docBase);
137119
} else {
138120
return null;
139121
}
@@ -143,9 +125,10 @@ public DocIdAndVersion lookupVersion(BytesRef id, boolean loadSeqNo, LeafReaderC
143125
* returns the internal lucene doc id for the given id bytes.
144126
* {@link DocIdSetIterator#NO_MORE_DOCS} is returned if not found
145127
* */
146-
private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
128+
private int getDocID(BytesRef id, LeafReaderContext context) throws IOException {
147129
// termsEnum can possibly be null here if this leaf contains only no-ops.
148130
if (termsEnum != null && termsEnum.seekExact(id)) {
131+
final Bits liveDocs = context.reader().getLiveDocs();
149132
int docID = DocIdSetIterator.NO_MORE_DOCS;
150133
// there may be more than one matching docID, in the case of nested docs, so we want the last one:
151134
docsEnum = termsEnum.postings(docsEnum, 0);
@@ -161,41 +144,23 @@ private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
161144
}
162145
}
163146

147+
private static long readNumericDocValues(LeafReader reader, String field, int docId) throws IOException {
148+
final NumericDocValues dv = reader.getNumericDocValues(field);
149+
if (dv == null || dv.advanceExact(docId) == false) {
150+
assert false : "document [" + docId + "] does not have docValues for [" + field + "]";
151+
throw new IllegalStateException("document [" + docId + "] does not have docValues for [" + field + "]");
152+
}
153+
return dv.longValue();
154+
}
155+
164156
/** Return null if id is not found. */
165157
DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOException {
166158
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
167159
"context's reader is not the same as the reader class was initialized on.";
168-
// termsEnum can possibly be null here if this leaf contains only no-ops.
169-
if (termsEnum != null && termsEnum.seekExact(id)) {
170-
docsEnum = termsEnum.postings(docsEnum, 0);
171-
final Bits liveDocs = context.reader().getLiveDocs();
172-
DocIdAndSeqNo result = null;
173-
int docID = docsEnum.nextDoc();
174-
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
175-
final NumericDocValues seqNoDV = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
176-
for (; docID != DocIdSetIterator.NO_MORE_DOCS; docID = docsEnum.nextDoc()) {
177-
final long seqNo;
178-
// remove the null check in 7.0 once we can't read indices with no seq#
179-
if (seqNoDV != null && seqNoDV.advanceExact(docID)) {
180-
seqNo = seqNoDV.longValue();
181-
} else {
182-
seqNo = UNASSIGNED_SEQ_NO;
183-
}
184-
final boolean isLive = (liveDocs == null || liveDocs.get(docID));
185-
if (isLive) {
186-
// The live document must always be the latest copy, thus we can early terminate here.
187-
// If a nested docs is live, we return the first doc which doesn't have term (only the last doc has term).
188-
// This should not be an issue since we no longer use primary term as tier breaker when comparing operations.
189-
assert result == null || result.seqNo <= seqNo :
190-
"the live doc does not have the highest seq_no; live_seq_no=" + seqNo + " < deleted_seq_no=" + result.seqNo;
191-
return new DocIdAndSeqNo(docID, seqNo, context, isLive);
192-
}
193-
if (result == null || result.seqNo < seqNo) {
194-
result = new DocIdAndSeqNo(docID, seqNo, context, isLive);
195-
}
196-
}
197-
}
198-
return result;
160+
final int docID = getDocID(id, context);
161+
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
162+
final long seqNo = readNumericDocValues(context.reader(), SeqNoFieldMapper.NAME, docID);
163+
return new DocIdAndSeqNo(docID, seqNo, context);
199164
} else {
200165
return null;
201166
}

server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -114,13 +114,11 @@ public static class DocIdAndSeqNo {
114114
public final int docId;
115115
public final long seqNo;
116116
public final LeafReaderContext context;
117-
public final boolean isLive;
118117

119-
DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context, boolean isLive) {
118+
DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context) {
120119
this.docId = docId;
121120
this.seqNo = seqNo;
122121
this.context = context;
123-
this.isLive = isLive;
124122
}
125123
}
126124

@@ -149,32 +147,21 @@ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term,
149147

150148
/**
151149
* Loads the internal docId and sequence number of the latest copy for a given uid from the provided reader.
152-
* The flag {@link DocIdAndSeqNo#isLive} indicates whether the returned document is live or (soft)deleted.
153-
* This returns {@code null} if no such document matching the given term uid.
150+
* The result is either null or the live and latest version of the given uid.
154151
*/
155152
public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) throws IOException {
156153
final PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field());
157154
final List<LeafReaderContext> leaves = reader.leaves();
158-
DocIdAndSeqNo latest = null;
159155
// iterate backwards to optimize for the frequently updated documents
160156
// which are likely to be in the last segments
161157
for (int i = leaves.size() - 1; i >= 0; i--) {
162158
final LeafReaderContext leaf = leaves.get(i);
163159
final PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord];
164160
final DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf);
165-
if (result == null) {
166-
continue;
167-
}
168-
if (result.isLive) {
169-
// The live document must always be the latest copy, thus we can early terminate here.
170-
assert latest == null || latest.seqNo <= result.seqNo :
171-
"the live doc does not have the highest seq_no; live_seq_no=" + result.seqNo + " < deleted_seq_no=" + latest.seqNo;
161+
if (result != null) {
172162
return result;
173163
}
174-
if (latest == null || latest.seqNo < result.seqNo) {
175-
latest = result;
176-
}
177164
}
178-
return latest;
165+
return null;
179166
}
180167
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -705,11 +705,7 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op)
705705
if (docAndSeqNo == null) {
706706
status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
707707
} else if (op.seqNo() > docAndSeqNo.seqNo) {
708-
if (docAndSeqNo.isLive) {
709-
status = OpVsLuceneDocStatus.OP_NEWER;
710-
} else {
711-
status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
712-
}
708+
status = OpVsLuceneDocStatus.OP_NEWER;
713709
} else if (op.seqNo() == docAndSeqNo.seqNo) {
714710
assert localCheckpointTracker.contains(op.seqNo()) || softDeleteEnabled == false :
715711
"local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id();

server/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.common.lucene.Lucene;
3434
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
3535
import org.elasticsearch.index.mapper.IdFieldMapper;
36+
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
3637
import org.elasticsearch.index.mapper.VersionFieldMapper;
3738
import org.elasticsearch.test.ESTestCase;
3839

@@ -52,6 +53,8 @@ public void testSimple() throws Exception {
5253
Document doc = new Document();
5354
doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
5455
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
56+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
57+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
5558
writer.addDocument(doc);
5659
writer.addDocument(new Document());
5760
DirectoryReader reader = DirectoryReader.open(writer);
@@ -86,6 +89,8 @@ public void testTwoDocuments() throws Exception {
8689
Document doc = new Document();
8790
doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
8891
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
92+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
93+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
8994
writer.addDocument(doc);
9095
writer.addDocument(doc);
9196
writer.addDocument(new Document());

server/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.common.lucene.Lucene;
3131
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
3232
import org.elasticsearch.index.mapper.IdFieldMapper;
33+
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
3334
import org.elasticsearch.index.mapper.VersionFieldMapper;
3435
import org.elasticsearch.index.shard.ShardId;
3536
import org.elasticsearch.test.ESTestCase;
@@ -68,6 +69,8 @@ public void testVersions() throws Exception {
6869
Document doc = new Document();
6970
doc.add(new Field(IdFieldMapper.NAME, "1", IdFieldMapper.Defaults.FIELD_TYPE));
7071
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 1));
72+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
73+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
7174
writer.updateDocument(new Term(IdFieldMapper.NAME, "1"), doc);
7275
directoryReader = reopen(directoryReader);
7376
assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1"), randomBoolean()).version, equalTo(1L));
@@ -77,6 +80,8 @@ public void testVersions() throws Exception {
7780
Field version = new NumericDocValuesField(VersionFieldMapper.NAME, 2);
7881
doc.add(uid);
7982
doc.add(version);
83+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
84+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
8085
writer.updateDocument(new Term(IdFieldMapper.NAME, "1"), doc);
8186
directoryReader = reopen(directoryReader);
8287
assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1"), randomBoolean()).version, equalTo(2L));
@@ -86,6 +91,8 @@ public void testVersions() throws Exception {
8691
version.setLongValue(3);
8792
doc.add(uid);
8893
doc.add(version);
94+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
95+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
8996
writer.updateDocument(new Term(IdFieldMapper.NAME, "1"), doc);
9097

9198
directoryReader = reopen(directoryReader);
@@ -115,6 +122,8 @@ public void testNestedDocuments() throws IOException {
115122
doc.add(new Field(IdFieldMapper.NAME, "1", IdFieldMapper.Defaults.FIELD_TYPE));
116123
NumericDocValuesField version = new NumericDocValuesField(VersionFieldMapper.NAME, 5L);
117124
doc.add(version);
125+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
126+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
118127
docs.add(doc);
119128

120129
writer.updateDocuments(new Term(IdFieldMapper.NAME, "1"), docs);
@@ -145,6 +154,8 @@ public void testCache() throws Exception {
145154
Document doc = new Document();
146155
doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
147156
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
157+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
158+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
148159
writer.addDocument(doc);
149160
DirectoryReader reader = DirectoryReader.open(writer);
150161
// should increase cache size by 1
@@ -170,6 +181,8 @@ public void testCacheFilterReader() throws Exception {
170181
Document doc = new Document();
171182
doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
172183
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
184+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
185+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
173186
writer.addDocument(doc);
174187
DirectoryReader reader = DirectoryReader.open(writer);
175188
assertEquals(87, loadDocIdAndVersion(reader, new Term(IdFieldMapper.NAME, "6"), randomBoolean()).version);

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4027,7 +4027,6 @@ public void testSequenceIDs() throws Exception {
40274027
searchResult.close();
40284028
}
40294029

4030-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/42979")
40314030
public void testLookupSeqNoByIdInLucene() throws Exception {
40324031
int numOps = between(10, 100);
40334032
long seqNo = 0;
@@ -4062,20 +4061,23 @@ public void testLookupSeqNoByIdInLucene() throws Exception {
40624061
InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null))) {
40634062
CheckedRunnable<IOException> lookupAndCheck = () -> {
40644063
try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
4065-
for (String id : latestOps.keySet()) {
4066-
String msg = "latestOps=" + latestOps + " op=" + id;
4067-
DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), newUid(id));
4068-
assertThat(msg, docIdAndSeqNo.seqNo, equalTo(latestOps.get(id).seqNo()));
4069-
assertThat(msg, docIdAndSeqNo.isLive,
4070-
equalTo(latestOps.get(id).operationType() == Engine.Operation.TYPE.INDEX));
4071-
}
4072-
assertThat(VersionsAndSeqNoResolver.loadDocIdAndVersion(
4073-
searcher.reader(), newUid("any-" + between(1, 10)), randomBoolean()), nullValue());
40744064
Map<String, Long> liveOps = latestOps.entrySet().stream()
40754065
.filter(e -> e.getValue().operationType() == Engine.Operation.TYPE.INDEX)
40764066
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().seqNo()));
40774067
assertThat(getDocIds(engine, true).stream().collect(Collectors.toMap(e -> e.getId(), e -> e.getSeqNo())),
40784068
equalTo(liveOps));
4069+
for (String id : latestOps.keySet()) {
4070+
String msg = "latestOps=" + latestOps + " op=" + id;
4071+
DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), newUid(id));
4072+
if (liveOps.containsKey(id) == false) {
4073+
assertNull(msg, docIdAndSeqNo);
4074+
} else {
4075+
assertNotNull(msg, docIdAndSeqNo);
4076+
assertThat(msg, docIdAndSeqNo.seqNo, equalTo(latestOps.get(id).seqNo()));
4077+
}
4078+
}
4079+
String notFoundId = randomValueOtherThanMany(liveOps::containsKey, () -> Long.toString(randomNonNegativeLong()));
4080+
assertNull(VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), newUid(notFoundId)));
40794081
}
40804082
};
40814083
for (Engine.Operation op : operations) {

0 commit comments

Comments
 (0)