3737import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_FEDERATION_RENAME_OPTION ;
3838import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT ;
3939import static org .apache .hadoop .hdfs .server .federation .router .RouterFederationRename .RouterRenameOption ;
40+ import static org .apache .hadoop .hdfs .server .federation .router .async .AsyncUtil .asyncApply ;
41+ import static org .apache .hadoop .hdfs .server .federation .router .async .AsyncUtil .asyncCatch ;
42+ import static org .apache .hadoop .hdfs .server .federation .router .async .AsyncUtil .asyncComplete ;
43+ import static org .apache .hadoop .hdfs .server .federation .router .async .AsyncUtil .asyncForEach ;
44+ import static org .apache .hadoop .hdfs .server .federation .router .async .AsyncUtil .asyncReturn ;
45+ import static org .apache .hadoop .hdfs .server .federation .router .async .AsyncUtil .asyncTry ;
4046import static org .apache .hadoop .tools .fedbalance .FedBalanceConfigs .SCHEDULER_JOURNAL_URI ;
4147
4248import java .io .FileNotFoundException ;
4955import java .util .ArrayList ;
5056import java .util .Collection ;
5157import java .util .EnumSet ;
58+ import java .util .Iterator ;
5259import java .util .LinkedHashMap ;
5360import java .util .LinkedHashSet ;
5461import java .util .List ;
6976import org .apache .hadoop .hdfs .HAUtil ;
7077import org .apache .hadoop .hdfs .protocol .UnresolvedPathException ;
7178import org .apache .hadoop .hdfs .protocolPB .AsyncRpcProtocolPBUtil ;
79+ import org .apache .hadoop .hdfs .server .federation .router .async .ApplyFunction ;
80+ import org .apache .hadoop .hdfs .server .federation .router .async .AsyncCatchFunction ;
81+ import org .apache .hadoop .hdfs .server .federation .router .async .CatchFunction ;
7282import org .apache .hadoop .thirdparty .com .google .common .cache .CacheBuilder ;
7383import org .apache .hadoop .thirdparty .com .google .common .cache .CacheLoader ;
7484import org .apache .hadoop .thirdparty .com .google .common .cache .LoadingCache ;
@@ -793,6 +803,46 @@ <T> T invokeAtAvailableNs(RemoteMethod method, Class<T> clazz)
793803 return invokeOnNs (method , clazz , io , nss );
794804 }
795805
806+ /**
807+ * Invokes the method at default namespace, if default namespace is not
808+ * available then at the other available namespaces.
809+ * If the namespace is unavailable, retry with other namespaces.
810+ * Asynchronous version of invokeAtAvailableNs method.
811+ * @param <T> expected return type.
812+ * @param method the remote method.
813+ * @return the response received after invoking method.
814+ * @throws IOException
815+ */
816+ <T > T invokeAtAvailableNsAsync (RemoteMethod method , Class <T > clazz )
817+ throws IOException {
818+ String nsId = subclusterResolver .getDefaultNamespace ();
819+ // If default Ns is not present return result from first namespace.
820+ Set <FederationNamespaceInfo > nss = namenodeResolver .getNamespaces ();
821+ // If no namespace is available, throw IOException.
822+ IOException io = new IOException ("No namespace available." );
823+
824+ asyncComplete (null );
825+ if (!nsId .isEmpty ()) {
826+ asyncTry (() -> {
827+ getRPCClient ().invokeSingle (nsId , method , clazz );
828+ });
829+
830+ asyncCatch ((AsyncCatchFunction <T , IOException >)(res , ioe ) -> {
831+ if (!clientProto .isUnavailableSubclusterException (ioe )) {
832+ LOG .debug ("{} exception cannot be retried" ,
833+ ioe .getClass ().getSimpleName ());
834+ throw ioe ;
835+ }
836+ nss .removeIf (n -> n .getNameserviceId ().equals (nsId ));
837+ invokeOnNsAsync (method , clazz , io , nss );
838+ }, IOException .class );
839+ } else {
840+ // If not have default NS.
841+ invokeOnNsAsync (method , clazz , io , nss );
842+ }
843+ return asyncReturn (clazz );
844+ }
845+
796846 /**
797847 * Invoke the method sequentially on available namespaces,
798848 * throw no namespace available exception, if no namespaces are available.
@@ -826,6 +876,61 @@ <T> T invokeOnNs(RemoteMethod method, Class<T> clazz, IOException ioe,
826876 throw ioe ;
827877 }
828878
879+ /**
880+ * Invoke the method sequentially on available namespaces,
881+ * throw no namespace available exception, if no namespaces are available.
882+ * Asynchronous version of invokeOnNs method.
883+ * @param method the remote method.
884+ * @param clazz Class for the return type.
885+ * @param ioe IOException .
886+ * @param nss List of name spaces in the federation
887+ * @return the response received after invoking method.
888+ * @throws IOException
889+ */
890+ <T > T invokeOnNsAsync (RemoteMethod method , Class <T > clazz , IOException ioe ,
891+ Set <FederationNamespaceInfo > nss ) throws IOException {
892+ if (nss .isEmpty ()) {
893+ throw ioe ;
894+ }
895+
896+ asyncComplete (null );
897+ Iterator <FederationNamespaceInfo > nsIterator = nss .iterator ();
898+ asyncForEach (nsIterator , (foreach , fnInfo ) -> {
899+ String nsId = fnInfo .getNameserviceId ();
900+ LOG .debug ("Invoking {} on namespace {}" , method , nsId );
901+ asyncTry (() -> {
902+ getRPCClient ().invokeSingle (nsId , method , clazz );
903+ asyncApply (result -> {
904+ if (result != null ) {
905+ foreach .breakNow ();
906+ return result ;
907+ }
908+ return null ;
909+ });
910+ });
911+
912+ asyncCatch ((CatchFunction <T , IOException >)(ret , ex ) -> {
913+ LOG .debug ("Failed to invoke {} on namespace {}" , method , nsId , ex );
914+ // Ignore the exception and try on other namespace, if the tried
915+ // namespace is unavailable, else throw the received exception.
916+ if (!clientProto .isUnavailableSubclusterException (ex )) {
917+ throw ex ;
918+ }
919+ return null ;
920+ }, IOException .class );
921+ });
922+
923+ asyncApply (obj -> {
924+ if (obj == null ) {
925+ // Couldn't get a response from any of the namespace, throw ioe.
926+ throw ioe ;
927+ }
928+ return obj ;
929+ });
930+
931+ return asyncReturn (clazz );
932+ }
933+
829934 @ Override // ClientProtocol
830935 public Token <DelegationTokenIdentifier > getDelegationToken (Text renewer )
831936 throws IOException {
@@ -877,6 +982,10 @@ public HdfsFileStatus create(String src, FsPermission masked,
877982 */
878983 RemoteLocation getCreateLocation (final String src ) throws IOException {
879984 final List <RemoteLocation > locations = getLocationsForPath (src , true );
985+ if (isAsync ()) {
986+ getCreateLocationAsync (src , locations );
987+ return asyncReturn (RemoteLocation .class );
988+ }
880989 return getCreateLocation (src , locations );
881990 }
882991
@@ -913,6 +1022,44 @@ RemoteLocation getCreateLocation(
9131022 return createLocation ;
9141023 }
9151024
1025+ /**
1026+ * Get the location to create a file. It checks if the file already existed
1027+ * in one of the locations.
1028+ * Asynchronous version of getCreateLocation method.
1029+ *
1030+ * @param src Path of the file to check.
1031+ * @param locations Prefetched locations for the file.
1032+ * @return The remote location for this file.
1033+ * @throws IOException If the file has no creation location.
1034+ */
1035+ RemoteLocation getCreateLocationAsync (
1036+ final String src , final List <RemoteLocation > locations )
1037+ throws IOException {
1038+
1039+ if (locations == null || locations .isEmpty ()) {
1040+ throw new IOException ("Cannot get locations to create " + src );
1041+ }
1042+
1043+ RemoteLocation createLocation = locations .get (0 );
1044+ if (locations .size () > 1 ) {
1045+ asyncTry (() -> {
1046+ getExistingLocationAsync (src , locations );
1047+ asyncApply ((ApplyFunction <RemoteLocation , RemoteLocation >) existingLocation -> {
1048+ if (existingLocation != null ) {
1049+ LOG .debug ("{} already exists in {}." , src , existingLocation );
1050+ return existingLocation ;
1051+ }
1052+ return createLocation ;
1053+ });
1054+ });
1055+ asyncCatch ((o , e ) -> createLocation , FileNotFoundException .class );
1056+ } else {
1057+ asyncComplete (createLocation );
1058+ }
1059+
1060+ return asyncReturn (RemoteLocation .class );
1061+ }
1062+
9161063 /**
9171064 * Gets the remote location where the file exists.
9181065 * @param src the name of file.
@@ -934,6 +1081,31 @@ private RemoteLocation getExistingLocation(String src,
9341081 return null ;
9351082 }
9361083
1084+ /**
1085+ * Gets the remote location where the file exists.
1086+ * Asynchronous version of getExistingLocation method.
1087+ * @param src the name of file.
1088+ * @param locations all the remote locations.
1089+ * @return the remote location of the file if it exists, else null.
1090+ * @throws IOException in case of any exception.
1091+ */
1092+ private RemoteLocation getExistingLocationAsync (String src ,
1093+ List <RemoteLocation > locations ) throws IOException {
1094+ RemoteMethod method = new RemoteMethod ("getFileInfo" ,
1095+ new Class <?>[] {String .class }, new RemoteParam ());
1096+ getRPCClient ().invokeConcurrent (
1097+ locations , method , true , false , HdfsFileStatus .class );
1098+ asyncApply ((ApplyFunction <Map <RemoteLocation , HdfsFileStatus >, Object >) results -> {
1099+ for (RemoteLocation loc : locations ) {
1100+ if (results .get (loc ) != null ) {
1101+ return loc ;
1102+ }
1103+ }
1104+ return null ;
1105+ });
1106+ return asyncReturn (RemoteLocation .class );
1107+ }
1108+
9371109 @ Override // ClientProtocol
9381110 public LastBlockWithStatus append (String src , final String clientName ,
9391111 final EnumSetWritable <CreateFlag > flag ) throws IOException {
@@ -1188,6 +1360,38 @@ public DatanodeInfo[] getDatanodeReport(
11881360 return toArray (datanodes , DatanodeInfo .class );
11891361 }
11901362
1363+ /**
1364+ * Get the datanode report with a timeout.
1365+ * Asynchronous version of the getDatanodeReport method.
1366+ * @param type Type of the datanode.
1367+ * @param requireResponse If we require all the namespaces to report.
1368+ * @param timeOutMs Time out for the reply in milliseconds.
1369+ * @return List of datanodes.
1370+ * @throws IOException If it cannot get the report.
1371+ */
1372+ public DatanodeInfo [] getDatanodeReportAsync (
1373+ DatanodeReportType type , boolean requireResponse , long timeOutMs )
1374+ throws IOException {
1375+ checkOperation (OperationCategory .UNCHECKED );
1376+
1377+ Map <String , DatanodeInfo > datanodesMap = new LinkedHashMap <>();
1378+ RemoteMethod method = new RemoteMethod ("getDatanodeReport" ,
1379+ new Class <?>[] {DatanodeReportType .class }, type );
1380+
1381+ Set <FederationNamespaceInfo > nss = namenodeResolver .getNamespaces ();
1382+ getRPCClient ().invokeConcurrent (nss , method , requireResponse , false ,
1383+ timeOutMs , DatanodeInfo [].class );
1384+
1385+ asyncApply ((ApplyFunction <Map <FederationNamespaceInfo , DatanodeInfo []>,
1386+ DatanodeInfo []>) results -> {
1387+ updateDnMap (results , datanodesMap );
1388+ // Map -> Array
1389+ Collection <DatanodeInfo > datanodes = datanodesMap .values ();
1390+ return toArray (datanodes , DatanodeInfo .class );
1391+ });
1392+ return asyncReturn (DatanodeInfo [].class );
1393+ }
1394+
11911395 @ Override // ClientProtocol
11921396 public DatanodeStorageReport [] getDatanodeStorageReport (
11931397 DatanodeReportType type ) throws IOException {
@@ -1206,6 +1410,11 @@ public Map<String, DatanodeStorageReport[]> getDatanodeStorageReportMap(
12061410 return getDatanodeStorageReportMap (type , true , -1 );
12071411 }
12081412
1413+ public Map <String , DatanodeStorageReport []> getDatanodeStorageReportMapAsync (
1414+ DatanodeReportType type ) throws IOException {
1415+ return getDatanodeStorageReportMapAsync (type , true , -1 );
1416+ }
1417+
12091418 /**
12101419 * Get the list of datanodes per subcluster.
12111420 *
@@ -1238,6 +1447,42 @@ public Map<String, DatanodeStorageReport[]> getDatanodeStorageReportMap(
12381447 return ret ;
12391448 }
12401449
1450+ /**
1451+ * Get the list of datanodes per subcluster.
1452+ * Asynchronous version of getDatanodeStorageReportMap method.
1453+ * @param type Type of the datanodes to get.
1454+ * @param requireResponse If true an exception will be thrown if all calls do
1455+ * not complete. If false exceptions are ignored and all data results
1456+ * successfully received are returned.
1457+ * @param timeOutMs Time out for the reply in milliseconds.
1458+ * @return nsId to datanode list.
1459+ * @throws IOException If the method cannot be invoked remotely.
1460+ */
1461+ public Map <String , DatanodeStorageReport []> getDatanodeStorageReportMapAsync (
1462+ DatanodeReportType type , boolean requireResponse , long timeOutMs )
1463+ throws IOException {
1464+
1465+ Map <String , DatanodeStorageReport []> ret = new LinkedHashMap <>();
1466+ RemoteMethod method = new RemoteMethod ("getDatanodeStorageReport" ,
1467+ new Class <?>[] {DatanodeReportType .class }, type );
1468+ Set <FederationNamespaceInfo > nss = namenodeResolver .getNamespaces ();
1469+ getRPCClient ().invokeConcurrent (
1470+ nss , method , requireResponse , false , timeOutMs , DatanodeStorageReport [].class );
1471+
1472+ asyncApply ((ApplyFunction <Map <FederationNamespaceInfo , DatanodeStorageReport []>,
1473+ Map <String , DatanodeStorageReport []>>) results -> {
1474+ for (Entry <FederationNamespaceInfo , DatanodeStorageReport []> entry :
1475+ results .entrySet ()) {
1476+ FederationNamespaceInfo ns = entry .getKey ();
1477+ String nsId = ns .getNameserviceId ();
1478+ DatanodeStorageReport [] result = entry .getValue ();
1479+ ret .put (nsId , result );
1480+ }
1481+ return ret ;
1482+ });
1483+ return asyncReturn (ret .getClass ());
1484+ }
1485+
12411486 @ Override // ClientProtocol
12421487 public boolean setSafeMode (SafeModeAction action , boolean isChecked )
12431488 throws IOException {
@@ -2053,6 +2298,37 @@ public DatanodeInfo[] getSlowDatanodeReport(boolean requireResponse, long timeOu
20532298 return toArray (datanodes , DatanodeInfo .class );
20542299 }
20552300
2301+ /**
2302+ * Get the slow running datanodes report with a timeout.
2303+ * Asynchronous version of the getSlowDatanodeReport method.
2304+ *
2305+ * @param requireResponse If we require all the namespaces to report.
2306+ * @param timeOutMs Time out for the reply in milliseconds.
2307+ * @return List of datanodes.
2308+ * @throws IOException If it cannot get the report.
2309+ */
2310+ public DatanodeInfo [] getSlowDatanodeReportAsync (boolean requireResponse , long timeOutMs )
2311+ throws IOException {
2312+ checkOperation (OperationCategory .UNCHECKED );
2313+
2314+ Map <String , DatanodeInfo > datanodesMap = new LinkedHashMap <>();
2315+ RemoteMethod method = new RemoteMethod ("getSlowDatanodeReport" );
2316+
2317+ Set <FederationNamespaceInfo > nss = namenodeResolver .getNamespaces ();
2318+ getRPCClient ().invokeConcurrent (nss , method , requireResponse , false ,
2319+ timeOutMs , DatanodeInfo [].class );
2320+
2321+ asyncApply ((ApplyFunction <Map <FederationNamespaceInfo , DatanodeInfo []>,
2322+ DatanodeInfo []>) results -> {
2323+ updateDnMap (results , datanodesMap );
2324+ // Map -> Array
2325+ Collection <DatanodeInfo > datanodes = datanodesMap .values ();
2326+ return toArray (datanodes , DatanodeInfo .class );
2327+ });
2328+
2329+ return asyncReturn (DatanodeInfo [].class );
2330+ }
2331+
20562332 private void updateDnMap (Map <FederationNamespaceInfo , DatanodeInfo []> results ,
20572333 Map <String , DatanodeInfo > datanodesMap ) {
20582334 for (Entry <FederationNamespaceInfo , DatanodeInfo []> entry :
0 commit comments