Skip to content

Commit

Permalink
HDFS-16139. Update BPServiceActor Scheduler's nextBlockReportTime ato…
Browse files Browse the repository at this point in the history
…mically (#3228). Contributed by Viraj Jasani.

Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
  • Loading branch information
virajjasani authored Jul 27, 2021
1 parent 97c88c9 commit b038042
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
Expand Down Expand Up @@ -323,10 +324,10 @@ private void connectToNNAndHandshake() throws IOException {
void triggerBlockReportForTests() {
synchronized (ibrManager) {
scheduler.scheduleHeartbeat();
long oldBlockReportTime = scheduler.nextBlockReportTime;
long oldBlockReportTime = scheduler.getNextBlockReportTime();
scheduler.forceFullBlockReportNow();
ibrManager.notifyAll();
while (oldBlockReportTime == scheduler.nextBlockReportTime) {
while (oldBlockReportTime == scheduler.getNextBlockReportTime()) {
try {
ibrManager.wait(100);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -1163,8 +1164,8 @@ static class Scheduler {
// nextBlockReportTime and nextHeartbeatTime may be assigned/read
// by testing threads (through BPServiceActor#triggerXXX), while also
// assigned/read by the actor thread.
@VisibleForTesting
volatile long nextBlockReportTime = monotonicNow();
private final AtomicLong nextBlockReportTime =
new AtomicLong(monotonicNow());

@VisibleForTesting
volatile long nextHeartbeatTime = monotonicNow();
Expand Down Expand Up @@ -1257,7 +1258,7 @@ boolean isLifelineDue(long startTime) {
}

boolean isBlockReportDue(long curTime) {
return nextBlockReportTime - curTime <= 0;
return nextBlockReportTime.get() - curTime <= 0;
}

boolean isOutliersReportDue(long curTime) {
Expand All @@ -1281,15 +1282,15 @@ void forceFullBlockReportNow() {
long scheduleBlockReport(long delay, boolean isRegistration) {
if (delay > 0) { // send BR after random delay
// Numerical overflow is possible here and is okay.
nextBlockReportTime =
monotonicNow() + ThreadLocalRandom.current().nextInt((int) (delay));
nextBlockReportTime.getAndSet(
monotonicNow() + ThreadLocalRandom.current().nextInt((int) (delay)));
} else { // send at next heartbeat
nextBlockReportTime = monotonicNow();
nextBlockReportTime.getAndSet(monotonicNow());
}
resetBlockReportTime = isRegistration; // reset future BRs for
// randomness, post first block report to avoid regular BRs from all
// DN's coming at one time.
return nextBlockReportTime;
return nextBlockReportTime.get();
}

/**
Expand All @@ -1302,8 +1303,8 @@ void scheduleNextBlockReport() {
// If we have sent the first set of block reports, then wait a random
// time before we start the periodic block reports.
if (resetBlockReportTime) {
nextBlockReportTime = monotonicNow() +
ThreadLocalRandom.current().nextInt((int)(blockReportIntervalMs));
nextBlockReportTime.getAndSet(monotonicNow() +
ThreadLocalRandom.current().nextInt((int) (blockReportIntervalMs)));
resetBlockReportTime = false;
} else {
/* say the last block report was at 8:20:14. The current report
Expand All @@ -1313,17 +1314,16 @@ void scheduleNextBlockReport() {
* 2) unexpected like 21:35:43, next report should be at 2:20:14
* on the next day.
*/
long factor =
(monotonicNow() - nextBlockReportTime + blockReportIntervalMs)
/ blockReportIntervalMs;
long factor = (monotonicNow() - nextBlockReportTime.get()
+ blockReportIntervalMs) / blockReportIntervalMs;
if (factor != 0) {
nextBlockReportTime += factor * blockReportIntervalMs;
nextBlockReportTime.getAndAdd(factor * blockReportIntervalMs);
} else {
// If the difference between the present time and the scheduled
// time is very less, the factor can be 0, so in that case, we can
// ignore that negligible time, spent while sending the BRss and
// schedule the next BR after the blockReportInterval.
nextBlockReportTime += blockReportIntervalMs;
nextBlockReportTime.getAndAdd(blockReportIntervalMs);
}
}
}
Expand All @@ -1336,6 +1336,16 @@ long getLifelineWaitTime() {
return nextLifelineTime - monotonicNow();
}

@VisibleForTesting
long getNextBlockReportTime() {
return nextBlockReportTime.get();
}

@VisibleForTesting
void setNextBlockReportTime(long nextBlockReportTime) {
this.nextBlockReportTime.getAndSet(nextBlockReportTime);
}

/**
* Wrapped for testing.
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import static java.lang.Math.abs;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -70,7 +71,7 @@ public void testScheduleBlockReportImmediate() {
Scheduler scheduler = makeMockScheduler(now);
scheduler.scheduleBlockReport(0, true);
assertTrue(scheduler.resetBlockReportTime);
assertThat(scheduler.nextBlockReportTime, is(now));
assertThat(scheduler.getNextBlockReportTime(), is(now));
}
}

Expand All @@ -81,8 +82,8 @@ public void testScheduleBlockReportDelayed() {
final long delayMs = 10;
scheduler.scheduleBlockReport(delayMs, true);
assertTrue(scheduler.resetBlockReportTime);
assertTrue(scheduler.nextBlockReportTime - now >= 0);
assertTrue(scheduler.nextBlockReportTime - (now + delayMs) < 0);
assertTrue(scheduler.getNextBlockReportTime() - now >= 0);
assertTrue(scheduler.getNextBlockReportTime() - (now + delayMs) < 0);
}
}

Expand All @@ -96,7 +97,8 @@ public void testScheduleNextBlockReport() {
Scheduler scheduler = makeMockScheduler(now);
assertTrue(scheduler.resetBlockReportTime);
scheduler.scheduleNextBlockReport();
assertTrue(scheduler.nextBlockReportTime - (now + BLOCK_REPORT_INTERVAL_MS) < 0);
assertTrue(scheduler.getNextBlockReportTime()
- (now + BLOCK_REPORT_INTERVAL_MS) < 0);
}
}

Expand All @@ -110,7 +112,8 @@ public void testScheduleNextBlockReport2() {
Scheduler scheduler = makeMockScheduler(now);
scheduler.resetBlockReportTime = false;
scheduler.scheduleNextBlockReport();
assertThat(scheduler.nextBlockReportTime, is(now + BLOCK_REPORT_INTERVAL_MS));
assertThat(scheduler.getNextBlockReportTime(),
is(now + BLOCK_REPORT_INTERVAL_MS));
}
}

Expand All @@ -129,10 +132,12 @@ public void testScheduleNextBlockReport3() {
final long blockReportDelay =
BLOCK_REPORT_INTERVAL_MS + random.nextInt(2 * (int) BLOCK_REPORT_INTERVAL_MS);
final long origBlockReportTime = now - blockReportDelay;
scheduler.nextBlockReportTime = origBlockReportTime;
scheduler.setNextBlockReportTime(origBlockReportTime);
scheduler.scheduleNextBlockReport();
assertTrue(scheduler.nextBlockReportTime - now < BLOCK_REPORT_INTERVAL_MS);
assertTrue(((scheduler.nextBlockReportTime - origBlockReportTime) % BLOCK_REPORT_INTERVAL_MS) == 0);
assertTrue((scheduler.getNextBlockReportTime() - now)
< BLOCK_REPORT_INTERVAL_MS);
assertEquals(0, ((scheduler.getNextBlockReportTime() - origBlockReportTime)
% BLOCK_REPORT_INTERVAL_MS));
}
}

Expand Down Expand Up @@ -201,7 +206,7 @@ private Scheduler makeMockScheduler(long now) {
HEARTBEAT_INTERVAL_MS, LIFELINE_INTERVAL_MS,
BLOCK_REPORT_INTERVAL_MS, OUTLIER_REPORT_INTERVAL_MS));
doReturn(now).when(mockScheduler).monotonicNow();
mockScheduler.nextBlockReportTime = now;
mockScheduler.setNextBlockReportTime(now);
mockScheduler.nextHeartbeatTime = now;
mockScheduler.nextOutliersReportTime = now;
return mockScheduler;
Expand Down

0 comments on commit b038042

Please sign in to comment.