Skip to content

Commit

Permalink
HDFS-15113. Addendum: Missing IBR when NameNode restart if open proce…
Browse files Browse the repository at this point in the history
…ssCommand async feature. Contributed by Xiaoqiao He.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
  • Loading branch information
Hexiaoqiao authored and RogPodge committed Mar 25, 2020
1 parent 80e0bb1 commit a59e642
Showing 1 changed file with 27 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -281,23 +281,30 @@ public void testBasicFunctionality() throws Exception {
public void testMissBlocksWhenReregister() throws Exception {
BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
bpos.start();
int totalTestBlocks = 4000;
Thread addNewBlockThread = null;
final AtomicInteger count = new AtomicInteger(0);

try {
waitForBothActors(bpos);
waitForInitialization(bpos);

DataNodeFaultInjector.set(new DataNodeFaultInjector() {
public void blockUtilSendFullBlockReport() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
GenericTestUtils.waitFor(() -> {
if(count.get() > 2000) {
return true;
}
return false;
}, 100, 1000);
} catch (Exception e) {
e.printStackTrace();
}
}
});

countBlockReportItems(FAKE_BLOCK, mockNN1);
int totalTestBlocks = 4000;
Thread addNewBlockThread = new Thread(() -> {
addNewBlockThread = new Thread(() -> {
for (int i = 0; i < totalTestBlocks; i++) {
SimulatedFSDataset fsDataset = (SimulatedFSDataset) mockFSDataset;
SimulatedStorage simulatedStorage = fsDataset.getStorages().get(0);
Expand All @@ -307,6 +314,7 @@ public void blockUtilSendFullBlockReport() {
fsDataset.createRbw(StorageType.DEFAULT, storageId, b, false);
bpos.notifyNamenodeReceivingBlock(b, storageId);
fsDataset.finalizeBlock(b, false);
count.addAndGet(1);
Thread.sleep(1);
} catch (Exception e) {
e.printStackTrace();
Expand All @@ -316,7 +324,13 @@ public void blockUtilSendFullBlockReport() {
addNewBlockThread.start();

// Make sure that generate blocks for DataNode and IBR not empty now.
Thread.sleep(200);
GenericTestUtils.waitFor(() -> {
if(count.get() > 0) {
return true;
}
return false;
}, 100, 1000);

// Trigger re-register using DataNode Command.
datanodeCommands[0] = new DatanodeCommand[]{RegisterCommand.REGISTER};
bpos.triggerHeartbeatForTests();
Expand All @@ -335,6 +349,7 @@ public void blockUtilSendFullBlockReport() {
assertTrue(fullBlockReportCount == totalTestBlocks ||
incrBlockReportCount == totalTestBlocks);
} finally {
addNewBlockThread.join();
bpos.stop();
bpos.join();

Expand Down Expand Up @@ -695,12 +710,17 @@ private void setTimeForSynchronousBPOSCalls() {
}
}

/**
* Record blocks counts of block report and total adding blocks count of IBR
* which assume no deleting blocks here.
*/
private void countBlockReportItems(final ExtendedBlock fakeBlock,
final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
final String fakeBlockPoolId = fakeBlock.getBlockPoolId();
final ArgumentCaptor<StorageBlockReport[]> captor =
ArgumentCaptor.forClass(StorageBlockReport[].class);

// Record blocks count about the last time block report.
Mockito.doAnswer((Answer<Object>) invocation -> {
Object[] arguments = invocation.getArguments();
StorageBlockReport[] list = (StorageBlockReport[])arguments[2];
Expand All @@ -713,6 +733,7 @@ private void countBlockReportItems(final ExtendedBlock fakeBlock,
Mockito.any()
);

// Record total adding blocks count and assume no deleting blocks here.
Mockito.doAnswer((Answer<Object>) invocation -> {
Object[] arguments = invocation.getArguments();
StorageReceivedDeletedBlocks[] list =
Expand Down

0 comments on commit a59e642

Please sign in to comment.