@@ -1887,63 +1887,78 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
1887
1887
snapshotStatus .moveToStarted (startTime , indexIncrementalFileCount ,
1888
1888
indexTotalNumberOfFiles , indexIncrementalSize , indexTotalFileSize );
1889
1889
1890
- final StepListener <Collection <Void >> allFilesUploadedListener = new StepListener <>();
1891
- allFilesUploadedListener .whenComplete (v -> {
1892
- final IndexShardSnapshotStatus .Copy lastSnapshotStatus =
1893
- snapshotStatus .moveToFinalize (snapshotIndexCommit .getGeneration ());
1894
-
1895
- // now create and write the commit point
1896
- final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot (snapshotId .getName (),
1897
- lastSnapshotStatus .getIndexVersion (),
1898
- indexCommitPointFiles ,
1899
- lastSnapshotStatus .getStartTime (),
1900
- threadPool .absoluteTimeInMillis () - lastSnapshotStatus .getStartTime (),
1901
- lastSnapshotStatus .getIncrementalFileCount (),
1902
- lastSnapshotStatus .getIncrementalSize ()
1903
- );
1904
-
1905
- logger .trace ("[{}] [{}] writing shard snapshot file" , shardId , snapshotId );
1890
+ final String indexGeneration ;
1891
+ final boolean writeShardGens = SnapshotsService .useShardGenerations (repositoryMetaVersion );
1892
+ // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones
1893
+ List <SnapshotFiles > newSnapshotsList = new ArrayList <>();
1894
+ newSnapshotsList .add (new SnapshotFiles (snapshotId .getName (), indexCommitPointFiles , shardStateIdentifier ));
1895
+ for (SnapshotFiles point : snapshots ) {
1896
+ newSnapshotsList .add (point );
1897
+ }
1898
+ final BlobStoreIndexShardSnapshots updatedBlobStoreIndexShardSnapshots = new BlobStoreIndexShardSnapshots (newSnapshotsList );
1899
+ final Runnable afterWriteSnapBlob ;
1900
+ if (writeShardGens ) {
1901
+ // When using shard generations we can safely write the index-${uuid} blob before writing out any of the actual data
1902
+ // for this shard since the uuid named blob will simply not be referenced in case of error and thus we will never
1903
+ // reference a generation that has not had all its files fully upload.
1904
+ indexGeneration = UUIDs .randomBase64UUID ();
1906
1905
try {
1907
- INDEX_SHARD_SNAPSHOT_FORMAT . write ( snapshot , shardContainer , snapshotId . getUUID (), compress );
1906
+ writeShardIndexBlob ( shardContainer , indexGeneration , updatedBlobStoreIndexShardSnapshots );
1908
1907
} catch (IOException e ) {
1909
- throw new IndexShardSnapshotFailedException (shardId , "Failed to write commit point" , e );
1910
- }
1911
- // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones
1912
- List <SnapshotFiles > newSnapshotsList = new ArrayList <>();
1913
- newSnapshotsList .add (new SnapshotFiles (snapshot .snapshot (), snapshot .indexFiles (), shardStateIdentifier ));
1914
- for (SnapshotFiles point : snapshots ) {
1915
- newSnapshotsList .add (point );
1908
+ throw new IndexShardSnapshotFailedException (shardId ,
1909
+ "Failed to write shard level snapshot metadata for [" + snapshotId + "] to ["
1910
+ + INDEX_SHARD_SNAPSHOTS_FORMAT .blobName (indexGeneration ) + "]" , e );
1916
1911
}
1917
- final List <String > blobsToDelete ;
1918
- final String indexGeneration ;
1919
- final boolean writeShardGens = SnapshotsService .useShardGenerations (repositoryMetaVersion );
1920
- if (writeShardGens ) {
1921
- indexGeneration = UUIDs .randomBase64UUID ();
1922
- blobsToDelete = Collections .emptyList ();
1923
- } else {
1924
- indexGeneration = Long .toString (Long .parseLong (fileListGeneration ) + 1 );
1925
- // Delete all previous index-N blobs
1926
- blobsToDelete = blobs .stream ().filter (blob -> blob .startsWith (SNAPSHOT_INDEX_PREFIX )).collect (Collectors .toList ());
1927
- assert blobsToDelete .stream ().mapToLong (b -> Long .parseLong (b .replaceFirst (SNAPSHOT_INDEX_PREFIX , "" )))
1912
+ afterWriteSnapBlob = () -> {};
1913
+ } else {
1914
+ // When not using shard generations we can only write the index-${N} blob after all other work for this shard has
1915
+ // completed.
1916
+ // Also, in case of numeric shard generations the data node has to take care of deleting old shard generations.
1917
+ indexGeneration = Long .toString (Long .parseLong (fileListGeneration ) + 1 );
1918
+ // Delete all previous index-N blobs
1919
+ final List <String > blobsToDelete = blobs .stream ().filter (blob -> blob .startsWith (SNAPSHOT_INDEX_PREFIX ))
1920
+ .collect (Collectors .toList ());
1921
+ assert blobsToDelete .stream ().mapToLong (b -> Long .parseLong (b .replaceFirst (SNAPSHOT_INDEX_PREFIX , "" )))
1928
1922
.max ().orElse (-1L ) < Long .parseLong (indexGeneration )
1929
1923
: "Tried to delete an index-N blob newer than the current generation [" + indexGeneration
1930
1924
+ "] when deleting index-N blobs " + blobsToDelete ;
1931
- }
1932
- try {
1933
- writeShardIndexBlob (shardContainer , indexGeneration , new BlobStoreIndexShardSnapshots (newSnapshotsList ));
1934
- } catch (IOException e ) {
1935
- throw new IndexShardSnapshotFailedException (shardId ,
1936
- "Failed to finalize snapshot creation [" + snapshotId + "] with shard index ["
1937
- + INDEX_SHARD_SNAPSHOTS_FORMAT .blobName (indexGeneration ) + "]" , e );
1938
- }
1939
- if (writeShardGens == false ) {
1925
+ afterWriteSnapBlob = () -> {
1926
+ try {
1927
+ writeShardIndexBlob (shardContainer , indexGeneration , updatedBlobStoreIndexShardSnapshots );
1928
+ } catch (IOException e ) {
1929
+ throw new IndexShardSnapshotFailedException (shardId ,
1930
+ "Failed to finalize snapshot creation [" + snapshotId + "] with shard index ["
1931
+ + INDEX_SHARD_SNAPSHOTS_FORMAT .blobName (indexGeneration ) + "]" , e );
1932
+ }
1940
1933
try {
1941
1934
deleteFromContainer (shardContainer , blobsToDelete );
1942
1935
} catch (IOException e ) {
1943
1936
logger .warn (() -> new ParameterizedMessage ("[{}][{}] failed to delete old index-N blobs during finalization" ,
1944
- snapshotId , shardId ), e );
1937
+ snapshotId , shardId ), e );
1945
1938
}
1939
+ };
1940
+ }
1941
+
1942
+ final StepListener <Collection <Void >> allFilesUploadedListener = new StepListener <>();
1943
+ allFilesUploadedListener .whenComplete (v -> {
1944
+ final IndexShardSnapshotStatus .Copy lastSnapshotStatus =
1945
+ snapshotStatus .moveToFinalize (snapshotIndexCommit .getGeneration ());
1946
+
1947
+ // now create and write the commit point
1948
+ logger .trace ("[{}] [{}] writing shard snapshot file" , shardId , snapshotId );
1949
+ try {
1950
+ INDEX_SHARD_SNAPSHOT_FORMAT .write (new BlobStoreIndexShardSnapshot (snapshotId .getName (),
1951
+ lastSnapshotStatus .getIndexVersion (),
1952
+ indexCommitPointFiles ,
1953
+ lastSnapshotStatus .getStartTime (),
1954
+ threadPool .absoluteTimeInMillis () - lastSnapshotStatus .getStartTime (),
1955
+ lastSnapshotStatus .getIncrementalFileCount (),
1956
+ lastSnapshotStatus .getIncrementalSize ()
1957
+ ), shardContainer , snapshotId .getUUID (), compress );
1958
+ } catch (IOException e ) {
1959
+ throw new IndexShardSnapshotFailedException (shardId , "Failed to write commit point" , e );
1946
1960
}
1961
+ afterWriteSnapBlob .run ();
1947
1962
snapshotStatus .moveToDone (threadPool .absoluteTimeInMillis (), indexGeneration );
1948
1963
listener .onResponse (indexGeneration );
1949
1964
}, listener ::onFailure );
0 commit comments