Skip to content

Fix Snapshots Recording Incorrect Max. Segment Counts #74291

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,45 @@
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class BlobStoreIncrementalityIT extends AbstractSnapshotIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), InternalSettingsPlugin.class);
}

public void testIncrementalBehaviorOnPrimaryFailover() throws InterruptedException, ExecutionException, IOException {
internalCluster().startMasterOnlyNode();
final String primaryNode = internalCluster().startDataOnlyNode();
Expand Down Expand Up @@ -166,6 +184,73 @@ public void testForceMergeCausesFullSnapshot() throws Exception {
assertThat(secondSnapshotShardStatus.getIncrementalFileCount(), greaterThan(0));
}

public void testRecordCorrectSegmentCountsWithBackgroundMerges() throws Exception {
final String repoName = "test-repo";
createRepository(repoName, "fs");

final String indexName = "test";
// disable merges
assertAcked(prepareCreate(indexName).setSettings(indexSettingsNoReplicas(1).put(MergePolicyConfig.INDEX_MERGE_ENABLED, "false")));

// create an empty snapshot so that later snapshots run as quickly as possible
createFullSnapshot(repoName, "empty");

// create a situation where we temporarily have a bunch of segments until the merges can catch up
long id = 0;
final int rounds = scaledRandomIntBetween(3, 5);
for (int i = 0; i < rounds; ++i) {
final int numDocs = scaledRandomIntBetween(100, 1000);
BulkRequestBuilder request = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int j = 0; j < numDocs; ++j) {
request.add(
Requests.indexRequest(indexName)
.id(Long.toString(id++))
.source(jsonBuilder().startObject().field("l", randomLong()).endObject())
);
}
assertNoFailures(request.get());
}

// snapshot with a bunch of unmerged segments
final SnapshotInfo before = createFullSnapshot(repoName, "snapshot-before");
final SnapshotInfo.IndexSnapshotDetails beforeIndexDetails = before.indexSnapshotDetails().get(indexName);
final long beforeSegmentCount = beforeIndexDetails.getMaxSegmentsPerShard();

// reactivate merges
assertAcked(admin().indices().prepareClose(indexName).get());
assertAcked(
admin().indices()
.prepareUpdateSettings(indexName)
.setSettings(
Settings.builder()
.put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 we know we did ≥3 rounds, each of which made ≥1 segment, and they're all pretty small (≤2MB) so this guarantees a merge IIUC.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ that was assumption here. Also ran a few hundred rounds of this to verify and it seems to hold 🤞

.put(MergePolicyConfig.INDEX_MERGE_ENABLED, "true")
)
);
assertAcked(admin().indices().prepareOpen(indexName).get());
assertEquals(0, admin().indices().prepareForceMerge(indexName).setFlush(true).get().getFailedShards());

// wait for merges to reduce segment count
assertBusy(() -> {
IndicesStatsResponse stats = client().admin().indices().prepareStats(indexName).setSegments(true).get();
assertThat(stats.getIndex(indexName).getPrimaries().getSegments().getCount(), lessThan(beforeSegmentCount));
}, 30L, TimeUnit.SECONDS);

final SnapshotInfo after = createFullSnapshot(repoName, "snapshot-after");
final int incrementalFileCount = clusterAdmin().prepareSnapshotStatus()
.setRepository(repoName)
.setSnapshots(after.snapshotId().getName())
.get()
.getSnapshots()
.get(0)
.getStats()
.getIncrementalFileCount();
assertEquals(0, incrementalFileCount);
logger.info("--> no files have changed between snapshots, asserting that segment counts are constant as well");
final SnapshotInfo.IndexSnapshotDetails afterIndexDetails = after.indexSnapshotDetails().get(indexName);
assertEquals(beforeSegmentCount, afterIndexDetails.getMaxSegmentsPerShard());
}

private void assertCountInIndexThenDelete(String index, long expectedCount) {
logger.info("--> asserting that index [{}] contains [{}] documents", index, expectedCount);
assertDocCount(index, expectedCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2773,7 +2773,7 @@ public void snapshotShard(SnapshotShardContext context) {
final ShardSnapshotResult shardSnapshotResult = new ShardSnapshotResult(
indexGeneration,
ByteSizeValue.ofBytes(blobStoreIndexShardSnapshot.totalSize()),
snapshotIndexCommit.getSegmentCount()
getSegmentInfoFileCount(blobStoreIndexShardSnapshot.indexFiles())
);
snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), shardSnapshotResult);
context.onResponse(shardSnapshotResult);
Expand Down