Skip to content

Commit d85d21e

Browse files
author
SiCheng-Zheng
committed
HBASE-27523 Add BulkLoad bandwidth throttling
1 parent 7ed2cb9 commit d85d21e

File tree

7 files changed

+467
-12
lines changed

7 files changed

+467
-12
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@
149149
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
150150
import org.apache.hadoop.hbase.regionserver.compactions.ForbidMajorCompactionChecker;
151151
import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationSink;
152+
import org.apache.hadoop.hbase.regionserver.throttle.BulkLoadThrottler;
152153
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
153154
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
154155
import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
@@ -7140,7 +7141,7 @@ private static boolean hasMultipleColumnFamilies(Collection<Pair<byte[], String>
71407141
*/
71417142
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
71427143
boolean assignSeqId, BulkLoadListener bulkLoadListener) throws IOException {
7143-
return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null, true);
7144+
return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null, true, null);
71447145
}
71457146

71467147
/**
@@ -7185,7 +7186,8 @@ String prepareBulkLoad(byte[] family, String srcPath, boolean copyFile, String c
71857186
*/
71867187
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
71877188
boolean assignSeqId, BulkLoadListener bulkLoadListener, boolean copyFile,
7188-
List<String> clusterIds, boolean replicate) throws IOException {
7189+
List<String> clusterIds, boolean replicate, BulkLoadThrottler bulkLoadThrottler)
7190+
throws IOException {
71897191
long seqId = -1;
71907192
Map<byte[], List<Path>> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
71917193
Map<String, Long> storeFilesSizes = new HashMap<>();
@@ -7281,7 +7283,10 @@ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> f
72817283
}
72827284
Pair<Path, Path> pair = null;
72837285
if (reqTmp) {
7284-
pair = store.preBulkLoadHFile(finalPath, seqId);
7286+
pair = store.preBulkLoadHFile(finalPath, seqId,
7287+
(bulkLoadThrottler != null && bulkLoadThrottler.isEnabled())
7288+
? bulkLoadThrottler
7289+
: null);
72857290
} else {
72867291
Path livePath = new Path(finalPath);
72877292
pair = new Pair<>(livePath, livePath);

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
5656
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
5757
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
58+
import org.apache.hadoop.hbase.regionserver.throttle.BulkLoadThrottler;
5859
import org.apache.hadoop.hbase.util.Bytes;
5960
import org.apache.hadoop.hbase.util.CommonFSUtils;
6061
import org.apache.hadoop.hbase.util.FSUtils;
@@ -552,8 +553,8 @@ public void removeStoreFiles(String familyName, Collection<HStoreFile> storeFile
552553
* @param seqNum Bulk Load sequence number
553554
* @return The destination {@link Path} of the bulk loaded file
554555
*/
555-
Pair<Path, Path> bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum)
556-
throws IOException {
556+
Pair<Path, Path> bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum,
557+
BulkLoadThrottler bulkLoadThrottler) throws IOException {
557558
// Copy the file if it's on another filesystem
558559
FileSystem srcFs = srcPath.getFileSystem(conf);
559560
srcPath = srcFs.resolvePath(srcPath);
@@ -567,7 +568,11 @@ Pair<Path, Path> bulkLoadStoreFile(final String familyName, Path srcPath, long s
567568
LOG.info("Bulk-load file " + srcPath + " is on different filesystem than "
568569
+ "the destination store. Copying file over to destination filesystem.");
569570
Path tmpPath = createTempName();
570-
FileUtil.copy(realSrcFs, srcPath, fs, tmpPath, false, conf);
571+
if (bulkLoadThrottler != null && bulkLoadThrottler.isEnabled()) {
572+
bulkLoadThrottler.copy(realSrcFs, srcPath, fs, tmpPath, false, conf);
573+
} else {
574+
FileUtil.copy(realSrcFs, srcPath, fs, tmpPath, false, conf);
575+
}
571576
LOG.info("Copied " + srcPath + " to temporary path on destination filesystem: " + tmpPath);
572577
srcPath = tmpPath;
573578
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3402,6 +3402,17 @@ public void onConfigurationChange(Configuration newConf) {
34023402
LOG.info("Update region server coprocessors because the configuration has changed");
34033403
this.rsHost = new RegionServerCoprocessorHost(this, newConf);
34043404
}
3405+
3406+
long oldBandWidth = secureBulkLoadManager.getBulkLoadThrottler().getConf().getLong(
3407+
SecureBulkLoadManager.HBASE_BULKLOAD_NODE_BANDWIDTH,
3408+
SecureBulkLoadManager.DEFAULT_HBASE_BULKLOAD_NODE_BANDWIDTH);
3409+
long newBandWidth = newConf.getLong(SecureBulkLoadManager.HBASE_BULKLOAD_NODE_BANDWIDTH,
3410+
SecureBulkLoadManager.DEFAULT_HBASE_BULKLOAD_NODE_BANDWIDTH);
3411+
if (oldBandWidth != newBandWidth) {
3412+
LOG.info("ConfigurationChange bulkload oldBandWidth is {} " + "newBandWidth is {}",
3413+
oldBandWidth, newBandWidth);
3414+
this.secureBulkLoadManager.getBulkLoadThrottler().setConf(newConf);
3415+
}
34053416
}
34063417

34073418
@Override

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
8686
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
8787
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
88+
import org.apache.hadoop.hbase.regionserver.throttle.BulkLoadThrottler;
8889
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
8990
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
9091
import org.apache.hadoop.hbase.security.EncryptionUtil;
@@ -670,9 +671,11 @@ public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
670671
* HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this)
671672
* @param seqNum sequence Id associated with the HFile
672673
*/
673-
public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
674+
public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum,
675+
BulkLoadThrottler bulkLoadThrottler) throws IOException {
674676
Path srcPath = new Path(srcPathStr);
675-
return getRegionFileSystem().bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
677+
return getRegionFileSystem().bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum,
678+
bulkLoadThrottler);
676679
}
677680

678681
public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.hadoop.hbase.client.AsyncConnection;
4242
import org.apache.hadoop.hbase.ipc.RpcServer;
4343
import org.apache.hadoop.hbase.regionserver.HRegion.BulkLoadListener;
44+
import org.apache.hadoop.hbase.regionserver.throttle.BulkLoadThrottler;
4445
import org.apache.hadoop.hbase.security.User;
4546
import org.apache.hadoop.hbase.security.UserProvider;
4647
import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
@@ -106,6 +107,11 @@ public class SecureBulkLoadManager {
106107
private UserProvider userProvider;
107108
private ConcurrentHashMap<UserGroupInformation, MutableInt> ugiReferenceCounter;
108109
private AsyncConnection conn;
110+
private Long defaultBulkLoadBandwidth;
111+
public final static String HBASE_BULKLOAD_NODE_BANDWIDTH =
112+
"hbase.regionserver.bulkload.node.bandwidth";
113+
public final static Long DEFAULT_HBASE_BULKLOAD_NODE_BANDWIDTH = 0L;
114+
private BulkLoadThrottler bulkLoadThrottler;
109115

110116
SecureBulkLoadManager(Configuration conf, AsyncConnection conn) {
111117
this.conf = conf;
@@ -130,6 +136,13 @@ public void start() throws IOException {
130136
fs.setPermission(baseStagingDir, PERM_HIDDEN);
131137
}
132138
}
139+
defaultBulkLoadBandwidth =
140+
conf.getLong(HBASE_BULKLOAD_NODE_BANDWIDTH, DEFAULT_HBASE_BULKLOAD_NODE_BANDWIDTH);
141+
bulkLoadThrottler = new BulkLoadThrottler(defaultBulkLoadBandwidth, conf);
142+
}
143+
144+
public BulkLoadThrottler getBulkLoadThrottler() {
145+
return bulkLoadThrottler;
133146
}
134147

135148
public void stop() throws IOException {
@@ -286,8 +299,10 @@ public Map<byte[], List<Path>> run() {
286299
// We call bulkLoadHFiles as requesting user
287300
// To enable access prior to staging
288301
return region.bulkLoadHFiles(familyPaths, true,
289-
new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(), clusterIds,
290-
request.getReplicate());
302+
(bulkLoadThrottler != null && bulkLoadThrottler.isEnabled())
303+
? new SecureBulkLoadListener(fs, bulkToken, conf, bulkLoadThrottler)
304+
: new SecureBulkLoadListener(fs, bulkToken, conf),
305+
request.getCopyFile(), clusterIds, request.getReplicate(), null);
291306
} catch (Exception e) {
292307
LOG.error("Failed to complete bulk load", e);
293308
}
@@ -348,6 +363,16 @@ static class SecureBulkLoadListener implements BulkLoadListener {
348363
private FileSystem srcFs = null;
349364
private Map<String, FsPermission> origPermissions = null;
350365
private Map<String, String> origSources = null;
366+
private BulkLoadThrottler bulkLoadThrottler;
367+
368+
public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf,
369+
BulkLoadThrottler bulkLoadThrottler) {
370+
this.fs = fs;
371+
this.stagingDir = stagingDir;
372+
this.conf = conf;
373+
this.origPermissions = new HashMap<>();
374+
this.bulkLoadThrottler = bulkLoadThrottler;
375+
}
351376

352377
public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) {
353378
this.fs = fs;
@@ -389,10 +414,18 @@ public String prepareBulkLoad(final byte[] family, final String srcPath, boolean
389414
if (!FSUtils.isSameHdfs(conf, srcFs, fs)) {
390415
LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than "
391416
+ "the destination filesystem. Copying file over to destination staging dir.");
392-
FileUtil.copy(srcFs, p, fs, stageP, false, conf);
417+
if (null != bulkLoadThrottler && bulkLoadThrottler.isEnabled()) {
418+
bulkLoadThrottler.copy(srcFs, p, fs, stageP, false, conf);
419+
} else {
420+
FileUtil.copy(srcFs, p, fs, stageP, false, conf);
421+
}
393422
} else if (copyFile) {
394423
LOG.debug("Bulk-load file " + srcPath + " is copied to destination staging dir.");
395-
FileUtil.copy(srcFs, p, fs, stageP, false, conf);
424+
if (null != bulkLoadThrottler && bulkLoadThrottler.isEnabled()) {
425+
bulkLoadThrottler.copy(srcFs, p, fs, stageP, false, conf);
426+
} else {
427+
FileUtil.copy(srcFs, p, fs, stageP, false, conf);
428+
}
396429
} else {
397430
LOG.debug("Moving " + p + " to " + stageP);
398431
FileStatus origFileStatus = fs.getFileStatus(p);

0 commit comments

Comments
 (0)