Skip to content

Commit 79ef02b

Browse files
author
zengqiang.xu
committed
HDFS-13274. Modify patch based on comment
1 parent c271cec commit 79ef02b

File tree

3 files changed

+10
-17
lines changed

3 files changed

+10
-17
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.net.InetSocketAddress;
2121
import java.util.concurrent.TimeUnit;
2222

23+
import org.apache.hadoop.conf.Configuration;
2324
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
2425
import org.apache.hadoop.ipc.RPC;
2526
import org.apache.hadoop.util.Time;
@@ -56,9 +57,11 @@ public class ConnectionContext {
5657
/** The maximum number of requests that this connection can handle concurrently. **/
5758
private final int maxConcurrencyPerConn;
5859

59-
public ConnectionContext(ProxyAndInfo<?> connection, int maxConcurrencyPerConn) {
60+
public ConnectionContext(ProxyAndInfo<?> connection, Configuration conf) {
6061
this.client = connection;
61-
this.maxConcurrencyPerConn = maxConcurrencyPerConn;
62+
this.maxConcurrencyPerConn = conf.getInt(
63+
RBFConfigKeys.DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_KEY,
64+
RBFConfigKeys.DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_DEFAULT);
6265
}
6366

6467
/**

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,6 @@ public class ConnectionPool {
109109
/** Enable using multiple physical socket or not. **/
110110
private final boolean enableMultiSocket;
111111

112-
/** Max Concurrency of each connection. */
113-
private final int maxConcurrencyPerConn;
114-
115112
/** Map for the protocols and their protobuf implementations. */
116113
private final static Map<Class<?>, ProtoImpl> PROTO_MAP = new HashMap<>();
117114
static {
@@ -159,9 +156,6 @@ protected ConnectionPool(Configuration config, String address,
159156
this.enableMultiSocket = conf.getBoolean(
160157
RBFConfigKeys.DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_KEY,
161158
RBFConfigKeys.DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_DEFAULT);
162-
this.maxConcurrencyPerConn = conf.getInt(
163-
RBFConfigKeys.DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_KEY,
164-
RBFConfigKeys.DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_DEFAULT);
165159

166160
// Add minimum connections to the pool
167161
for (int i = 0; i < this.minSize; i++) {
@@ -403,8 +397,7 @@ public String getJSON() {
403397
public ConnectionContext newConnection() throws IOException {
404398
return newConnection(this.conf, this.namenodeAddress,
405399
this.ugi, this.protocol, this.enableMultiSocket,
406-
this.socketIndex.incrementAndGet(),
407-
this.maxConcurrencyPerConn);
400+
this.socketIndex.incrementAndGet());
408401
}
409402

410403
/**
@@ -419,16 +412,13 @@ public ConnectionContext newConnection() throws IOException {
419412
* @param ugi User context.
420413
* @param proto Interface of the protocol.
421414
* @param enableMultiSocket Enable multiple socket or not.
422-
* @param maxConcurrencyPerConn The maximum number of requests that
423-
* this connection can handle concurrently.
424415
* @return proto for the target ClientProtocol that contains the user's
425416
* security context.
426417
* @throws IOException If it cannot be created.
427418
*/
428419
protected static <T> ConnectionContext newConnection(Configuration conf,
429420
String nnAddress, UserGroupInformation ugi, Class<T> proto,
430-
boolean enableMultiSocket, int socketIndex,
431-
int maxConcurrencyPerConn) throws IOException {
421+
boolean enableMultiSocket, int socketIndex) throws IOException {
432422
if (!PROTO_MAP.containsKey(proto)) {
433423
String msg = "Unsupported protocol for connection to NameNode: "
434424
+ ((proto != null) ? proto.getName() : "null");
@@ -467,7 +457,7 @@ protected static <T> ConnectionContext newConnection(Configuration conf,
467457
Text dtService = SecurityUtil.buildTokenService(socket);
468458

469459
ProxyAndInfo<T> clientProxy = new ProxyAndInfo<T>(client, dtService, socket);
470-
return new ConnectionContext(clientProxy, maxConcurrencyPerConn);
460+
return new ConnectionContext(clientProxy, conf);
471461
}
472462

473463
private static <T> T newProtoClient(Class<T> proto, ProtoImpl classes,

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ private void testConnectionCleanup(float ratio, int totalConns,
343343
addConnectionsToPool(pool, totalConns - 1, activeConns - 1);
344344

345345
// There are activeConn connections.
346-
// We can clean up the pool
346+
// We can cleanup the pool
347347
tmpConnManager.cleanup(pool);
348348
assertEquals(leftConns, pool.getNumConnections());
349349

@@ -356,6 +356,6 @@ public void testUnsupportedProtoExceptionMsg() throws Exception {
356356
"Unsupported protocol for connection to NameNode: "
357357
+ TestConnectionManager.class.getName(),
358358
() -> ConnectionPool.newConnection(conf, TEST_NN_ADDRESS, TEST_USER1,
359-
TestConnectionManager.class, false, 0, 1));
359+
TestConnectionManager.class, false, 0));
360360
}
361361
}

0 commit comments

Comments
 (0)