Skip to content

Use exact numDocs in synced-flush and metadata snapshot #30228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
May 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions server/src/main/java/org/elasticsearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Explanation;
import org.apache.lucene.search.FieldDoc;
Expand Down Expand Up @@ -149,6 +150,16 @@ public static int getNumDocs(SegmentInfos info) {
return numDocs;
}

/**
* Unlike {@link #getNumDocs(SegmentInfos)} this method returns a numDocs that always excludes soft-deleted docs.
* This method is expensive thus prefer using {@link #getNumDocs(SegmentInfos)} unless an exact numDocs is required.
*/
public static int getExactNumDocs(IndexCommit commit) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

try (DirectoryReader reader = DirectoryReader.open(commit)) {
return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETE_FIELD).numDocs();
}
}

/**
* Reads the segments infos from the given commit, failing if it fails to load
*/
Expand Down
12 changes: 11 additions & 1 deletion server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
Map<String, String> commitUserDataBuilder = new HashMap<>();
try {
final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory);
numDocs = Lucene.getNumDocs(segmentCommitInfos);
numDocs = Lucene.getExactNumDocs(commit != null ? commit : findIndexCommit(directory, segmentCommitInfos));
commitUserDataBuilder.putAll(segmentCommitInfos.getUserData());
Version maxVersion = segmentCommitInfos.getMinSegmentLuceneVersion(); // we don't know which version was used to write so we take the max version.
for (SegmentCommitInfo info : segmentCommitInfos) {
Expand Down Expand Up @@ -945,6 +945,16 @@ public static void hashFile(BytesRefBuilder fileHash, InputStream in, long size)
assert fileHash.length() == len : Integer.toString(fileHash.length()) + " != " + Integer.toString(len);
}

private static IndexCommit findIndexCommit(Directory directory, SegmentInfos sis) throws IOException {
List<IndexCommit> commits = DirectoryReader.listCommits(directory);
for (IndexCommit commit : commits) {
if (commit.getSegmentsFileName().equals(sis.getSegmentsFileName())) {
return commit;
}
}
throw new IOException("Index commit [" + sis.getSegmentsFileName() + "] is not found");
}

@Override
public Iterator<StoreFileMetaData> iterator() {
return metadata.values().iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.indices.flush;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.Assertions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
Expand All @@ -41,13 +42,13 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -467,15 +468,19 @@ public String executor() {
}
}

private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest request) {
private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest request) throws IOException {
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
FlushRequest flushRequest = new FlushRequest().force(false).waitIfOngoing(true);
logger.trace("{} performing pre sync flush", request.shardId());
indexShard.flush(flushRequest);
final CommitStats commitStats = indexShard.commitStats();
final Engine.CommitId commitId = commitStats.getRawCommitId();
logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, commitStats.getNumDocs());
return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs(), commitStats.syncId());
try (Engine.IndexCommitRef commitRef = indexShard.acquireLastIndexCommit(false)) {
final SegmentInfos segmentInfos = Lucene.readSegmentInfos(commitRef.getIndexCommit());
final int numDocs = Lucene.getExactNumDocs(commitRef.getIndexCommit());
final Engine.CommitId commitId = new Engine.CommitId(segmentInfos.getId());
final String syncId = segmentInfos.userData.get(Engine.SYNC_COMMIT_ID);
logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, numDocs);
return new PreSyncedFlushResponse(commitId, numDocs, syncId);
}
}

private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public RecoveryResponse newInstance() {
* @param recoveryTarget the target of the recovery
* @return a snapshot of the store metadata
*/
private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) {
static Store.MetadataSnapshot getStoreMetadataSnapshot(final Logger logger, final RecoveryTarget recoveryTarget) {
try {
return recoveryTarget.indexShard().snapshotStoreMetadata();
} catch (final org.apache.lucene.index.IndexNotFoundException e) {
Expand All @@ -312,7 +312,7 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
final StartRecoveryRequest request;
logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());

final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget);
final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(logger, recoveryTarget);
logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size());

final long startingSeqNo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.translog.Translog;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -108,4 +111,33 @@ public void testGetStartingSeqNo() throws Exception {
closeShards(replica);
}
}

public void testExactNumDocsInStoreMetadataSnapshot() throws Exception {
final IndexShard replica = newShard(false);
recoveryEmptyReplica(replica);
long flushedDocs = 0;
final int numDocs = scaledRandomIntBetween(1, 20);
final Set<String> docIds = new HashSet<>();
for (int i = 0; i < numDocs; i++) {
String id = Integer.toString(i);
docIds.add(id);
indexDoc(replica, "_doc", id);
if (randomBoolean()) {
replica.flush(new FlushRequest());
flushedDocs = docIds.size();
}
}
for (String id : randomSubsetOf(docIds)) {
deleteDoc(replica, "_doc", id);
docIds.remove(id);
if (randomBoolean()) {
replica.flush(new FlushRequest());
flushedDocs = docIds.size();
}
}
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
assertThat(PeerRecoveryTargetService.getStoreMetadataSnapshot(logger, recoveryTarget).getNumDocs(), equalTo(flushedDocs));
recoveryTarget.decRef();
closeShards(replica);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
Expand Down Expand Up @@ -76,7 +78,9 @@
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -1104,8 +1108,7 @@ public void beforeIndexDeletion() throws Exception {
// ElasticsearchIntegrationTest must override beforeIndexDeletion() to avoid failures.
assertNoPendingIndexOperations();
//check that shards that have same sync id also contain same number of documents
// norelease - AwaitsFix: https://github.com/elastic/elasticsearch/pull/30228
// assertSameSyncIdSameDocs();
assertSameSyncIdSameDocs();
assertOpenTranslogReferences();
}

Expand All @@ -1116,23 +1119,39 @@ private void assertSameSyncIdSameDocs() {
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) {
CommitStats commitStats = indexShard.commitStats();
if (commitStats != null) { // null if the engine is closed or if the shard is recovering
String syncId = commitStats.getUserData().get(Engine.SYNC_COMMIT_ID);
if (syncId != null) {
long liveDocsOnShard = commitStats.getNumDocs();
if (docsOnShards.get(syncId) != null) {
assertThat("sync id is equal but number of docs does not match on node " + nodeAndClient.name + ". expected " + docsOnShards.get(syncId) + " but got " + liveDocsOnShard, docsOnShards.get(syncId), equalTo(liveDocsOnShard));
} else {
docsOnShards.put(syncId, liveDocsOnShard);
}
Tuple<String, Integer> commitStats = commitStats(indexShard);
if (commitStats != null) {
String syncId = commitStats.v1();
long liveDocsOnShard = commitStats.v2();
if (docsOnShards.get(syncId) != null) {
assertThat("sync id is equal but number of docs does not match on node " + nodeAndClient.name +
". expected " + docsOnShards.get(syncId) + " but got " + liveDocsOnShard, docsOnShards.get(syncId),
equalTo(liveDocsOnShard));
} else {
docsOnShards.put(syncId, liveDocsOnShard);
}
}
}
}
}
}

private Tuple<String, Integer> commitStats(IndexShard indexShard) {
try (Engine.IndexCommitRef commitRef = indexShard.acquireLastIndexCommit(false)) {
final String syncId = commitRef.getIndexCommit().getUserData().get(Engine.SYNC_COMMIT_ID);
// Only read if sync_id exists
if (Strings.hasText(syncId)) {
return Tuple.tuple(syncId, Lucene.getExactNumDocs(commitRef.getIndexCommit()));
} else {
return null;
}
} catch (IllegalIndexShardStateException ex) {
return null; // Shard is closed or not started yet.
} catch (IOException ex) {
throw new AssertionError(ex);
}
}

private void assertNoPendingIndexOperations() throws Exception {
assertBusy(() -> {
final Collection<NodeAndClient> nodesAndClients = nodes.values();
Expand Down