|
21 | 21 | import org.elasticsearch.Version;
|
22 | 22 | import org.elasticsearch.action.ActionRunnable;
|
23 | 23 | import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
| 24 | +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; |
| 25 | +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; |
| 26 | +import org.elasticsearch.action.index.IndexRequestBuilder; |
24 | 27 | import org.elasticsearch.action.support.PlainActionFuture;
|
25 | 28 | import org.elasticsearch.client.Client;
|
26 | 29 | import org.elasticsearch.cluster.ClusterState;
|
|
30 | 33 | import org.elasticsearch.common.settings.Settings;
|
31 | 34 | import org.elasticsearch.common.unit.ByteSizeUnit;
|
32 | 35 | import org.elasticsearch.common.xcontent.XContentFactory;
|
| 36 | +import org.elasticsearch.core.internal.io.IOUtils; |
33 | 37 | import org.elasticsearch.repositories.IndexId;
|
34 | 38 | import org.elasticsearch.repositories.IndexMetaDataGenerations;
|
35 | 39 | import org.elasticsearch.repositories.RepositoriesService;
|
|
41 | 45 | import org.elasticsearch.threadpool.ThreadPool;
|
42 | 46 |
|
43 | 47 | import java.io.IOException;
|
| 48 | +import java.nio.channels.SeekableByteChannel; |
44 | 49 | import java.nio.file.Files;
|
45 | 50 | import java.nio.file.Path;
|
46 | 51 | import java.nio.file.StandardOpenOption;
|
47 | 52 | import java.util.Collections;
|
| 53 | +import java.util.List; |
48 | 54 | import java.util.Locale;
|
49 | 55 | import java.util.Map;
|
50 | 56 | import java.util.concurrent.ExecutionException;
|
51 | 57 | import java.util.function.Function;
|
52 | 58 | import java.util.stream.Collectors;
|
| 59 | +import java.util.stream.Stream; |
53 | 60 |
|
54 | 61 | import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
55 | 62 | import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFileExists;
|
| 63 | +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertRequestBuilderThrows; |
56 | 64 | import static org.hamcrest.Matchers.containsString;
|
57 | 65 | import static org.hamcrest.Matchers.equalTo;
|
58 | 66 | import static org.hamcrest.Matchers.greaterThan;
|
| 67 | +import static org.hamcrest.Matchers.hasSize; |
59 | 68 | import static org.hamcrest.Matchers.is;
|
60 | 69 |
|
61 | 70 | public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCase {
|
@@ -347,6 +356,286 @@ public void testRepairBrokenShardGenerations() throws Exception {
|
347 | 356 | createFullSnapshot(repoName, "snapshot-2");
|
348 | 357 | }
|
349 | 358 |
|
| 359 | + /** |
| 360 | + * Tests that a shard snapshot with a corrupted shard index file can still be used for restore and incremental snapshots. |
| 361 | + */ |
| 362 | + public void testSnapshotWithCorruptedShardIndexFile() throws Exception { |
| 363 | + final Client client = client(); |
| 364 | + final Path repo = randomRepoPath(); |
| 365 | + final String indexName = "test-idx"; |
| 366 | + final int nDocs = randomIntBetween(1, 10); |
| 367 | + |
| 368 | + logger.info("--> creating index [{}] with [{}] documents in it", indexName, nDocs); |
| 369 | + assertAcked(prepareCreate(indexName).setSettings(indexSettingsNoReplicas(1))); |
| 370 | + |
| 371 | + final IndexRequestBuilder[] documents = new IndexRequestBuilder[nDocs]; |
| 372 | + for (int j = 0; j < nDocs; j++) { |
| 373 | + documents[j] = client.prepareIndex(indexName).setSource("foo", "bar"); |
| 374 | + } |
| 375 | + indexRandom(true, documents); |
| 376 | + flushAndRefresh(); |
| 377 | + |
| 378 | + createRepository("test-repo", "fs", repo); |
| 379 | + |
| 380 | + final String snapshot1 = "test-snap-1"; |
| 381 | + final SnapshotInfo snapshotInfo = createFullSnapshot("test-repo", snapshot1); |
| 382 | + assertThat(snapshotInfo.indices(), hasSize(1)); |
| 383 | + |
| 384 | + final RepositoryData repositoryData = getRepositoryData("test-repo"); |
| 385 | + final Map<String, IndexId> indexIds = repositoryData.getIndices(); |
| 386 | + assertThat(indexIds.size(), equalTo(1)); |
| 387 | + |
| 388 | + final IndexId corruptedIndex = indexIds.get(indexName); |
| 389 | + final Path shardIndexFile = repo.resolve("indices") |
| 390 | + .resolve(corruptedIndex.getId()).resolve("0") |
| 391 | + .resolve("index-" + repositoryData.shardGenerations().getShardGen(corruptedIndex, 0)); |
| 392 | + |
| 393 | + logger.info("--> truncating shard index file [{}]", shardIndexFile); |
| 394 | + try (SeekableByteChannel outChan = Files.newByteChannel(shardIndexFile, StandardOpenOption.WRITE)) { |
| 395 | + outChan.truncate(randomInt(10)); |
| 396 | + } |
| 397 | + |
| 398 | + logger.info("--> verifying snapshot state for [{}]", snapshot1); |
| 399 | + List<SnapshotInfo> snapshotInfos = clusterAdmin().prepareGetSnapshots("test-repo").get().getSnapshots("test-repo"); |
| 400 | + assertThat(snapshotInfos.size(), equalTo(1)); |
| 401 | + assertThat(snapshotInfos.get(0).state(), equalTo(SnapshotState.SUCCESS)); |
| 402 | + assertThat(snapshotInfos.get(0).snapshotId().getName(), equalTo(snapshot1)); |
| 403 | + |
| 404 | + logger.info("--> deleting index [{}]", indexName); |
| 405 | + assertAcked(client().admin().indices().prepareDelete(indexName)); |
| 406 | + |
| 407 | + logger.info("--> restoring snapshot [{}]", snapshot1); |
| 408 | + clusterAdmin().prepareRestoreSnapshot("test-repo", snapshot1) |
| 409 | + .setRestoreGlobalState(randomBoolean()) |
| 410 | + .setWaitForCompletion(true) |
| 411 | + .get(); |
| 412 | + ensureGreen(); |
| 413 | + |
| 414 | + assertDocCount(indexName, nDocs); |
| 415 | + |
| 416 | + logger.info("--> indexing [{}] more documents into [{}]", nDocs, indexName); |
| 417 | + for (int j = 0; j < nDocs; j++) { |
| 418 | + documents[j] = client.prepareIndex(indexName).setSource("foo2", "bar2"); |
| 419 | + } |
| 420 | + indexRandom(true, documents); |
| 421 | + |
| 422 | + final String snapshot2 = "test-snap-2"; |
| 423 | + logger.info("--> creating snapshot [{}]", snapshot2); |
| 424 | + final SnapshotInfo snapshotInfo2 = clusterAdmin().prepareCreateSnapshot("test-repo", snapshot2) |
| 425 | + .setWaitForCompletion(true) |
| 426 | + .get() |
| 427 | + .getSnapshotInfo(); |
| 428 | + assertThat(snapshotInfo2.state(), equalTo(SnapshotState.PARTIAL)); |
| 429 | + assertThat(snapshotInfo2.failedShards(), equalTo(1)); |
| 430 | + assertThat(snapshotInfo2.successfulShards(), equalTo(snapshotInfo.totalShards() - 1)); |
| 431 | + assertThat(snapshotInfo2.indices(), hasSize(1)); |
| 432 | + } |
| 433 | + |
| 434 | + public void testDeleteSnapshotWithMissingIndexAndShardMetadata() throws Exception { |
| 435 | + Client client = client(); |
| 436 | + |
| 437 | + Path repo = randomRepoPath(); |
| 438 | + createRepository("test-repo", "fs", repo); |
| 439 | + |
| 440 | + final String[] indices = {"test-idx-1", "test-idx-2"}; |
| 441 | + createIndex(indices); |
| 442 | + logger.info("--> indexing some data"); |
| 443 | + indexRandom(true, |
| 444 | + client().prepareIndex("test-idx-1").setSource("foo", "bar"), |
| 445 | + client().prepareIndex("test-idx-2").setSource("foo", "bar")); |
| 446 | + |
| 447 | + logger.info("--> creating snapshot"); |
| 448 | + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1") |
| 449 | + .setWaitForCompletion(true).setIndices(indices).get(); |
| 450 | + final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo(); |
| 451 | + assertThat(snapshotInfo.successfulShards(), greaterThan(0)); |
| 452 | + assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); |
| 453 | + |
| 454 | + final Map<String, IndexId> indexIds = getRepositoryData("test-repo").getIndices(); |
| 455 | + final Path indicesPath = repo.resolve("indices"); |
| 456 | + |
| 457 | + logger.info("--> delete index metadata and shard metadata"); |
| 458 | + for (String index : indices) { |
| 459 | + Path shardZero = indicesPath.resolve(indexIds.get(index).getId()).resolve("0"); |
| 460 | + if (randomBoolean()) { |
| 461 | + Files.delete(shardZero.resolve("index-" + getRepositoryData("test-repo").shardGenerations() |
| 462 | + .getShardGen(indexIds.get(index), 0))); |
| 463 | + } |
| 464 | + Files.delete(shardZero.resolve("snap-" + snapshotInfo.snapshotId().getUUID() + ".dat")); |
| 465 | + } |
| 466 | + |
| 467 | + startDeleteSnapshot("test-repo", "test-snap-1").get(); |
| 468 | + |
| 469 | + logger.info("--> make sure snapshot doesn't exist"); |
| 470 | + |
| 471 | + expectThrows(SnapshotMissingException.class, () -> client.admin().cluster().prepareGetSnapshots("test-repo") |
| 472 | + .addSnapshots("test-snap-1").get().getSnapshots("test-repo")); |
| 473 | + |
| 474 | + for (String index : indices) { |
| 475 | + assertTrue(Files.notExists(indicesPath.resolve(indexIds.get(index).getId()))); |
| 476 | + } |
| 477 | + } |
| 478 | + |
| 479 | + public void testDeleteSnapshotWithMissingMetadata() throws Exception { |
| 480 | + Client client = client(); |
| 481 | + |
| 482 | + Path repo = randomRepoPath(); |
| 483 | + createRepository("test-repo", "fs", repo); |
| 484 | + |
| 485 | + createIndex("test-idx-1", "test-idx-2"); |
| 486 | + logger.info("--> indexing some data"); |
| 487 | + indexRandom(true, |
| 488 | + client().prepareIndex("test-idx-1").setSource("foo", "bar"), |
| 489 | + client().prepareIndex("test-idx-2").setSource("foo", "bar")); |
| 490 | + |
| 491 | + logger.info("--> creating snapshot"); |
| 492 | + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1") |
| 493 | + .setWaitForCompletion(true).setIndices("test-idx-*").get(); |
| 494 | + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); |
| 495 | + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), |
| 496 | + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); |
| 497 | + |
| 498 | + logger.info("--> delete global state metadata"); |
| 499 | + Path metadata = repo.resolve("meta-" + createSnapshotResponse.getSnapshotInfo().snapshotId().getUUID() + ".dat"); |
| 500 | + Files.delete(metadata); |
| 501 | + |
| 502 | + startDeleteSnapshot("test-repo", "test-snap-1").get(); |
| 503 | + |
| 504 | + logger.info("--> make sure snapshot doesn't exist"); |
| 505 | + expectThrows(SnapshotMissingException.class, () -> client.admin().cluster().prepareGetSnapshots("test-repo") |
| 506 | + .addSnapshots("test-snap-1").get().getSnapshots("test-repo")); |
| 507 | + } |
| 508 | + |
| 509 | + public void testDeleteSnapshotWithCorruptedSnapshotFile() throws Exception { |
| 510 | + Client client = client(); |
| 511 | + |
| 512 | + Path repo = randomRepoPath(); |
| 513 | + createRepository("test-repo", "fs", Settings.builder() |
| 514 | + .put("location", repo).put("compress", false) |
| 515 | + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)); |
| 516 | + |
| 517 | + createIndex("test-idx-1", "test-idx-2"); |
| 518 | + logger.info("--> indexing some data"); |
| 519 | + indexRandom(true, |
| 520 | + client().prepareIndex("test-idx-1").setSource("foo", "bar"), |
| 521 | + client().prepareIndex("test-idx-2").setSource("foo", "bar")); |
| 522 | + |
| 523 | + logger.info("--> creating snapshot"); |
| 524 | + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1") |
| 525 | + .setWaitForCompletion(true).setIndices("test-idx-*").get(); |
| 526 | + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); |
| 527 | + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), |
| 528 | + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); |
| 529 | + |
| 530 | + logger.info("--> truncate snapshot file to make it unreadable"); |
| 531 | + Path snapshotPath = repo.resolve("snap-" + createSnapshotResponse.getSnapshotInfo().snapshotId().getUUID() + ".dat"); |
| 532 | + try(SeekableByteChannel outChan = Files.newByteChannel(snapshotPath, StandardOpenOption.WRITE)) { |
| 533 | + outChan.truncate(randomInt(10)); |
| 534 | + } |
| 535 | + startDeleteSnapshot("test-repo", "test-snap-1").get(); |
| 536 | + |
| 537 | + logger.info("--> make sure snapshot doesn't exist"); |
| 538 | + expectThrows(SnapshotMissingException.class, |
| 539 | + () -> client.admin().cluster().prepareGetSnapshots("test-repo").addSnapshots("test-snap-1").get(). |
| 540 | + getSnapshots("test-repo")); |
| 541 | + |
| 542 | + logger.info("--> make sure that we can create the snapshot again"); |
| 543 | + createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1") |
| 544 | + .setWaitForCompletion(true).setIndices("test-idx-*").get(); |
| 545 | + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); |
| 546 | + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), |
| 547 | + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); |
| 548 | + } |
| 549 | + |
| 550 | + /** Tests that a snapshot with a corrupted global state file can still be deleted */ |
| 551 | + public void testDeleteSnapshotWithCorruptedGlobalState() throws Exception { |
| 552 | + final Path repo = randomRepoPath(); |
| 553 | + |
| 554 | + createRepository("test-repo", "fs", Settings.builder() |
| 555 | + .put("location", repo) |
| 556 | + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)); |
| 557 | + |
| 558 | + createIndex("test-idx-1", "test-idx-2"); |
| 559 | + indexRandom(true, |
| 560 | + client().prepareIndex("test-idx-1").setSource("foo", "bar"), |
| 561 | + client().prepareIndex("test-idx-2").setSource("foo", "bar"), |
| 562 | + client().prepareIndex("test-idx-2").setSource("foo", "bar")); |
| 563 | + flushAndRefresh("test-idx-1", "test-idx-2"); |
| 564 | + |
| 565 | + SnapshotInfo snapshotInfo = createFullSnapshot("test-repo", "test-snap"); |
| 566 | + |
| 567 | + final Path globalStatePath = repo.resolve("meta-" + snapshotInfo.snapshotId().getUUID() + ".dat"); |
| 568 | + if (randomBoolean()) { |
| 569 | + // Delete the global state metadata file |
| 570 | + IOUtils.deleteFilesIgnoringExceptions(globalStatePath); |
| 571 | + } else { |
| 572 | + // Truncate the global state metadata file |
| 573 | + try (SeekableByteChannel outChan = Files.newByteChannel(globalStatePath, StandardOpenOption.WRITE)) { |
| 574 | + outChan.truncate(randomInt(10)); |
| 575 | + } |
| 576 | + } |
| 577 | + |
| 578 | + List<SnapshotInfo> snapshotInfos = clusterAdmin().prepareGetSnapshots("test-repo").get().getSnapshots("test-repo"); |
| 579 | + assertThat(snapshotInfos.size(), equalTo(1)); |
| 580 | + assertThat(snapshotInfos.get(0).state(), equalTo(SnapshotState.SUCCESS)); |
| 581 | + assertThat(snapshotInfos.get(0).snapshotId().getName(), equalTo("test-snap")); |
| 582 | + |
| 583 | + SnapshotsStatusResponse snapshotStatusResponse = |
| 584 | + clusterAdmin().prepareSnapshotStatus("test-repo").setSnapshots("test-snap").get(); |
| 585 | + assertThat(snapshotStatusResponse.getSnapshots(), hasSize(1)); |
| 586 | + assertThat(snapshotStatusResponse.getSnapshots().get(0).getSnapshot().getSnapshotId().getName(), equalTo("test-snap")); |
| 587 | + |
| 588 | + assertAcked(startDeleteSnapshot("test-repo", "test-snap").get()); |
| 589 | + expectThrows(SnapshotMissingException.class, () -> clusterAdmin() |
| 590 | + .prepareGetSnapshots("test-repo").addSnapshots("test-snap").get().getSnapshots("test-repo")); |
| 591 | + assertRequestBuilderThrows(clusterAdmin().prepareSnapshotStatus("test-repo").addSnapshots("test-snap"), |
| 592 | + SnapshotMissingException.class); |
| 593 | + |
| 594 | + createFullSnapshot("test-repo", "test-snap"); |
| 595 | + } |
| 596 | + |
| 597 | + public void testSnapshotWithMissingShardLevelIndexFile() throws Exception { |
| 598 | + disableRepoConsistencyCheck("This test uses a purposely broken repository so it would fail consistency checks"); |
| 599 | + |
| 600 | + Path repo = randomRepoPath(); |
| 601 | + createRepository("test-repo", "fs", repo); |
| 602 | + |
| 603 | + createIndex("test-idx-1", "test-idx-2"); |
| 604 | + logger.info("--> indexing some data"); |
| 605 | + indexRandom(true, |
| 606 | + client().prepareIndex("test-idx-1").setSource("foo", "bar"), |
| 607 | + client().prepareIndex("test-idx-2").setSource("foo", "bar")); |
| 608 | + |
| 609 | + logger.info("--> creating snapshot"); |
| 610 | + clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap-1") |
| 611 | + .setWaitForCompletion(true).setIndices("test-idx-*").get(); |
| 612 | + |
| 613 | + logger.info("--> deleting shard level index file"); |
| 614 | + final Path indicesPath = repo.resolve("indices"); |
| 615 | + for (IndexId indexId : getRepositoryData("test-repo").getIndices().values()) { |
| 616 | + final Path shardGen; |
| 617 | + try (Stream<Path> shardFiles = Files.list(indicesPath.resolve(indexId.getId()).resolve("0"))) { |
| 618 | + shardGen = shardFiles.filter(file -> file.getFileName().toString().startsWith(BlobStoreRepository.INDEX_FILE_PREFIX)) |
| 619 | + .findFirst().orElseThrow(() -> new AssertionError("Failed to find shard index blob")); |
| 620 | + } |
| 621 | + Files.delete(shardGen); |
| 622 | + } |
| 623 | + |
| 624 | + logger.info("--> creating another snapshot"); |
| 625 | + CreateSnapshotResponse createSnapshotResponse = |
| 626 | + clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap-2") |
| 627 | + .setWaitForCompletion(true).setIndices("test-idx-1").get(); |
| 628 | + assertEquals(createSnapshotResponse.getSnapshotInfo().successfulShards(), |
| 629 | + createSnapshotResponse.getSnapshotInfo().totalShards() - 1); |
| 630 | + |
| 631 | + logger.info("--> restoring the first snapshot, the repository should not have lost any shard data despite deleting index-N, " + |
| 632 | + "because it uses snap-*.data files and not the index-N to determine what files to restore"); |
| 633 | + client().admin().indices().prepareDelete("test-idx-1", "test-idx-2").get(); |
| 634 | + RestoreSnapshotResponse restoreSnapshotResponse = |
| 635 | + clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap-1").setWaitForCompletion(true).get(); |
| 636 | + assertEquals(0, restoreSnapshotResponse.getRestoreInfo().failedShards()); |
| 637 | + } |
| 638 | + |
350 | 639 | private void assertRepositoryBlocked(Client client, String repo, String existingSnapshot) {
|
351 | 640 | logger.info("--> try to delete snapshot");
|
352 | 641 | final RepositoryException repositoryException3 = expectThrows(RepositoryException.class,
|
|
0 commit comments