Skip to content

Commit cf75d85

Browse files
author
lgh
committed
when NN restart, heartbeat and fbr is seperate, so should limit the execution interval; change fullBlockReportLeaseId to volatile for synchronized in two thread(fbr and heartbeat)
1 parent 39efd66 commit cf75d85

File tree

1 file changed

+35
-23
lines changed
  • hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode

1 file changed

+35
-23
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ enum RunningState {
122122
private final DataNode dn;
123123
private final DNConf dnConf;
124124
private long prevBlockReportId;
125-
private long fullBlockReportLeaseId;
125+
private volatile long fullBlockReportLeaseId;
126126
private final SortedSet<Integer> blockReportSizes =
127127
Collections.synchronizedSortedSet(new TreeSet<>());
128128
private final int maxDataLength;
@@ -483,6 +483,8 @@ List<DatanodeCommand> blockReport(long fullBrLeaseId) throws IOException {
483483
(nCmds + " commands: " + Joiner.on("; ").join(cmds)))) +
484484
".");
485485
}
486+
scheduler.updateLastBlockReportTime(monotonicNow());
487+
scheduler.scheduleNextBlockReport();
486488
return cmds.size() == 0 ? null : cmds;
487489
}
488490

@@ -770,10 +772,7 @@ private void offerService() throws Exception {
770772
LOG.info("Forcing a full block report to " + nnAddr);
771773
}
772774
if ((fullBlockReportLeaseId != 0) || forceFullBr) {
773-
fbrExecutorService.submit(new FBRTaskHandler(fullBlockReportLeaseId));
774-
fullBlockReportLeaseId = 0;
775-
scheduler.updateLastBlockReportTime(monotonicNow());
776-
scheduler.scheduleNextBlockReport();
775+
fbrExecutorService.submit(new FBRTaskHandler());
777776
}
778777

779778
if (!dn.areCacheReportsDisabledForTests()) {
@@ -971,20 +970,25 @@ void reportRemoteBadBlock(DatanodeInfo dnInfo, ExtendedBlock block)
971970

972971
void reRegister() throws IOException {
973972
if (shouldRun()) {
974-
// re-retrieve namespace info to make sure that, if the NN
975-
// was restarted, we still match its version (HDFS-2120)
976-
NamespaceInfo nsInfo = retrieveNamespaceInfo();
977-
// HDFS-9917,Standby NN IBR can be very huge if standby namenode is down
978-
// for sometime.
979-
if (state == HAServiceState.STANDBY || state == HAServiceState.OBSERVER) {
980-
ibrManager.clearIBRs();
973+
if (scheduler.shouldReRegister()) {
974+
// re-retrieve namespace info to make sure that, if the NN
975+
// was restarted, we still match its version (HDFS-2120)
976+
NamespaceInfo nsInfo = retrieveNamespaceInfo();
977+
// HDFS-9917,Standby NN IBR can be very huge if standby namenode is down
978+
// for sometime.
979+
if (state == HAServiceState.STANDBY || state == HAServiceState.OBSERVER) {
980+
ibrManager.clearIBRs();
981+
}
982+
// HDFS-15113, register and trigger FBR after clean IBR to avoid missing
983+
// some blocks report to Standby util next FBR.
984+
// and re-register
985+
register(nsInfo);
986+
scheduler.setReRegisterTime(monotonicNow());
987+
scheduler.scheduleHeartbeat();
988+
DataNodeFaultInjector.get().blockUtilSendFullBlockReport();
989+
} else {
990+
LOG.info("DNA_REGISTER execution interval is too short. Skip.");
981991
}
982-
// HDFS-15113, register and trigger FBR after clean IBR to avoid missing
983-
// some blocks report to Standby util next FBR.
984-
// and re-register
985-
register(nsInfo);
986-
scheduler.scheduleHeartbeat();
987-
DataNodeFaultInjector.get().blockUtilSendFullBlockReport();
988992
}
989993
}
990994

@@ -1195,10 +1199,7 @@ public void run() {
11951199

11961200
final class FBRTaskHandler implements Runnable {
11971201

1198-
private long fullBlockReportLeaseId;
1199-
1200-
private FBRTaskHandler(long fullBlockReportLeaseId) {
1201-
this.fullBlockReportLeaseId = fullBlockReportLeaseId;
1202+
private FBRTaskHandler() {
12021203
}
12031204

12041205
@Override
@@ -1207,10 +1208,12 @@ public void run() {
12071208
List<DatanodeCommand> cmds = null;
12081209
try {
12091210
synchronized (sendBRLock) {
1210-
cmds = blockReport(this.fullBlockReportLeaseId);
1211+
cmds = blockReport(fullBlockReportLeaseId);
12111212
}
1213+
fullBlockReportLeaseId = 0;
12121214
commandProcessingThread.enqueue(cmds);
12131215
} catch (Throwable t) {
1216+
fullBlockReportLeaseId = 0;
12141217
LOG.warn("InterruptedException in FBR Task Handler.", t);
12151218
sleepAndLogInterrupts(5000, "offering FBR service");
12161219
synchronized(ibrManager) {
@@ -1260,6 +1263,7 @@ static class Scheduler {
12601263
private final long lifelineIntervalMs;
12611264
private volatile long blockReportIntervalMs;
12621265
private volatile long outliersReportIntervalMs;
1266+
private long reRegisterTime = 0;
12631267

12641268
Scheduler(long heartbeatIntervalMs, long lifelineIntervalMs,
12651269
long blockReportIntervalMs, long outliersReportIntervalMs) {
@@ -1445,6 +1449,14 @@ long getOutliersReportIntervalMs() {
14451449
return this.outliersReportIntervalMs;
14461450
}
14471451

1452+
private boolean shouldReRegister() {
1453+
return monotonicNow() - reRegisterTime > this.heartbeatIntervalMs * 3;
1454+
}
1455+
1456+
public void setReRegisterTime(long reRegisterTime) {
1457+
this.reRegisterTime = reRegisterTime;
1458+
}
1459+
14481460
/**
14491461
* Wrapped for testing.
14501462
* @return

0 commit comments

Comments
 (0)