30
30
import org .apache .lucene .store .IndexInput ;
31
31
import org .apache .lucene .store .IndexOutput ;
32
32
import org .apache .lucene .store .RateLimiter ;
33
+ import org .apache .lucene .util .BytesRef ;
33
34
import org .apache .lucene .util .SetOnce ;
34
35
import org .elasticsearch .ExceptionsHelper ;
35
36
import org .elasticsearch .Version ;
@@ -182,7 +183,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
182
183
183
184
private static final String SNAPSHOT_INDEX_CODEC = "snapshots" ;
184
185
185
- private static final String DATA_BLOB_PREFIX = "__" ;
186
+ private static final String UPLOADED_DATA_BLOB_PREFIX = "__" ;
187
+
188
+ /**
189
+ * Prefix used for the identifiers of data blobs that were not actually written to the repository physically because their contents are
190
+ * already stored in the metadata referencing them, i.e. in {@link BlobStoreIndexShardSnapshot} and
191
+ * {@link BlobStoreIndexShardSnapshots}. This is the case for files for which {@link StoreFileMetaData#hashEqualsContents()} is
192
+ * {@code true}.
193
+ */
194
+ private static final String VIRTUAL_DATA_BLOB_PREFIX = "v__" ;
186
195
187
196
/**
188
197
* When set to true metadata files are stored in compressed format. This setting doesn’t affect index
@@ -1529,6 +1538,9 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
1529
1538
}
1530
1539
}
1531
1540
1541
+ // We can skip writing blobs where the metadata hash is equal to the blob's contents because we store the hash/contents
1542
+ // directly in the shard level metadata in this case
1543
+ final boolean needsWrite = md .hashEqualsContents () == false ;
1532
1544
indexTotalFileCount += md .length ();
1533
1545
indexTotalNumberOfFiles ++;
1534
1546
@@ -1537,9 +1549,14 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
1537
1549
indexIncrementalSize += md .length ();
1538
1550
// create a new FileInfo
1539
1551
BlobStoreIndexShardSnapshot .FileInfo snapshotFileInfo =
1540
- new BlobStoreIndexShardSnapshot .FileInfo (DATA_BLOB_PREFIX + UUIDs .randomBase64UUID (), md , chunkSize ());
1552
+ new BlobStoreIndexShardSnapshot .FileInfo (
1553
+ (needsWrite ? UPLOADED_DATA_BLOB_PREFIX : VIRTUAL_DATA_BLOB_PREFIX ) + UUIDs .randomBase64UUID (),
1554
+ md , chunkSize ());
1541
1555
indexCommitPointFiles .add (snapshotFileInfo );
1542
- filesToSnapshot .add (snapshotFileInfo );
1556
+ if (needsWrite ) {
1557
+ filesToSnapshot .add (snapshotFileInfo );
1558
+ }
1559
+ assert needsWrite || assertFileContentsMatchHash (snapshotFileInfo , store );
1543
1560
} else {
1544
1561
indexCommitPointFiles .add (existingFileInfo );
1545
1562
}
@@ -1548,8 +1565,6 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
1548
1565
snapshotStatus .moveToStarted (startTime , indexIncrementalFileCount ,
1549
1566
indexTotalNumberOfFiles , indexIncrementalSize , indexTotalFileCount );
1550
1567
1551
- assert indexIncrementalFileCount == filesToSnapshot .size ();
1552
-
1553
1568
final StepListener <Collection <Void >> allFilesUploadedListener = new StepListener <>();
1554
1569
allFilesUploadedListener .whenComplete (v -> {
1555
1570
final IndexShardSnapshotStatus .Copy lastSnapshotStatus =
@@ -1638,6 +1653,17 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
1638
1653
}
1639
1654
}
1640
1655
1656
+ private static boolean assertFileContentsMatchHash (BlobStoreIndexShardSnapshot .FileInfo fileInfo , Store store ) {
1657
+ try (IndexInput indexInput = store .openVerifyingInput (fileInfo .physicalName (), IOContext .READONCE , fileInfo .metadata ())) {
1658
+ final byte [] tmp = new byte [Math .toIntExact (fileInfo .metadata ().length ())];
1659
+ indexInput .readBytes (tmp , 0 , tmp .length );
1660
+ assert fileInfo .metadata ().hash ().bytesEquals (new BytesRef (tmp ));
1661
+ } catch (IOException e ) {
1662
+ throw new AssertionError (e );
1663
+ }
1664
+ return true ;
1665
+ }
1666
+
1641
1667
@ Override
1642
1668
public void restoreShard (Store store , SnapshotId snapshotId , IndexId indexId , ShardId snapshotShardId ,
1643
1669
RecoveryState recoveryState , ActionListener <Void > listener ) {
@@ -1681,38 +1707,42 @@ protected void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRe
1681
1707
1682
1708
private void restoreFile (BlobStoreIndexShardSnapshot .FileInfo fileInfo , Store store ) throws IOException {
1683
1709
boolean success = false ;
1684
-
1685
- try (InputStream stream = maybeRateLimit (new SlicedInputStream (fileInfo .numberOfParts ()) {
1686
- @ Override
1687
- protected InputStream openSlice (long slice ) throws IOException {
1688
- return container .readBlob (fileInfo .partName (slice ));
1689
- }
1690
- },
1691
- restoreRateLimiter , restoreRateLimitingTimeInNanos )) {
1692
- try (IndexOutput indexOutput =
1693
- store .createVerifyingOutput (fileInfo .physicalName (), fileInfo .metadata (), IOContext .DEFAULT )) {
1694
- final byte [] buffer = new byte [BUFFER_SIZE ];
1695
- int length ;
1696
- while ((length = stream .read (buffer )) > 0 ) {
1697
- indexOutput .writeBytes (buffer , 0 , length );
1698
- recoveryState .getIndex ().addRecoveredBytesToFile (fileInfo .physicalName (), length );
1699
- }
1700
- Store .verify (indexOutput );
1701
- indexOutput .close ();
1702
- store .directory ().sync (Collections .singleton (fileInfo .physicalName ()));
1703
- success = true ;
1704
- } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex ) {
1705
- try {
1706
- store .markStoreCorrupted (ex );
1707
- } catch (IOException e ) {
1708
- logger .warn ("store cannot be marked as corrupted" , e );
1709
- }
1710
- throw ex ;
1711
- } finally {
1712
- if (success == false ) {
1713
- store .deleteQuiet (fileInfo .physicalName ());
1710
+ try (IndexOutput indexOutput =
1711
+ store .createVerifyingOutput (fileInfo .physicalName (), fileInfo .metadata (), IOContext .DEFAULT )) {
1712
+ if (fileInfo .name ().startsWith (VIRTUAL_DATA_BLOB_PREFIX )) {
1713
+ final BytesRef hash = fileInfo .metadata ().hash ();
1714
+ indexOutput .writeBytes (hash .bytes , hash .offset , hash .length );
1715
+ recoveryState .getIndex ().addRecoveredBytesToFile (fileInfo .physicalName (), hash .length );
1716
+ } else {
1717
+ try (InputStream stream = maybeRateLimit (new SlicedInputStream (fileInfo .numberOfParts ()) {
1718
+ @ Override
1719
+ protected InputStream openSlice (long slice ) throws IOException {
1720
+ return container .readBlob (fileInfo .partName (slice ));
1721
+ }
1722
+ }, restoreRateLimiter , restoreRateLimitingTimeInNanos )) {
1723
+ final byte [] buffer = new byte [BUFFER_SIZE ];
1724
+ int length ;
1725
+ while ((length = stream .read (buffer )) > 0 ) {
1726
+ indexOutput .writeBytes (buffer , 0 , length );
1727
+ recoveryState .getIndex ().addRecoveredBytesToFile (fileInfo .physicalName (), length );
1728
+ }
1714
1729
}
1715
1730
}
1731
+ Store .verify (indexOutput );
1732
+ indexOutput .close ();
1733
+ store .directory ().sync (Collections .singleton (fileInfo .physicalName ()));
1734
+ success = true ;
1735
+ } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex ) {
1736
+ try {
1737
+ store .markStoreCorrupted (ex );
1738
+ } catch (IOException e ) {
1739
+ logger .warn ("store cannot be marked as corrupted" , e );
1740
+ }
1741
+ throw ex ;
1742
+ } finally {
1743
+ if (success == false ) {
1744
+ store .deleteQuiet (fileInfo .physicalName ());
1745
+ }
1716
1746
}
1717
1747
}
1718
1748
}.restore (snapshotFiles , store , l );
@@ -1843,7 +1873,7 @@ private static List<String> unusedBlobs(Set<String> blobs, Set<String> surviving
1843
1873
|| (blob .startsWith (SNAPSHOT_PREFIX ) && blob .endsWith (".dat" )
1844
1874
&& survivingSnapshotUUIDs .contains (
1845
1875
blob .substring (SNAPSHOT_PREFIX .length (), blob .length () - ".dat" .length ())) == false )
1846
- || (blob .startsWith (DATA_BLOB_PREFIX ) && updatedSnapshots .findNameFile (canonicalName (blob )) == null )
1876
+ || (blob .startsWith (UPLOADED_DATA_BLOB_PREFIX ) && updatedSnapshots .findNameFile (canonicalName (blob )) == null )
1847
1877
|| FsBlobContainer .isTempBlobName (blob )).collect (Collectors .toList ());
1848
1878
}
1849
1879
@@ -1897,7 +1927,7 @@ private Tuple<BlobStoreIndexShardSnapshots, Long> buildBlobStoreIndexShardSnapsh
1897
1927
final BlobStoreIndexShardSnapshots shardSnapshots = indexShardSnapshotsFormat .read (shardContainer , Long .toString (latest ));
1898
1928
return new Tuple <>(shardSnapshots , latest );
1899
1929
} else if (blobs .stream ().anyMatch (b -> b .startsWith (SNAPSHOT_PREFIX ) || b .startsWith (INDEX_FILE_PREFIX )
1900
- || b .startsWith (DATA_BLOB_PREFIX ))) {
1930
+ || b .startsWith (UPLOADED_DATA_BLOB_PREFIX ))) {
1901
1931
throw new IllegalStateException (
1902
1932
"Could not find a readable index-N file in a non-empty shard snapshot directory [" + shardContainer .path () + "]" );
1903
1933
}
0 commit comments