17
17
*/
18
18
package org .apache .hadoop .hdfs .qjournal .client ;
19
19
20
+ import static org .apache .hadoop .hdfs .DFSConfigKeys .DFS_JOURNALNODE_MAINTENANCE_NODES_DEFAULT ;
21
+ import static org .apache .hadoop .hdfs .DFSConfigKeys .DFS_JOURNALNODE_MAINTENANCE_NODES_KEY ;
22
+
20
23
import java .io .IOException ;
21
24
import java .net .InetSocketAddress ;
22
25
import java .net .URI ;
31
34
import java .util .concurrent .TimeUnit ;
32
35
import java .util .concurrent .TimeoutException ;
33
36
37
+ import org .apache .hadoop .hdfs .server .blockmanagement .HostSet ;
34
38
import org .apache .hadoop .util .Lists ;
35
39
import org .slf4j .Logger ;
36
40
import org .slf4j .LoggerFactory ;
@@ -108,6 +112,7 @@ public class QuorumJournalManager implements JournalManager {
108
112
private static final int OUTPUT_BUFFER_CAPACITY_DEFAULT = 512 * 1024 ;
109
113
private int outputBufferCapacity ;
110
114
private final URLConnectionFactory connectionFactory ;
115
+ private int quorumJournalCount ;
111
116
112
117
/** Limit logging about input stream selection to every 5 seconds max. */
113
118
private static final long SELECT_INPUT_STREAM_LOG_INTERVAL_MS = 5000 ;
@@ -144,7 +149,14 @@ public QuorumJournalManager(Configuration conf,
144
149
this .uri = uri ;
145
150
this .nsInfo = nsInfo ;
146
151
this .nameServiceId = nameServiceId ;
147
- this .loggers = new AsyncLoggerSet (createLoggers (loggerFactory ));
152
+ this .loggers = new AsyncLoggerSet (createLoggers (loggerFactory ), this .quorumJournalCount );
153
+
154
+ // Check whether the number of jn maintenance lists is valid
155
+ int quorumThreshold = quorumJournalCount / 2 + 1 ;
156
+ Preconditions .checkArgument (
157
+ this .loggers .size () >= quorumThreshold ,
158
+ "The total journalnode minus %s the number of blacklists must be greater than or equal to"
159
+ + " %s!" , DFS_JOURNALNODE_MAINTENANCE_NODES_KEY , quorumThreshold );
148
160
149
161
this .maxTxnsPerRpc =
150
162
conf .getInt (QJM_RPC_MAX_TXNS_KEY , QJM_RPC_MAX_TXNS_DEFAULT );
@@ -250,6 +262,9 @@ Map<AsyncLogger, NewEpochResponseProto> createNewUniqueEpoch()
250
262
251
263
@ Override
252
264
public void format (NamespaceInfo nsInfo , boolean force ) throws IOException {
265
+ if (isEnableJnMaintenance ()) {
266
+ throw new IOException ("format() does not support enabling jn maintenance mode" );
267
+ }
253
268
QuorumCall <AsyncLogger , Void > call = loggers .format (nsInfo , force );
254
269
try {
255
270
call .waitFor (loggers .size (), loggers .size (), 0 , timeoutMs ,
@@ -406,21 +421,39 @@ private void recoverUnclosedSegment(long segmentTxId) throws IOException {
406
421
logToSync .getStartTxId (),
407
422
logToSync .getEndTxId ()));
408
423
}
409
-
410
- static List <AsyncLogger > createLoggers (Configuration conf ,
424
+
425
+ List <AsyncLogger > createLoggers (Configuration conf ,
426
+ URI uri ,
427
+ NamespaceInfo nsInfo ,
428
+ AsyncLogger .Factory factory ,
429
+ String nameServiceId )
430
+ throws IOException {
431
+ String [] skipNodesHostPort = conf .getTrimmedStrings (
432
+ DFS_JOURNALNODE_MAINTENANCE_NODES_KEY , DFS_JOURNALNODE_MAINTENANCE_NODES_DEFAULT );
433
+ return createLoggers (conf , uri , nsInfo , factory , nameServiceId , skipNodesHostPort );
434
+ }
435
+
436
+ private List <AsyncLogger > createLoggers (Configuration conf ,
411
437
URI uri ,
412
438
NamespaceInfo nsInfo ,
413
439
AsyncLogger .Factory factory ,
414
- String nameServiceId )
440
+ String nameServiceId ,
441
+ String [] skipNodesHostPort )
415
442
throws IOException {
416
443
List <AsyncLogger > ret = Lists .newArrayList ();
417
444
List <InetSocketAddress > addrs = Util .getAddressesList (uri , conf );
418
445
if (addrs .size () % 2 == 0 ) {
419
446
LOG .warn ("Quorum journal URI '" + uri + "' has an even number " +
420
447
"of Journal Nodes specified. This is not recommended!" );
421
448
}
449
+ setQuorumJournalCount (addrs .size ());
450
+ HostSet skipSet = DFSUtil .getHostSet (skipNodesHostPort );
422
451
String jid = parseJournalId (uri );
423
452
for (InetSocketAddress addr : addrs ) {
453
+ if (skipSet .match (addr )) {
454
+ LOG .info ("The node {} is a maintenance node and will skip initialization." , addr );
455
+ continue ;
456
+ }
424
457
ret .add (factory .createLogger (conf , nsInfo , jid , nameServiceId , addr ));
425
458
}
426
459
return ret ;
@@ -667,6 +700,9 @@ AsyncLoggerSet getLoggerSetForTests() {
667
700
668
701
@ Override
669
702
public void doPreUpgrade () throws IOException {
703
+ if (isEnableJnMaintenance ()) {
704
+ throw new IOException ("doPreUpgrade() does not support enabling jn maintenance mode" );
705
+ }
670
706
QuorumCall <AsyncLogger , Void > call = loggers .doPreUpgrade ();
671
707
try {
672
708
call .waitFor (loggers .size (), loggers .size (), 0 , timeoutMs ,
@@ -684,6 +720,9 @@ public void doPreUpgrade() throws IOException {
684
720
685
721
@ Override
686
722
public void doUpgrade (Storage storage ) throws IOException {
723
+ if (isEnableJnMaintenance ()) {
724
+ throw new IOException ("doUpgrade() does not support enabling jn maintenance mode" );
725
+ }
687
726
QuorumCall <AsyncLogger , Void > call = loggers .doUpgrade (storage );
688
727
try {
689
728
call .waitFor (loggers .size (), loggers .size (), 0 , timeoutMs ,
@@ -701,6 +740,9 @@ public void doUpgrade(Storage storage) throws IOException {
701
740
702
741
@ Override
703
742
public void doFinalize () throws IOException {
743
+ if (isEnableJnMaintenance ()) {
744
+ throw new IOException ("doFinalize() does not support enabling jn maintenance mode" );
745
+ }
704
746
QuorumCall <AsyncLogger , Void > call = loggers .doFinalize ();
705
747
try {
706
748
call .waitFor (loggers .size (), loggers .size (), 0 , timeoutMs ,
@@ -719,6 +761,9 @@ public void doFinalize() throws IOException {
719
761
@ Override
720
762
public boolean canRollBack (StorageInfo storage , StorageInfo prevStorage ,
721
763
int targetLayoutVersion ) throws IOException {
764
+ if (isEnableJnMaintenance ()) {
765
+ throw new IOException ("canRollBack() does not support enabling jn maintenance mode" );
766
+ }
722
767
QuorumCall <AsyncLogger , Boolean > call = loggers .canRollBack (storage ,
723
768
prevStorage , targetLayoutVersion );
724
769
try {
@@ -753,6 +798,9 @@ public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
753
798
754
799
@ Override
755
800
public void doRollback () throws IOException {
801
+ if (isEnableJnMaintenance ()) {
802
+ throw new IOException ("doRollback() does not support enabling jn maintenance mode" );
803
+ }
756
804
QuorumCall <AsyncLogger , Void > call = loggers .doRollback ();
757
805
try {
758
806
call .waitFor (loggers .size (), loggers .size (), 0 , timeoutMs ,
@@ -770,6 +818,9 @@ public void doRollback() throws IOException {
770
818
771
819
@ Override
772
820
public void discardSegments (long startTxId ) throws IOException {
821
+ if (isEnableJnMaintenance ()) {
822
+ throw new IOException ("discardSegments() does not support enabling jn maintenance mode" );
823
+ }
773
824
QuorumCall <AsyncLogger , Void > call = loggers .discardSegments (startTxId );
774
825
try {
775
826
call .waitFor (loggers .size (), loggers .size (), 0 ,
@@ -789,6 +840,9 @@ public void discardSegments(long startTxId) throws IOException {
789
840
790
841
@ Override
791
842
public long getJournalCTime () throws IOException {
843
+ if (isEnableJnMaintenance ()) {
844
+ throw new IOException ("getJournalCTime() does not support enabling jn maintenance mode" );
845
+ }
792
846
QuorumCall <AsyncLogger , Long > call = loggers .getJournalCTime ();
793
847
try {
794
848
call .waitFor (loggers .size (), loggers .size (), 0 ,
@@ -819,4 +873,12 @@ public long getJournalCTime() throws IOException {
819
873
820
874
throw new AssertionError ("Unreachable code." );
821
875
}
876
+
877
+ public void setQuorumJournalCount (int quorumJournalCount ) {
878
+ this .quorumJournalCount = quorumJournalCount ;
879
+ }
880
+
881
+ private boolean isEnableJnMaintenance () {
882
+ return this .loggers .size () < quorumJournalCount ;
883
+ }
822
884
}
0 commit comments