Skip to content

Commit 60fb4ef

Browse files
committed
Use exact numDocs in synced-flush and metadata snapshot (#30228)
Since #29458, we use a searcher to calculate the number of documents for a commit stats. Sadly, that approach is flawed. The searcher might no longer point to the last commit if it's refreshed. As synced-flush requires an exact numDocs to work correctly, we have to exclude all soft-deleted docs. This commit makes synced-flush stop using CommitStats but read an exact numDocs directly from an index commit. Relates #29458 Relates #29530
1 parent 0a7fdf8 commit 60fb4ef

File tree

6 files changed

+98
-21
lines changed

6 files changed

+98
-21
lines changed

server/src/main/java/org/elasticsearch/common/lucene/Lucene.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.lucene.index.SegmentCommitInfo;
4545
import org.apache.lucene.index.SegmentInfos;
4646
import org.apache.lucene.index.SegmentReader;
47+
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
4748
import org.apache.lucene.search.DocIdSetIterator;
4849
import org.apache.lucene.search.Explanation;
4950
import org.apache.lucene.search.FieldDoc;
@@ -149,6 +150,16 @@ public static int getNumDocs(SegmentInfos info) {
149150
return numDocs;
150151
}
151152

153+
/**
154+
* Unlike {@link #getNumDocs(SegmentInfos)} this method returns a numDocs that always excludes soft-deleted docs.
155+
* This method is expensive thus prefer using {@link #getNumDocs(SegmentInfos)} unless an exact numDocs is required.
156+
*/
157+
public static int getExactNumDocs(IndexCommit commit) throws IOException {
158+
try (DirectoryReader reader = DirectoryReader.open(commit)) {
159+
return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETE_FIELD).numDocs();
160+
}
161+
}
162+
152163
/**
153164
* Reads the segments infos from the given commit, failing if it fails to load
154165
*/

server/src/main/java/org/elasticsearch/index/store/Store.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -862,7 +862,7 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
862862
Map<String, String> commitUserDataBuilder = new HashMap<>();
863863
try {
864864
final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory);
865-
numDocs = Lucene.getNumDocs(segmentCommitInfos);
865+
numDocs = Lucene.getExactNumDocs(commit != null ? commit : findIndexCommit(directory, segmentCommitInfos));
866866
commitUserDataBuilder.putAll(segmentCommitInfos.getUserData());
867867
Version maxVersion = segmentCommitInfos.getMinSegmentLuceneVersion(); // we don't know which version was used to write so we take the max version.
868868
for (SegmentCommitInfo info : segmentCommitInfos) {
@@ -945,6 +945,16 @@ public static void hashFile(BytesRefBuilder fileHash, InputStream in, long size)
945945
assert fileHash.length() == len : Integer.toString(fileHash.length()) + " != " + Integer.toString(len);
946946
}
947947

948+
private static IndexCommit findIndexCommit(Directory directory, SegmentInfos sis) throws IOException {
949+
List<IndexCommit> commits = DirectoryReader.listCommits(directory);
950+
for (IndexCommit commit : commits) {
951+
if (commit.getSegmentsFileName().equals(sis.getSegmentsFileName())) {
952+
return commit;
953+
}
954+
}
955+
throw new IOException("Index commit [" + sis.getSegmentsFileName() + "] is not found");
956+
}
957+
948958
@Override
949959
public Iterator<StoreFileMetaData> iterator() {
950960
return metadata.values().iterator();

server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.elasticsearch.indices.flush;
2020

2121
import org.apache.logging.log4j.message.ParameterizedMessage;
22+
import org.apache.lucene.index.SegmentInfos;
2223
import org.elasticsearch.Assertions;
2324
import org.elasticsearch.ElasticsearchException;
2425
import org.elasticsearch.Version;
@@ -41,13 +42,13 @@
4142
import org.elasticsearch.common.inject.Inject;
4243
import org.elasticsearch.common.io.stream.StreamInput;
4344
import org.elasticsearch.common.io.stream.StreamOutput;
45+
import org.elasticsearch.common.lucene.Lucene;
4446
import org.elasticsearch.common.settings.Settings;
4547
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
4648
import org.elasticsearch.common.util.concurrent.CountDown;
4749
import org.elasticsearch.index.Index;
4850
import org.elasticsearch.index.IndexNotFoundException;
4951
import org.elasticsearch.index.IndexService;
50-
import org.elasticsearch.index.engine.CommitStats;
5152
import org.elasticsearch.index.engine.Engine;
5253
import org.elasticsearch.index.shard.IndexEventListener;
5354
import org.elasticsearch.index.shard.IndexShard;
@@ -467,15 +468,19 @@ public String executor() {
467468
}
468469
}
469470

470-
private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest request) {
471+
private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest request) throws IOException {
471472
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
472473
FlushRequest flushRequest = new FlushRequest().force(false).waitIfOngoing(true);
473474
logger.trace("{} performing pre sync flush", request.shardId());
474475
indexShard.flush(flushRequest);
475-
final CommitStats commitStats = indexShard.commitStats();
476-
final Engine.CommitId commitId = commitStats.getRawCommitId();
477-
logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, commitStats.getNumDocs());
478-
return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs(), commitStats.syncId());
476+
try (Engine.IndexCommitRef commitRef = indexShard.acquireLastIndexCommit(false)) {
477+
final SegmentInfos segmentInfos = Lucene.readSegmentInfos(commitRef.getIndexCommit());
478+
final int numDocs = Lucene.getExactNumDocs(commitRef.getIndexCommit());
479+
final Engine.CommitId commitId = new Engine.CommitId(segmentInfos.getId());
480+
final String syncId = segmentInfos.userData.get(Engine.SYNC_COMMIT_ID);
481+
logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, numDocs);
482+
return new PreSyncedFlushResponse(commitId, numDocs, syncId);
483+
}
479484
}
480485

481486
private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) {

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ public RecoveryResponse newInstance() {
289289
* @param recoveryTarget the target of the recovery
290290
* @return a snapshot of the store metadata
291291
*/
292-
private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) {
292+
static Store.MetadataSnapshot getStoreMetadataSnapshot(final Logger logger, final RecoveryTarget recoveryTarget) {
293293
try {
294294
return recoveryTarget.indexShard().snapshotStoreMetadata();
295295
} catch (final org.apache.lucene.index.IndexNotFoundException e) {
@@ -312,7 +312,7 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
312312
final StartRecoveryRequest request;
313313
logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());
314314

315-
final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget);
315+
final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(logger, recoveryTarget);
316316
logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size());
317317

318318
final long startingSeqNo;

server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,18 @@
2424
import org.apache.lucene.index.IndexWriter;
2525
import org.apache.lucene.index.IndexWriterConfig;
2626
import org.apache.lucene.index.NoMergePolicy;
27+
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
2728
import org.elasticsearch.common.UUIDs;
2829
import org.elasticsearch.index.seqno.SequenceNumbers;
2930
import org.elasticsearch.index.shard.IndexShard;
3031
import org.elasticsearch.index.shard.IndexShardTestCase;
3132
import org.elasticsearch.index.translog.Translog;
3233

3334
import java.util.HashMap;
35+
import java.util.HashSet;
3436
import java.util.List;
3537
import java.util.Map;
38+
import java.util.Set;
3639

3740
import static org.hamcrest.Matchers.equalTo;
3841

@@ -108,4 +111,33 @@ public void testGetStartingSeqNo() throws Exception {
108111
closeShards(replica);
109112
}
110113
}
114+
115+
public void testExactNumDocsInStoreMetadataSnapshot() throws Exception {
116+
final IndexShard replica = newShard(false);
117+
recoveryEmptyReplica(replica);
118+
long flushedDocs = 0;
119+
final int numDocs = scaledRandomIntBetween(1, 20);
120+
final Set<String> docIds = new HashSet<>();
121+
for (int i = 0; i < numDocs; i++) {
122+
String id = Integer.toString(i);
123+
docIds.add(id);
124+
indexDoc(replica, "_doc", id);
125+
if (randomBoolean()) {
126+
replica.flush(new FlushRequest());
127+
flushedDocs = docIds.size();
128+
}
129+
}
130+
for (String id : randomSubsetOf(docIds)) {
131+
deleteDoc(replica, "_doc", id);
132+
docIds.remove(id);
133+
if (randomBoolean()) {
134+
replica.flush(new FlushRequest());
135+
flushedDocs = docIds.size();
136+
}
137+
}
138+
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
139+
assertThat(PeerRecoveryTargetService.getStoreMetadataSnapshot(logger, recoveryTarget).getNumDocs(), equalTo(flushedDocs));
140+
recoveryTarget.decRef();
141+
closeShards(replica);
142+
}
111143
}

test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
2727
import org.apache.logging.log4j.Logger;
2828
import org.apache.lucene.store.AlreadyClosedException;
29+
import org.elasticsearch.common.collect.Tuple;
30+
import org.elasticsearch.common.lucene.Lucene;
2931
import org.elasticsearch.core.internal.io.IOUtils;
3032
import org.elasticsearch.ElasticsearchException;
3133
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
@@ -76,7 +78,9 @@
7678
import org.elasticsearch.index.engine.CommitStats;
7779
import org.elasticsearch.index.engine.Engine;
7880
import org.elasticsearch.index.engine.EngineTestCase;
81+
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
7982
import org.elasticsearch.index.shard.IndexShard;
83+
import org.elasticsearch.index.shard.IndexShardState;
8084
import org.elasticsearch.index.shard.IndexShardTestCase;
8185
import org.elasticsearch.index.shard.ShardId;
8286
import org.elasticsearch.indices.IndicesService;
@@ -1104,8 +1108,7 @@ public void beforeIndexDeletion() throws Exception {
11041108
// ElasticsearchIntegrationTest must override beforeIndexDeletion() to avoid failures.
11051109
assertNoPendingIndexOperations();
11061110
//check that shards that have same sync id also contain same number of documents
1107-
// norelease - AwaitsFix: https://github.com/elastic/elasticsearch/pull/30228
1108-
// assertSameSyncIdSameDocs();
1111+
assertSameSyncIdSameDocs();
11091112
assertOpenTranslogReferences();
11101113
}
11111114

@@ -1116,23 +1119,39 @@ private void assertSameSyncIdSameDocs() {
11161119
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
11171120
for (IndexService indexService : indexServices) {
11181121
for (IndexShard indexShard : indexService) {
1119-
CommitStats commitStats = indexShard.commitStats();
1120-
if (commitStats != null) { // null if the engine is closed or if the shard is recovering
1121-
String syncId = commitStats.getUserData().get(Engine.SYNC_COMMIT_ID);
1122-
if (syncId != null) {
1123-
long liveDocsOnShard = commitStats.getNumDocs();
1124-
if (docsOnShards.get(syncId) != null) {
1125-
assertThat("sync id is equal but number of docs does not match on node " + nodeAndClient.name + ". expected " + docsOnShards.get(syncId) + " but got " + liveDocsOnShard, docsOnShards.get(syncId), equalTo(liveDocsOnShard));
1126-
} else {
1127-
docsOnShards.put(syncId, liveDocsOnShard);
1128-
}
1122+
Tuple<String, Integer> commitStats = commitStats(indexShard);
1123+
if (commitStats != null) {
1124+
String syncId = commitStats.v1();
1125+
long liveDocsOnShard = commitStats.v2();
1126+
if (docsOnShards.get(syncId) != null) {
1127+
assertThat("sync id is equal but number of docs does not match on node " + nodeAndClient.name +
1128+
". expected " + docsOnShards.get(syncId) + " but got " + liveDocsOnShard, docsOnShards.get(syncId),
1129+
equalTo(liveDocsOnShard));
1130+
} else {
1131+
docsOnShards.put(syncId, liveDocsOnShard);
11291132
}
11301133
}
11311134
}
11321135
}
11331136
}
11341137
}
11351138

1139+
private Tuple<String, Integer> commitStats(IndexShard indexShard) {
1140+
try (Engine.IndexCommitRef commitRef = indexShard.acquireLastIndexCommit(false)) {
1141+
final String syncId = commitRef.getIndexCommit().getUserData().get(Engine.SYNC_COMMIT_ID);
1142+
// Only read if sync_id exists
1143+
if (Strings.hasText(syncId)) {
1144+
return Tuple.tuple(syncId, Lucene.getExactNumDocs(commitRef.getIndexCommit()));
1145+
} else {
1146+
return null;
1147+
}
1148+
} catch (IllegalIndexShardStateException ex) {
1149+
return null; // Shard is closed or not started yet.
1150+
} catch (IOException ex) {
1151+
throw new AssertionError(ex);
1152+
}
1153+
}
1154+
11361155
private void assertNoPendingIndexOperations() throws Exception {
11371156
assertBusy(() -> {
11381157
final Collection<NodeAndClient> nodesAndClients = nodes.values();

0 commit comments

Comments
 (0)