Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-17601. [ARR] RouterRpcServer supports asynchronous rpc. #7108

Open
wants to merge 6 commits into
base: HDFS-17531
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename.RouterRenameOption;
import static org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient.isExpectedClass;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCatch;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncForEach;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncTry;
import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;

import java.io.FileNotFoundException;
Expand All @@ -49,6 +56,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
Expand All @@ -68,6 +76,8 @@
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil;
import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
import org.apache.hadoop.hdfs.server.federation.router.async.AsyncCatchFunction;
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
Expand Down Expand Up @@ -791,6 +801,46 @@
return invokeOnNs(method, clazz, io, nss);
}

/**
* Invokes the method at default namespace, if default namespace is not
* available then at the other available namespaces.
* If the namespace is unavailable, retry with other namespaces.
* Asynchronous version of invokeAtAvailableNs method.
* @param <T> expected return type.
* @param method the remote method.
* @return the response received after invoking method.
* @throws IOException
*/
<T> T invokeAtAvailableNsAsync(RemoteMethod method, Class<T> clazz)
throws IOException {
String nsId = subclusterResolver.getDefaultNamespace();
// If default Ns is not present return result from first namespace.
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
// If no namespace is available, throw IOException.
IOException io = new IOException("No namespace available.");

asyncComplete(null);
if (!nsId.isEmpty()) {
asyncTry(() -> {
rpcClient.invokeSingle(nsId, method, clazz);
});

asyncCatch((AsyncCatchFunction<T, IOException>)(res, ioe) -> {
if (!clientProto.isUnavailableSubclusterException(ioe)) {
LOG.debug("{} exception cannot be retried",
ioe.getClass().getSimpleName());
throw ioe;
}
nss.removeIf(n -> n.getNameserviceId().equals(nsId));
invokeOnNs(method, clazz, io, nss);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @hfutatzhanghb should use invokeOnNsAsync.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed.

}, IOException.class);
} else {
// If not have default NS.
invokeOnNsAsync(method, clazz, io, nss);
}
return asyncReturn(clazz);
}

/**
* Invoke the method sequentially on available namespaces,
* throw no namespace available exception, if no namespaces are available.
Expand Down Expand Up @@ -824,6 +874,60 @@
throw ioe;
}

/**
* Invoke the method sequentially on available namespaces,
* throw no namespace available exception, if no namespaces are available.
* Asynchronous version of invokeOnNs method.
* @param method the remote method.
* @param clazz Class for the return type.
* @param ioe IOException .
* @param nss List of name spaces in the federation
* @return the response received after invoking method.
* @throws IOException
*/
<T> T invokeOnNsAsync(RemoteMethod method, Class<T> clazz, IOException ioe,
Set<FederationNamespaceInfo> nss) throws IOException {
if (nss.isEmpty()) {
throw ioe;
}

asyncComplete(null);
Iterator<FederationNamespaceInfo> nsIterator = nss.iterator();
asyncForEach(nsIterator, (foreach, fnInfo) -> {
String nsId = fnInfo.getNameserviceId();
LOG.debug("Invoking {} on namespace {}", method, nsId);
asyncTry(() -> {
rpcClient.invokeSingle(nsId, method, clazz);
asyncApply(result -> {
if (result != null && isExpectedClass(clazz, result)) {
foreach.breakNow();
return result;
}
return null;
});
});

asyncCatch((AsyncCatchFunction<T, IOException>)(ret, ex) -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use CatchFunction.

LOG.debug("Failed to invoke {} on namespace {}", method, nsId, ex);
// Ignore the exception and try on other namespace, if the tried
// namespace is unavailable, else throw the received exception.
if (!clientProto.isUnavailableSubclusterException(ex)) {
throw ex;
}
}, IOException.class);
});

asyncApply(obj -> {
if (obj == null) {
// Couldn't get a response from any of the namespace, throw ioe.
throw ioe;
}
return obj;
});

return asyncReturn(clazz);
}

@Override // ClientProtocol
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
throws IOException {
Expand Down Expand Up @@ -875,6 +979,10 @@
*/
RemoteLocation getCreateLocation(final String src) throws IOException {
final List<RemoteLocation> locations = getLocationsForPath(src, true);
if (isAsync()) {
getCreateLocationAsync(src, locations);
return asyncReturn(RemoteLocation.class);
}
return getCreateLocation(src, locations);
}

Expand Down Expand Up @@ -911,6 +1019,43 @@
return createLocation;
}

/**
* Get the location to create a file. It checks if the file already existed
* in one of the locations.
*
* @param src Path of the file to check.
* @param locations Prefetched locations for the file.
* @return The remote location for this file.
* @throws IOException If the file has no creation location.
*/
RemoteLocation getCreateLocationAsync(
final String src, final List<RemoteLocation> locations)
throws IOException {

if (locations == null || locations.isEmpty()) {
throw new IOException("Cannot get locations to create " + src);
}

RemoteLocation createLocation = locations.get(0);
if (locations.size() > 1) {
asyncTry(() -> {
getExistingLocationAsync(src, locations);
asyncApply((ApplyFunction<RemoteLocation, RemoteLocation>) existingLocation -> {
if (existingLocation != null) {
LOG.debug("{} already exists in {}.", src, existingLocation);
return existingLocation;
}
return createLocation;
});
});
asyncCatch((o, e) -> createLocation, FileNotFoundException.class);
} else {
asyncComplete(createLocation);
}

return asyncReturn(RemoteLocation.class);
}

/**
* Gets the remote location where the file exists.
* @param src the name of file.
Expand All @@ -932,6 +1077,31 @@
return null;
}

/**
* Gets the remote location where the file exists.
* Asynchronous version of getExistingLocation method.
* @param src the name of file.
* @param locations all the remote locations.
* @return the remote location of the file if it exists, else null.
* @throws IOException in case of any exception.
*/
private RemoteLocation getExistingLocationAsync(String src,
List<RemoteLocation> locations) throws IOException {
RemoteMethod method = new RemoteMethod("getFileInfo",
new Class<?>[] {String.class}, new RemoteParam());
rpcClient.invokeConcurrent(
locations, method, true, false, HdfsFileStatus.class);
asyncApply((ApplyFunction<Map<RemoteLocation, HdfsFileStatus>, Object>) results -> {
for (RemoteLocation loc : locations) {
if (results.get(loc) != null) {
return loc;
}
}
return null;
});
return asyncReturn(null);
}

@Override // ClientProtocol
public LastBlockWithStatus append(String src, final String clientName,
final EnumSetWritable<CreateFlag> flag) throws IOException {
Expand Down Expand Up @@ -1186,6 +1356,38 @@
return toArray(datanodes, DatanodeInfo.class);
}

/**
* Get the datanode report with a timeout.
* Asynchronous version of the getDatanodeReport method.
* @param type Type of the datanode.
* @param requireResponse If we require all the namespaces to report.
* @param timeOutMs Time out for the reply in milliseconds.
* @return List of datanodes.
* @throws IOException If it cannot get the report.
*/
public DatanodeInfo[] getDatanodeReportAsync(
DatanodeReportType type, boolean requireResponse, long timeOutMs)
throws IOException {
checkOperation(OperationCategory.UNCHECKED);

Map<String, DatanodeInfo> datanodesMap = new LinkedHashMap<>();
RemoteMethod method = new RemoteMethod("getDatanodeReport",
new Class<?>[] {DatanodeReportType.class}, type);

Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
rpcClient.invokeConcurrent(nss, method, requireResponse, false,
timeOutMs, DatanodeInfo[].class);

Check failure on line 1380 in hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java#L1380

blanks: end of line
asyncApply((ApplyFunction<Map<FederationNamespaceInfo, DatanodeInfo[]>,
DatanodeInfo[]>) results -> {

Check failure on line 1382 in hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java#L1382

blanks: end of line
updateDnMap(results, datanodesMap);
// Map -> Array
Collection<DatanodeInfo> datanodes = datanodesMap.values();
return toArray(datanodes, DatanodeInfo.class);
});
return asyncReturn(DatanodeInfo[].class);
}

@Override // ClientProtocol
public DatanodeStorageReport[] getDatanodeStorageReport(
DatanodeReportType type) throws IOException {
Expand Down Expand Up @@ -1236,6 +1438,42 @@
return ret;
}

/**
* Get the list of datanodes per subcluster.
* Asynchronous version of getDatanodeStorageReportMap method.
* @param type Type of the datanodes to get.
* @param requireResponse If true an exception will be thrown if all calls do
* not complete. If false exceptions are ignored and all data results
* successfully received are returned.
* @param timeOutMs Time out for the reply in milliseconds.
* @return nsId to datanode list.
* @throws IOException If the method cannot be invoked remotely.
*/
public Map<String, DatanodeStorageReport[]> getDatanodeStorageReportMapAsync(
DatanodeReportType type, boolean requireResponse, long timeOutMs)
throws IOException {

Map<String, DatanodeStorageReport[]> ret = new LinkedHashMap<>();
RemoteMethod method = new RemoteMethod("getDatanodeStorageReport",
new Class<?>[] {DatanodeReportType.class}, type);
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
rpcClient.invokeConcurrent(
nss, method, requireResponse, false, timeOutMs, DatanodeStorageReport[].class);

asyncApply((ApplyFunction<Map<FederationNamespaceInfo, DatanodeStorageReport[]>,
Map<String, DatanodeStorageReport[]>>) results -> {
for (Entry<FederationNamespaceInfo, DatanodeStorageReport[]> entry :
results.entrySet()) {
FederationNamespaceInfo ns = entry.getKey();
String nsId = ns.getNameserviceId();
DatanodeStorageReport[] result = entry.getValue();
ret.put(nsId, result);
}
return ret;
});
return asyncReturn(ret.getClass());
}

@Override // ClientProtocol
public boolean setSafeMode(SafeModeAction action, boolean isChecked)
throws IOException {
Expand Down Expand Up @@ -2051,6 +2289,37 @@
return toArray(datanodes, DatanodeInfo.class);
}

/**
* Get the slow running datanodes report with a timeout.
* Asynchronous version of the getSlowDatanodeReport method.

Check failure on line 2294 in hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java#L2294

blanks: end of line
*
* @param requireResponse If we require all the namespaces to report.
* @param timeOutMs Time out for the reply in milliseconds.
* @return List of datanodes.
* @throws IOException If it cannot get the report.
*/
public DatanodeInfo[] getSlowDatanodeReportAsync(boolean requireResponse, long timeOutMs)
throws IOException {
checkOperation(OperationCategory.UNCHECKED);

Map<String, DatanodeInfo> datanodesMap = new LinkedHashMap<>();
RemoteMethod method = new RemoteMethod("getSlowDatanodeReport");

Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
rpcClient.invokeConcurrent(nss, method, requireResponse, false,
timeOutMs, DatanodeInfo[].class);

Check failure on line 2311 in hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java#L2311

blanks: end of line
asyncApply((ApplyFunction<Map<FederationNamespaceInfo, DatanodeInfo[]>,
DatanodeInfo[]>) results -> {
updateDnMap(results, datanodesMap);
// Map -> Array
Collection<DatanodeInfo> datanodes = datanodesMap.values();
return toArray(datanodes, DatanodeInfo.class);
});

return asyncReturn(DatanodeInfo[].class);
}

private void updateDnMap(Map<FederationNamespaceInfo, DatanodeInfo[]> results,
Map<String, DatanodeInfo> datanodesMap) {
for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry :
Expand Down
Loading