3737import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_FEDERATION_RENAME_OPTION ;
3838import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT ;
3939import static org .apache .hadoop .hdfs .server .federation .router .RouterFederationRename .RouterRenameOption ;
40+ import static org .apache .hadoop .hdfs .server .federation .router .async .AsyncUtil .asyncApply ;
41+ import static org .apache .hadoop .hdfs .server .federation .router .async .AsyncUtil .asyncCatch ;
42+ import static org .apache .hadoop .hdfs .server .federation .router .async .AsyncUtil .asyncComplete ;
43+ import static org .apache .hadoop .hdfs .server .federation .router .async .AsyncUtil .asyncForEach ;
44+ import static org .apache .hadoop .hdfs .server .federation .router .async .AsyncUtil .asyncReturn ;
45+ import static org .apache .hadoop .hdfs .server .federation .router .async .AsyncUtil .asyncTry ;
4046import static org .apache .hadoop .tools .fedbalance .FedBalanceConfigs .SCHEDULER_JOURNAL_URI ;
4147
4248import java .io .FileNotFoundException ;
4955import java .util .ArrayList ;
5056import java .util .Collection ;
5157import java .util .EnumSet ;
58+ import java .util .Iterator ;
5259import java .util .LinkedHashMap ;
5360import java .util .LinkedHashSet ;
5461import java .util .List ;
6875import org .apache .hadoop .hdfs .HAUtil ;
6976import org .apache .hadoop .hdfs .protocol .UnresolvedPathException ;
7077import org .apache .hadoop .hdfs .protocolPB .AsyncRpcProtocolPBUtil ;
78+ import org .apache .hadoop .hdfs .server .federation .router .async .ApplyFunction ;
79+ import org .apache .hadoop .hdfs .server .federation .router .async .AsyncCatchFunction ;
80+ import org .apache .hadoop .hdfs .server .federation .router .async .CatchFunction ;
7181import org .apache .hadoop .thirdparty .com .google .common .cache .CacheBuilder ;
7282import org .apache .hadoop .thirdparty .com .google .common .cache .CacheLoader ;
7383import org .apache .hadoop .thirdparty .com .google .common .cache .LoadingCache ;
@@ -791,6 +801,46 @@ <T> T invokeAtAvailableNs(RemoteMethod method, Class<T> clazz)
791801 return invokeOnNs (method , clazz , io , nss );
792802 }
793803
804+ /**
805+ * Invokes the method at default namespace, if default namespace is not
806+ * available then at the other available namespaces.
807+ * If the namespace is unavailable, retry with other namespaces.
808+ * Asynchronous version of invokeAtAvailableNs method.
809+ * @param <T> expected return type.
810+ * @param method the remote method.
811+ * @return the response received after invoking method.
812+ * @throws IOException
813+ */
814+ <T > T invokeAtAvailableNsAsync (RemoteMethod method , Class <T > clazz )
815+ throws IOException {
816+ String nsId = subclusterResolver .getDefaultNamespace ();
817+ // If default Ns is not present return result from first namespace.
818+ Set <FederationNamespaceInfo > nss = namenodeResolver .getNamespaces ();
819+ // If no namespace is available, throw IOException.
820+ IOException io = new IOException ("No namespace available." );
821+
822+ asyncComplete (null );
823+ if (!nsId .isEmpty ()) {
824+ asyncTry (() -> {
825+ getRPCClient ().invokeSingle (nsId , method , clazz );
826+ });
827+
828+ asyncCatch ((AsyncCatchFunction <T , IOException >)(res , ioe ) -> {
829+ if (!clientProto .isUnavailableSubclusterException (ioe )) {
830+ LOG .debug ("{} exception cannot be retried" ,
831+ ioe .getClass ().getSimpleName ());
832+ throw ioe ;
833+ }
834+ nss .removeIf (n -> n .getNameserviceId ().equals (nsId ));
835+ invokeOnNsAsync (method , clazz , io , nss );
836+ }, IOException .class );
837+ } else {
838+ // If not have default NS.
839+ invokeOnNsAsync (method , clazz , io , nss );
840+ }
841+ return asyncReturn (clazz );
842+ }
843+
794844 /**
795845 * Invoke the method sequentially on available namespaces,
796846 * throw no namespace available exception, if no namespaces are available.
@@ -824,6 +874,61 @@ <T> T invokeOnNs(RemoteMethod method, Class<T> clazz, IOException ioe,
824874 throw ioe ;
825875 }
826876
877+ /**
878+ * Invoke the method sequentially on available namespaces,
879+ * throw no namespace available exception, if no namespaces are available.
880+ * Asynchronous version of invokeOnNs method.
881+ * @param method the remote method.
882+ * @param clazz Class for the return type.
883+ * @param ioe IOException .
884+ * @param nss List of name spaces in the federation
885+ * @return the response received after invoking method.
886+ * @throws IOException
887+ */
888+ <T > T invokeOnNsAsync (RemoteMethod method , Class <T > clazz , IOException ioe ,
889+ Set <FederationNamespaceInfo > nss ) throws IOException {
890+ if (nss .isEmpty ()) {
891+ throw ioe ;
892+ }
893+
894+ asyncComplete (null );
895+ Iterator <FederationNamespaceInfo > nsIterator = nss .iterator ();
896+ asyncForEach (nsIterator , (foreach , fnInfo ) -> {
897+ String nsId = fnInfo .getNameserviceId ();
898+ LOG .debug ("Invoking {} on namespace {}" , method , nsId );
899+ asyncTry (() -> {
900+ getRPCClient ().invokeSingle (nsId , method , clazz );
901+ asyncApply (result -> {
902+ if (result != null ) {
903+ foreach .breakNow ();
904+ return result ;
905+ }
906+ return null ;
907+ });
908+ });
909+
910+ asyncCatch ((CatchFunction <T , IOException >)(ret , ex ) -> {
911+ LOG .debug ("Failed to invoke {} on namespace {}" , method , nsId , ex );
912+ // Ignore the exception and try on other namespace, if the tried
913+ // namespace is unavailable, else throw the received exception.
914+ if (!clientProto .isUnavailableSubclusterException (ex )) {
915+ throw ex ;
916+ }
917+ return null ;
918+ }, IOException .class );
919+ });
920+
921+ asyncApply (obj -> {
922+ if (obj == null ) {
923+ // Couldn't get a response from any of the namespace, throw ioe.
924+ throw ioe ;
925+ }
926+ return obj ;
927+ });
928+
929+ return asyncReturn (clazz );
930+ }
931+
827932 @ Override // ClientProtocol
828933 public Token <DelegationTokenIdentifier > getDelegationToken (Text renewer )
829934 throws IOException {
@@ -875,6 +980,10 @@ public HdfsFileStatus create(String src, FsPermission masked,
875980 */
876981 RemoteLocation getCreateLocation (final String src ) throws IOException {
877982 final List <RemoteLocation > locations = getLocationsForPath (src , true );
983+ if (isAsync ()) {
984+ getCreateLocationAsync (src , locations );
985+ return asyncReturn (RemoteLocation .class );
986+ }
878987 return getCreateLocation (src , locations );
879988 }
880989
@@ -911,6 +1020,44 @@ RemoteLocation getCreateLocation(
9111020 return createLocation ;
9121021 }
9131022
1023+ /**
1024+ * Get the location to create a file. It checks if the file already existed
1025+ * in one of the locations.
1026+ * Asynchronous version of getCreateLocation method.
1027+ *
1028+ * @param src Path of the file to check.
1029+ * @param locations Prefetched locations for the file.
1030+ * @return The remote location for this file.
1031+ * @throws IOException If the file has no creation location.
1032+ */
1033+ RemoteLocation getCreateLocationAsync (
1034+ final String src , final List <RemoteLocation > locations )
1035+ throws IOException {
1036+
1037+ if (locations == null || locations .isEmpty ()) {
1038+ throw new IOException ("Cannot get locations to create " + src );
1039+ }
1040+
1041+ RemoteLocation createLocation = locations .get (0 );
1042+ if (locations .size () > 1 ) {
1043+ asyncTry (() -> {
1044+ getExistingLocationAsync (src , locations );
1045+ asyncApply ((ApplyFunction <RemoteLocation , RemoteLocation >) existingLocation -> {
1046+ if (existingLocation != null ) {
1047+ LOG .debug ("{} already exists in {}." , src , existingLocation );
1048+ return existingLocation ;
1049+ }
1050+ return createLocation ;
1051+ });
1052+ });
1053+ asyncCatch ((o , e ) -> createLocation , FileNotFoundException .class );
1054+ } else {
1055+ asyncComplete (createLocation );
1056+ }
1057+
1058+ return asyncReturn (RemoteLocation .class );
1059+ }
1060+
9141061 /**
9151062 * Gets the remote location where the file exists.
9161063 * @param src the name of file.
@@ -932,6 +1079,31 @@ private RemoteLocation getExistingLocation(String src,
9321079 return null ;
9331080 }
9341081
1082+ /**
1083+ * Gets the remote location where the file exists.
1084+ * Asynchronous version of getExistingLocation method.
1085+ * @param src the name of file.
1086+ * @param locations all the remote locations.
1087+ * @return the remote location of the file if it exists, else null.
1088+ * @throws IOException in case of any exception.
1089+ */
1090+ private RemoteLocation getExistingLocationAsync (String src ,
1091+ List <RemoteLocation > locations ) throws IOException {
1092+ RemoteMethod method = new RemoteMethod ("getFileInfo" ,
1093+ new Class <?>[] {String .class }, new RemoteParam ());
1094+ getRPCClient ().invokeConcurrent (
1095+ locations , method , true , false , HdfsFileStatus .class );
1096+ asyncApply ((ApplyFunction <Map <RemoteLocation , HdfsFileStatus >, Object >) results -> {
1097+ for (RemoteLocation loc : locations ) {
1098+ if (results .get (loc ) != null ) {
1099+ return loc ;
1100+ }
1101+ }
1102+ return null ;
1103+ });
1104+ return asyncReturn (RemoteLocation .class );
1105+ }
1106+
9351107 @ Override // ClientProtocol
9361108 public LastBlockWithStatus append (String src , final String clientName ,
9371109 final EnumSetWritable <CreateFlag > flag ) throws IOException {
@@ -1186,6 +1358,38 @@ public DatanodeInfo[] getDatanodeReport(
11861358 return toArray (datanodes , DatanodeInfo .class );
11871359 }
11881360
1361+ /**
1362+ * Get the datanode report with a timeout.
1363+ * Asynchronous version of the getDatanodeReport method.
1364+ * @param type Type of the datanode.
1365+ * @param requireResponse If we require all the namespaces to report.
1366+ * @param timeOutMs Time out for the reply in milliseconds.
1367+ * @return List of datanodes.
1368+ * @throws IOException If it cannot get the report.
1369+ */
1370+ public DatanodeInfo [] getDatanodeReportAsync (
1371+ DatanodeReportType type , boolean requireResponse , long timeOutMs )
1372+ throws IOException {
1373+ checkOperation (OperationCategory .UNCHECKED );
1374+
1375+ Map <String , DatanodeInfo > datanodesMap = new LinkedHashMap <>();
1376+ RemoteMethod method = new RemoteMethod ("getDatanodeReport" ,
1377+ new Class <?>[] {DatanodeReportType .class }, type );
1378+
1379+ Set <FederationNamespaceInfo > nss = namenodeResolver .getNamespaces ();
1380+ getRPCClient ().invokeConcurrent (nss , method , requireResponse , false ,
1381+ timeOutMs , DatanodeInfo [].class );
1382+
1383+ asyncApply ((ApplyFunction <Map <FederationNamespaceInfo , DatanodeInfo []>,
1384+ DatanodeInfo []>) results -> {
1385+ updateDnMap (results , datanodesMap );
1386+ // Map -> Array
1387+ Collection <DatanodeInfo > datanodes = datanodesMap .values ();
1388+ return toArray (datanodes , DatanodeInfo .class );
1389+ });
1390+ return asyncReturn (DatanodeInfo [].class );
1391+ }
1392+
11891393 @ Override // ClientProtocol
11901394 public DatanodeStorageReport [] getDatanodeStorageReport (
11911395 DatanodeReportType type ) throws IOException {
@@ -1204,6 +1408,11 @@ public Map<String, DatanodeStorageReport[]> getDatanodeStorageReportMap(
12041408 return getDatanodeStorageReportMap (type , true , -1 );
12051409 }
12061410
1411+ public Map <String , DatanodeStorageReport []> getDatanodeStorageReportMapAsync (
1412+ DatanodeReportType type ) throws IOException {
1413+ return getDatanodeStorageReportMapAsync (type , true , -1 );
1414+ }
1415+
12071416 /**
12081417 * Get the list of datanodes per subcluster.
12091418 *
@@ -1236,6 +1445,42 @@ public Map<String, DatanodeStorageReport[]> getDatanodeStorageReportMap(
12361445 return ret ;
12371446 }
12381447
1448+ /**
1449+ * Get the list of datanodes per subcluster.
1450+ * Asynchronous version of getDatanodeStorageReportMap method.
1451+ * @param type Type of the datanodes to get.
1452+ * @param requireResponse If true an exception will be thrown if all calls do
1453+ * not complete. If false exceptions are ignored and all data results
1454+ * successfully received are returned.
1455+ * @param timeOutMs Time out for the reply in milliseconds.
1456+ * @return nsId to datanode list.
1457+ * @throws IOException If the method cannot be invoked remotely.
1458+ */
1459+ public Map <String , DatanodeStorageReport []> getDatanodeStorageReportMapAsync (
1460+ DatanodeReportType type , boolean requireResponse , long timeOutMs )
1461+ throws IOException {
1462+
1463+ Map <String , DatanodeStorageReport []> ret = new LinkedHashMap <>();
1464+ RemoteMethod method = new RemoteMethod ("getDatanodeStorageReport" ,
1465+ new Class <?>[] {DatanodeReportType .class }, type );
1466+ Set <FederationNamespaceInfo > nss = namenodeResolver .getNamespaces ();
1467+ getRPCClient ().invokeConcurrent (
1468+ nss , method , requireResponse , false , timeOutMs , DatanodeStorageReport [].class );
1469+
1470+ asyncApply ((ApplyFunction <Map <FederationNamespaceInfo , DatanodeStorageReport []>,
1471+ Map <String , DatanodeStorageReport []>>) results -> {
1472+ for (Entry <FederationNamespaceInfo , DatanodeStorageReport []> entry :
1473+ results .entrySet ()) {
1474+ FederationNamespaceInfo ns = entry .getKey ();
1475+ String nsId = ns .getNameserviceId ();
1476+ DatanodeStorageReport [] result = entry .getValue ();
1477+ ret .put (nsId , result );
1478+ }
1479+ return ret ;
1480+ });
1481+ return asyncReturn (ret .getClass ());
1482+ }
1483+
12391484 @ Override // ClientProtocol
12401485 public boolean setSafeMode (SafeModeAction action , boolean isChecked )
12411486 throws IOException {
@@ -2051,6 +2296,37 @@ public DatanodeInfo[] getSlowDatanodeReport(boolean requireResponse, long timeOu
20512296 return toArray (datanodes , DatanodeInfo .class );
20522297 }
20532298
2299+ /**
2300+ * Get the slow running datanodes report with a timeout.
2301+ * Asynchronous version of the getSlowDatanodeReport method.
2302+ *
2303+ * @param requireResponse If we require all the namespaces to report.
2304+ * @param timeOutMs Time out for the reply in milliseconds.
2305+ * @return List of datanodes.
2306+ * @throws IOException If it cannot get the report.
2307+ */
2308+ public DatanodeInfo [] getSlowDatanodeReportAsync (boolean requireResponse , long timeOutMs )
2309+ throws IOException {
2310+ checkOperation (OperationCategory .UNCHECKED );
2311+
2312+ Map <String , DatanodeInfo > datanodesMap = new LinkedHashMap <>();
2313+ RemoteMethod method = new RemoteMethod ("getSlowDatanodeReport" );
2314+
2315+ Set <FederationNamespaceInfo > nss = namenodeResolver .getNamespaces ();
2316+ getRPCClient ().invokeConcurrent (nss , method , requireResponse , false ,
2317+ timeOutMs , DatanodeInfo [].class );
2318+
2319+ asyncApply ((ApplyFunction <Map <FederationNamespaceInfo , DatanodeInfo []>,
2320+ DatanodeInfo []>) results -> {
2321+ updateDnMap (results , datanodesMap );
2322+ // Map -> Array
2323+ Collection <DatanodeInfo > datanodes = datanodesMap .values ();
2324+ return toArray (datanodes , DatanodeInfo .class );
2325+ });
2326+
2327+ return asyncReturn (DatanodeInfo [].class );
2328+ }
2329+
20542330 private void updateDnMap (Map <FederationNamespaceInfo , DatanodeInfo []> results ,
20552331 Map <String , DatanodeInfo > datanodesMap ) {
20562332 for (Entry <FederationNamespaceInfo , DatanodeInfo []> entry :
0 commit comments