Skip to content

Dry up Snapshot IT Infrastructure #62578

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -111,8 +112,7 @@ public void testConcurrentlyChangeRepositoryContents() throws Exception {
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

logger.info("--> delete snapshot");
client.admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get();
startDeleteSnapshot(repoName, snapshot).get();

logger.info("--> make sure snapshot doesn't exist");
expectThrows(SnapshotMissingException.class, () -> client.admin().cluster().prepareGetSnapshots(repoName)
Expand Down Expand Up @@ -180,8 +180,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
logger.info("--> verify index-N blob is found at the new location");
assertThat(getRepositoryData(repoName).getGenId(), is(beforeMoveGen + 1));

logger.info("--> delete snapshot");
client().admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get();
startDeleteSnapshot(repoName, snapshot).get();

logger.info("--> verify index-N blob is found at the expected location");
assertThat(getRepositoryData(repoName).getGenId(), is(beforeMoveGen + 2));
Expand Down Expand Up @@ -241,7 +240,7 @@ public void testHandlingMissingRootLevelSnapshotMetadata() throws Exception {
is(SnapshotsService.OLD_SNAPSHOT_FORMAT));

logger.info("--> verify that snapshot with missing root level metadata can be deleted");
assertAcked(client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotToCorrupt.getName()).get());
assertAcked(startDeleteSnapshot(repoName, snapshotToCorrupt.getName()).get());

logger.info("--> verify that repository is assumed in new metadata format after removing corrupted snapshot");
assertThat(PlainActionFuture.get(f -> threadPool.generic().execute(
Expand Down Expand Up @@ -291,7 +290,7 @@ public void testMountCorruptedRepositoryData() throws Exception {
expectThrows(RepositoryException.class, () -> getRepositoryData(otherRepo));
}

public void testHandleSnapshotErrorWithBwCFormat() throws IOException {
public void testHandleSnapshotErrorWithBwCFormat() throws IOException, ExecutionException, InterruptedException {
final String repoName = "test-repo";
final Path repoPath = randomRepoPath();
createRepository(repoName, "fs", repoPath);
Expand All @@ -315,13 +314,12 @@ public void testHandleSnapshotErrorWithBwCFormat() throws IOException {
assertFileExists(initialShardMetaPath);
Files.move(initialShardMetaPath, shardPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + "1"));

logger.info("--> delete old version snapshot");
client().admin().cluster().prepareDeleteSnapshot(repoName, oldVersionSnapshot).get();
startDeleteSnapshot(repoName, oldVersionSnapshot).get();

createFullSnapshot(repoName, "snapshot-2");
}

public void testRepairBrokenShardGenerations() throws IOException {
public void testRepairBrokenShardGenerations() throws Exception {
final String repoName = "test-repo";
final Path repoPath = randomRepoPath();
createRepository(repoName, "fs", repoPath);
Expand All @@ -336,8 +334,7 @@ public void testRepairBrokenShardGenerations() throws IOException {

createFullSnapshot(repoName, "snapshot-1");

logger.info("--> delete old version snapshot");
client().admin().cluster().prepareDeleteSnapshot(repoName, oldVersionSnapshot).get();
startDeleteSnapshot(repoName, oldVersionSnapshot).get();

logger.info("--> move shard level metadata to new generation and make RepositoryData point at an older generation");
final IndexId indexId = getRepositoryData(repoName).resolveIndexId(indexName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,11 @@
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.SnapshotsInProgress;
Expand Down Expand Up @@ -111,6 +108,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;

import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.elasticsearch.test.NodeRoles.nonMasterNode;
Expand Down Expand Up @@ -340,13 +338,13 @@ public void testRestoreCustomMetadata() throws Exception {
equalTo("before_snapshot_s_gw_noapi"));
}

private void updateClusterState(final ClusterStateUpdater updater) throws InterruptedException {
private void updateClusterState(final Function<ClusterState, ClusterState> updater) throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
clusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return updater.execute(currentState);
public ClusterState execute(ClusterState currentState) {
return updater.apply(currentState);
}

@Override
Expand All @@ -362,10 +360,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
countDownLatch.await();
}

private interface ClusterStateUpdater {
ClusterState execute(ClusterState currentState) throws Exception;
}

public void testSnapshotDuringNodeShutdown() throws Exception {
logger.info("--> start 2 nodes");
Client client = client();
Expand Down Expand Up @@ -857,10 +851,9 @@ public void testRestoreShrinkIndex() throws Exception {
assertAcked(client.admin().indices().prepareResizeIndex(sourceIdx, shrunkIdx).get());

logger.info("--> snapshot the shrunk index");
CreateSnapshotResponse createResponse = client.admin().cluster()
assertSuccessful(client.admin().cluster()
.prepareCreateSnapshot(repo, snapshot)
.setWaitForCompletion(true).setIndices(shrunkIdx).get();
assertEquals(SnapshotState.SUCCESS, createResponse.getSnapshotInfo().state());
.setWaitForCompletion(true).setIndices(shrunkIdx).execute());

logger.info("--> delete index and stop the data node");
assertAcked(client.admin().indices().prepareDelete(sourceIdx).get());
Expand Down Expand Up @@ -912,7 +905,7 @@ public void testSnapshotWithDateMath() {
assertThat(snapshots.get(0).getState().completed(), equalTo(true));
}

public void testSnapshotTotalAndIncrementalSizes() throws IOException {
public void testSnapshotTotalAndIncrementalSizes() throws Exception {
Client client = client();
final String indexName = "test-blocks-1";
final String repositoryName = "repo-" + indexName;
Expand Down Expand Up @@ -966,7 +959,7 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException {
createFullSnapshot(repositoryName, snapshot1);

// drop 1st one to avoid miscalculation as snapshot reuses some files of prev snapshot
assertAcked(client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot0).get());
assertAcked(startDeleteSnapshot(repositoryName, snapshot0).get());

response = client.admin().cluster().prepareSnapshotStatus(repositoryName)
.setSnapshots(snapshot1)
Expand Down Expand Up @@ -1209,24 +1202,9 @@ public void testAbortWaitsOnDataNode() throws Exception {
createRepository(repoName, "mock");
blockAllDataNodes(repoName);
final String snapshotName = "test-snap";
final ActionFuture<CreateSnapshotResponse> snapshotResponse =
client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true).execute();
final ActionFuture<CreateSnapshotResponse> snapshotResponse = startFullSnapshot(repoName, snapshotName);
waitForBlock(dataNodeName, repoName, TimeValue.timeValueSeconds(30L));

final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, otherDataNode);
final PlainActionFuture<Void> abortVisibleFuture = PlainActionFuture.newFuture();
clusterService.addListener(new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
final SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress != null && snapshotsInProgress.entries().stream()
.anyMatch(entry -> entry.state() == SnapshotsInProgress.State.ABORTED)) {
abortVisibleFuture.onResponse(null);
clusterService.removeListener(this);
}
}
});

final AtomicBoolean blocked = new AtomicBoolean(true);

final TransportService transportService = internalCluster().getInstance(TransportService.class, otherDataNode);
Expand All @@ -1241,10 +1219,10 @@ public void onRequestSent(DiscoveryNode node, long requestId, String action, Tra
});

logger.info("--> abort snapshot");
final ActionFuture<AcknowledgedResponse> deleteResponse =
client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute();
final ActionFuture<AcknowledgedResponse> deleteResponse = startDeleteSnapshot(repoName, snapshotName);

abortVisibleFuture.get(30L, TimeUnit.SECONDS);
awaitClusterState(otherDataNode, state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
.entries().stream().anyMatch(entry -> entry.state() == SnapshotsInProgress.State.ABORTED));

assertFalse("delete should not be able to finish until data node is unblocked", deleteResponse.isDone());
blocked.set(false);
Expand All @@ -1261,8 +1239,7 @@ public void testPartialSnapshotAllShardsMissing() throws Exception {
createIndex("some-index");
stopNode(dataNode);
ensureStableCluster(1);
final CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot(repoName, "test-snap")
.setPartial(true).setWaitForCompletion(true).get();
final CreateSnapshotResponse createSnapshotResponse = startFullSnapshot(repoName, "test-snap", true).get();
assertThat(createSnapshotResponse.getSnapshotInfo().state(), is(SnapshotState.PARTIAL));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.snapshots;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
Expand Down Expand Up @@ -101,6 +100,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
Expand Down Expand Up @@ -140,7 +140,6 @@
import static org.hamcrest.Matchers.nullValue;

// The tests in here do a lot of state updates and other writes to disk and are slowed down too much by WindowsFS
@LuceneTestCase.SuppressFileSystems(value = "WindowsFS")
public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCase {

@Override
Expand Down Expand Up @@ -423,8 +422,7 @@ public void testEmptySnapshot() throws Exception {
createRepository("test-repo", "fs");

logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true).get();
CreateSnapshotResponse createSnapshotResponse = startFullSnapshot("test-repo", "test-snap").get();
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(0));

Expand Down Expand Up @@ -1186,8 +1184,7 @@ public void testDeleteSnapshot() throws Exception {

assertDocCount("test-idx", 10L * numberOfSnapshots);

logger.info("--> delete the last snapshot");
client.admin().cluster().prepareDeleteSnapshot("test-repo", lastSnapshot).get();
startDeleteSnapshot("test-repo", lastSnapshot).get();
logger.info("--> make sure that number of files is back to what it was when the first snapshot was made");
assertFileCount(repo, numberOfFiles[0]);
}
Expand Down Expand Up @@ -1318,8 +1315,7 @@ public void testDeleteSnapshotWithMissingIndexAndShardMetadata() throws Exceptio
Files.delete(shardZero.resolve("snap-" + snapshotInfo.snapshotId().getUUID() + ".dat"));
}

logger.info("--> delete snapshot");
client.admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap-1").get();
startDeleteSnapshot("test-repo", "test-snap-1").get();

logger.info("--> make sure snapshot doesn't exist");

Expand Down Expand Up @@ -1354,8 +1350,7 @@ public void testDeleteSnapshotWithMissingMetadata() throws Exception {
Path metadata = repo.resolve("meta-" + createSnapshotResponse.getSnapshotInfo().snapshotId().getUUID() + ".dat");
Files.delete(metadata);

logger.info("--> delete snapshot");
client.admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap-1").get();
startDeleteSnapshot("test-repo", "test-snap-1").get();

logger.info("--> make sure snapshot doesn't exist");
expectThrows(SnapshotMissingException.class, () -> client.admin().cluster().prepareGetSnapshots("test-repo")
Expand Down Expand Up @@ -1388,8 +1383,7 @@ public void testDeleteSnapshotWithCorruptedSnapshotFile() throws Exception {
try(SeekableByteChannel outChan = Files.newByteChannel(snapshotPath, StandardOpenOption.WRITE)) {
outChan.truncate(randomInt(10));
}
logger.info("--> delete snapshot");
client.admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap-1").get();
startDeleteSnapshot("test-repo", "test-snap-1").get();

logger.info("--> make sure snapshot doesn't exist");
expectThrows(SnapshotMissingException.class,
Expand Down Expand Up @@ -1442,7 +1436,7 @@ public void testDeleteSnapshotWithCorruptedGlobalState() throws Exception {
assertThat(snapshotStatusResponse.getSnapshots(), hasSize(1));
assertThat(snapshotStatusResponse.getSnapshots().get(0).getSnapshot().getSnapshotId().getName(), equalTo("test-snap"));

assertAcked(client().admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap").get());
assertAcked(startDeleteSnapshot("test-repo", "test-snap").get());
expectThrows(SnapshotMissingException.class, () -> client().admin().cluster()
.prepareGetSnapshots("test-repo").addSnapshots("test-snap").get().getSnapshots("test-repo"));
assertRequestBuilderThrows(client().admin().cluster().prepareSnapshotStatus("test-repo").addSnapshots("test-snap"),
Expand Down Expand Up @@ -2637,7 +2631,7 @@ public void testRestoreSnapshotWithCorruptedIndexMetadata() throws Exception {
}
}

assertAcked(client().admin().cluster().prepareDeleteSnapshot("test-repo", snapshotInfo.snapshotId().getName()).get());
assertAcked(startDeleteSnapshot("test-repo", snapshotInfo.snapshotId().getName()).get());
}

/**
Expand Down Expand Up @@ -2756,8 +2750,7 @@ public void testCannotCreateSnapshotsWithSameName() throws Exception {
assertThat(e.getMessage(), containsString("snapshot with the same name already exists"));
}

logger.info("--> delete the first snapshot");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshotName).get();
startDeleteSnapshot(repositoryName, snapshotName).get();

logger.info("--> try creating a snapshot with the same name, now it should work because the first one was deleted");
createSnapshotResponse = client.admin()
Expand Down Expand Up @@ -2816,7 +2809,7 @@ public void testGetSnapshotsRequest() throws Exception {
assertEquals(1, getSnapshotsResponse.getSnapshots("test-repo").size());
assertEquals("snap-on-empty-repo", getSnapshotsResponse.getSnapshots("test-repo").get(0).snapshotId().getName());
unblockNode(repositoryName, initialBlockedNode); // unblock node
client.admin().cluster().prepareDeleteSnapshot(repositoryName, "snap-on-empty-repo").get();
startDeleteSnapshot(repositoryName, "snap-on-empty-repo").get();

final int numSnapshots = randomIntBetween(1, 3) + 1;
logger.info("--> take {} snapshot(s)", numSnapshots - 1);
Expand Down Expand Up @@ -3319,7 +3312,7 @@ public void testRestoreIncreasesPrimaryTerms() {
assertThat(restoredIndexMetadata.getSettings().get(IndexMetadata.SETTING_HISTORY_UUID), notNullValue());
}

public void testSnapshotDifferentIndicesBySameName() throws InterruptedException {
public void testSnapshotDifferentIndicesBySameName() throws InterruptedException, ExecutionException {
String indexName = "testindex";
String repoName = "test-repo";
Path absolutePath = randomRepoPath().toAbsolutePath();
Expand Down Expand Up @@ -3376,8 +3369,7 @@ public void testSnapshotDifferentIndicesBySameName() throws InterruptedException
snapshotToRestore = "snap-1";
expectedCount = docCount;
}
logger.info("--> deleting snapshot [{}]", snapshotToDelete);
assertAcked(client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotToDelete).get());
assertAcked(startDeleteSnapshot(repoName, snapshotToDelete).get());
logger.info("--> restoring snapshot [{}]", snapshotToRestore);
client().admin().cluster().prepareRestoreSnapshot(repoName, snapshotToRestore).setIndices(indexName).setRenamePattern(indexName)
.setRenameReplacement("restored-3").setWaitForCompletion(true).get();
Expand Down
Loading