Skip to content

Commit e2fa107

Browse files
顾鹏顾鹏
authored andcommitted
HDFS-17223. Add journalnode maintenance node list
1 parent 81edbeb commit e2fa107

File tree

9 files changed

+268
-12
lines changed

9 files changed

+268
-12
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1445,6 +1445,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
14451445
"dfs.journalnode.edit-cache-size.fraction";
14461446
public static final float DFS_JOURNALNODE_EDIT_CACHE_SIZE_FRACTION_DEFAULT = 0.5f;
14471447

1448+
public static final String DFS_JOURNALNODE_MAINTENANCE_NODES_KEY =
1449+
"dfs.journalnode.maintenance.nodes";
1450+
public static final String[] DFS_JOURNALNODE_MAINTENANCE_NODES_DEFAULT = {};
1451+
14481452
// Journal-node related configs for the client side.
14491453
public static final String DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY = "dfs.qjournal.queued-edits.limit.mb";
14501454
public static final int DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT = 10;

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@
1717
*/
1818
package org.apache.hadoop.hdfs.qjournal.client;
1919

20+
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_MAINTENANCE_NODES_DEFAULT;
21+
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_MAINTENANCE_NODES_KEY;
22+
2023
import java.io.IOException;
2124
import java.net.InetSocketAddress;
2225
import java.net.MalformedURLException;
2326
import java.net.URI;
27+
import java.net.URISyntaxException;
2428
import java.net.URL;
2529
import java.security.PrivilegedExceptionAction;
2630
import java.util.concurrent.ExecutionException;
@@ -44,9 +48,11 @@
4448
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
4549
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
4650
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
51+
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolFakeTranslatorPB;
4752
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
4853
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolTranslatorPB;
4954
import org.apache.hadoop.hdfs.qjournal.server.GetJournalEditServlet;
55+
import org.apache.hadoop.hdfs.server.blockmanagement.HostSet;
5056
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
5157
import org.apache.hadoop.hdfs.server.common.StorageInfo;
5258
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@@ -220,17 +226,40 @@ protected QJournalProtocol createProxy() throws IOException {
220226
// Need to set NODELAY or else batches larger than MTU can trigger
221227
// 40ms nailing delays.
222228
confCopy.setBoolean(CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY, true);
229+
230+
String[] skipNodesHostPort = conf.getTrimmedStrings(
231+
DFS_JOURNALNODE_MAINTENANCE_NODES_KEY, DFS_JOURNALNODE_MAINTENANCE_NODES_DEFAULT);
232+
HostSet excludes = new HostSet();
233+
for (String hostPort : skipNodesHostPort) {
234+
try {
235+
URI uri = new URI("dummy", hostPort, null, null, null);
236+
int port = uri.getPort() == -1 ? 0 : uri.getPort();
237+
InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port);
238+
if (addr.isUnresolved()) {
239+
QuorumJournalManager.LOG.warn(String.format("Failed to resolve address `%s`", hostPort));
240+
continue;
241+
}
242+
excludes.add(addr);
243+
} catch (URISyntaxException e) {
244+
QuorumJournalManager.LOG.warn(String.format("Failed to parse `%s`", hostPort));
245+
}
246+
}
247+
223248
RPC.setProtocolEngine(confCopy,
224249
QJournalProtocolPB.class, ProtobufRpcEngine2.class);
225250
return SecurityUtil.doAsLoginUser(
226251
(PrivilegedExceptionAction<QJournalProtocol>) () -> {
227-
RPC.setProtocolEngine(confCopy,
228-
QJournalProtocolPB.class, ProtobufRpcEngine2.class);
229-
QJournalProtocolPB pbproxy = RPC.getProxy(
230-
QJournalProtocolPB.class,
231-
RPC.getProtocolVersion(QJournalProtocolPB.class),
232-
addr, confCopy);
233-
return new QJournalProtocolTranslatorPB(pbproxy);
252+
if (excludes.match(addr)) {
253+
return new QJournalProtocolFakeTranslatorPB(addr);
254+
} else {
255+
RPC.setProtocolEngine(confCopy,
256+
QJournalProtocolPB.class, ProtobufRpcEngine2.class);
257+
QJournalProtocolPB pbproxy = RPC.getProxy(
258+
QJournalProtocolPB.class,
259+
RPC.getProtocolVersion(QJournalProtocolPB.class),
260+
addr, confCopy);
261+
return new QJournalProtocolTranslatorPB(pbproxy);
262+
}
234263
});
235264
}
236265

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
*/
1818
package org.apache.hadoop.hdfs.qjournal.client;
1919

20+
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_MAINTENANCE_NODES_DEFAULT;
21+
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_MAINTENANCE_NODES_KEY;
22+
2023
import java.io.IOException;
2124
import java.net.InetSocketAddress;
2225
import java.net.URI;
@@ -146,6 +149,16 @@ public QuorumJournalManager(Configuration conf,
146149
this.nameServiceId = nameServiceId;
147150
this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory));
148151

152+
// Check whether the number of jn maintenance lists is valid
153+
String[] skipNodesHostPort = conf.getTrimmedStrings(
154+
DFS_JOURNALNODE_MAINTENANCE_NODES_KEY, DFS_JOURNALNODE_MAINTENANCE_NODES_DEFAULT);
155+
156+
int quorumThreshold = (this.loggers.size() / 2) + 1;
157+
Preconditions.checkArgument(
158+
(this.loggers.size() - skipNodesHostPort.length) >= quorumThreshold,
159+
"The total journalnode minus %s the number of blacklists must be greater than or equal to %s!",
160+
DFS_JOURNALNODE_MAINTENANCE_NODES_KEY, quorumThreshold);
161+
149162
this.maxTxnsPerRpc =
150163
conf.getInt(QJM_RPC_MAX_TXNS_KEY, QJM_RPC_MAX_TXNS_DEFAULT);
151164
Preconditions.checkArgument(maxTxnsPerRpc > 0,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Copyright (c) 2022, SensorsData and/or its affiliates. All rights reserved.
3+
* SENSORSDATA PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
4+
*/
5+
6+
package org.apache.hadoop.hdfs.qjournal.protocolPB;
7+
8+
import java.io.Closeable;
9+
import java.io.IOException;
10+
import java.net.InetSocketAddress;
11+
import java.net.URL;
12+
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
13+
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
14+
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
15+
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
16+
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
17+
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
18+
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
19+
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
20+
import org.apache.hadoop.hdfs.server.common.StorageInfo;
21+
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
22+
23+
/**
24+
* The client is a mock implementation that pretends to access the JournalNode but doesn't perform
25+
* any actual operations. It provides an interface for blacklisting JournalNode nodes.
26+
*/
27+
public class QJournalProtocolFakeTranslatorPB implements QJournalProtocol, Closeable {
28+
29+
private final InetSocketAddress addr;
30+
31+
public QJournalProtocolFakeTranslatorPB(InetSocketAddress addr) {
32+
this.addr = addr;
33+
}
34+
35+
@Override
36+
public void close() throws IOException {
37+
return;
38+
}
39+
40+
@Override
41+
public boolean isFormatted(String journalId, String nameServiceId) throws IOException {
42+
return false;
43+
}
44+
45+
@Override
46+
public GetJournalStateResponseProto getJournalState(String journalId, String nameServiceId)
47+
throws IOException {
48+
throw new IOException("The journalnode " + addr + " is black node, skip this node.");
49+
}
50+
51+
@Override
52+
public void format(String journalId, String nameServiceId, NamespaceInfo nsInfo, boolean force)
53+
throws IOException {
54+
throw new IOException("The journalnode " + addr + " is black node, the format operation does "
55+
+ "not support blacklisting");
56+
}
57+
58+
@Override
59+
public NewEpochResponseProto newEpoch(String journalId, String nameServiceId,
60+
NamespaceInfo nsInfo, long epoch) throws IOException {
61+
throw new IOException("The journalnode " + addr + " is black node, skip this node.");
62+
}
63+
64+
@Override
65+
public void journal(RequestInfo reqInfo, long segmentTxId, long firstTxnId, int numTxns,
66+
byte[] records) throws IOException {
67+
throw new IOException("The journalnode " + addr + " is black node, skip this node.");
68+
}
69+
70+
@Override
71+
public void heartbeat(RequestInfo reqInfo) throws IOException {
72+
throw new IOException("The journalnode " + addr + " is black node, skip this node.");
73+
}
74+
75+
@Override
76+
public void startLogSegment(RequestInfo reqInfo, long txid, int layoutVersion)
77+
throws IOException {
78+
throw new IOException("The journalnode " + addr + " is black node, skip this node.");
79+
}
80+
81+
@Override
82+
public void finalizeLogSegment(RequestInfo reqInfo, long startTxId, long endTxId)
83+
throws IOException {
84+
throw new IOException("The journalnode " + addr + " is black node, skip this node.");
85+
}
86+
87+
@Override
88+
public void purgeLogsOlderThan(RequestInfo requestInfo, long minTxIdToKeep) throws IOException {
89+
throw new IOException("The journalnode " + addr + " is black node, skip this node.");
90+
}
91+
92+
@Override
93+
public GetEditLogManifestResponseProto getEditLogManifest(String jid, String nameServiceId,
94+
long sinceTxId, boolean inProgressOk) throws IOException {
95+
throw new IOException("The journalnode " + addr + " is black node, skip this node.");
96+
}
97+
98+
@Override
99+
public GetJournaledEditsResponseProto getJournaledEdits(String jid, String nameServiceId,
100+
long sinceTxId, int maxTxns) throws IOException {
101+
throw new IOException("The journalnode " + addr + " is black node, skip this node.");
102+
}
103+
104+
@Override
105+
public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo, long segmentTxId)
106+
throws IOException {
107+
throw new IOException("The journalnode " + addr + " is black node, skip this node.");
108+
}
109+
110+
@Override
111+
public void acceptRecovery(RequestInfo reqInfo, SegmentStateProto stateToAccept, URL fromUrl)
112+
throws IOException {
113+
throw new IOException("The journalnode " + addr + " is black node, skip this node.");
114+
}
115+
116+
@Override
117+
public void doPreUpgrade(String journalId) throws IOException {
118+
throw new IOException("The journalnode " + addr + " is black node, the doPreUpgrade operation "
119+
+ "does not support blacklisting");
120+
}
121+
122+
@Override
123+
public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException {
124+
throw new IOException("The journalnode " + addr + " is black node, the doUpgrade operation does"
125+
+ " not support blacklisting");
126+
}
127+
128+
@Override
129+
public void doFinalize(String journalId, String nameServiceid) throws IOException {
130+
throw new IOException("The journalnode " + addr + " is black node, the doFinalize operation "
131+
+ "does not support blacklisting");
132+
}
133+
134+
@Override
135+
public Boolean canRollBack(String journalId, String nameServiceid, StorageInfo storage,
136+
StorageInfo prevStorage, int targetLayoutVersion) throws IOException {
137+
throw new IOException("The journalnode " + addr + " is black node, the canRollBack operation "
138+
+ "does not support blacklisting");
139+
}
140+
141+
@Override
142+
public void doRollback(String journalId, String nameServiceid) throws IOException {
143+
throw new IOException("The journalnode " + addr + " is black node, the doRollback operation "
144+
+ "does not support blacklisting");
145+
}
146+
147+
@Override
148+
public void discardSegments(String journalId, String nameServiceId, long startTxId)
149+
throws IOException {
150+
throw new IOException("The journalnode " + addr + " is black node, the discardSegments "
151+
+ "operation does not support blacklisting");
152+
}
153+
154+
@Override
155+
public Long getJournalCTime(String journalId, String nameServiceId) throws IOException {
156+
throw new IOException("The journalnode " + addr + " is black node, the getJournalCTime "
157+
+ "operation does not support blacklisting");
158+
}
159+
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostSet.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class HostSet implements Iterable<InetSocketAddress> {
4545
* The function that checks whether there exists an entry foo in the set
4646
* so that foo &lt;= addr.
4747
*/
48-
boolean matchedBy(InetSocketAddress addr) {
48+
public boolean matchedBy(InetSocketAddress addr) {
4949
Collection<Integer> ports = addrs.get(addr.getAddress());
5050
return addr.getPort() == 0 ? !ports.isEmpty() : ports.contains(addr
5151
.getPort());
@@ -55,23 +55,23 @@ boolean matchedBy(InetSocketAddress addr) {
5555
* The function that checks whether there exists an entry foo in the set
5656
* so that addr &lt;= foo.
5757
*/
58-
boolean match(InetSocketAddress addr) {
58+
public boolean match(InetSocketAddress addr) {
5959
int port = addr.getPort();
6060
Collection<Integer> ports = addrs.get(addr.getAddress());
6161
boolean exactMatch = ports.contains(port);
6262
boolean genericMatch = ports.contains(0);
6363
return exactMatch || genericMatch;
6464
}
6565

66-
boolean isEmpty() {
66+
public boolean isEmpty() {
6767
return addrs.isEmpty();
6868
}
6969

70-
int size() {
70+
public int size() {
7171
return addrs.size();
7272
}
7373

74-
void add(InetSocketAddress addr) {
74+
public void add(InetSocketAddress addr) {
7575
Preconditions.checkArgument(!addr.isUnresolved());
7676
addrs.put(addr.getAddress(), addr.getPort());
7777
}

hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6287,6 +6287,20 @@
62876287
</description>
62886288
</property>
62896289

6290+
<property>
6291+
<name>dfs.journalnode.maintenance.nodes</name>
6292+
<value></value>
6293+
<description>
6294+
In the case of one out of three journal nodes being down, theoretically the service can still
6295+
continue. However, in reality, the downed node may not recover quickly. If the Namenode needs
6296+
to be restarted, it will try the downed journal node through the lengthy RPC retry mechanism,
6297+
resulting in a long initialization time for the Namenode to provide services. By adding the
6298+
downed journal node to the maintenance nodes, the initialization time of the Namenode in such
6299+
scenarios can be accelerated.
6300+
</description>
6301+
</property>
6302+
6303+
62906304
<property>
62916305
<name>dfs.namenode.lease-hard-limit-sec</name>
62926306
<value>1200</value>

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public abstract class QJMTestUtil {
4747
public static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
4848
12345, "mycluster", "my-bp", 0L);
4949
public static final String JID = "test-journal";
50+
public static final String FAKE_HOSTNAME = "jn";
5051

5152
public static byte[] createTxnData(int startTxn, int numTxns) throws Exception {
5253
DataOutputBuffer buf = new DataOutputBuffer();

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717
*/
1818
package org.apache.hadoop.hdfs.qjournal.client;
1919

20+
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_MAINTENANCE_NODES_KEY;
2021
import static org.junit.Assert.*;
2122

2223
import java.io.IOException;
2324
import java.net.InetSocketAddress;
2425
import java.util.concurrent.ExecutionException;
2526
import java.util.concurrent.TimeUnit;
2627

28+
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolFakeTranslatorPB;
2729
import org.slf4j.Logger;
2830
import org.slf4j.LoggerFactory;
2931
import org.apache.hadoop.conf.Configuration;
@@ -178,4 +180,18 @@ public void testStopSendingEditsWhenOutOfSync() throws Exception {
178180

179181
ch.sendEdits(3L, 3L, 1, FAKE_DATA).get();
180182
}
183+
184+
/**
185+
* Test that, if the journalnode maintenance list name contains the FAKE_ADDR address, it is
186+
* considered to be the node to be masked and QJournalProtocolEmptyTranslatorPB is returned
187+
*/
188+
@Test
189+
public void testCreateProxyReturnEmptyTranslatorPB() throws Exception {
190+
conf.set(DFS_JOURNALNODE_MAINTENANCE_NODES_KEY,
191+
FAKE_ADDR.getHostName() + ":" + FAKE_ADDR.getPort());
192+
IPCLoggerChannel ipcChannel = new IPCLoggerChannel(conf, FAKE_NSINFO, JID, FAKE_ADDR);
193+
QJournalProtocol protocol = ipcChannel.createProxy();
194+
assertTrue(protocol instanceof QJournalProtocolFakeTranslatorPB);
195+
}
196+
181197
}

0 commit comments

Comments
 (0)