Skip to content

Allow Parallel Snapshot Restore And Delete (#51608) #51666

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ public ClusterState execute(ClusterState currentState) {
}
// Check if the snapshot to restore is currently being deleted
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
if (deletionsInProgress != null
&& deletionsInProgress.getEntries().stream().anyMatch(entry -> entry.getSnapshot().equals(snapshot))) {
throw new ConcurrentSnapshotExecutionException(snapshot,
"cannot restore a snapshot while a snapshot deletion is in-progress [" +
deletionsInProgress.getEntries().get(0).getSnapshot() + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1293,9 +1293,12 @@ public ClusterState execute(ClusterState currentState) {
// don't allow snapshot deletions while a restore is taking place,
// otherwise we could end up deleting a snapshot that is being restored
// and the files the restore depends on would all be gone
if (restoreInProgress.isEmpty() == false) {
throw new ConcurrentSnapshotExecutionException(snapshot,
"cannot delete snapshot during a restore in progress in [" + restoreInProgress + "]");

for (RestoreInProgress.Entry entry : restoreInProgress) {
if (entry.snapshot().equals(snapshot)) {
throw new ConcurrentSnapshotExecutionException(snapshot,
"cannot delete snapshot during a restore in progress in [" + restoreInProgress + "]");
}
}
}
ClusterState.Builder clusterStateBuilder = ClusterState.builder(currentState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,58 +152,4 @@ public void testSnapshottingWithInProgressDeletionNotAllowed() throws Exception
client().admin().cluster().prepareCreateSnapshot(repo, snapshot2).setWaitForCompletion(true).get();
assertEquals(1, client().admin().cluster().prepareGetSnapshots(repo).setSnapshots("_all").get().getSnapshots().size());
}

public void testRestoreWithInProgressDeletionsNotAllowed() throws Exception {
logger.info("--> creating repository");
final String repo = "test-repo";
assertAcked(client().admin().cluster().preparePutRepository(repo).setType("mock").setSettings(
Settings.builder()
.put("location", randomRepoPath())
.put("random", randomAlphaOfLength(10))
.put("wait_after_unblock", 200)).get());

logger.info("--> snapshot");
final String index = "test-idx";
assertAcked(prepareCreate(index, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0)));
for (int i = 0; i < 10; i++) {
index(index, "_doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
final String snapshot1 = "test-snap1";
client().admin().cluster().prepareCreateSnapshot(repo, snapshot1).setWaitForCompletion(true).get();
final String index2 = "test-idx2";
assertAcked(prepareCreate(index2, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0)));
for (int i = 0; i < 10; i++) {
index(index2, "_doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
final String snapshot2 = "test-snap2";
client().admin().cluster().prepareCreateSnapshot(repo, snapshot2).setWaitForCompletion(true).get();
client().admin().indices().prepareClose(index, index2).get();

String blockedNode = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true);
logger.info("--> start deletion of snapshot");
ActionFuture<AcknowledgedResponse> future = client().admin().cluster().prepareDeleteSnapshot(repo, snapshot2).execute();
logger.info("--> waiting for block to kick in on node [{}]", blockedNode);
waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10));

logger.info("--> try restoring the other snapshot, should fail because the deletion is in progress");
try {
client().admin().cluster().prepareRestoreSnapshot(repo, snapshot1).setWaitForCompletion(true).get();
fail("should not be able to restore a snapshot while another is being deleted");
} catch (ConcurrentSnapshotExecutionException e) {
assertThat(e.getMessage(), containsString("cannot restore a snapshot while a snapshot deletion is in-progress"));
}

logger.info("--> unblocking blocked node [{}]", blockedNode);
unblockNode(repo, blockedNode);

logger.info("--> wait until snapshot deletion is finished");
assertAcked(future.actionGet());

logger.info("--> restoring snapshot, which should now work");
client().admin().cluster().prepareRestoreSnapshot(repo, snapshot1).setWaitForCompletion(true).get();
assertEquals(1, client().admin().cluster().prepareGetSnapshots(repo).setSnapshots("_all").get().getSnapshots().size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2616,13 +2616,6 @@ public void testDeleteSnapshotWhileRestoringFails() throws Exception {
assertEquals(repoName, e.getRepositoryName());
assertEquals(snapshotName, e.getSnapshotName());
assertThat(e.getMessage(), containsString("cannot delete snapshot during a restore"));

logger.info("-- try deleting another snapshot while the restore is in progress (should throw an error)");
e = expectThrows(ConcurrentSnapshotExecutionException.class, () ->
client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName2).get());
assertEquals(repoName, e.getRepositoryName());
assertEquals(snapshotName2, e.getSnapshotName());
assertThat(e.getMessage(), containsString("cannot delete snapshot during a restore"));
} finally {
// unblock even if the try block fails otherwise we will get bogus failures when we delete all indices in test teardown.
logger.info("--> unblocking all data nodes");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@
import static org.elasticsearch.action.support.ActionTestUtils.assertNoFailureListener;
import static org.elasticsearch.env.Environment.PATH_HOME_SETTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.empty;
Expand Down Expand Up @@ -520,6 +521,92 @@ public void testConcurrentSnapshotCreateAndDeleteOther() {
}
}

public void testConcurrentSnapshotRestoreAndDeleteOther() {
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));

String repoName = "repo";
String snapshotName = "snapshot";
final String index = "test";
final int shards = randomIntBetween(1, 10);

TestClusterNodes.TestClusterNode masterNode =
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());

final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();

final int documentsFirstSnapshot = randomIntBetween(0, 100);

continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> indexNDocuments(
documentsFirstSnapshot, index, () -> client().admin().cluster()
.prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true).execute(createSnapshotResponseStepListener)));

final int documentsSecondSnapshot = randomIntBetween(0, 100);

final StepListener<CreateSnapshotResponse> createOtherSnapshotResponseStepListener = new StepListener<>();

final String secondSnapshotName = "snapshot-2";
continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> indexNDocuments(
documentsSecondSnapshot, index, () -> client().admin().cluster().prepareCreateSnapshot(repoName, secondSnapshotName)
.setWaitForCompletion(true).execute(createOtherSnapshotResponseStepListener)));

final StepListener<AcknowledgedResponse> deleteSnapshotStepListener = new StepListener<>();
final StepListener<RestoreSnapshotResponse> restoreSnapshotResponseListener = new StepListener<>();

continueOrDie(createOtherSnapshotResponseStepListener,
createSnapshotResponse -> {
scheduleNow(
() -> client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute(deleteSnapshotStepListener));
scheduleNow(() -> client().admin().cluster().restoreSnapshot(
new RestoreSnapshotRequest(repoName, secondSnapshotName).waitForCompletion(true)
.renamePattern("(.+)").renameReplacement("restored_$1"),
restoreSnapshotResponseListener));
});

final StepListener<SearchResponse> searchResponseListener = new StepListener<>();
continueOrDie(restoreSnapshotResponseListener, restoreSnapshotResponse -> {
assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards());
client().search(new SearchRequest("restored_" + index).source(new SearchSourceBuilder().size(0).trackTotalHits(true)),
searchResponseListener);
});

deterministicTaskQueue.runAllRunnableTasks();

assertEquals(documentsFirstSnapshot + documentsSecondSnapshot,
Objects.requireNonNull(searchResponseListener.result().getHits().getTotalHits()).value);
assertThat(deleteSnapshotStepListener.result().isAcknowledged(), is(true));
assertThat(restoreSnapshotResponseListener.result().getRestoreInfo().failedShards(), is(0));

final Repository repository = masterNode.repositoriesService.repository(repoName);
Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
assertThat(snapshotIds, contains(createOtherSnapshotResponseStepListener.result().getSnapshotInfo().snapshotId()));

for (SnapshotId snapshotId : snapshotIds) {
final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
assertThat(snapshotInfo.indices(), containsInAnyOrder(index));
assertEquals(shards, snapshotInfo.successfulShards());
assertEquals(0, snapshotInfo.failedShards());
}
}

private void indexNDocuments(int documents, String index, Runnable afterIndexing) {
if (documents == 0) {
afterIndexing.run();
return;
}
final BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < documents; ++i) {
bulkRequest.add(new IndexRequest(index).source(Collections.singletonMap("foo", "bar" + i)));
}
final StepListener<BulkResponse> bulkResponseStepListener = new StepListener<>();
client().bulk(bulkRequest, bulkResponseStepListener);
continueOrDie(bulkResponseStepListener, bulkResponse -> {
assertFalse("Failures in bulk response: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures());
assertEquals(documents, bulkResponse.getItems().length);
afterIndexing.run();
});
}

public void testConcurrentSnapshotDeleteAndDeleteIndex() throws IOException {
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));

Expand Down