Skip to content

Commit f910b11

Browse files
HDFS-17735. [ARR] LocalResolver#getDatanodesSubcluster adapts to async rpc. (#7422). Contributed by hfutatzhanghb.
Reviewed-by: Jian Zhang <keepromise@apache.org>
1 parent baf393d commit f910b11

File tree

4 files changed

+72
-9
lines changed

4 files changed

+72
-9
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,9 @@ public PathLocation getDestinationForPath(String path) throws IOException {
118118
public void addResolver(DestinationOrder order, OrderedResolver resolver) {
119119
orderedResolvers.put(order, resolver);
120120
}
121-
}
121+
122+
@VisibleForTesting
123+
public OrderedResolver getOrderedResolver(DestinationOrder order) {
124+
return orderedResolvers.get(order);
125+
}
126+
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.hadoop.classification.VisibleForTesting;
4646
import org.apache.hadoop.thirdparty.com.google.common.net.HostAndPort;
4747

48+
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
4849

4950
/**
5051
* The local subcluster (where the writer is) should be tried first. The writer
@@ -124,9 +125,9 @@ String getClientAddr() {
124125
* needs to be done as a privileged action to use the user for the Router and
125126
* not the one from the client in the RPC call.
126127
*
127-
* @return DN IP -> Subcluster.
128+
* @return DN IP -&gt; Subcluster.
128129
*/
129-
private Map<String, String> getDatanodesSubcluster() {
130+
public Map<String, String> getDatanodesSubcluster() {
130131

131132
final RouterRpcServer rpcServer = getRpcServer();
132133
if (rpcServer == null) {
@@ -143,9 +144,16 @@ private Map<String, String> getDatanodesSubcluster() {
143144
@Override
144145
public Map<String, DatanodeStorageReport[]> run() {
145146
try {
146-
return rpcServer.getDatanodeStorageReportMap(
147-
DatanodeReportType.ALL);
148-
} catch (IOException e) {
147+
Map<String, DatanodeStorageReport[]> result;
148+
if (rpcServer.isAsync()) {
149+
rpcServer.getDatanodeStorageReportMapAsync(DatanodeReportType.ALL);
150+
result = syncReturn(Map.class);
151+
} else {
152+
result = rpcServer.getDatanodeStorageReportMap(
153+
DatanodeReportType.ALL);
154+
}
155+
return result;
156+
} catch (Exception e) {
149157
LOG.error("Cannot get the datanodes from the RPC server", e);
150158
return null;
151159
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/utils/AsyncUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public static <R> R syncReturn(Class<R> clazz)
127127
try {
128128
return (R) completableFuture.get();
129129
} catch (ExecutionException e) {
130-
throw (Exception)e.getCause();
130+
throw (Exception) e.getCause();
131131
}
132132
}
133133

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRPCMultipleDestinationMountTableResolver.java

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,22 @@
2525
import org.apache.hadoop.hdfs.MiniDFSCluster;
2626
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
2727
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
28+
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
2829
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
2930
import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
3031
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
32+
import org.apache.hadoop.hdfs.server.federation.resolver.order.LocalResolver;
3133
import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
34+
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
3235
import org.apache.hadoop.hdfs.server.federation.router.RouterClientProtocol;
3336
import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
3437
import org.apache.hadoop.hdfs.server.federation.router.TestRouterRPCMultipleDestinationMountTableResolver;
38+
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
3539
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
3640
import org.junit.BeforeClass;
3741
import org.junit.Test;
42+
import org.slf4j.Logger;
43+
import org.slf4j.LoggerFactory;
3844

3945
import java.io.IOException;
4046
import java.util.HashMap;
@@ -52,14 +58,21 @@
5258
public class TestRouterAsyncRPCMultipleDestinationMountTableResolver extends
5359
TestRouterRPCMultipleDestinationMountTableResolver {
5460

61+
public static final Logger LOG =
62+
LoggerFactory.getLogger(TestRouterAsyncRPCMultipleDestinationMountTableResolver.class);
63+
5564
@BeforeClass
5665
public static void setUp() throws Exception {
5766

5867
// Build and start a federated cluster.
5968
cluster = new StateStoreDFSCluster(false, 3,
6069
MultipleDestinationMountTableResolver.class);
61-
Configuration routerConf =
62-
new RouterConfigBuilder().stateStore().admin().quota().rpc().build();
70+
Configuration routerConf = new RouterConfigBuilder()
71+
.stateStore()
72+
.admin()
73+
.quota()
74+
.rpc()
75+
.build();
6376
routerConf.setBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true);
6477

6578
Configuration hdfsConf = new Configuration(false);
@@ -84,6 +97,43 @@ public static void setUp() throws Exception {
8497
rpcServer =routerContext.getRouter().getRpcServer();
8598
}
8699

100+
@Test
101+
public void testLocalResolverGetDatanodesSubcluster() throws IOException {
102+
String testPath = "/testLocalResolverGetDatanodesSubcluster";
103+
Path path = new Path(testPath);
104+
Map<String, String> destMap = new HashMap<>();
105+
destMap.put("ns0", testPath);
106+
destMap.put("ns1", testPath);
107+
nnFs0.mkdirs(path);
108+
nnFs1.mkdirs(path);
109+
MountTable addEntry =
110+
MountTable.newInstance(testPath, destMap);
111+
addEntry.setQuota(new RouterQuotaUsage.Builder().build());
112+
addEntry.setDestOrder(DestinationOrder.LOCAL);
113+
assertTrue(addMountTable(addEntry));
114+
115+
Map<String, String> datanodesSubcluster = null;
116+
try {
117+
MultipleDestinationMountTableResolver resolver =
118+
(MultipleDestinationMountTableResolver) routerContext.getRouter().getSubclusterResolver();
119+
LocalResolver localResolver =
120+
(LocalResolver) resolver.getOrderedResolver(DestinationOrder.LOCAL);
121+
datanodesSubcluster = localResolver.getDatanodesSubcluster();
122+
} catch (Exception e) {
123+
LOG.info("Exception occurs when testLocalResolverGetDatanodesSubcluster.", e);
124+
} finally {
125+
RouterClient client = routerContext.getAdminClient();
126+
MountTableManager mountTableManager = client.getMountTableManager();
127+
RemoveMountTableEntryRequest req2 =
128+
RemoveMountTableEntryRequest.newInstance(testPath);
129+
mountTableManager.removeMountTableEntry(req2);
130+
nnFs0.delete(new Path(testPath), true);
131+
nnFs1.delete(new Path(testPath), true);
132+
}
133+
assertNotNull(datanodesSubcluster);
134+
assertFalse(datanodesSubcluster.isEmpty());
135+
}
136+
87137
@Override
88138
@Test
89139
public void testInvokeAtAvailableNs() throws IOException {

0 commit comments

Comments
 (0)