Skip to content

Commit 09cabaa

Browse files
authored
HDFS-13274. RBF: Extend RouterRpcClient to use multiple sockets (#4531)
1 parent b28e4c6 commit 09cabaa

File tree

7 files changed

+208
-38
lines changed

7 files changed

+208
-38
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.net.InetSocketAddress;
2121
import java.util.concurrent.TimeUnit;
2222

23+
import org.apache.hadoop.conf.Configuration;
2324
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
2425
import org.apache.hadoop.ipc.RPC;
2526
import org.apache.hadoop.util.Time;
@@ -53,9 +54,14 @@ public class ConnectionContext {
5354
private long lastActiveTs = 0;
5455
/** The connection's active status would expire after this window. */
5556
private final static long ACTIVE_WINDOW_TIME = TimeUnit.SECONDS.toMillis(30);
57+
/** The maximum number of requests that this connection can handle concurrently. **/
58+
private final int maxConcurrencyPerConn;
5659

57-
public ConnectionContext(ProxyAndInfo<?> connection) {
60+
public ConnectionContext(ProxyAndInfo<?> connection, Configuration conf) {
5861
this.client = connection;
62+
this.maxConcurrencyPerConn = conf.getInt(
63+
RBFConfigKeys.DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_KEY,
64+
RBFConfigKeys.DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_DEFAULT);
5965
}
6066

6167
/**
@@ -93,6 +99,23 @@ public synchronized boolean isClosed() {
9399
* @return True if the connection can be used.
94100
*/
95101
public synchronized boolean isUsable() {
102+
return hasAvailableConcurrency() && !isClosed();
103+
}
104+
105+
/**
106+
* Return true if this connection context still has available concurrency,
107+
* else return false.
108+
*/
109+
private synchronized boolean hasAvailableConcurrency() {
110+
return this.numThreads < maxConcurrencyPerConn;
111+
}
112+
113+
/**
114+
* Check if the connection is idle. It checks if the connection is not used
115+
* by another thread.
116+
* @return True if the connection is not used by another thread.
117+
*/
118+
public synchronized boolean isIdle() {
96119
return !isActive() && !isClosed();
97120
}
98121

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java

Lines changed: 48 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ public class ConnectionPool {
7777
private static final Logger LOG =
7878
LoggerFactory.getLogger(ConnectionPool.class);
7979

80-
8180
/** Configuration settings for the connection pool. */
8281
private final Configuration conf;
8382

@@ -94,6 +93,8 @@ public class ConnectionPool {
9493
private volatile List<ConnectionContext> connections = new ArrayList<>();
9594
/** Connection index for round-robin. */
9695
private final AtomicInteger clientIndex = new AtomicInteger(0);
96+
/** Underlying socket index. **/
97+
private final AtomicInteger socketIndex = new AtomicInteger(0);
9798

9899
/** Min number of connections per user. */
99100
private final int minSize;
@@ -105,6 +106,9 @@ public class ConnectionPool {
105106
/** The last time a connection was active. */
106107
private volatile long lastActiveTime = 0;
107108

109+
/** Enable using multiple physical socket or not. **/
110+
private final boolean enableMultiSocket;
111+
108112
/** Map for the protocols and their protobuf implementations. */
109113
private final static Map<Class<?>, ProtoImpl> PROTO_MAP = new HashMap<>();
110114
static {
@@ -149,9 +153,12 @@ protected ConnectionPool(Configuration config, String address,
149153
this.minSize = minPoolSize;
150154
this.maxSize = maxPoolSize;
151155
this.minActiveRatio = minActiveRatio;
156+
this.enableMultiSocket = conf.getBoolean(
157+
RBFConfigKeys.DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_KEY,
158+
RBFConfigKeys.DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_DEFAULT);
152159

153160
// Add minimum connections to the pool
154-
for (int i=0; i<this.minSize; i++) {
161+
for (int i = 0; i < this.minSize; i++) {
155162
ConnectionContext newConnection = newConnection();
156163
this.connections.add(newConnection);
157164
}
@@ -210,24 +217,23 @@ public AtomicInteger getClientIndex() {
210217
* @return Connection context.
211218
*/
212219
protected ConnectionContext getConnection() {
213-
214220
this.lastActiveTime = Time.now();
215-
216-
// Get a connection from the pool following round-robin
217-
ConnectionContext conn = null;
218221
List<ConnectionContext> tmpConnections = this.connections;
219-
int size = tmpConnections.size();
220-
// Inc and mask off sign bit, lookup index should be non-negative int
221-
int threadIndex = this.clientIndex.getAndIncrement() & 0x7FFFFFFF;
222-
for (int i=0; i<size; i++) {
223-
int index = (threadIndex + i) % size;
224-
conn = tmpConnections.get(index);
225-
if (conn != null && conn.isUsable()) {
226-
return conn;
222+
for (ConnectionContext tmpConnection : tmpConnections) {
223+
if (tmpConnection != null && tmpConnection.isUsable()) {
224+
return tmpConnection;
227225
}
228226
}
229227

230-
// We return a connection even if it's active
228+
ConnectionContext conn = null;
229+
// We return a connection even if it's busy
230+
int size = tmpConnections.size();
231+
if (size > 0) {
232+
// Get a connection from the pool following round-robin
233+
// Inc and mask off sign bit, lookup index should be non-negative int
234+
int threadIndex = this.clientIndex.getAndIncrement() & 0x7FFFFFFF;
235+
conn = tmpConnections.get(threadIndex % size);
236+
}
231237
return conn;
232238
}
233239

@@ -256,19 +262,18 @@ public synchronized List<ConnectionContext> removeConnections(int num) {
256262
int targetCount = Math.min(num, this.connections.size() - this.minSize);
257263
// Remove and close targetCount of connections
258264
List<ConnectionContext> tmpConnections = new ArrayList<>();
259-
for (int i = 0; i < this.connections.size(); i++) {
260-
ConnectionContext conn = this.connections.get(i);
265+
for (ConnectionContext conn : this.connections) {
261266
// Only pick idle connections to close
262-
if (removed.size() < targetCount && conn.isUsable()) {
267+
if (removed.size() < targetCount && conn.isIdle()) {
263268
removed.add(conn);
264269
} else {
265270
tmpConnections.add(conn);
266271
}
267272
}
268273
this.connections = tmpConnections;
269274
}
270-
LOG.debug("Expected to remove {} connection " +
271-
"and actually removed {} connections", num, removed.size());
275+
LOG.debug("Expected to remove {} connection and actually removed {} connections",
276+
num, removed.size());
272277
return removed;
273278
}
274279

@@ -303,7 +308,6 @@ protected int getNumConnections() {
303308
*/
304309
protected int getNumActiveConnections() {
305310
int ret = 0;
306-
307311
List<ConnectionContext> tmpConnections = this.connections;
308312
for (ConnectionContext conn : tmpConnections) {
309313
if (conn.isActive()) {
@@ -320,10 +324,9 @@ protected int getNumActiveConnections() {
320324
*/
321325
protected int getNumIdleConnections() {
322326
int ret = 0;
323-
324327
List<ConnectionContext> tmpConnections = this.connections;
325328
for (ConnectionContext conn : tmpConnections) {
326-
if (conn.isUsable()) {
329+
if (conn.isIdle()) {
327330
ret++;
328331
}
329332
}
@@ -393,28 +396,30 @@ public String getJSON() {
393396
* @throws IOException If it cannot get a new connection.
394397
*/
395398
public ConnectionContext newConnection() throws IOException {
396-
return newConnection(
397-
this.conf, this.namenodeAddress, this.ugi, this.protocol);
399+
return newConnection(this.conf, this.namenodeAddress,
400+
this.ugi, this.protocol, this.enableMultiSocket,
401+
this.socketIndex.incrementAndGet());
398402
}
399403

400404
/**
401405
* Creates a proxy wrapper for a client NN connection. Each proxy contains
402406
* context for a single user/security context. To maximize throughput it is
403407
* recommended to use multiple connection per user+server, allowing multiple
404408
* writes and reads to be dispatched in parallel.
405-
* @param <T>
409+
* @param <T> Input type T.
406410
*
407411
* @param conf Configuration for the connection.
408412
* @param nnAddress Address of server supporting the ClientProtocol.
409413
* @param ugi User context.
410414
* @param proto Interface of the protocol.
415+
* @param enableMultiSocket Enable multiple socket or not.
411416
* @return proto for the target ClientProtocol that contains the user's
412417
* security context.
413418
* @throws IOException If it cannot be created.
414419
*/
415420
protected static <T> ConnectionContext newConnection(Configuration conf,
416-
String nnAddress, UserGroupInformation ugi, Class<T> proto)
417-
throws IOException {
421+
String nnAddress, UserGroupInformation ugi, Class<T> proto,
422+
boolean enableMultiSocket, int socketIndex) throws IOException {
418423
if (!PROTO_MAP.containsKey(proto)) {
419424
String msg = "Unsupported protocol for connection to NameNode: "
420425
+ ((proto != null) ? proto.getName() : "null");
@@ -437,23 +442,31 @@ protected static <T> ConnectionContext newConnection(Configuration conf,
437442
}
438443
InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress);
439444
final long version = RPC.getProtocolVersion(classes.protoPb);
440-
Object proxy = RPC.getProtocolProxy(classes.protoPb, version, socket, ugi,
441-
conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy();
445+
Object proxy;
446+
if (enableMultiSocket) {
447+
FederationConnectionId connectionId = new FederationConnectionId(
448+
socket, classes.protoPb, ugi, RPC.getRpcTimeout(conf),
449+
defaultPolicy, conf, socketIndex);
450+
proxy = RPC.getProtocolProxy(classes.protoPb, version, connectionId,
451+
conf, factory).getProxy();
452+
} else {
453+
proxy = RPC.getProtocolProxy(classes.protoPb, version, socket, ugi,
454+
conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy();
455+
}
456+
442457
T client = newProtoClient(proto, classes, proxy);
443458
Text dtService = SecurityUtil.buildTokenService(socket);
444459

445-
ProxyAndInfo<T> clientProxy =
446-
new ProxyAndInfo<T>(client, dtService, socket);
447-
ConnectionContext connection = new ConnectionContext(clientProxy);
448-
return connection;
460+
ProxyAndInfo<T> clientProxy = new ProxyAndInfo<T>(client, dtService, socket);
461+
return new ConnectionContext(clientProxy, conf);
449462
}
450463

451464
private static <T> T newProtoClient(Class<T> proto, ProtoImpl classes,
452465
Object proxy) {
453466
try {
454467
Constructor<?> constructor =
455468
classes.protoClientPb.getConstructor(classes.protoPb);
456-
Object o = constructor.newInstance(new Object[] {proxy});
469+
Object o = constructor.newInstance(proxy);
457470
if (proto.isAssignableFrom(o.getClass())) {
458471
@SuppressWarnings("unchecked")
459472
T client = (T) o;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.router;
19+
20+
import org.apache.commons.lang3.builder.EqualsBuilder;
21+
import org.apache.commons.lang3.builder.HashCodeBuilder;
22+
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.io.retry.RetryPolicy;
24+
import org.apache.hadoop.ipc.Client;
25+
import org.apache.hadoop.security.UserGroupInformation;
26+
27+
import java.net.InetSocketAddress;
28+
29+
public class FederationConnectionId extends Client.ConnectionId {
30+
private final int index;
31+
32+
public FederationConnectionId(InetSocketAddress address, Class<?> protocol,
33+
UserGroupInformation ticket, int rpcTimeout,
34+
RetryPolicy connectionRetryPolicy, Configuration conf, int index) {
35+
super(address, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf);
36+
this.index = index;
37+
}
38+
39+
@Override
40+
public int hashCode() {
41+
return new HashCodeBuilder()
42+
.append(super.hashCode())
43+
.append(this.index)
44+
.toHashCode();
45+
}
46+
47+
@Override
48+
public boolean equals(Object obj) {
49+
if (!super.equals(obj)) {
50+
return false;
51+
}
52+
if (obj instanceof FederationConnectionId) {
53+
FederationConnectionId other = (FederationConnectionId)obj;
54+
return new EqualsBuilder()
55+
.append(this.index, other.index)
56+
.isEquals();
57+
}
58+
return false;
59+
}
60+
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,12 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
135135
FEDERATION_ROUTER_PREFIX + "connection.clean.ms";
136136
public static final long DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT =
137137
TimeUnit.SECONDS.toMillis(10);
138+
public static final String DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_KEY =
139+
FEDERATION_ROUTER_PREFIX + "enable.multiple.socket";
140+
public static final boolean DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_DEFAULT = false;
141+
public static final String DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_KEY =
142+
FEDERATION_ROUTER_PREFIX + "max.concurrency.per.connection";
143+
public static final int DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_DEFAULT = 1;
138144

139145
// HDFS Router RPC client
140146
public static final String DFS_ROUTER_CLIENT_THREADS_SIZE =

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,33 @@
134134
</description>
135135
</property>
136136

137+
<property>
138+
<name>dfs.federation.router.enable.multiple.socket</name>
139+
<value>false</value>
140+
<description>
141+
If enable multiple downstream socket or not. If true, ConnectionPool
142+
will use a new socket when creating a new connection for the same user,
143+
and RouterRPCClient will get a better throughput. It's best used with
144+
dfs.federation.router.max.concurrency.per.connection together to get
145+
a better throughput with fewer sockets. Such as enable
146+
dfs.federation.router.enable.multiple.socket and
147+
set dfs.federation.router.max.concurrency.per.connection = 20.
148+
</description>
149+
</property>
150+
151+
<property>
152+
<name>dfs.federation.router.max.concurrency.per.connection</name>
153+
<value>1</value>
154+
<description>
155+
The maximum number of requests that a connection can handle concurrently.
156+
When the number of requests being processed by a socket is less than this value,
157+
new request will be processed by this socket. When enable
158+
dfs.federation.router.enable.multiple.socket, it's best
159+
set this value greater than 1, such as 20, to avoid frequent
160+
creation and idle sockets in the case of a NS with jitter requests.
161+
</description>
162+
</property>
163+
137164
<property>
138165
<name>dfs.federation.router.connection.pool.clean.ms</name>
139166
<value>60000</value>

hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,11 +416,13 @@ The RPC server to receive connections from the clients.
416416
The Router forwards the client requests to the NameNodes.
417417
It uses a pool of connections to reduce the latency of creating them.
418418

419-
| Property | Default | Description|
419+
| Property | Default | Description |
420420
|:---- |:---- |:---- |
421421
| dfs.federation.router.connection.pool-size | 1 | Size of the pool of connections from the router to namenodes. |
422422
| dfs.federation.router.connection.clean.ms | 10000 | Time interval, in milliseconds, to check if the connection pool should remove unused connections. |
423423
| dfs.federation.router.connection.pool.clean.ms | 60000 | Time interval, in milliseconds, to check if the connection manager should remove unused connection pools. |
424+
| dfs.federation.router.enable.multiple.socket | false | If true, ConnectionPool will use a new socket when creating a new connection for the same user. And it's best used with dfs.federation.router.max.concurrency.per.connection together. |
425+
| dfs.federation.router.max.concurrency.per.connection | 1 | The maximum number of requests that a connection can handle concurrently. |
424426

425427
### Admin server
426428

0 commit comments

Comments
 (0)