Skip to content

HDFS-16369. RBF: Fix the retry logic of RouterRpcServer#invokeAtAvailableNs. #3745

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
Expand Down Expand Up @@ -671,8 +670,8 @@ static String getMethodName() {

/**
* Invokes the method at default namespace, if default namespace is not
* available then at the first available namespace.
* If the namespace is unavailable, retry once with other namespace.
* available then at the other available namespaces.
* If the namespace is unavailable, retry with other namespaces.
* @param <T> expected return type.
* @param method the remote method.
* @return the response received after invoking method.
Expand All @@ -681,28 +680,29 @@ static String getMethodName() {
<T> T invokeAtAvailableNs(RemoteMethod method, Class<T> clazz)
throws IOException {
String nsId = subclusterResolver.getDefaultNamespace();
// If default Ns is not present return result from first namespace.
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
try {
if (!nsId.isEmpty()) {
// If no namespace is available, then throw this IOException.
IOException io = new IOException("No namespace available.");
// If default Ns is present return result from that namespace.
if (!nsId.isEmpty()) {
try {
return rpcClient.invokeSingle(nsId, method, clazz);
} catch (IOException ioe) {
if (!clientProto.isUnavailableSubclusterException(ioe)) {
LOG.debug("{} exception cannot be retried",
ioe.getClass().getSimpleName());
throw ioe;
}
// Remove the already tried namespace.
nss.removeIf(n -> n.getNameserviceId().equals(nsId));
return invokeOnNs(method, clazz, io, nss);
}
// If no namespace is available, throw IOException.
IOException io = new IOException("No namespace available.");
return invokeOnNs(method, clazz, io, nss);
} catch (IOException ioe) {
if (!clientProto.isUnavailableSubclusterException(ioe)) {
LOG.debug("{} exception cannot be retried",
ioe.getClass().getSimpleName());
throw ioe;
}
Set<FederationNamespaceInfo> nssWithoutFailed = getNameSpaceInfo(nss, nsId);
return invokeOnNs(method, clazz, ioe, nssWithoutFailed);
}
return invokeOnNs(method, clazz, io, nss);
}

/**
* Invoke the method on first available namespace,
* Invoke the method sequentially on available namespaces,
* throw no namespace available exception, if no namespaces are available.
* @param method the remote method.
* @param clazz Class for the return type.
Expand All @@ -716,26 +716,22 @@ <T> T invokeOnNs(RemoteMethod method, Class<T> clazz, IOException ioe,
if (nss.isEmpty()) {
throw ioe;
}
String nsId = nss.iterator().next().getNameserviceId();
return rpcClient.invokeSingle(nsId, method, clazz);
}

/**
* Get set of namespace info's removing the already invoked namespaceinfo.
* @param nss List of namespaces in the federation.
* @param nsId Already invoked namespace id.
* @return List of name spaces in the federation on
* removing the already invoked namespaceinfo.
*/
private static Set<FederationNamespaceInfo> getNameSpaceInfo(
final Set<FederationNamespaceInfo> nss, final String nsId) {
Set<FederationNamespaceInfo> namespaceInfos = new HashSet<>();
for (FederationNamespaceInfo ns : nss) {
if (!nsId.equals(ns.getNameserviceId())) {
namespaceInfos.add(ns);
for (FederationNamespaceInfo fnInfo : nss) {
String nsId = fnInfo.getNameserviceId();
LOG.debug("Invoking {} on namespace {}", method, nsId);
try {
return rpcClient.invokeSingle(nsId, method, clazz);
} catch (IOException e) {
LOG.debug("Failed to invoke {} on namespace {}", method, nsId, e);
// Ignore the exception and try on other namespace, if the tried
// namespace is unavailable, else throw the received exception.
if (!clientProto.isUnavailableSubclusterException(e)) {
throw e;
}
}
}
return namespaceInfos;
// Couldn't get a response from any of the namespace, throw ioe.
throw ioe;
}

@Override // ClientProtocol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,22 @@
* Tests router rpc with multiple destination mount table resolver.
*/
public class TestRouterRPCMultipleDestinationMountTableResolver {
private static final List<String> NS_IDS = Arrays.asList("ns0", "ns1");
private static final List<String> NS_IDS = Arrays.asList("ns0", "ns1", "ns2");

private static StateStoreDFSCluster cluster;
private static RouterContext routerContext;
private static MountTableResolver resolver;
private static DistributedFileSystem nnFs0;
private static DistributedFileSystem nnFs1;
private static DistributedFileSystem nnFs2;
private static DistributedFileSystem routerFs;
private static RouterRpcServer rpcServer;

@BeforeClass
public static void setUp() throws Exception {

// Build and start a federated cluster
cluster = new StateStoreDFSCluster(false, 2,
cluster = new StateStoreDFSCluster(false, 3,
MultipleDestinationMountTableResolver.class);
Configuration routerConf =
new RouterConfigBuilder().stateStore().admin().quota().rpc().build();
Expand All @@ -111,6 +112,8 @@ public static void setUp() throws Exception {
.getNamenode(cluster.getNameservices().get(0), null).getFileSystem();
nnFs1 = (DistributedFileSystem) cluster
.getNamenode(cluster.getNameservices().get(1), null).getFileSystem();
nnFs2 = (DistributedFileSystem) cluster
.getNamenode(cluster.getNameservices().get(2), null).getFileSystem();
routerFs = (DistributedFileSystem) routerContext.getFileSystem();
rpcServer =routerContext.getRouter().getRpcServer();
}
Expand Down Expand Up @@ -668,14 +671,16 @@ public void testInvokeAtAvailableNs() throws IOException {
// Make one subcluster unavailable.
MiniDFSCluster dfsCluster = cluster.getCluster();
dfsCluster.shutdownNameNode(0);
dfsCluster.shutdownNameNode(1);
try {
// Verify that #invokeAtAvailableNs works by calling #getServerDefaults.
RemoteMethod method = new RemoteMethod("getServerDefaults");
FsServerDefaults serverDefaults =
rpcServer.invokeAtAvailableNs(method, FsServerDefaults.class);
assertNotNull(serverDefaults);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there something else we can assert? Any metrics?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of using the NamenodeMetrics, but it doesn't track getServerDefaults. Second is RBFClientMetrics. It tracks all the invokes for getServerDefault, irrespective of success and it isn't per namespace as well. So, in case the invocation order by any chance changes, from ns0->ns1->n2 to ns0->ns2->ns1 in that case the test will become flaky.

In general without the fix, the getServerDefault call will fail only if 2 NS is down, so I thought if the call is successful we can conclude things work. Let me know if you have any ideas around what we can assert

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the flakiness is not ideal.
Let's go with this.

} finally {
dfsCluster.restartNameNode(0);
dfsCluster.restartNameNode(0, false);
dfsCluster.restartNameNode(1);
}
}

Expand Down Expand Up @@ -893,6 +898,9 @@ private static FileSystem getFileSystem(final String nsId) {
if (nsId.equals("ns1")) {
return nnFs1;
}
if (nsId.equals("ns2")) {
return nnFs2;
}
return null;
}

Expand Down