Skip to content

Commit a39af33

Browse files
committed
HDFS-17545. [ARR] router async rpc client. (apache#6871). Contributed by Jian Zhang.
Reviewed-by: hfutatzhanghb <hfutzhanghb@163.com> Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
1 parent 00564f0 commit a39af33

File tree

12 files changed

+1624
-198
lines changed

12 files changed

+1624
-198
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,52 +18,91 @@
1818

1919
package org.apache.hadoop.hdfs.protocolPB;
2020

21+
import org.apache.hadoop.hdfs.server.federation.router.ThreadLocalContext;
2122
import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
2223
import org.apache.hadoop.io.Writable;
23-
import org.apache.hadoop.ipc.CallerContext;
2424
import org.apache.hadoop.ipc.Client;
2525
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
26-
import org.apache.hadoop.ipc.Server;
2726
import org.apache.hadoop.ipc.internal.ShadedProtobufHelper;
2827
import org.apache.hadoop.util.concurrent.AsyncGet;
2928
import org.slf4j.Logger;
3029
import org.slf4j.LoggerFactory;
3130

3231
import java.io.IOException;
3332
import java.util.concurrent.CompletableFuture;
33+
import java.util.concurrent.Executor;
3434

3535
import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
36-
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
3736
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCompleteWith;
3837
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
3938
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
4039

40+
/**
41+
* <p>This utility class encapsulates the logic required to initiate asynchronous RPCs,
42+
* handle responses, and propagate exceptions. It works in conjunction with
43+
* {@link ProtobufRpcEngine2} and {@link Client} to facilitate the asynchronous
44+
* nature of the operations.
45+
*
46+
* @see ProtobufRpcEngine2
47+
* @see Client
48+
* @see CompletableFuture
49+
*/
4150
public final class AsyncRpcProtocolPBUtil {
4251
public static final Logger LOG = LoggerFactory.getLogger(AsyncRpcProtocolPBUtil.class);
52+
/** The executor used for handling responses asynchronously. */
53+
private static Executor worker;
4354

4455
private AsyncRpcProtocolPBUtil() {}
4556

57+
/**
58+
* Asynchronously invokes an RPC call and applies a response transformation function
59+
* to the result. This method is generic and can be used to handle any type of
60+
* RPC call.
61+
*
62+
* <p>The method uses the {@link ShadedProtobufHelper.IpcCall} to prepare the RPC call
63+
* and the {@link ApplyFunction} to process the response. It also handles exceptions
64+
* that may occur during the RPC call and wraps them in a user-friendly manner.
65+
*
66+
* @param call The IPC call encapsulating the RPC request.
67+
* @param response The function to apply to the response of the RPC call.
68+
* @param clazz The class object representing the type {@code R} of the response.
69+
* @param <T> Type of the call's result.
70+
* @param <R> Type of method return.
71+
* @return An object of type {@code R} that is the result of applying the response
72+
* function to the RPC call result.
73+
* @throws IOException If an I/O error occurs during the asynchronous RPC call.
74+
*/
4675
public static <T, R> R asyncIpcClient(
4776
ShadedProtobufHelper.IpcCall<T> call, ApplyFunction<T, R> response,
4877
Class<R> clazz) throws IOException {
4978
ipc(call);
5079
AsyncGet<T, Exception> asyncReqMessage =
5180
(AsyncGet<T, Exception>) ProtobufRpcEngine2.getAsyncReturnMessage();
5281
CompletableFuture<Writable> responseFuture = Client.getResponseFuture();
53-
// transfer originCall & callerContext to worker threads of executor.
54-
final Server.Call originCall = Server.getCurCall().get();
55-
final CallerContext originContext = CallerContext.getCurrent();
56-
asyncCompleteWith(responseFuture);
57-
asyncApply(o -> {
82+
// transfer thread local context to worker threads of executor.
83+
ThreadLocalContext threadLocalContext = new ThreadLocalContext();
84+
asyncCompleteWith(responseFuture.handleAsync((result, e) -> {
85+
threadLocalContext.transfer();
86+
if (e != null) {
87+
throw warpCompletionException(e);
88+
}
5889
try {
59-
Server.getCurCall().set(originCall);
60-
CallerContext.setCurrent(originContext);
6190
T res = asyncReqMessage.get(-1, null);
6291
return response.apply(res);
63-
} catch (Exception e) {
64-
throw warpCompletionException(e);
92+
} catch (Exception ex) {
93+
throw warpCompletionException(ex);
6594
}
66-
});
95+
}, worker));
6796
return asyncReturn(clazz);
6897
}
98+
99+
/**
100+
* Sets the executor used for handling responses asynchronously within
101+
* the utility class.
102+
*
103+
* @param worker The executor to be used for handling responses asynchronously.
104+
*/
105+
public static void setWorker(Executor worker) {
106+
AsyncRpcProtocolPBUtil.worker = worker;
107+
}
69108
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {
5252

5353

5454
/** Time for an operation to be received in the Router. */
55-
private static final ThreadLocal<Long> START_TIME = new ThreadLocal<>();
55+
private static final ThreadLocal<Long> START_TIME = ThreadLocal.withInitial(() -> -1L);
5656
/** Time for an operation to be sent to the Namenode. */
57-
private static final ThreadLocal<Long> PROXY_TIME = new ThreadLocal<>();
57+
private static final ThreadLocal<Long> PROXY_TIME = ThreadLocal.withInitial(() -> -1L);
5858

5959
/** Configuration for the performance monitor. */
6060
private Configuration conf;
@@ -141,6 +141,14 @@ public void startOp() {
141141
START_TIME.set(monotonicNow());
142142
}
143143

144+
public static long getStartOpTime() {
145+
return START_TIME.get();
146+
}
147+
148+
public static void setStartOpTime(long startOpTime) {
149+
START_TIME.set(startOpTime);
150+
}
151+
144152
@Override
145153
public long proxyOp() {
146154
PROXY_TIME.set(monotonicNow());
@@ -151,6 +159,14 @@ public long proxyOp() {
151159
return Thread.currentThread().getId();
152160
}
153161

162+
public static long getProxyOpTime() {
163+
return PROXY_TIME.get();
164+
}
165+
166+
public static void setProxyOpTime(long proxyOpTime) {
167+
PROXY_TIME.set(proxyOpTime);
168+
}
169+
154170
@Override
155171
public void proxyOpComplete(boolean success, String nsId,
156172
FederationNamenodeServiceState state) {

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,15 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
7272
public static final String DFS_ROUTER_RPC_ENABLE =
7373
FEDERATION_ROUTER_PREFIX + "rpc.enable";
7474
public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true;
75+
public static final String DFS_ROUTER_RPC_ENABLE_ASYNC =
76+
FEDERATION_ROUTER_PREFIX + "rpc.async.enable";
77+
public static final boolean DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT = false;
78+
public static final String DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT =
79+
FEDERATION_ROUTER_PREFIX + "rpc.async.handler.count";
80+
public static final int DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT = 2;
81+
public static final String DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT =
82+
FEDERATION_ROUTER_PREFIX + "rpc.async.responder.count";
83+
public static final int DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT = 10;
7584

7685
public static final String DFS_ROUTER_METRICS_ENABLE =
7786
FEDERATION_ROUTER_PREFIX + "metrics.enable";

0 commit comments

Comments
 (0)