Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT = 1000000;
public static final String DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_KEY = "dfs.namenode.checkpoint.max-retries";
public static final int DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_DEFAULT = 3;
public static final String DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_KEY =
"dfs.namenode.checkpoint.parallel.upload.enabled";
public static final boolean DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_DEFAULT = false;
public static final String DFS_NAMENODE_MISSING_CHECKPOINT_PERIODS_BEFORE_SHUTDOWN_KEY = "dfs.namenode.missing.checkpoint.periods.before.shutdown";
public static final int DFS_NAMENODE_MISSING_CHECKPOINT_PERIODS_BEFORE_SHUTDOWN_DEFAULT = 3;
public static final String DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ public class CheckpointConf {
*/
private double quietMultiplier;

/**
* Whether enable the standby namenode to upload fsiamge to multiple other namenodes in
* parallel, in the cluster with observer namenodes.
*/
private final boolean parallelUploadEnabled;

public CheckpointConf(Configuration conf) {
checkpointCheckPeriod = conf.getTimeDuration(
DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY,
Expand All @@ -68,6 +74,9 @@ public CheckpointConf(Configuration conf) {
legacyOivImageDir = conf.get(DFS_NAMENODE_LEGACY_OIV_IMAGE_DIR_KEY);
quietMultiplier = conf.getDouble(DFS_NAMENODE_CHECKPOINT_QUIET_MULTIPLIER_KEY,
DFS_NAMENODE_CHECKPOINT_QUIET_MULTIPLIER_DEFAULT);
parallelUploadEnabled = conf.getBoolean(
DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_KEY,
DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_DEFAULT);
warnForDeprecatedConfigs(conf);
}

Expand Down Expand Up @@ -106,4 +115,8 @@ public String getLegacyOivImageDir() {
public double getQuietPeriod() {
return this.checkpointPeriod * this.quietMultiplier;
}

public boolean isParallelUploadEnabled() {
return parallelUploadEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ private void doCheckpoint() throws InterruptedException, IOException {
// Do this in a separate thread to avoid blocking transition to active, but don't allow more
// than the expected number of tasks to run or queue up
// See HDFS-4816
ExecutorService executor = new ThreadPoolExecutor(0, activeNNAddresses.size(), 100,
int poolSize = checkpointConf.isParallelUploadEnabled() ? activeNNAddresses.size() : 0;
ExecutorService executor = new ThreadPoolExecutor(poolSize, activeNNAddresses.size(), 100,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(activeNNAddresses.size()),
uploadThreadFactory);
// for right now, just match the upload to the nn address by convention. There is no need to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,66 @@ public Boolean get() {
// Assert that former active did not accept the canceled checkpoint file.
assertEquals(0, nns[0].getFSImage().getMostRecentCheckpointTxId());
}


/**
* Test standby namenode upload fsiamge to multiple other namenodes in parallel, in the
* cluster with observer namenodes.
*/
@Test(timeout=60000)
public void testCheckpointParallelUpload() throws Exception {
// Set dfs.namenode.checkpoint.txns differently on the first NN to avoid it
// doing checkpoint when it becomes a standby
cluster.getConfiguration(0).setInt(
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 1000);

// don't compress, we want a big image
for (int i = 0; i < NUM_NNS; i++) {
cluster.getConfiguration(i).setBoolean(
DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, false);
}

// Throttle SBN upload to make it hang during upload to ANN, and enable parallel upload fsimage.
for (int i = 1; i < NUM_NNS; i++) {
cluster.getConfiguration(i).setLong(
DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY, 100);
cluster.getConfiguration(i).setBoolean(
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_KEY, true);
}
for (int i = 0; i < NUM_NNS; i++) {
cluster.restartNameNode(i);
}

// update references to each of the nns
setNNs();

cluster.transitionToActive(0);

doEdits(0, 100);

for (int i = 1; i < NUM_NNS; i++) {
HATestUtil.waitForStandbyToCatchUp(nns[0], nns[i]);
HATestUtil.waitForCheckpoint(cluster, i, ImmutableList.of(104));
}
cluster.transitionToStandby(0);
cluster.transitionToActive(1);

GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
int transferThreadCount = 0;
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threads = threadBean.getThreadInfo(
threadBean.getAllThreadIds(), 1);
for (ThreadInfo thread: threads) {
if (thread.getThreadName().startsWith("TransferFsImageUpload")) {
transferThreadCount++;
}
}
return transferThreadCount == NUM_NNS - 1;
}
}, 1000, 30000);
}

/**
* Make sure that clients will receive StandbyExceptions even when a
* checkpoint is in progress on the SBN, and therefore the StandbyCheckpointer
Expand Down