Skip to content

HBASE-26255 Add an option to use region location from meta table in T… #3670

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
Expand Down Expand Up @@ -101,6 +105,15 @@ public class TableSnapshotInputFormatImpl {
"hbase.TableSnapshotInputFormat.locality.enabled";
public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT = true;

/**
* Whether to calculate the Snapshot region location by region location from meta.
* It is much faster than computing block locations for splits.
*/
public static final String SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION =
"hbase.TableSnapshotInputFormat.locality.by.region.location";

public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT = false;

/**
* In some scenario, scan limited rows on each InputSplit for sampling data extraction
*/
Expand Down Expand Up @@ -392,17 +405,49 @@ public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED_DEFAULT);
scan.setScanMetricsEnabled(scanMetricsEnabled);

boolean useRegionLoc = conf.getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION,
SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT);

Connection connection = null;
RegionLocator regionLocator = null;
if (localityEnabled && useRegionLoc) {
Configuration newConf = new Configuration(conf);
newConf.setInt("hbase.hconnection.threads.max", 1);
try {
connection = ConnectionFactory.createConnection(newConf);
regionLocator = connection.getRegionLocator(htd.getTableName());

/* Get all locations for the table and cache it */
regionLocator.getAllRegionLocations();
} finally {
if (connection != null) {
connection.close();
}
}
}

List<InputSplit> splits = new ArrayList<>();
for (HRegionInfo hri : regionManifests) {
// load region descriptor
List<String> hosts = null;
if (localityEnabled) {
if (regionLocator != null) {
/* Get Location from the local cache */
HRegionLocation
location = regionLocator.getRegionLocation(hri.getStartKey(), false);

hosts = new ArrayList<>(1);
hosts.add(location.getHostname());
} else {
hosts = calculateLocationsForInputSplit(conf, htd, hri, tableDir);
}
}

if (numSplits > 1) {
byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true);
for (int i = 0; i < sp.length - 1; i++) {
if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i],
sp[i + 1])) {
List<String> hosts =
calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled);

Scan boundedScan = new Scan(scan);
if (scan.getStartRow().length == 0) {
Expand All @@ -425,8 +470,7 @@ public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
} else {
if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
hri.getStartKey(), hri.getEndKey())) {
List<String> hosts =
calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled);

splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
}
}
Expand All @@ -440,14 +484,9 @@ public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
* only when localityEnabled is true.
*/
private static List<String> calculateLocationsForInputSplit(Configuration conf,
TableDescriptor htd, HRegionInfo hri, Path tableDir, boolean localityEnabled)
TableDescriptor htd, HRegionInfo hri, Path tableDir)
throws IOException {
if (localityEnabled) { // care block locality
return getBestLocations(conf,
HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
} else { // do not care block locality
return null;
}
return getBestLocations(conf, HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT;
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY;
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT;
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION;
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -198,6 +200,18 @@ public void testInitTableSnapshotMapperJobConfig() throws Exception {
}
}

@Test
public void testWithMockedMapReduceSingleRegionByRegionLocation() throws Exception {
Configuration conf = UTIL.getConfiguration();
conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION, true);
try {
testWithMockedMapReduce(UTIL, name.getMethodName() + "Snapshot", 1, 1, 1,
true);
} finally {
conf.unset(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION);
}
}

@Override
public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName,
String snapshotName, Path tmpTableDir) throws Exception {
Expand All @@ -218,6 +232,8 @@ public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotNam

Configuration conf = util.getConfiguration();
conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, setLocalityEnabledTo);
conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION,
SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT);
Job job = new Job(conf);
Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan
Expand Down Expand Up @@ -406,13 +422,26 @@ private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumS
job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY,
SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT);

boolean byRegionLoc =
job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION,
SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT);
for (int i = 0; i < splits.size(); i++) {
// validate input split
InputSplit split = splits.get(i);
Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split;
if (localityEnabled) {
Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0);
if (byRegionLoc) {
// When it uses region location from meta, the hostname will be "localhost",
// the location from hdfs block location is "127.0.0.1".
Assert.assertEquals(1, split.getLocations().length);
Assert.assertTrue("Not using region location!",
split.getLocations()[0].equals("localhost"));
} else {
Assert.assertTrue("Not using region location!",
split.getLocations()[0].equals("127.0.0.1"));
}
} else {
Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0);
}
Expand Down