Skip to content

Commit d16cffa

Browse files
committed
add UT
1 parent 1be829a commit d16cffa

File tree

6 files changed

+349
-2
lines changed

6 files changed

+349
-2
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncNamenodeProtocol.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@
3939
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
4040
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
4141

42+
/**
43+
* Module that implements all the asynchronous RPC calls in {@link NamenodeProtocol} in the
44+
* {@link RouterRpcServer}.
45+
*/
4246
public class RouterAsyncNamenodeProtocol extends RouterNamenodeProtocol {
4347

4448
/** RPC server to receive client calls. */

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/async/RouterAsyncUserProtocol.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@
4242
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
4343
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
4444

45+
/**
46+
* Module that implements all the asynchronous RPC calls in
47+
* {@link RefreshUserMappingsProtocol} {@link GetUserMappingsProtocol} in the
48+
* {@link RouterRpcServer}.
49+
*/
4550
public class RouterAsyncUserProtocol extends RouterUserProtocol {
4651
private static final Logger LOG =
4752
LoggerFactory.getLogger(RouterAsyncUserProtocol.class);

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ public RouterRpcServer(Configuration conf, Router router,
462462
/**
463463
* Init router async handlers and router async responders.
464464
*/
465-
protected void initAsyncThreadPool() {
465+
public void initAsyncThreadPool() {
466466
int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT,
467467
DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT);
468468
int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT,
@@ -605,7 +605,7 @@ BalanceProcedureScheduler getFedRenameScheduler() {
605605
* @return routerStateIdContext
606606
*/
607607
@VisibleForTesting
608-
protected RouterStateIdContext getRouterStateIdContext() {
608+
public RouterStateIdContext getRouterStateIdContext() {
609609
return routerStateIdContext;
610610
}
611611

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.async;
19+
20+
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.fs.FileSystem;
22+
import org.apache.hadoop.fs.Path;
23+
import org.apache.hadoop.fs.permission.FsPermission;
24+
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
25+
import org.apache.hadoop.hdfs.server.federation.MockResolver;
26+
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
27+
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
28+
import org.apache.hadoop.hdfs.server.federation.router.RouterAsyncRpcClient;
29+
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
30+
import org.apache.hadoop.ipc.CallerContext;
31+
import org.junit.After;
32+
import org.junit.AfterClass;
33+
import org.junit.Before;
34+
import org.junit.BeforeClass;
35+
import org.mockito.Mockito;
36+
37+
import java.io.IOException;
38+
import java.util.concurrent.TimeUnit;
39+
40+
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
41+
import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
42+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
43+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
44+
import static org.junit.Assert.assertTrue;
45+
46+
/**
47+
* Used to test the functionality of async router rps.
48+
*/
49+
public class RouterAsyncProtocolTestBase {
50+
private static Configuration routerConf;
51+
/** Federated HDFS cluster. */
52+
private static MiniRouterDFSCluster cluster;
53+
private static String ns0;
54+
55+
/** Random Router for this federated cluster. */
56+
private MiniRouterDFSCluster.RouterContext router;
57+
private FileSystem routerFs;
58+
private RouterRpcServer routerRpcServer;
59+
private RouterRpcServer routerAsyncRpcServer;
60+
61+
@BeforeClass
62+
public static void setUpCluster() throws Exception {
63+
cluster = new MiniRouterDFSCluster(true, 1, 2,
64+
DEFAULT_HEARTBEAT_INTERVAL_MS, 1000);
65+
cluster.setNumDatanodesPerNameservice(3);
66+
67+
cluster.startCluster();
68+
69+
// Making one Namenode active per nameservice
70+
if (cluster.isHighAvailability()) {
71+
for (String ns : cluster.getNameservices()) {
72+
cluster.switchToActive(ns, NAMENODES[0]);
73+
cluster.switchToStandby(ns, NAMENODES[1]);
74+
}
75+
}
76+
// Start routers with only an RPC service
77+
routerConf = new RouterConfigBuilder()
78+
.rpc()
79+
.build();
80+
81+
// Reduce the number of RPC clients threads to overload the Router easy
82+
routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
83+
routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
84+
routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
85+
// We decrease the DN cache times to make the test faster
86+
routerConf.setTimeDuration(
87+
RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
88+
cluster.addRouterOverrides(routerConf);
89+
// Start routers with only an RPC service
90+
cluster.startRouters();
91+
92+
// Register and verify all NNs with all routers
93+
cluster.registerNamenodes();
94+
cluster.waitNamenodeRegistration();
95+
cluster.waitActiveNamespaces();
96+
ns0 = cluster.getNameservices().get(0);
97+
}
98+
99+
@AfterClass
100+
public static void shutdownCluster() throws Exception {
101+
if (cluster != null) {
102+
cluster.shutdown();
103+
}
104+
}
105+
106+
@Before
107+
public void setUp() throws IOException {
108+
router = cluster.getRandomRouter();
109+
routerFs = router.getFileSystem();
110+
routerRpcServer = router.getRouterRpcServer();
111+
routerRpcServer.initAsyncThreadPool();
112+
RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
113+
routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
114+
routerRpcServer.getRPCMonitor(),
115+
routerRpcServer.getRouterStateIdContext());
116+
routerAsyncRpcServer = Mockito.spy(routerRpcServer);
117+
Mockito.when(routerAsyncRpcServer.getRPCClient()).thenReturn(asyncRpcClient);
118+
119+
// Create mock locations
120+
MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver();
121+
resolver.addLocation("/", ns0, "/");
122+
FsPermission permission = new FsPermission("705");
123+
routerFs.mkdirs(new Path("/testdir"), permission);
124+
}
125+
126+
@After
127+
public void tearDown() throws IOException {
128+
// clear client context
129+
CallerContext.setCurrent(null);
130+
boolean delete = routerFs.delete(new Path("/testdir"));
131+
assertTrue(delete);
132+
if (routerFs != null) {
133+
routerFs.close();
134+
}
135+
}
136+
137+
public static Configuration getRouterConf() {
138+
return routerConf;
139+
}
140+
141+
public static MiniRouterDFSCluster getCluster() {
142+
return cluster;
143+
}
144+
145+
public static String getNs0() {
146+
return ns0;
147+
}
148+
149+
public MiniRouterDFSCluster.RouterContext getRouter() {
150+
return router;
151+
}
152+
153+
public FileSystem getRouterFs() {
154+
return routerFs;
155+
}
156+
157+
public RouterRpcServer getRouterRpcServer() {
158+
return routerRpcServer;
159+
}
160+
161+
public RouterRpcServer getRouterAsyncRpcServer() {
162+
return routerAsyncRpcServer;
163+
}
164+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.async;
19+
20+
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
21+
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
22+
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
23+
import org.apache.hadoop.hdfs.server.federation.router.RouterNamenodeProtocol;
24+
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
25+
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
26+
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
27+
import org.junit.Before;
28+
import org.junit.Test;
29+
30+
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
31+
import static org.junit.Assert.assertEquals;
32+
import static org.junit.Assert.assertNotNull;
33+
34+
/**
35+
* Used to test the functionality of {@link RouterAsyncNamenodeProtocol}.
36+
*/
37+
public class TestRouterAsyncNamenodeProtocol extends RouterAsyncProtocolTestBase {
38+
39+
private RouterAsyncNamenodeProtocol asyncNamenodeProtocol;
40+
private RouterNamenodeProtocol namenodeProtocol;
41+
42+
@Before
43+
public void setup() throws Exception {
44+
asyncNamenodeProtocol = new RouterAsyncNamenodeProtocol(getRouterAsyncRpcServer());
45+
namenodeProtocol = new RouterNamenodeProtocol(getRouterRpcServer());
46+
}
47+
48+
@Test
49+
public void getBlocks() throws Exception {
50+
DatanodeInfo[] dns = getRouter().getClient()
51+
.getNamenode().getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
52+
53+
DatanodeInfo dn0 = dns[0];
54+
asyncNamenodeProtocol.getBlocks(dn0, 1024, 0, 0,
55+
null);
56+
BlocksWithLocations asyncRouterBlockLocations = syncReturn(BlocksWithLocations.class);
57+
assertNotNull(asyncRouterBlockLocations);
58+
59+
BlocksWithLocations syncRouterBlockLocations = namenodeProtocol.getBlocks(dn0, 1024,
60+
0, 0, null);
61+
62+
BlockWithLocations[] asyncRouterBlocks = asyncRouterBlockLocations.getBlocks();
63+
BlockWithLocations[] syncRouterBlocks = syncRouterBlockLocations.getBlocks();
64+
65+
assertEquals(asyncRouterBlocks.length, syncRouterBlocks.length);
66+
for (int i = 0; i < syncRouterBlocks.length; i++) {
67+
assertEquals(
68+
asyncRouterBlocks[i].getBlock().getBlockId(),
69+
syncRouterBlocks[i].getBlock().getBlockId());
70+
}
71+
}
72+
73+
@Test
74+
public void getBlockKeys() throws Exception {
75+
asyncNamenodeProtocol.getBlockKeys();
76+
ExportedBlockKeys asyncBlockKeys = syncReturn(ExportedBlockKeys.class);
77+
assertNotNull(asyncBlockKeys);
78+
79+
ExportedBlockKeys syncBlockKeys = namenodeProtocol.getBlockKeys();
80+
compareBlockKeys(asyncBlockKeys, syncBlockKeys);
81+
}
82+
83+
@Test
84+
public void getTransactionID() throws Exception {
85+
asyncNamenodeProtocol.getTransactionID();
86+
long asyncTransactionID = syncReturn(Long.class);
87+
assertNotNull(asyncTransactionID);
88+
89+
long transactionID = namenodeProtocol.getTransactionID();
90+
assertEquals(asyncTransactionID, transactionID);
91+
}
92+
93+
@Test
94+
public void getMostRecentCheckpointTxId() throws Exception {
95+
asyncNamenodeProtocol.getMostRecentCheckpointTxId();
96+
long asyncMostRecentCheckpointTxId = syncReturn(Long.class);
97+
assertNotNull(asyncMostRecentCheckpointTxId);
98+
99+
long mostRecentCheckpointTxId = namenodeProtocol.getMostRecentCheckpointTxId();
100+
assertEquals(asyncMostRecentCheckpointTxId, mostRecentCheckpointTxId);
101+
}
102+
103+
@Test
104+
public void versionRequest() throws Exception {
105+
asyncNamenodeProtocol.versionRequest();
106+
NamespaceInfo asyncNamespaceInfo = syncReturn(NamespaceInfo.class);
107+
assertNotNull(asyncNamespaceInfo);
108+
NamespaceInfo syncNamespaceInfo = namenodeProtocol.versionRequest();
109+
compareVersion(asyncNamespaceInfo, syncNamespaceInfo);
110+
}
111+
112+
private void compareBlockKeys(
113+
ExportedBlockKeys blockKeys, ExportedBlockKeys otherBlockKeys) {
114+
assertEquals(blockKeys.getCurrentKey(), otherBlockKeys.getCurrentKey());
115+
assertEquals(blockKeys.getKeyUpdateInterval(), otherBlockKeys.getKeyUpdateInterval());
116+
assertEquals(blockKeys.getTokenLifetime(), otherBlockKeys.getTokenLifetime());
117+
}
118+
119+
private void compareVersion(NamespaceInfo version, NamespaceInfo otherVersion) {
120+
assertEquals(version.getBlockPoolID(), otherVersion.getBlockPoolID());
121+
assertEquals(version.getNamespaceID(), otherVersion.getNamespaceID());
122+
assertEquals(version.getClusterID(), otherVersion.getClusterID());
123+
assertEquals(version.getLayoutVersion(), otherVersion.getLayoutVersion());
124+
assertEquals(version.getCTime(), otherVersion.getCTime());
125+
}
126+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.async;
19+
20+
import org.apache.hadoop.security.UserGroupInformation;
21+
import org.junit.Before;
22+
import org.junit.Test;
23+
24+
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
25+
import static org.junit.Assert.assertArrayEquals;
26+
27+
/**
28+
* Used to test the functionality of {@link RouterAsyncUserProtocol}.
29+
*/
30+
public class TestRouterAsyncUserProtocol extends RouterAsyncProtocolTestBase {
31+
32+
private RouterAsyncUserProtocol asyncUserProtocol;
33+
34+
@Before
35+
public void setup() throws Exception {
36+
asyncUserProtocol = new RouterAsyncUserProtocol(getRouterAsyncRpcServer());
37+
}
38+
39+
@Test
40+
public void testgetGroupsForUser() throws Exception {
41+
String[] group = new String[] {"bar", "group2"};
42+
UserGroupInformation.createUserForTesting("user",
43+
new String[] {"bar", "group2"});
44+
asyncUserProtocol.getGroupsForUser("user");
45+
String[] result = syncReturn(String[].class);
46+
assertArrayEquals(group, result);
47+
}
48+
}

0 commit comments

Comments
 (0)