3030import org .apache .commons .io .FilenameUtils ;
3131import org .apache .commons .lang3 .StringUtils ;
3232import org .apache .hadoop .fs .FileSystem ;
33+ import org .apache .hadoop .fs .LocatedFileStatus ;
3334import org .apache .hadoop .fs .Path ;
35+ import org .apache .hadoop .fs .RemoteIterator ;
3436import org .apache .hadoop .hbase .TableName ;
3537import org .apache .hadoop .hbase .backup .BackupCopyJob ;
3638import org .apache .hadoop .hbase .backup .BackupInfo ;
4042import org .apache .hadoop .hbase .backup .BackupType ;
4143import org .apache .hadoop .hbase .backup .HBackupFileSystem ;
4244import org .apache .hadoop .hbase .backup .mapreduce .MapReduceBackupCopyJob ;
45+ import org .apache .hadoop .hbase .backup .mapreduce .MapReduceHFileSplitterJob ;
4346import org .apache .hadoop .hbase .backup .util .BackupUtils ;
4447import org .apache .hadoop .hbase .client .Admin ;
4548import org .apache .hadoop .hbase .client .ColumnFamilyDescriptor ;
4649import org .apache .hadoop .hbase .client .Connection ;
50+ import org .apache .hadoop .hbase .io .hfile .HFile ;
4751import org .apache .hadoop .hbase .mapreduce .WALPlayer ;
4852import org .apache .hadoop .hbase .snapshot .SnapshotDescriptionUtils ;
4953import org .apache .hadoop .hbase .snapshot .SnapshotManifest ;
54+ import org .apache .hadoop .hbase .snapshot .SnapshotRegionLocator ;
5055import org .apache .hadoop .hbase .util .CommonFSUtils ;
5156import org .apache .hadoop .hbase .util .HFileArchiveUtil ;
5257import org .apache .hadoop .hbase .wal .AbstractFSWALProvider ;
@@ -165,54 +170,60 @@ protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup) throws I
165170 LOG .debug ("copying archive {} to {}" , archive , tgt );
166171 archiveFiles .add (archive .toString ());
167172 }
173+ mergeSplitBulkloads (activeFiles , archiveFiles , srcTable );
174+ incrementalCopyBulkloadHFiles (tgtFs , srcTable );
168175 }
169-
170- copyBulkLoadedFiles (activeFiles , archiveFiles );
171176 return bulkLoads ;
172177 }
173178
174- private void copyBulkLoadedFiles (List <String > activeFiles , List <String > archiveFiles )
175- throws IOException {
176- try {
177- // Enable special mode of BackupDistCp
178- conf .setInt (MapReduceBackupCopyJob .NUMBER_OF_LEVELS_TO_PRESERVE_KEY , 5 );
179- // Copy active files
180- String tgtDest = backupInfo .getBackupRootDir () + Path .SEPARATOR + backupInfo .getBackupId ();
181- int attempt = 1 ;
182- while (activeFiles .size () > 0 ) {
183- LOG .info ("Copy " + activeFiles .size () + " active bulk loaded files. Attempt =" + attempt ++);
184- String [] toCopy = new String [activeFiles .size ()];
185- activeFiles .toArray (toCopy );
186- // Active file can be archived during copy operation,
187- // we need to handle this properly
188- try {
189- incrementalCopyHFiles (toCopy , tgtDest );
190- break ;
191- } catch (IOException e ) {
192- // Check if some files got archived
193- // Update active and archived lists
194- // When file is being moved from active to archive
195- // directory, the number of active files decreases
196- int numOfActive = activeFiles .size ();
197- updateFileLists (activeFiles , archiveFiles );
198- if (activeFiles .size () < numOfActive ) {
199- continue ;
200- }
201- // if not - throw exception
202- throw e ;
179+ private void mergeSplitBulkloads (List <String > activeFiles , List <String > archiveFiles ,
180+ TableName tn ) throws IOException {
181+ int attempt = 1 ;
182+
183+ while (!activeFiles .isEmpty ()) {
184+ LOG .info ("MergeSplit {} active bulk loaded files. Attempt={}" , activeFiles .size (), attempt ++);
185+ // Active file can be archived during copy operation,
186+ // we need to handle this properly
187+ try {
188+ mergeSplitBulkloads (activeFiles , tn );
189+ break ;
190+ } catch (IOException e ) {
191+ int numActiveFiles = activeFiles .size ();
192+ updateFileLists (activeFiles , archiveFiles );
193+ if (activeFiles .size () < numActiveFiles ) {
194+ continue ;
203195 }
196+
197+ throw e ;
204198 }
205- // If incremental copy will fail for archived files
206- // we will have partially loaded files in backup destination (only files from active data
207- // directory). It is OK, because the backup will marked as FAILED and data will be cleaned up
208- if (archiveFiles .size () > 0 ) {
209- String [] toCopy = new String [archiveFiles .size ()];
210- archiveFiles .toArray (toCopy );
211- incrementalCopyHFiles (toCopy , tgtDest );
212- }
213- } finally {
214- // Disable special mode of BackupDistCp
215- conf .unset (MapReduceBackupCopyJob .NUMBER_OF_LEVELS_TO_PRESERVE_KEY );
199+ }
200+
201+ if (!archiveFiles .isEmpty ()) {
202+ mergeSplitBulkloads (archiveFiles , tn );
203+ }
204+ }
205+
206+ private void mergeSplitBulkloads (List <String > files , TableName tn ) throws IOException {
207+ MapReduceHFileSplitterJob player = new MapReduceHFileSplitterJob ();
208+ conf .set (MapReduceHFileSplitterJob .BULK_OUTPUT_CONF_KEY ,
209+ getBulkOutputDirForTable (tn ).toString ());
210+ player .setConf (conf );
211+
212+ String inputDirs = StringUtils .join (files , "," );
213+ String [] args = { inputDirs , tn .getNameWithNamespaceInclAsString () };
214+
215+ int result ;
216+
217+ try {
218+ result = player .run (args );
219+ } catch (Exception e ) {
220+ LOG .error ("Failed to run MapReduceHFileSplitterJob" , e );
221+ throw new IOException (e );
222+ }
223+
224+ if (result != 0 ) {
225+ throw new IOException (
226+ "Failed to run MapReduceHFileSplitterJob with invalid result: " + result );
216227 }
217228 }
218229
@@ -263,6 +274,7 @@ public void execute() throws IOException, ColumnFamilyMismatchException {
263274 try {
264275 // copy out the table and region info files for each table
265276 BackupUtils .copyTableRegionInfo (conn , backupInfo , conf );
277+ setupRegionLocator ();
266278 // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
267279 convertWALsToHFiles ();
268280 incrementalCopyHFiles (new String [] { getBulkOutputDir ().toString () },
@@ -405,6 +417,29 @@ protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws
405417 }
406418 }
407419
420+ private void incrementalCopyBulkloadHFiles (FileSystem tgtFs , TableName tn ) throws IOException {
421+ Path bulkOutDir = getBulkOutputDirForTable (tn );
422+ FileSystem fs = FileSystem .get (conf );
423+
424+ if (fs .exists (bulkOutDir )) {
425+ conf .setInt (MapReduceBackupCopyJob .NUMBER_OF_LEVELS_TO_PRESERVE_KEY , 2 );
426+ Path tgtPath = getTargetDirForTable (tn );
427+ try {
428+ RemoteIterator <LocatedFileStatus > locatedFiles = tgtFs .listFiles (bulkOutDir , true );
429+ List <String > files = new ArrayList <>();
430+ while (locatedFiles .hasNext ()) {
431+ LocatedFileStatus file = locatedFiles .next ();
432+ if (file .isFile () && HFile .isHFileFormat (tgtFs , file .getPath ())) {
433+ files .add (file .getPath ().toString ());
434+ }
435+ }
436+ incrementalCopyHFiles (files .toArray (files .toArray (new String [0 ])), tgtPath .toString ());
437+ } finally {
438+ conf .unset (MapReduceBackupCopyJob .NUMBER_OF_LEVELS_TO_PRESERVE_KEY );
439+ }
440+ }
441+ }
442+
408443 protected Path getBulkOutputDirForTable (TableName table ) {
409444 Path tablePath = getBulkOutputDir ();
410445 tablePath = new Path (tablePath , table .getNamespaceAsString ());
@@ -420,6 +455,30 @@ protected Path getBulkOutputDir() {
420455 return path ;
421456 }
422457
458+ private Path getTargetDirForTable (TableName table ) {
459+ Path path = new Path (backupInfo .getBackupRootDir () + Path .SEPARATOR + backupInfo .getBackupId ());
460+ path = new Path (path , table .getNamespaceAsString ());
461+ path = new Path (path , table .getNameAsString ());
462+ return path ;
463+ }
464+
465+ private void setupRegionLocator () throws IOException {
466+ Map <TableName , String > fullBackupIds = getFullBackupIds ();
467+ try (BackupAdminImpl backupAdmin = new BackupAdminImpl (conn )) {
468+
469+ for (TableName tableName : backupInfo .getTables ()) {
470+ String fullBackupId = fullBackupIds .get (tableName );
471+ BackupInfo fullBackupInfo = backupAdmin .getBackupInfo (fullBackupId );
472+ String snapshotName = fullBackupInfo .getSnapshotName (tableName );
473+ Path root = HBackupFileSystem .getTableBackupPath (tableName ,
474+ new Path (fullBackupInfo .getBackupRootDir ()), fullBackupId );
475+ String manifestDir =
476+ SnapshotDescriptionUtils .getCompletedSnapshotDir (snapshotName , root ).toString ();
477+ SnapshotRegionLocator .setSnapshotManifestDir (conf , manifestDir , tableName );
478+ }
479+ }
480+ }
481+
423482 private Map <TableName , String > getFullBackupIds () throws IOException {
424483 // Ancestors are stored from newest to oldest, so we can iterate backwards
425484 // in order to populate our backupId map with the most recent full backup
0 commit comments