Skip to content

Commit b6920ef

Browse files
committed
Ensure fully deleted segments are accounted for correctly (#33757)
We can't rely on the leaf reader ordinal in a wrapped reader since it might not correspond to the ordinal in the SegmentInfos for it's SegmentCommitInfo. Relates to #32844 Closes #33689 Closes #33755
1 parent a6b2a55 commit b6920ef

File tree

5 files changed

+141
-25
lines changed

5 files changed

+141
-25
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshot.java

Lines changed: 64 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,15 +81,25 @@ public synchronized List<String> syncSnapshot(IndexCommit commit) throws IOExcep
8181
try (Lock writeLock = targetDirectory.obtainLock(IndexWriter.WRITE_LOCK_NAME);
8282
StandardDirectoryReader reader = (StandardDirectoryReader) DirectoryReader.open(commit)) {
8383
SegmentInfos segmentInfos = reader.getSegmentInfos();
84-
DirectoryReader wrapper = wrapReader(reader);
8584
List<SegmentCommitInfo> newInfos = new ArrayList<>();
86-
for (LeafReaderContext ctx : wrapper.leaves()) {
87-
SegmentCommitInfo info = segmentInfos.info(ctx.ord);
85+
for (LeafReaderContext ctx : reader.leaves()) {
8886
LeafReader leafReader = ctx.reader();
89-
LiveDocs liveDocs = getLiveDocs(leafReader);
90-
if (leafReader.numDocs() != 0) { // fully deleted segments don't need to be processed
91-
SegmentCommitInfo newInfo = syncSegment(info, liveDocs, leafReader.getFieldInfos(), existingSegments, createdFiles);
92-
newInfos.add(newInfo);
87+
SegmentCommitInfo info = reader.getSegmentInfos().info(ctx.ord);
88+
assert info.info.equals(Lucene.segmentReader(ctx.reader()).getSegmentInfo().info);
89+
/* We could do this totally different without wrapping this dummy directory reader if FilterCodecReader would have a
90+
* getDelegate method. This is fixed in LUCENE-8502 but we need to wait for it to come in 7.5.1 or 7.6.
91+
* The reason here is that the ctx.ord is not guaranteed to be equivalent to the SegmentCommitInfo ord in the SegmentInfo
92+
* object since we might drop fully deleted segments. if that happens we are using the wrong reader for the SI and
93+
* might almost certainly expose deleted documents.
94+
*/
95+
DirectoryReader wrappedReader = wrapReader(new DummyDirectoryReader(reader.directory(), leafReader));
96+
if (wrappedReader.leaves().isEmpty() == false) {
97+
leafReader = wrappedReader.leaves().get(0).reader();
98+
LiveDocs liveDocs = getLiveDocs(leafReader);
99+
if (leafReader.numDocs() != 0) { // fully deleted segments don't need to be processed
100+
SegmentCommitInfo newInfo = syncSegment(info, liveDocs, leafReader.getFieldInfos(), existingSegments, createdFiles);
101+
newInfos.add(newInfo);
102+
}
93103
}
94104
}
95105
segmentInfos.clear();
@@ -257,4 +267,51 @@ private static class LiveDocs {
257267
this.bits = bits;
258268
}
259269
}
270+
271+
private static class DummyDirectoryReader extends DirectoryReader {
272+
273+
protected DummyDirectoryReader(Directory directory, LeafReader... segmentReaders) throws IOException {
274+
super(directory, segmentReaders);
275+
}
276+
277+
@Override
278+
protected DirectoryReader doOpenIfChanged() throws IOException {
279+
return null;
280+
}
281+
282+
@Override
283+
protected DirectoryReader doOpenIfChanged(IndexCommit commit) throws IOException {
284+
return null;
285+
}
286+
287+
@Override
288+
protected DirectoryReader doOpenIfChanged(IndexWriter writer, boolean applyAllDeletes) throws IOException {
289+
return null;
290+
}
291+
292+
@Override
293+
public long getVersion() {
294+
return 0;
295+
}
296+
297+
@Override
298+
public boolean isCurrent() throws IOException {
299+
return false;
300+
}
301+
302+
@Override
303+
public IndexCommit getIndexCommit() throws IOException {
304+
return null;
305+
}
306+
307+
@Override
308+
protected void doClose() throws IOException {
309+
310+
}
311+
312+
@Override
313+
public CacheHelper getReaderCacheHelper() {
314+
return null;
315+
}
316+
}
260317
}

x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ protected void closeInternal() {
130130
SourceOnlySnapshot snapshot = new SourceOnlySnapshot(tempStore.directory(), querySupplier);
131131
snapshot.syncSnapshot(snapshotIndexCommit);
132132
// we will use the lucene doc ID as the seq ID so we set the local checkpoint to maxDoc with a new index UUID
133-
SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
133+
SegmentInfos segmentInfos = tempStore.readLastCommittedSegmentsInfo();
134134
tempStore.bootstrapNewHistory(segmentInfos.totalMaxDoc());
135135
store.incRef();
136136
try (DirectoryReader reader = DirectoryReader.open(tempStore.directory())) {

x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
import java.util.Map;
5050
import java.util.Optional;
5151
import java.util.concurrent.ExecutionException;
52-
import java.util.function.Consumer;
52+
import java.util.function.BiConsumer;
5353

5454
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
5555
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@@ -97,7 +97,10 @@ public void testSnapshotAndRestore() throws Exception {
9797
boolean requireRouting = randomBoolean();
9898
boolean useNested = randomBoolean();
9999
IndexRequestBuilder[] builders = snashotAndRestore(sourceIdx, 1, true, requireRouting, useNested);
100-
assertHits(sourceIdx, builders.length);
100+
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(sourceIdx).clear().setDocs(true).get();
101+
long deleted = indicesStatsResponse.getTotal().docs.getDeleted();
102+
boolean sourceHadDeletions = deleted > 0; // we use indexRandom which might create holes ie. deleted docs
103+
assertHits(sourceIdx, builders.length, sourceHadDeletions);
101104
assertMappings(sourceIdx, requireRouting, useNested);
102105
SearchPhaseExecutionException e = expectThrows(SearchPhaseExecutionException.class, () -> {
103106
client().prepareSearch(sourceIdx).setQuery(QueryBuilders.idsQuery()
@@ -116,7 +119,7 @@ public void testSnapshotAndRestore() throws Exception {
116119
client().admin().indices().prepareUpdateSettings(sourceIdx)
117120
.setSettings(Settings.builder().put("index.number_of_replicas", 1)).get();
118121
ensureGreen(sourceIdx);
119-
assertHits(sourceIdx, builders.length);
122+
assertHits(sourceIdx, builders.length, sourceHadDeletions);
120123
}
121124

122125
public void testSnapshotAndRestoreWithNested() throws Exception {
@@ -125,7 +128,7 @@ public void testSnapshotAndRestoreWithNested() throws Exception {
125128
IndexRequestBuilder[] builders = snashotAndRestore(sourceIdx, 1, true, requireRouting, true);
126129
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats().clear().setDocs(true).get();
127130
assertThat(indicesStatsResponse.getTotal().docs.getDeleted(), Matchers.greaterThan(0L));
128-
assertHits(sourceIdx, builders.length);
131+
assertHits(sourceIdx, builders.length, true);
129132
assertMappings(sourceIdx, requireRouting, true);
130133
SearchPhaseExecutionException e = expectThrows(SearchPhaseExecutionException.class, () ->
131134
client().prepareSearch(sourceIdx).setQuery(QueryBuilders.idsQuery().addIds("" + randomIntBetween(0, builders.length))).get());
@@ -141,7 +144,7 @@ public void testSnapshotAndRestoreWithNested() throws Exception {
141144
client().admin().indices().prepareUpdateSettings(sourceIdx).setSettings(Settings.builder().put("index.number_of_replicas", 1))
142145
.get();
143146
ensureGreen(sourceIdx);
144-
assertHits(sourceIdx, builders.length);
147+
assertHits(sourceIdx, builders.length, true);
145148
}
146149

147150
private void assertMappings(String sourceIdx, boolean requireRouting, boolean useNested) throws IOException {
@@ -165,15 +168,12 @@ private void assertMappings(String sourceIdx, boolean requireRouting, boolean us
165168
}
166169
}
167170

168-
private void assertHits(String index, int numDocsExpected) {
171+
private void assertHits(String index, int numDocsExpected, boolean sourceHadDeletions) {
169172
SearchResponse searchResponse = client().prepareSearch(index)
170173
.addSort(SeqNoFieldMapper.NAME, SortOrder.ASC)
171174
.setSize(numDocsExpected).get();
172-
Consumer<SearchResponse> assertConsumer = res -> {
175+
BiConsumer<SearchResponse, Boolean> assertConsumer = (res, allowHoles) -> {
173176
SearchHits hits = res.getHits();
174-
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats().clear().setDocs(true).get();
175-
long deleted = indicesStatsResponse.getTotal().docs.getDeleted();
176-
boolean allowHoles = deleted > 0; // we use indexRandom which might create holes ie. deleted docs
177177
long i = 0;
178178
for (SearchHit hit : hits) {
179179
String id = hit.getId();
@@ -190,18 +190,24 @@ private void assertHits(String index, int numDocsExpected) {
190190
assertEquals("r" + id, hit.field("_routing").getValue());
191191
}
192192
};
193-
assertConsumer.accept(searchResponse);
193+
assertConsumer.accept(searchResponse, sourceHadDeletions);
194194
assertEquals(numDocsExpected, searchResponse.getHits().totalHits);
195195
searchResponse = client().prepareSearch(index)
196196
.addSort(SeqNoFieldMapper.NAME, SortOrder.ASC)
197197
.setScroll("1m")
198198
.slice(new SliceBuilder(SeqNoFieldMapper.NAME, randomIntBetween(0,1), 2))
199199
.setSize(randomIntBetween(1, 10)).get();
200-
do {
201-
// now do a scroll with a slice
202-
assertConsumer.accept(searchResponse);
203-
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(1)).get();
204-
} while (searchResponse.getHits().getHits().length > 0);
200+
try {
201+
do {
202+
// now do a scroll with a slice
203+
assertConsumer.accept(searchResponse, true);
204+
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(1)).get();
205+
} while (searchResponse.getHits().getHits().length > 0);
206+
} finally {
207+
if (searchResponse.getScrollId() != null) {
208+
client().prepareClearScroll().addScrollId(searchResponse.getScrollId()).get();
209+
}
210+
}
205211

206212
}
207213

x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,6 @@ private String randomDoc() {
162162
return "{ \"value\" : \"" + randomAlphaOfLength(10) + "\"}";
163163
}
164164

165-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33689")
166165
public void testRestoreMinmal() throws IOException {
167166
IndexShard shard = newStartedShard(true);
168167
int numInitialDocs = randomIntBetween(10, 100);

x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotTests.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212
import org.apache.lucene.document.StoredField;
1313
import org.apache.lucene.document.StringField;
1414
import org.apache.lucene.document.TextField;
15+
import org.apache.lucene.index.CodecReader;
1516
import org.apache.lucene.index.DirectoryReader;
1617
import org.apache.lucene.index.FilterMergePolicy;
1718
import org.apache.lucene.index.IndexCommit;
1819
import org.apache.lucene.index.IndexFileNames;
20+
import org.apache.lucene.index.IndexReader;
1921
import org.apache.lucene.index.IndexWriter;
2022
import org.apache.lucene.index.IndexWriterConfig;
2123
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
@@ -34,6 +36,7 @@
3436
import org.apache.lucene.search.TermQuery;
3537
import org.apache.lucene.search.TopDocs;
3638
import org.apache.lucene.store.Directory;
39+
import org.apache.lucene.util.IOSupplier;
3740
import org.elasticsearch.common.lucene.Lucene;
3841
import org.elasticsearch.test.ESTestCase;
3942

@@ -242,4 +245,55 @@ public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo,
242245
reader.close();
243246
}
244247
}
248+
249+
public void testFullyDeletedSegments() throws IOException {
250+
try (Directory dir = newDirectory()) {
251+
SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
252+
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()
253+
.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
254+
.setIndexDeletionPolicy(deletionPolicy).setMergePolicy(new FilterMergePolicy(NoMergePolicy.INSTANCE) {
255+
@Override
256+
public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext) {
257+
return randomBoolean();
258+
}
259+
260+
@Override
261+
public boolean keepFullyDeletedSegment(IOSupplier<CodecReader> readerIOSupplier) throws IOException {
262+
return true;
263+
}
264+
}));
265+
Document doc = new Document();
266+
doc.add(new StringField("id", "1", Field.Store.YES));
267+
doc.add(new TextField("text", "the quick brown fox", Field.Store.NO));
268+
doc.add(new NumericDocValuesField("rank", 1));
269+
doc.add(new StoredField("rank", 1));
270+
doc.add(new StoredField("src", "the quick brown fox"));
271+
writer.addDocument(doc);
272+
writer.commit();
273+
doc = new Document();
274+
doc.add(new StringField("id", "1", Field.Store.YES));
275+
doc.add(new TextField("text", "the quick brown fox", Field.Store.NO));
276+
doc.add(new NumericDocValuesField("rank", 3));
277+
doc.add(new StoredField("rank", 3));
278+
doc.add(new StoredField("src", "the quick brown fox"));
279+
writer.softUpdateDocument(new Term("id", "1"), doc, new NumericDocValuesField(Lucene.SOFT_DELETES_FIELD, 1));
280+
writer.commit();
281+
try (Directory targetDir = newDirectory()) {
282+
IndexCommit snapshot = deletionPolicy.snapshot();
283+
SourceOnlySnapshot snapshoter = new SourceOnlySnapshot(targetDir);
284+
snapshoter.syncSnapshot(snapshot);
285+
286+
try (DirectoryReader snapReader = DirectoryReader.open(targetDir)) {
287+
assertEquals(snapReader.maxDoc(), 1);
288+
assertEquals(snapReader.numDocs(), 1);
289+
assertEquals("3", snapReader.document(0).getField("rank").stringValue());
290+
}
291+
try (IndexReader writerReader = DirectoryReader.open(writer)) {
292+
assertEquals(writerReader.maxDoc(), 2);
293+
assertEquals(writerReader.numDocs(), 1);
294+
}
295+
}
296+
writer.close();
297+
}
298+
}
245299
}

0 commit comments

Comments
 (0)