6363import org .apache .hadoop .hdfs .server .namenode .NameNodeAdapter ;
6464import org .apache .hadoop .security .token .Token ;
6565import org .apache .hadoop .test .GenericTestUtils ;
66+ import org .apache .hadoop .util .Lists ;
6667import org .junit .After ;
6768import org .junit .Assert ;
6869import org .junit .Before ;
@@ -1159,9 +1160,9 @@ public void testRecoveryWithDecommission() throws Exception {
11591160 }
11601161
11611162 @ Test (timeout = 120000 )
1162- public void testDecommissionBusyNodeWithErasureCodeWorkBackOff () throws Exception {
1163+ public void testDecommissionBusyNodeWithECReconstruction1 () throws Exception {
11631164 bm .setDecommissionECReconstruction (false );
1164- byte index = 6 ;
1165+ byte [] indices = new byte []{ 6 } ;
11651166 // 1 create EC file
11661167 final Path ecFile = new Path (ecDir , "testDecommission2NodeWithBusyNode" );
11671168 int writeBytes = cellSize * dataBlocks ;
@@ -1175,16 +1176,13 @@ public void testDecommissionBusyNodeWithErasureCodeWorkBackOff() throws Exceptio
11751176 cluster .getNamesystem ().getFSDirectory ().getINode4Write (ecFile .toString ()).asFile ();
11761177 BlockInfo firstBlock = fileNode .getBlocks ()[0 ];
11771178 List <DatanodeStorageInfo > storageInfos =
1178- getStorageInfoForBlockIndex ((BlockInfoStriped ) firstBlock , index );
1179+ getStorageInfoForBlockIndex ((BlockInfoStriped ) firstBlock , indices );
11791180 assertEquals (1 , storageInfos .size ());
11801181 DatanodeDescriptor busyNode = storageInfos .get (0 ).getDatanodeDescriptor ();
11811182 for (int j = 0 ; j < replicationStreamsHardLimit ; j ++) {
11821183 busyNode .incrementPendingReplicationWithoutTargets ();
11831184 }
1184- List <DatanodeStorageInfo > datanodeStorageInfos =
1185- getStorageInfoForBlockIndex ((BlockInfoStriped ) firstBlock , index );
1186- assertEquals (1 , datanodeStorageInfos .size ());
1187- DatanodeStorageInfo toDecommissionStorage = datanodeStorageInfos .get (0 );
1185+ DatanodeStorageInfo toDecommissionStorage = storageInfos .get (0 );
11881186
11891187 // 3 decommissioning one datanode
11901188 List <DatanodeInfo > decommissionNodes = new ArrayList <>();
@@ -1203,17 +1201,22 @@ public void testDecommissionBusyNodeWithErasureCodeWorkBackOff() throws Exceptio
12031201 decommissionNode (0 , decommissionNodes , AdminStates .DECOMMISSIONED );
12041202 assertEquals (9 , bm .countNodes (firstBlock ).liveReplicas ());
12051203 assertEquals (1 , bm .countNodes (firstBlock ).decommissioned ());
1204+ assertEquals (0 , cluster .getDataNodes ().stream ()
1205+ .mapToLong (dn -> dn .getMetrics ().getBlocksReplicated ()).sum ());
12061206 assertTrue (cluster .getDataNodes ().stream ()
12071207 .mapToLong (dn -> dn .getMetrics ().getECReconstructionTasks ()).sum () > 0 );
12081208
1209+ // 6 Get getBlocks again, verity that the decommissioned index already has a LIVE replica,
1210+ // and confirm verify the DECOMMISSIONED replica is the previously offline replica
12091211 fileNode = cluster .getNamesystem ().getFSDirectory ().getINode (ecFile .toString ()).asFile ();
12101212 firstBlock = fileNode .getBlocks ()[0 ];
1211- datanodeStorageInfos = getStorageInfoForBlockIndex ((BlockInfoStriped ) firstBlock , index );
1212- assertTrue (datanodeStorageInfos .size () >= 2 );
1213+ List <DatanodeStorageInfo > newStorageInfos =
1214+ getStorageInfoForBlockIndex ((BlockInfoStriped ) firstBlock , indices );
1215+ assertTrue (newStorageInfos .size () >= 2 );
12131216 DatanodeStorageInfo decommissionedNode = null ;
12141217 int alive = 0 ;
1215- for (int i = 0 ; i < datanodeStorageInfos .size ();i ++) {
1216- DatanodeStorageInfo datanodeStorageInfo = datanodeStorageInfos .get (i );
1218+ for (int i = 0 ; i < newStorageInfos .size ();i ++) {
1219+ DatanodeStorageInfo datanodeStorageInfo = newStorageInfos .get (i );
12171220 if (datanodeStorageInfo .getDatanodeDescriptor ().isDecommissioned ()) {
12181221 decommissionedNode = datanodeStorageInfo ;
12191222 } else if (datanodeStorageInfo .getDatanodeDescriptor ().isAlive ()) {
@@ -1224,24 +1227,98 @@ public void testDecommissionBusyNodeWithErasureCodeWorkBackOff() throws Exceptio
12241227 assertEquals (toDecommissionStorage , decommissionedNode );
12251228 assertTrue (alive >= 1 );
12261229
1227- // 6 check the checksum of a file
1230+ // 7 check the checksum of a file
12281231 FileChecksum fileChecksum2 = dfs .getFileChecksum (ecFile , writeBytes );
12291232 Assert .assertEquals ("Checksum mismatches!" , fileChecksum1 , fileChecksum2 );
12301233
1234+ // 8 check the data is correct
1235+ StripedFileTestUtil .checkData (dfs , ecFile , writeBytes , decommissionNodes ,
1236+ null , blockGroupSize );
1237+ }
1238+
1239+ @ Test (timeout = 120000 )
1240+ public void testDecommissionBusyNodeWithECReconstruction2 () throws Exception {
1241+ bm .setDecommissionECReconstruction (false );
1242+ byte [] indices = new byte []{5 , 6 };
1243+ // 1 create EC file
1244+ final Path ecFile = new Path (ecDir , "testDecommission2NodeWithBusyNode" );
1245+ int writeBytes = cellSize * dataBlocks ;
1246+ writeStripedFile (dfs , ecFile , writeBytes );
1247+
1248+ assertEquals (0 , bm .numOfUnderReplicatedBlocks ());
1249+ FileChecksum fileChecksum1 = dfs .getFileChecksum (ecFile , writeBytes );
1250+
1251+ // 2 make one datanode busy
1252+ INodeFile fileNode =
1253+ cluster .getNamesystem ().getFSDirectory ().getINode4Write (ecFile .toString ()).asFile ();
1254+ BlockInfo firstBlock = fileNode .getBlocks ()[0 ];
1255+ List <DatanodeStorageInfo > storageInfos =
1256+ getStorageInfoForBlockIndex ((BlockInfoStriped ) firstBlock , indices );
1257+ assertEquals (2 , storageInfos .size ());
1258+ DatanodeDescriptor busyNode = storageInfos .get (0 ).getDatanodeDescriptor ();
1259+ for (int j = 0 ; j < replicationStreamsHardLimit ; j ++) {
1260+ busyNode .incrementPendingReplicationWithoutTargets ();
1261+ }
1262+
1263+ // 3 decommissioning two datanode
1264+ List <DatanodeInfo > decommissionNodes = new ArrayList <>();
1265+ decommissionNodes .add (storageInfos .get (0 ).getDatanodeDescriptor ());
1266+ decommissionNodes .add (storageInfos .get (1 ).getDatanodeDescriptor ());
1267+ decommissionNode (0 , decommissionNodes , AdminStates .DECOMMISSION_INPROGRESS );
1268+
1269+ // 4 Verify that the non-busy replica has been copied and the busy replica is
1270+ // reconstructed after decommissionECReconstruction is enabled.
1271+ bm .setDecommissionECReconstruction (true );
1272+ decommissionNode (0 , decommissionNodes , AdminStates .DECOMMISSIONED );
1273+ assertEquals (9 , bm .countNodes (firstBlock ).liveReplicas ());
1274+ assertEquals (2 , bm .countNodes (firstBlock ).decommissioned ());
1275+ assertTrue (cluster .getDataNodes ().stream ()
1276+ .mapToLong (dn -> dn .getMetrics ().getBlocksReplicated ()).sum () > 0 );
1277+ assertTrue (cluster .getDataNodes ().stream ()
1278+ .mapToLong (dn -> dn .getMetrics ().getECReconstructionTasks ()).sum () > 0 );
1279+
1280+ // 5 Get getBlocks again, verity that the decommissioned index already has a LIVE replica,
1281+ // and confirm verify the DECOMMISSIONED replica is the previously offline replica
1282+ fileNode = cluster .getNamesystem ().getFSDirectory ().getINode (ecFile .toString ()).asFile ();
1283+ firstBlock = fileNode .getBlocks ()[0 ];
1284+ List <DatanodeStorageInfo > newStorageInfos =
1285+ getStorageInfoForBlockIndex ((BlockInfoStriped ) firstBlock , indices );
1286+ assertTrue (newStorageInfos .size () >= 4 );
1287+ int alive = 0 ;
1288+ int decommissioned = 0 ;
1289+ for (int i = 0 ; i < newStorageInfos .size ();i ++) {
1290+ DatanodeStorageInfo newDatanodeStorageInfo = newStorageInfos .get (i );
1291+ if (newDatanodeStorageInfo .getDatanodeDescriptor ().isDecommissioned ()) {
1292+ assertTrue (newDatanodeStorageInfo .equals (storageInfos .get (0 )) ||
1293+ newDatanodeStorageInfo .equals (storageInfos .get (1 )));
1294+ decommissioned ++;
1295+ } else if (newDatanodeStorageInfo .getDatanodeDescriptor ().isAlive ()) {
1296+ alive ++;
1297+ }
1298+ }
1299+ assertTrue (alive >= 2 );
1300+ assertEquals (2 , decommissioned );
1301+
1302+ // 6 check the checksum of a file
1303+ FileChecksum fileChecksum2 = dfs .getFileChecksum (ecFile , writeBytes );
1304+ assertEquals ("Checksum mismatches!" , fileChecksum1 , fileChecksum2 );
1305+
12311306 // 7 check the data is correct
12321307 StripedFileTestUtil .checkData (dfs , ecFile , writeBytes , decommissionNodes ,
12331308 null , blockGroupSize );
12341309 }
12351310
12361311 private List <DatanodeStorageInfo > getStorageInfoForBlockIndex (BlockInfoStriped block ,
1237- int blockIndex ) {
1312+ byte [] blockIndices ) {
12381313 List <DatanodeStorageInfo > storageInfos = new ArrayList <>();
12391314 Iterator <BlockInfoStriped .StorageAndBlockIndex > iterator =
12401315 block .getStorageAndIndexInfos ().iterator ();
12411316 while (iterator .hasNext ()) {
12421317 BlockInfoStriped .StorageAndBlockIndex storageAndBlockIndex = iterator .next ();
1243- if (storageAndBlockIndex .getBlockIndex () == blockIndex ) {
1244- storageInfos .add (storageAndBlockIndex .getStorage ());
1318+ for (int blockIndex : blockIndices ) {
1319+ if (storageAndBlockIndex .getBlockIndex () == blockIndex ) {
1320+ storageInfos .add (storageAndBlockIndex .getStorage ());
1321+ }
12451322 }
12461323 }
12471324 return storageInfos ;
0 commit comments