2121import static org .apache .hadoop .hdfs .DFSConfigKeys .DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY ;
2222import static org .apache .hadoop .hdfs .server .namenode .NameNodeAdapter .getServiceState ;
2323import static org .apache .hadoop .hdfs .server .namenode .ha .ObserverReadProxyProvider .*;
24- import static org .junit .Assert .assertEquals ;
25- import static org .junit .Assert .assertNotNull ;
26- import static org .junit .Assert .assertTrue ;
27- import static org .junit .Assert .fail ;
24+ import static org .junit .jupiter . api . Assertions .assertEquals ;
25+ import static org .junit .jupiter . api . Assertions .assertNotNull ;
26+ import static org .junit .jupiter . api . Assertions .assertTrue ;
27+ import static org .junit .jupiter . api . Assertions .fail ;
2828import static org .mockito .ArgumentMatchers .any ;
2929import static org .mockito .ArgumentMatchers .anyBoolean ;
3030import static org .mockito .ArgumentMatchers .anyLong ;
5858import org .apache .hadoop .hdfs .MiniDFSCluster ;
5959import org .apache .hadoop .hdfs .protocol .Block ;
6060import org .apache .hadoop .hdfs .protocol .DatanodeInfo ;
61+ import org .apache .hadoop .hdfs .protocol .ErasureCodingPolicy ;
6162import org .apache .hadoop .hdfs .protocol .ExtendedBlock ;
6263import org .apache .hadoop .hdfs .protocol .LocatedBlock ;
6364import org .apache .hadoop .hdfs .protocol .LocatedBlocks ;
6970import org .apache .hadoop .hdfs .server .namenode .NameNodeRpcServer ;
7071import org .apache .hadoop .hdfs .server .namenode .TestFsck ;
7172import org .apache .hadoop .hdfs .tools .GetGroups ;
73+ import org .apache .hadoop .io .erasurecode .ECSchema ;
7274import org .apache .hadoop .ipc .ObserverRetryOnActiveException ;
7375import org .apache .hadoop .ipc .metrics .RpcMetrics ;
7476import org .apache .hadoop .test .GenericTestUtils ;
7577import org .apache .hadoop .test .LambdaTestUtils ;
7678import org .apache .hadoop .util .Time ;
7779import org .apache .hadoop .util .concurrent .HadoopExecutors ;
78- import org .junit .After ;
79- import org .junit .AfterClass ;
80- import org .junit .Before ;
81- import org .junit .BeforeClass ;
82- import org .junit .Test ;
80+ import org .junit .jupiter . api . AfterEach ;
81+ import org .junit .jupiter . api . AfterAll ;
82+ import org .junit .jupiter . api . BeforeEach ;
83+ import org .junit .jupiter . api . BeforeAll ;
84+ import org .junit .jupiter . api . Test ;
8385import org .mockito .Mockito ;
8486import org .slf4j .Logger ;
8587import org .slf4j .LoggerFactory ;
@@ -98,7 +100,7 @@ public class TestObserverNode {
98100
99101 private final Path testPath = new Path ("/TestObserverNode" );
100102
101- @ BeforeClass
103+ @ BeforeAll
102104 public static void startUpCluster () throws Exception {
103105 conf = new Configuration ();
104106 conf .setBoolean (DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY , true );
@@ -110,23 +112,23 @@ public static void startUpCluster() throws Exception {
110112 dfsCluster = qjmhaCluster .getDfsCluster ();
111113 }
112114
113- @ Before
115+ @ BeforeEach
114116 public void setUp () throws Exception {
115117 setObserverRead (true );
116118 }
117119
118- @ After
120+ @ AfterEach
119121 public void cleanUp () throws IOException {
120122 dfs .delete (testPath , true );
121- assertEquals ("NN[0] should be active" , HAServiceState .ACTIVE ,
122- getServiceState ( dfsCluster . getNameNode ( 0 )) );
123- assertEquals ("NN[1] should be standby" , HAServiceState .STANDBY ,
124- getServiceState ( dfsCluster . getNameNode ( 1 )) );
125- assertEquals ("NN[2] should be observer" , HAServiceState .OBSERVER ,
126- getServiceState ( dfsCluster . getNameNode ( 2 )) );
123+ assertEquals (HAServiceState .ACTIVE , getServiceState ( dfsCluster . getNameNode ( 0 )) ,
124+ "NN[0] should be active" );
125+ assertEquals (HAServiceState .STANDBY , getServiceState ( dfsCluster . getNameNode ( 1 )) ,
126+ "NN[1] should be standby" );
127+ assertEquals (HAServiceState .OBSERVER , getServiceState ( dfsCluster . getNameNode ( 2 )) ,
128+ "NN[2] should be observer" );
127129 }
128130
129- @ AfterClass
131+ @ AfterAll
130132 public static void shutDownCluster () throws IOException {
131133 if (qjmhaCluster != null ) {
132134 qjmhaCluster .shutdown ();
@@ -228,8 +230,8 @@ public void testConfigStartup() throws Exception {
228230 }
229231
230232 // Confirm that the namenode at nnIdx is standby
231- assertTrue ("The NameNode is observer despite being transitioned to standby" ,
232- dfsCluster . getNameNode ( nnIdx ). isStandbyState () );
233+ assertTrue (dfsCluster . getNameNode ( nnIdx ). isStandbyState () ,
234+ "The NameNode is observer despite being transitioned to standby" );
233235
234236 // Restart the NameNode with observer startup option as false
235237 dfsCluster .getConfiguration (nnIdx )
@@ -238,9 +240,9 @@ public void testConfigStartup() throws Exception {
238240
239241 // Verify that the NameNode is not in Observer state
240242 dfsCluster .waitNameNodeUp (nnIdx );
241- assertTrue ("The NameNode started as Observer despite "
242- + DFS_NAMENODE_OBSERVER_ENABLED_KEY + " being false" ,
243- dfsCluster . getNameNode ( nnIdx ). isStandbyState () );
243+ assertTrue (dfsCluster . getNameNode ( nnIdx ). isStandbyState (),
244+ "The NameNode started as Observer despite "
245+ + DFS_NAMENODE_OBSERVER_ENABLED_KEY + " being false" );
244246
245247 dfs .mkdir (testPath , FsPermission .getDefault ());
246248 assertSentTo (0 );
@@ -260,9 +262,9 @@ public void testConfigStartup() throws Exception {
260262
261263 // Check that the NameNode is in Observer state
262264 dfsCluster .waitNameNodeUp (nnIdx );
263- assertTrue ("The NameNode did not start as Observer despite "
264- + DFS_NAMENODE_OBSERVER_ENABLED_KEY + " being true" ,
265- dfsCluster . getNameNode ( nnIdx ). isObserverState () );
265+ assertTrue (dfsCluster . getNameNode ( nnIdx ). isObserverState (),
266+ "The NameNode did not start as Observer despite "
267+ + DFS_NAMENODE_OBSERVER_ENABLED_KEY + " being true" );
266268
267269 dfs .mkdir (testPath2 , FsPermission .getDefault ());
268270 assertSentTo (0 );
@@ -437,6 +439,43 @@ public void testObserverNodeSafeModeWithBlockLocations() throws Exception {
437439 dfs .open (testPath ).close ();
438440 assertSentTo (0 );
439441
442+ // Test erasure coded files
443+ ErasureCodingPolicy ecPolicy = new ErasureCodingPolicy (new ECSchema ("rs" , 3 , 2 ), 1024 );
444+
445+ // Fake a small file that only needs 1 block
446+ doAnswer ((invocation ) -> {
447+ List <LocatedBlock > fakeBlocks = new ArrayList <>();
448+ // Return a single location, which is enough for the small file but not for the large file
449+ ExtendedBlock b = new ExtendedBlock ("fake-pool" , new Block (12345L , 1 , 0 ));
450+ DatanodeInfo datanodeInfo = new DatanodeInfo .DatanodeInfoBuilder ().build ();
451+ LocatedBlock fakeBlock = new LocatedBlock (b , new DatanodeInfo [] {datanodeInfo });
452+ fakeBlocks .add (fakeBlock );
453+ return new LocatedBlocks (1 , false , fakeBlocks , null , true , null , ecPolicy );
454+ }).when (bmSpy ).createLocatedBlocks (Mockito .any (), anyLong (),
455+ anyBoolean (), anyLong (), anyLong (), anyBoolean (), anyBoolean (),
456+ Mockito .any (), Mockito .any ());
457+
458+ // Small file should suceed with just the one block
459+ dfs .open (testPath ).close ();
460+ assertSentTo (2 );
461+
462+ // Fake a larger file that needs all 3 data shards
463+ doAnswer ((invocation ) -> {
464+ List <LocatedBlock > fakeBlocks = new ArrayList <>();
465+ // Return a single location, which is enough for the small file but not for the large file
466+ ExtendedBlock b = new ExtendedBlock ("fake-pool" , new Block (12345L , 1024 * 3 , 0 ));
467+ DatanodeInfo datanodeInfo = new DatanodeInfo .DatanodeInfoBuilder ().build ();
468+ LocatedBlock fakeBlock = new LocatedBlock (b , new DatanodeInfo [] {datanodeInfo });
469+ fakeBlocks .add (fakeBlock );
470+ return new LocatedBlocks (1024 * 3 , false , fakeBlocks , null , true , null , ecPolicy );
471+ }).when (bmSpy ).createLocatedBlocks (Mockito .any (), anyLong (),
472+ anyBoolean (), anyLong (), anyLong (), anyBoolean (), anyBoolean (),
473+ Mockito .any (), Mockito .any ());
474+
475+ // Large file should failover to the active
476+ dfs .open (testPath ).close ();
477+ assertSentTo (0 );
478+
440479 Mockito .reset (bmSpy );
441480
442481 // Remove safe mode on observer, request should still go to it.
@@ -471,7 +510,62 @@ public void testObserverNodeBlockMissingRetry() throws Exception {
471510 anyBoolean (), anyLong (), anyLong (), anyBoolean (), anyBoolean (),
472511 Mockito .any (), Mockito .any ());
473512
474- dfs .open (testPath );
513+ dfs .open (testPath ).close ();
514+ assertSentTo (0 );
515+
516+ dfs .getClient ().listPaths ("/" , new byte [0 ], true );
517+ assertSentTo (0 );
518+
519+ dfs .getClient ().getLocatedFileInfo (testPath .toString (), false );
520+ assertSentTo (0 );
521+
522+ dfs .getClient ().batchedListPaths (new String []{"/" }, new byte [0 ], true );
523+ assertSentTo (0 );
524+
525+ // Test erasure coded files
526+ ErasureCodingPolicy ecPolicy = new ErasureCodingPolicy (new ECSchema ("rs" , 3 , 2 ), 1024 );
527+
528+ // Fake a small file that only needs 1 block
529+ doAnswer ((invocation ) -> {
530+ List <LocatedBlock > fakeBlocks = new ArrayList <>();
531+ // Return a single location, which is enough for the small file but not for the large file
532+ ExtendedBlock b = new ExtendedBlock ("fake-pool" , new Block (12345L , 1 , 0 ));
533+ DatanodeInfo datanodeInfo = new DatanodeInfo .DatanodeInfoBuilder ().build ();
534+ LocatedBlock fakeBlock = new LocatedBlock (b , new DatanodeInfo [] {datanodeInfo });
535+ fakeBlocks .add (fakeBlock );
536+ return new LocatedBlocks (1 , false , fakeBlocks , null , true , null , ecPolicy );
537+ }).when (bmSpy ).createLocatedBlocks (Mockito .any (), anyLong (),
538+ anyBoolean (), anyLong (), anyLong (), anyBoolean (), anyBoolean (),
539+ Mockito .any (), Mockito .any ());
540+
541+ // The small file should succeed on the observer, while the large file should not
542+
543+ dfs .open (testPath ).close ();
544+ assertSentTo (2 );
545+
546+ dfs .getClient ().listPaths ("/" , new byte [0 ], true );
547+ assertSentTo (2 );
548+
549+ dfs .getClient ().getLocatedFileInfo (testPath .toString (), false );
550+ assertSentTo (2 );
551+
552+ dfs .getClient ().batchedListPaths (new String []{"/" }, new byte [0 ], true );
553+ assertSentTo (2 );
554+
555+ // Fake a larger file that needs all 3 data shards
556+ doAnswer ((invocation ) -> {
557+ List <LocatedBlock > fakeBlocks = new ArrayList <>();
558+ // Return a single location, which is enough for the small file but not for the large file
559+ ExtendedBlock b = new ExtendedBlock ("fake-pool" , new Block (12345L , 1024 * 3 , 0 ));
560+ DatanodeInfo datanodeInfo = new DatanodeInfo .DatanodeInfoBuilder ().build ();
561+ LocatedBlock fakeBlock = new LocatedBlock (b , new DatanodeInfo [] {datanodeInfo });
562+ fakeBlocks .add (fakeBlock );
563+ return new LocatedBlocks (1024 * 3 , false , fakeBlocks , null , true , null , ecPolicy );
564+ }).when (bmSpy ).createLocatedBlocks (Mockito .any (), anyLong (),
565+ anyBoolean (), anyLong (), anyLong (), anyBoolean (), anyBoolean (),
566+ Mockito .any (), Mockito .any ());
567+
568+ dfs .open (testPath ).close ();
475569 assertSentTo (0 );
476570
477571 dfs .getClient ().listPaths ("/" , new byte [0 ], true );
@@ -563,16 +657,15 @@ public void testStickyActive() throws Exception {
563657 dfsCluster .rollEditLogAndTail (0 );
564658 // No Observers present, should still go to Active
565659 dfsCluster .transitionToStandby (2 );
566- assertEquals ("NN[2] should be standby" , HAServiceState .STANDBY ,
567- getServiceState ( dfsCluster . getNameNode ( 2 )) );
660+ assertEquals (HAServiceState .STANDBY , getServiceState ( dfsCluster . getNameNode ( 2 )) ,
661+ "NN[2] should be standby" );
568662 newFs .open (testFile ).close ();
569663 assertSentTo (0 );
570664 // Restore Observer
571665 int newObserver = 1 ;
572666 dfsCluster .transitionToObserver (newObserver );
573- assertEquals ("NN[" + newObserver + "] should be observer" ,
574- HAServiceState .OBSERVER ,
575- getServiceState (dfsCluster .getNameNode (newObserver )));
667+ assertEquals (HAServiceState .OBSERVER , getServiceState (dfsCluster .getNameNode (newObserver )),
668+ "NN[" + newObserver + "] should be observer" );
576669 long startTime = Time .monotonicNow ();
577670 try {
578671 while (Time .monotonicNow () - startTime <= 5000 ) {
@@ -661,19 +754,19 @@ public void testMkdirsRaceWithObserverRead() throws Exception {
661754 LOG .warn ("MkDirRunner thread failed" , e .getCause ());
662755 }
663756 }
664- assertTrue ("Not all threads finished" , finished );
757+ assertTrue (finished , "Not all threads finished" );
665758 threadPool .shutdown ();
666759
667- assertEquals ("Active and Observer stateIds don't match" ,
668- dfsCluster .getNameNode (0 ).getFSImage ().getLastAppliedOrWrittenTxId (),
669- dfsCluster . getNameNode ( 2 ). getFSImage (). getLastAppliedOrWrittenTxId () );
760+ assertEquals (dfsCluster . getNameNode ( 0 ). getFSImage (). getLastAppliedOrWrittenTxId () ,
761+ dfsCluster .getNameNode (2 ).getFSImage ().getLastAppliedOrWrittenTxId (),
762+ "Active and Observer stateIds don't match" );
670763 for (int i = 0 ; i < numThreads ; i ++) {
671- assertTrue ("Client #" + i
764+ assertTrue (clientStates [i ].lastSeenStateId >= activStateId &&
765+ clientStates [i ].fnfe == null ,
766+ "Client #" + i
672767 + " lastSeenStateId=" + clientStates [i ].lastSeenStateId
673768 + " activStateId=" + activStateId
674- + "\n " + clientStates [i ].fnfe ,
675- clientStates [i ].lastSeenStateId >= activStateId &&
676- clientStates [i ].fnfe == null );
769+ + "\n " + clientStates [i ].fnfe );
677770 }
678771
679772 // Restore edit log
@@ -707,7 +800,7 @@ public void run() {
707800
708801 FileStatus stat = fs .getFileStatus (DIR_PATH );
709802 assertSentTo (fs , 2 );
710- assertTrue ("Should be a directory" , stat . isDirectory () );
803+ assertTrue (stat . isDirectory (), "Should be a directory" );
711804 } catch (FileNotFoundException ioe ) {
712805 clientState .fnfe = ioe ;
713806 } catch (Exception e ) {
@@ -752,13 +845,13 @@ public void testSimpleReadEmptyDirOrFile() throws IOException {
752845
753846 private static void assertSentTo (DistributedFileSystem fs , int nnIdx )
754847 throws IOException {
755- assertTrue ("Request was not sent to the expected namenode " + nnIdx ,
756- HATestUtil . isSentToAnyOfNameNodes ( fs , dfsCluster , nnIdx ) );
848+ assertTrue (HATestUtil . isSentToAnyOfNameNodes ( fs , dfsCluster , nnIdx ) ,
849+ "Request was not sent to the expected namenode " + nnIdx );
757850 }
758851
759852 private void assertSentTo (int nnIdx ) throws IOException {
760- assertTrue ("Request was not sent to the expected namenode " + nnIdx ,
761- HATestUtil . isSentToAnyOfNameNodes ( dfs , dfsCluster , nnIdx ) );
853+ assertTrue (HATestUtil . isSentToAnyOfNameNodes ( dfs , dfsCluster , nnIdx ) ,
854+ "Request was not sent to the expected namenode " + nnIdx );
762855 }
763856
764857 private static void setObserverRead (boolean flag ) throws Exception {
0 commit comments