Skip to content

Commit 79408b5

Browse files
author
eddy.cao
committed
Add config to enable parallel upload fsimage to multiple other namenodes
1 parent 49266a6 commit 79408b5

File tree

4 files changed

+79
-20
lines changed

4 files changed

+79
-20
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
259259
public static final long DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT = 1000000;
260260
public static final String DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_KEY = "dfs.namenode.checkpoint.max-retries";
261261
public static final int DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_DEFAULT = 3;
262+
public static final String DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_KEY = "dfs.namenode.checkpoint.parallel.upload.enabled";
263+
public static final boolean DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_DEFAULT = false;
262264
public static final String DFS_NAMENODE_MISSING_CHECKPOINT_PERIODS_BEFORE_SHUTDOWN_KEY = "dfs.namenode.missing.checkpoint.periods.before.shutdown";
263265
public static final int DFS_NAMENODE_MISSING_CHECKPOINT_PERIODS_BEFORE_SHUTDOWN_DEFAULT = 3;
264266
public static final String DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY =

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ public class CheckpointConf {
5454
*/
5555
private double quietMultiplier;
5656

57+
/**
58+
* Whether enable the standby namenode to upload fsiamge to multiple other namenodes in
59+
* parallel, in the cluster with observer namenodes.
60+
*/
61+
private final boolean parallelUploadEnabled;
62+
5763
public CheckpointConf(Configuration conf) {
5864
checkpointCheckPeriod = conf.getTimeDuration(
5965
DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY,
@@ -68,6 +74,9 @@ public CheckpointConf(Configuration conf) {
6874
legacyOivImageDir = conf.get(DFS_NAMENODE_LEGACY_OIV_IMAGE_DIR_KEY);
6975
quietMultiplier = conf.getDouble(DFS_NAMENODE_CHECKPOINT_QUIET_MULTIPLIER_KEY,
7076
DFS_NAMENODE_CHECKPOINT_QUIET_MULTIPLIER_DEFAULT);
77+
parallelUploadEnabled = conf.getBoolean(
78+
DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_KEY,
79+
DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_DEFAULT);
7180
warnForDeprecatedConfigs(conf);
7281
}
7382

@@ -106,4 +115,8 @@ public String getLegacyOivImageDir() {
106115
public double getQuietPeriod() {
107116
return this.checkpointPeriod * this.quietMultiplier;
108117
}
118+
119+
public boolean isParallelUploadEnabled() {
120+
return parallelUploadEnabled;
121+
}
109122
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,10 +248,10 @@ private void doCheckpoint() throws InterruptedException, IOException {
248248
// Do this in a separate thread to avoid blocking transition to active, but don't allow more
249249
// than the expected number of tasks to run or queue up
250250
// See HDFS-4816
251-
ExecutorService executor =
252-
new ThreadPoolExecutor(activeNNAddresses.size(), activeNNAddresses.size(), 100,
253-
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(activeNNAddresses.size()),
254-
uploadThreadFactory);
251+
int poolSize = checkpointConf.isParallelUploadEnabled() ? activeNNAddresses.size() : 0;
252+
ExecutorService executor = new ThreadPoolExecutor(poolSize, activeNNAddresses.size(), 100,
253+
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(activeNNAddresses.size()),
254+
uploadThreadFactory);
255255
// for right now, just match the upload to the nn address by convention. There is no need to
256256
// directly tie them together by adding a pair class.
257257
HashMap<String, Future<TransferFsImage.TransferResult>> uploads =

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java

Lines changed: 60 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -451,21 +451,6 @@ public void testCheckpointCancellationDuringUpload() throws Exception {
451451
cluster.transitionToStandby(0);
452452
cluster.transitionToActive(1);
453453

454-
GenericTestUtils.waitFor(new Supplier<Boolean>() {
455-
@Override
456-
public Boolean get() {
457-
int transferThreadCount = 0;
458-
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
459-
ThreadInfo[] threads = threadBean.getThreadInfo(
460-
threadBean.getAllThreadIds(), 1);
461-
for (ThreadInfo thread: threads) {
462-
if (thread.getThreadName().startsWith("TransferFsImageUpload")) {
463-
transferThreadCount++;
464-
}
465-
}
466-
return transferThreadCount == NUM_NNS - 1;
467-
}
468-
}, 1000, 30000);
469454

470455
// Wait to make sure background TransferFsImageUpload thread was cancelled.
471456
// This needs to be done before the next test in the suite starts, so that a
@@ -491,7 +476,66 @@ public Boolean get() {
491476
// Assert that former active did not accept the canceled checkpoint file.
492477
assertEquals(0, nns[0].getFSImage().getMostRecentCheckpointTxId());
493478
}
494-
479+
480+
/**
481+
* Test standby namenode upload fsiamge to multiple other namenodes in parallel, in the
482+
* cluster with observer namenodes.
483+
*/
484+
@Test(timeout=60000)
485+
public void testCheckpointParallelUpload() throws Exception {
486+
// Set dfs.namenode.checkpoint.txns differently on the first NN to avoid it
487+
// doing checkpoint when it becomes a standby
488+
cluster.getConfiguration(0).setInt(
489+
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 1000);
490+
491+
// don't compress, we want a big image
492+
for (int i = 0; i < NUM_NNS; i++) {
493+
cluster.getConfiguration(i).setBoolean(
494+
DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, false);
495+
}
496+
497+
// Throttle SBN upload to make it hang during upload to ANN, and enable parallel upload fsimage.
498+
for (int i = 1; i < NUM_NNS; i++) {
499+
cluster.getConfiguration(i).setLong(
500+
DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY, 100);
501+
cluster.getConfiguration(i).setBoolean(
502+
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_KEY, true);
503+
}
504+
for (int i = 0; i < NUM_NNS; i++) {
505+
cluster.restartNameNode(i);
506+
}
507+
508+
// update references to each of the nns
509+
setNNs();
510+
511+
cluster.transitionToActive(0);
512+
513+
doEdits(0, 100);
514+
515+
for (int i = 1; i < NUM_NNS; i++) {
516+
HATestUtil.waitForStandbyToCatchUp(nns[0], nns[i]);
517+
HATestUtil.waitForCheckpoint(cluster, i, ImmutableList.of(104));
518+
}
519+
cluster.transitionToStandby(0);
520+
cluster.transitionToActive(1);
521+
522+
GenericTestUtils.waitFor(new Supplier<Boolean>() {
523+
@Override
524+
public Boolean get() {
525+
int transferThreadCount = 0;
526+
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
527+
ThreadInfo[] threads = threadBean.getThreadInfo(
528+
threadBean.getAllThreadIds(), 1);
529+
for (ThreadInfo thread: threads) {
530+
if (thread.getThreadName().startsWith("TransferFsImageUpload")) {
531+
transferThreadCount++;
532+
}
533+
}
534+
return transferThreadCount == NUM_NNS - 1;
535+
}
536+
}, 1000, 30000);
537+
}
538+
495539
/**
496540
* Make sure that clients will receive StandbyExceptions even when a
497541
* checkpoint is in progress on the SBN, and therefore the StandbyCheckpointer

0 commit comments

Comments
 (0)