diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index cc63d9036b2f3..febaedec21706 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -823,7 +823,7 @@ T invokeAtAvailableNsAsync(RemoteMethod method, Class clazz) asyncComplete(null); if (!nsId.isEmpty()) { asyncTry(() -> { - rpcClient.invokeSingle(nsId, method, clazz); + getRPCClient().invokeSingle(nsId, method, clazz); }); asyncCatch((AsyncCatchFunction)(res, ioe) -> { @@ -898,7 +898,7 @@ T invokeOnNsAsync(RemoteMethod method, Class clazz, IOException ioe, String nsId = fnInfo.getNameserviceId(); LOG.debug("Invoking {} on namespace {}", method, nsId); asyncTry(() -> { - rpcClient.invokeSingle(nsId, method, clazz); + getRPCClient().invokeSingle(nsId, method, clazz); asyncApply(result -> { if (result != null && isExpectedClass(clazz, result)) { foreach.breakNow(); @@ -1092,7 +1092,7 @@ private RemoteLocation getExistingLocationAsync(String src, List locations) throws IOException { RemoteMethod method = new RemoteMethod("getFileInfo", new Class[] {String.class}, new RemoteParam()); - rpcClient.invokeConcurrent( + getRPCClient().invokeConcurrent( locations, method, true, false, HdfsFileStatus.class); asyncApply((ApplyFunction, Object>) results -> { for (RemoteLocation loc : locations) { @@ -1378,7 +1378,7 @@ public DatanodeInfo[] getDatanodeReportAsync( new Class[] {DatanodeReportType.class}, type); Set nss = namenodeResolver.getNamespaces(); - rpcClient.invokeConcurrent(nss, method, requireResponse, false, + getRPCClient().invokeConcurrent(nss, method, requireResponse, false, timeOutMs, DatanodeInfo[].class); asyncApply((ApplyFunction, @@ -1460,7 +1460,7 @@ public Map getDatanodeStorageReportMapAsync( RemoteMethod method = new RemoteMethod("getDatanodeStorageReport", new Class[] {DatanodeReportType.class}, type); Set nss = namenodeResolver.getNamespaces(); - rpcClient.invokeConcurrent( + getRPCClient().invokeConcurrent( nss, method, requireResponse, false, timeOutMs, DatanodeStorageReport[].class); asyncApply((ApplyFunction, @@ -2309,7 +2309,7 @@ public DatanodeInfo[] getSlowDatanodeReportAsync(boolean requireResponse, long t RemoteMethod method = new RemoteMethod("getSlowDatanodeReport"); Set nss = namenodeResolver.getNamespaces(); - rpcClient.invokeConcurrent(nss, method, requireResponse, false, + getRPCClient().invokeConcurrent(nss, method, requireResponse, false, timeOutMs, DatanodeInfo[].class); asyncApply((ApplyFunction,