31
31
import org .apache .hadoop .fs .Path ;
32
32
import org .apache .hadoop .hbase .HDFSBlocksDistribution ;
33
33
import org .apache .hadoop .hbase .HDFSBlocksDistribution .HostAndWeight ;
34
+ import org .apache .hadoop .hbase .HRegionLocation ;
34
35
import org .apache .hadoop .hbase .PrivateCellUtil ;
35
36
import org .apache .hadoop .hbase .client .ClientSideRegionScanner ;
37
+ import org .apache .hadoop .hbase .client .Connection ;
38
+ import org .apache .hadoop .hbase .client .ConnectionFactory ;
36
39
import org .apache .hadoop .hbase .client .IsolationLevel ;
37
40
import org .apache .hadoop .hbase .client .RegionInfo ;
41
+ import org .apache .hadoop .hbase .client .RegionLocator ;
38
42
import org .apache .hadoop .hbase .client .Result ;
39
43
import org .apache .hadoop .hbase .client .Scan ;
40
44
import org .apache .hadoop .hbase .client .TableDescriptor ;
@@ -101,6 +105,15 @@ public class TableSnapshotInputFormatImpl {
101
105
"hbase.TableSnapshotInputFormat.locality.enabled" ;
102
106
public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT = true ;
103
107
108
+ /**
109
+ * Whether to calculate the Snapshot region location by region location from meta.
110
+ * It is much faster than computing block locations for splits.
111
+ */
112
+ public static final String SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION =
113
+ "hbase.TableSnapshotInputFormat.locality.by.region.location" ;
114
+
115
+ public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT = false ;
116
+
104
117
/**
105
118
* In some scenario, scan limited rows on each InputSplit for sampling data extraction
106
119
*/
@@ -392,17 +405,49 @@ public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
392
405
SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED_DEFAULT );
393
406
scan .setScanMetricsEnabled (scanMetricsEnabled );
394
407
408
+ boolean useRegionLoc = conf .getBoolean (SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION ,
409
+ SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT );
410
+
411
+ Connection connection = null ;
412
+ RegionLocator regionLocator = null ;
413
+ if (localityEnabled && useRegionLoc ) {
414
+ Configuration newConf = new Configuration (conf );
415
+ newConf .setInt ("hbase.hconnection.threads.max" , 1 );
416
+ try {
417
+ connection = ConnectionFactory .createConnection (newConf );
418
+ regionLocator = connection .getRegionLocator (htd .getTableName ());
419
+
420
+ /* Get all locations for the table and cache it */
421
+ regionLocator .getAllRegionLocations ();
422
+ } finally {
423
+ if (connection != null ) {
424
+ connection .close ();
425
+ }
426
+ }
427
+ }
428
+
395
429
List <InputSplit > splits = new ArrayList <>();
396
430
for (RegionInfo hri : regionManifests ) {
397
431
// load region descriptor
432
+ List <String > hosts = null ;
433
+ if (localityEnabled ) {
434
+ if (regionLocator != null ) {
435
+ /* Get Location from the local cache */
436
+ HRegionLocation
437
+ location = regionLocator .getRegionLocation (hri .getStartKey (), false );
438
+
439
+ hosts = new ArrayList <>(1 );
440
+ hosts .add (location .getHostname ());
441
+ } else {
442
+ hosts = calculateLocationsForInputSplit (conf , htd , hri , tableDir );
443
+ }
444
+ }
398
445
399
446
if (numSplits > 1 ) {
400
447
byte [][] sp = sa .split (hri .getStartKey (), hri .getEndKey (), numSplits , true );
401
448
for (int i = 0 ; i < sp .length - 1 ; i ++) {
402
449
if (PrivateCellUtil .overlappingKeys (scan .getStartRow (), scan .getStopRow (), sp [i ],
403
450
sp [i + 1 ])) {
404
- List <String > hosts =
405
- calculateLocationsForInputSplit (conf , htd , hri , tableDir , localityEnabled );
406
451
407
452
Scan boundedScan = new Scan (scan );
408
453
if (scan .getStartRow ().length == 0 ) {
@@ -425,8 +470,7 @@ public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
425
470
} else {
426
471
if (PrivateCellUtil .overlappingKeys (scan .getStartRow (), scan .getStopRow (),
427
472
hri .getStartKey (), hri .getEndKey ())) {
428
- List <String > hosts =
429
- calculateLocationsForInputSplit (conf , htd , hri , tableDir , localityEnabled );
473
+
430
474
splits .add (new InputSplit (htd , hri , hosts , scan , restoreDir ));
431
475
}
432
476
}
@@ -440,14 +484,9 @@ public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
440
484
* only when localityEnabled is true.
441
485
*/
442
486
private static List <String > calculateLocationsForInputSplit (Configuration conf ,
443
- TableDescriptor htd , RegionInfo hri , Path tableDir , boolean localityEnabled )
444
- throws IOException {
445
- if (localityEnabled ) { // care block locality
446
- return getBestLocations (conf ,
447
- HRegion .computeHDFSBlocksDistribution (conf , htd , hri , tableDir ));
448
- } else { // do not care block locality
449
- return null ;
450
- }
487
+ TableDescriptor htd , RegionInfo hri , Path tableDir )
488
+ throws IOException {
489
+ return getBestLocations (conf , HRegion .computeHDFSBlocksDistribution (conf , htd , hri , tableDir ));
451
490
}
452
491
453
492
/**
0 commit comments